1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-02 18:23:12 +00:00

Add .blacklist command.

This commit is contained in:
wlinator 2024-07-08 21:50:07 +02:00
parent 2ba645fac5
commit 7e2a5c18a6
8 changed files with 132 additions and 2 deletions

View file

@ -37,8 +37,35 @@ class LumiBot(bridge.Bot):
ctx = await self.get_context(message)
if ctx.command:
await ctx.trigger_typing()
# await ctx.trigger_typing()
await self.invoke(ctx)
@staticmethod
async def convert_to_user(ctx: commands.Context | bridge.Context, user_id: int) -> Optional[discord.User]:
"""
Converts a user ID to a User object.
Args:
ctx (commands.Context): The context in which the command was invoked.
user_id (int): The ID of the user to convert.
Returns:
Optional[discord.User]: The User object, or None if conversion fails.
"""
try:
if isinstance(ctx, bridge.BridgeApplicationContext):
return # TODO: Implement this
else:
return await commands.UserConverter().convert(ctx, str(user_id))
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
@staticmethod
async def convert_to_text_channel(

View file

@ -8,6 +8,7 @@ import Client
import config.parser
import services.config_service
import services.help_service
from services.blacklist_service import BlacklistUserService
# Remove the default logger configuration
logger.remove()
@ -44,6 +45,12 @@ client = Client.LumiBot(
help_command=services.help_service.LumiHelp()
)
@client.check
async def blacklist_check(ctx):
if BlacklistUserService.is_user_blacklisted(ctx.author.id):
return False
return True
def load_modules():
loaded = set()

View file

@ -5,6 +5,7 @@ from discord.ext.commands import Cog
import lib.embeds.boost
from lib.embeds.greet import Greet
from services.config_service import GuildConfig
from services.blacklist_service import BlacklistUserService
class EventHandler(Cog):
@ -13,6 +14,9 @@ class EventHandler(Cog):
@Cog.listener()
async def on_member_join(self, member):
if BlacklistUserService.is_user_blacklisted(member.id):
return
config = GuildConfig(member.guild.id)
if not config.welcome_channel_id:
@ -27,6 +31,9 @@ class EventHandler(Cog):
@Cog.listener()
async def on_member_update(self, before, after):
if BlacklistUserService.is_user_blacklisted(after.id):
return
if before.premium_since is None and after.premium_since is not None:
await self.on_nitro_boost(after)

View file

@ -3,6 +3,7 @@ import random
from discord.ext.commands import Cog
from config.parser import JsonCache
from services.blacklist_service import BlacklistUserService
_reactions = JsonCache.read_json("reactions")
_8ball = _reactions["eightball"]
@ -30,6 +31,9 @@ class ReactionListener(Cog):
@Cog.listener('on_message')
async def reaction_listener(self, message):
if BlacklistUserService.is_user_blacklisted(message.author.id):
return
if not message.author.bot:
await ReactionHandler.respond(message)

View file

@ -12,6 +12,7 @@ from config.parser import JsonCache
from lib import formatter
from services.config_service import GuildConfig
from services.xp_service import XpService, XpRewardService
from services.blacklist_service import BlacklistUserService
_strings = JsonCache.read_json("strings")
_messages = JsonCache.read_json("levels")
@ -232,6 +233,9 @@ class XpListener(commands.Cog):
Args:
message (discord.Message): The message object.
"""
if BlacklistUserService.is_user_blacklisted(message.author.id):
return
if message.author.bot or message.guild is None:
return

View file

@ -1,7 +1,8 @@
import discord
from discord.ext import commands, bridge
from modules.admin import award, sql
from modules.admin import award, sql, blacklist
from typing import Optional
class BotAdmin(commands.Cog, name="Bot Admin"):
@ -43,6 +44,14 @@ class BotAdmin(commands.Cog, name="Bot Admin"):
@commands.is_owner()
async def inject(self, ctx, *, query: str):
return await sql.inject_cmd(ctx, query)
@commands.command(
name="blacklist",
help="Add or remove a user from the blacklist. This command can only be performed by a bot administrator."
)
@commands.is_owner()
async def blacklist(self, ctx, user: discord.User, *, reason: Optional[str] = None):
return await blacklist.blacklist_user(ctx, user, reason)
def setup(client):

View file

@ -0,0 +1,31 @@
import discord
from services.blacklist_service import BlacklistUserService
from typing import Optional
from config.parser import JsonCache
resources = JsonCache.read_json("art")
exclaim_icon = resources["icons"]["exclaim"]
hammer_icon = resources["icons"]["hammer"]
async def blacklist_user(
ctx, user: discord.User, reason: Optional[str] = None
) -> None:
"""
Blacklists a user with an optional reason.
Args:
user_id (int): The ID of the user to blacklist.
reason (str, optional): The reason for blacklisting the user. Defaults to "No reason was given".
"""
blacklist_service = BlacklistUserService(user.id)
blacklist_service.add_to_blacklist(reason)
embed = discord.Embed(
description=f"User `{user.name}` has been blacklisted from Luminara.",
color=discord.Color.red(),
)
embed.set_author(name="User Blacklisted", icon_url=hammer_icon)
embed.set_footer(text="There is no process to reinstate a blacklisted user. Appeals are not considered.", icon_url=exclaim_icon)
await ctx.send(embed=embed)

View file

@ -0,0 +1,41 @@
from db import database
from typing import List, Tuple, Optional
class BlacklistUserService:
def __init__(self, user_id: int) -> None:
self.user_id: int = user_id
def add_to_blacklist(self, reason: Optional[str] = None) -> None:
"""
Adds a user to the blacklist with the given reason.
Args:
reason (str): The reason for blacklisting the user.
"""
query: str = """
INSERT INTO blacklist_user (user_id, reason)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE reason = VALUES(reason)
"""
database.execute_query(query, (self.user_id, reason))
@staticmethod
def is_user_blacklisted(user_id: int) -> bool:
"""
Checks if a user is currently blacklisted.
Args:
user_id (int): The ID of the user to check.
Returns:
bool: True if the user is blacklisted, False otherwise.
"""
query: str = """
SELECT active
FROM blacklist_user
WHERE user_id = %s
"""
result: List[Tuple[bool]] = database.select_query(query, (user_id,))
return any(active for (active,) in result)