1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-02 18:23:12 +00:00

Allow mentions as a prefix and add sync command

This commit is contained in:
wlinator 2024-08-12 03:35:37 -04:00
parent b28c809d8f
commit 9d3f4df03f
8 changed files with 44 additions and 212 deletions

167
Client.py
View file

@ -1,10 +1,8 @@
import os
import platform
from typing import Optional
import discord
from discord.ext import bridge, commands
from discord.ext.commands import EmojiConverter, TextChannelConverter
from discord.ext import bridge
from loguru import logger
from lib.constants import CONST
@ -43,166 +41,3 @@ class LumiBot(bridge.Bot):
if ctx.command:
# await ctx.trigger_typing()
await self.invoke(ctx)
@staticmethod
async def convert_to_user(
ctx: commands.Context | bridge.Context,
user_id: int,
) -> Optional[discord.User]:
"""
Converts a user ID to a User object.
Args:
ctx (commands.Context): The context in which the command was invoked.
user_id (int): The ID of the user to convert.
Returns:
Optional[discord.User]: The User object, or None if conversion fails.
"""
try:
if isinstance(ctx, bridge.BridgeApplicationContext):
return # TODO: Implement this
else:
return await commands.UserConverter().convert(ctx, str(user_id))
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
@staticmethod
async def convert_to_emoji(
ctx: commands.Context | bridge.Context,
emoji: str,
) -> Optional[discord.Emoji]:
"""
Converts a emoji to an Emoji object.
"""
converter = EmojiConverter()
try:
if isinstance(ctx, bridge.BridgeApplicationContext):
return # TODO: Implement this
else:
return await converter.convert(ctx, emoji)
except commands.EmojiNotFound:
logger.warning(f"Emoji not found: {emoji}")
return None
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
@staticmethod
async def convert_to_text_channel(
ctx: commands.Context | bridge.Context,
channel_id: int,
) -> Optional[discord.TextChannel]:
"""
Converts a channel ID to a TextChannel object.
Args:
ctx (commands.Context): The context in which the command was invoked.
channel_id (int): The ID of the channel to convert.
Returns:
Optional[discord.TextChannel]: The TextChannel object, or None if conversion fails.
"""
converter = TextChannelConverter()
try:
if isinstance(ctx, bridge.BridgeApplicationContext):
return # TODO: Implement this
else:
return await converter.convert(ctx, str(channel_id))
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
@staticmethod
async def convert_to_member(
ctx: commands.Context,
user_id: int,
) -> Optional[discord.Member]:
"""
Converts a user ID to a Member object.
Args:
ctx (commands.Context): The context in which the command was invoked.
user_id (int): The ID of the user to convert.
Returns:
Optional[discord.Member]: The Member object, or None if conversion fails.
"""
converter = commands.MemberConverter()
try:
member = await converter.convert(ctx, str(user_id))
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
return member
@staticmethod
async def get_or_fetch_channel(
guild: discord.Guild,
channel_id: int,
) -> Optional[discord.abc.GuildChannel]:
"""
Retrieves a channel from the guild's cache or fetches it from the API if not found.
Args:
guild (discord.Guild): The guild object.
channel_id (int): The ID of the channel to retrieve or fetch.
Returns:
Optional[discord.abc.GuildChannel]: The channel object, or None if not found or an error occurs.
"""
channel = guild.get_channel(channel_id)
if not channel:
try:
channel = await guild.fetch_channel(channel_id)
except (discord.HTTPException, discord.NotFound, discord.Forbidden):
return None
return channel
@staticmethod
async def get_or_fetch_member(
guild: discord.Guild,
user_id: int,
) -> Optional[discord.Member]:
"""
Retrieves a member from the guild's cache or fetches them from the API if not found.
Args:
guild (discord.Guild): The guild object.
user_id (int): The ID of the member to retrieve or fetch.
Returns:
Optional[discord.Member]: The member object, or None if not found or an error occurs.
"""
member = guild.get_member(user_id)
if not member:
try:
member = await guild.fetch_member(user_id)
except (discord.HTTPException, discord.NotFound, discord.Forbidden):
return None
return member

View file

@ -2,6 +2,7 @@ import os
import sys
import discord
from discord.ext import commands
from loguru import logger
import Client
@ -26,10 +27,8 @@ logger.add(sys.stdout, format=log_format, colorize=True, level="DEBUG")
async def get_prefix(bot, message):
try:
return services.config_service.GuildConfig.get_prefix(message.guild.id)
except AttributeError:
return "."
extras = services.config_service.GuildConfig.get_prefix(message.guild.id)
return commands.when_mentioned_or(*extras)(bot, message)
client = Client.LumiBot(

View file

@ -66,7 +66,7 @@ class EventHandler(Cog):
@Cog.listener()
async def on_command_completion(self, ctx) -> None:
log_msg = "{} executed .{}".format(ctx.author.name, ctx.command.qualified_name)
log_msg = f"{ctx.author.name} executed .{ctx.command.qualified_name}"
if ctx.guild is not None:
logger.debug(f"{log_msg} | guild: {ctx.guild.name} ")
@ -75,7 +75,7 @@ class EventHandler(Cog):
@Cog.listener()
async def on_application_command_completion(self, ctx) -> None:
log_msg = "{} executed /{}".format(ctx.author.name, ctx.command.qualified_name)
log_msg = f"{ctx.author.name} executed /{ctx.command.qualified_name}"
if ctx.guild is not None:
logger.debug(f"{log_msg} | guild: {ctx.guild.name} ")

View file

@ -1,3 +1,4 @@
import contextlib
from discord import Message
from discord.ext.commands import Cog
from loguru import logger
@ -45,28 +46,21 @@ class ReactionHandler:
"""
Tries to respond to the message.
"""
response = data.get("response")
if response:
try:
if response := data.get("response"):
with contextlib.suppress(Exception):
await self.message.reply(response)
return True
except Exception:
pass
return False
async def try_react(self, data) -> bool:
"""
Tries to react to the message.
"""
emoji_id = data.get("emoji_id")
if emoji_id:
try:
emoji = self.client.get_emoji(emoji_id)
if emoji:
if emoji_id := data.get("emoji_id"):
with contextlib.suppress(Exception):
if emoji := self.client.get_emoji(emoji_id):
await self.message.add_reaction(emoji)
return True
except Exception:
pass
return False

View file

@ -6,6 +6,7 @@ from typing import Optional
import discord
from discord.ext import commands
from discord.ext.commands import TextChannelConverter
from Client import LumiBot
from config.parser import JsonCache
@ -140,10 +141,10 @@ class XPHandler:
if guild_config.level_channel_id and message.guild:
context = await self.client.get_context(message)
with contextlib.suppress(discord.HTTPException):
return await self.client.convert_to_text_channel(
with contextlib.suppress(commands.BadArgument, commands.CommandError):
return await TextChannelConverter().convert(
context,
guild_config.level_channel_id,
str(guild_config.level_channel_id),
)
return None

View file

@ -1,9 +1,9 @@
from typing import Optional
import discord
from discord.ext import bridge, commands
from discord.ext import commands
from modules.admin import award, blacklist, sql
from modules.admin import award, blacklist, sql, sync
class BotAdmin(commands.Cog, name="Bot Admin"):
@ -15,45 +15,32 @@ class BotAdmin(commands.Cog, name="Bot Admin"):
def __init__(self, client):
self.client = client
@bridge.bridge_command(
name="award",
description="This command can only be performed by a bot administrator.",
help="Awards cash to a specific user. This command can only be performed by a bot administrator.",
guild_only=True,
)
@commands.command(name="award")
@commands.guild_only()
@commands.is_owner()
async def award_command(self, ctx, user: discord.User, *, amount: int):
return await award.cmd(ctx, user, amount)
@bridge.bridge_command(
name="sqlselect",
aliases=["sqls"],
description="This command can only be performed by a bot administrator.",
help="Perform a SELECT query in the database. This command can only be performed by a bot administrator.",
)
@commands.command(name="sqlselect", aliases=["sqls"])
@commands.is_owner()
async def select(self, ctx, *, query: str):
return await sql.select_cmd(ctx, query)
@bridge.bridge_command(
name="sqlinject",
aliases=["sqli"],
description="This command can only be performed by a bot administrator.",
help="Change a value in the database. This command can only be performed by a bot administrator.",
)
@commands.command(name="sqlinject", aliases=["sqli"])
@commands.is_owner()
async def inject(self, ctx, *, query: str):
return await sql.inject_cmd(ctx, query)
@commands.command(
name="blacklist",
help="Add or remove a user from the blacklist. This command can only be performed by a bot administrator.",
)
@commands.command(name="blacklist")
@commands.is_owner()
async def blacklist(self, ctx, user: discord.User, *, reason: Optional[str] = None):
return await blacklist.blacklist_user(ctx, user, reason)
@commands.command(name="sync")
@commands.is_owner()
async def sync_command(self, ctx):
await sync.sync_commands(self.client, ctx)
def setup(client):
client.add_cog(BotAdmin(client))

16
modules/admin/sync.py Normal file
View file

@ -0,0 +1,16 @@
import discord
from lib.embed_builder import EmbedBuilder
from lib.exceptions.LumiExceptions import LumiException
async def sync_commands(client, ctx):
try:
await client.sync_commands()
embed = EmbedBuilder.create_success_embed(
ctx,
author_text="Sync Successful",
description="command tree synced successfully.",
)
await ctx.send(embed=embed)
except discord.HTTPException as e:
raise LumiException(f"An error occurred while syncing: {e}") from e

View file

@ -112,7 +112,7 @@ class GuildConfig:
prefix = database.select_query_one(query, (guild_id,))
return prefix if prefix else "."
return prefix or "."
@staticmethod
def set_prefix(guild_id, prefix):