mirror of
https://github.com/wlinator/luminara.git
synced 2024-10-02 18:23:12 +00:00
Add xp handler
This commit is contained in:
parent
2d96944a88
commit
dafaff56bd
1 changed files with 265 additions and 0 deletions
265
handlers/xp.py
Normal file
265
handlers/xp.py
Normal file
|
@ -0,0 +1,265 @@
|
|||
import asyncio
|
||||
import contextlib
|
||||
import random
|
||||
import time
|
||||
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
import lib.format
|
||||
from lib.client import Luminara
|
||||
from lib.const import CONST
|
||||
from services.blacklist_service import BlacklistUserService
|
||||
from services.config_service import GuildConfig
|
||||
from services.xp_service import XpRewardService, XpService
|
||||
|
||||
|
||||
class XPHandler:
|
||||
def __init__(self, client: Luminara, message: discord.Message) -> None:
|
||||
"""
|
||||
Initializes the XPHandler with the given client and message.
|
||||
|
||||
Args:
|
||||
client (Luminara): The bot client.
|
||||
message (discord.Message): The message object.
|
||||
"""
|
||||
self.client = client
|
||||
self.message: discord.Message = message
|
||||
self.author: discord.Member | discord.User = message.author
|
||||
self.guild: discord.Guild | None = message.guild
|
||||
self.xp_conf: XpService = XpService(
|
||||
self.author.id,
|
||||
self.guild.id if self.guild else 0,
|
||||
)
|
||||
self.guild_conf: GuildConfig | None = None
|
||||
|
||||
def process(self) -> bool:
|
||||
"""
|
||||
Processes the XP gain and level up for the user.
|
||||
|
||||
Returns:
|
||||
bool: True if the user leveled up, False otherwise.
|
||||
"""
|
||||
_xp: XpService = self.xp_conf
|
||||
_now: float = time.time()
|
||||
leveled_up: bool = False
|
||||
|
||||
if _xp.cooldown_time and _now < _xp.cooldown_time:
|
||||
return False
|
||||
|
||||
# Award the amount of XP specified in .env
|
||||
_xp.xp += _xp.xp_gain
|
||||
|
||||
# Check if total XP now exceeds the XP required to level up
|
||||
if _xp.xp >= XpService.xp_needed_for_next_level(_xp.level):
|
||||
_xp.level += 1
|
||||
_xp.xp = 0
|
||||
leveled_up = True
|
||||
|
||||
_xp.cooldown_time = _now + _xp.new_cooldown
|
||||
_xp.push()
|
||||
return leveled_up
|
||||
|
||||
async def notify(self) -> None:
|
||||
"""
|
||||
Notifies the user and the guild about the level up.
|
||||
"""
|
||||
if self.guild is None:
|
||||
return
|
||||
|
||||
_xp: XpService = self.xp_conf
|
||||
_gd: GuildConfig = GuildConfig(self.guild.id)
|
||||
|
||||
level_message: str | None = None # Initialize level_message
|
||||
|
||||
if isinstance(self.author, discord.Member):
|
||||
level_message = await self.get_level_message(_gd, _xp, self.author)
|
||||
|
||||
if level_message:
|
||||
level_channel: discord.TextChannel | None = await self.get_level_channel(
|
||||
self.message,
|
||||
_gd,
|
||||
)
|
||||
|
||||
if level_channel:
|
||||
await level_channel.send(content=level_message)
|
||||
else:
|
||||
await self.message.reply(content=level_message)
|
||||
|
||||
async def reward(self) -> None:
|
||||
"""
|
||||
Rewards the user with a role for leveling up.
|
||||
"""
|
||||
if self.guild is None:
|
||||
return
|
||||
|
||||
_xp: XpService = self.xp_conf
|
||||
_rew: XpRewardService = XpRewardService(self.guild.id)
|
||||
|
||||
if role_id := _rew.get_role(_xp.level):
|
||||
reason: str = "Automated Level Reward"
|
||||
|
||||
if role := self.guild.get_role(role_id):
|
||||
with contextlib.suppress(
|
||||
discord.Forbidden,
|
||||
discord.NotFound,
|
||||
discord.HTTPException,
|
||||
):
|
||||
if isinstance(self.author, discord.Member):
|
||||
await self.author.add_roles(role, reason=reason)
|
||||
previous, replace = _rew.should_replace_previous_reward(_xp.level)
|
||||
|
||||
if (
|
||||
replace
|
||||
and isinstance(self.author, discord.Member)
|
||||
and (role := self.guild.get_role(previous or role_id))
|
||||
):
|
||||
with contextlib.suppress(
|
||||
discord.Forbidden,
|
||||
discord.NotFound,
|
||||
discord.HTTPException,
|
||||
):
|
||||
await self.author.remove_roles(role, reason=reason)
|
||||
|
||||
async def get_level_channel(
|
||||
self,
|
||||
message: discord.Message,
|
||||
guild_config: GuildConfig,
|
||||
) -> discord.TextChannel | None:
|
||||
"""
|
||||
Retrieves the level up notification channel for the guild.
|
||||
|
||||
Args:
|
||||
message (discord.Message): The message object.
|
||||
guild_config (GuildConfig): The guild configuration.
|
||||
|
||||
Returns:
|
||||
Optional[discord.TextChannel]: The level up notification channel, or None if not found.
|
||||
"""
|
||||
if guild_config.level_channel_id and message.guild:
|
||||
context = await self.client.get_context(message)
|
||||
|
||||
with contextlib.suppress(commands.BadArgument, commands.CommandError):
|
||||
return await commands.TextChannelConverter().convert(
|
||||
context,
|
||||
str(guild_config.level_channel_id),
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def get_level_message(
|
||||
guild_config: GuildConfig,
|
||||
level_config: XpService,
|
||||
author: discord.Member,
|
||||
) -> str | None:
|
||||
"""
|
||||
Retrieves the level up message for the user.
|
||||
|
||||
Args:
|
||||
guild_config (GuildConfig): The guild configuration.
|
||||
level_config (XpService): The XP service configuration.
|
||||
author (discord.Member): The user who leveled up.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The level up message, or None if not found.
|
||||
"""
|
||||
match guild_config.level_message_type:
|
||||
case 0:
|
||||
level_message = None
|
||||
case 1:
|
||||
level_message = XPHandler.messages_whimsical(level_config.level, author)
|
||||
case 2:
|
||||
if not guild_config.level_message:
|
||||
level_message = XPHandler.level_message_generic(
|
||||
level_config.level,
|
||||
author,
|
||||
)
|
||||
else:
|
||||
level_message = lib.format.template(
|
||||
guild_config.level_message,
|
||||
author.name,
|
||||
level_config.level,
|
||||
)
|
||||
case _:
|
||||
msg = "Invalid level message type"
|
||||
raise ValueError(msg)
|
||||
|
||||
return level_message
|
||||
|
||||
@staticmethod
|
||||
def level_message_generic(level: int, author: discord.Member) -> str:
|
||||
"""
|
||||
Generates a generic level up message.
|
||||
|
||||
Args:
|
||||
level (int): The new level of the user.
|
||||
author (discord.Member): The user who leveled up.
|
||||
|
||||
Returns:
|
||||
str: The generic level up message.
|
||||
"""
|
||||
return CONST.STRINGS["level_up"].format(author.name, level)
|
||||
|
||||
@staticmethod
|
||||
def messages_whimsical(level: int, author: discord.Member) -> str:
|
||||
"""
|
||||
Generates a whimsical level up message.
|
||||
|
||||
Args:
|
||||
level (int): The new level of the user.
|
||||
author (discord.Member): The user who leveled up.
|
||||
|
||||
Returns:
|
||||
str: The whimsical level up message.
|
||||
"""
|
||||
level_range: str | None = None
|
||||
for key in CONST.LEVEL_MESSAGES:
|
||||
start, end = map(int, key.split("-"))
|
||||
if start <= level <= end:
|
||||
level_range = key
|
||||
break
|
||||
|
||||
if level_range is None:
|
||||
# Generic fallback
|
||||
return XPHandler.level_message_generic(level, author)
|
||||
|
||||
message_list = CONST.LEVEL_MESSAGES[level_range]
|
||||
random_message = random.choice(message_list)
|
||||
start_string = CONST.STRINGS["level_up_prefix"].format(author.name)
|
||||
return start_string + random_message.format(level)
|
||||
|
||||
|
||||
class XpListener(commands.Cog):
|
||||
def __init__(self, client: Luminara) -> None:
|
||||
"""
|
||||
Initializes the XpListener with the given client.
|
||||
|
||||
Args:
|
||||
client (Luminara): The bot client.
|
||||
"""
|
||||
self.client: Luminara = client
|
||||
|
||||
@commands.Cog.listener("on_message")
|
||||
async def xp_listener(self, message: discord.Message) -> None:
|
||||
"""
|
||||
Listens for messages and processes XP gain and level up.
|
||||
|
||||
Args:
|
||||
message (discord.Message): The message object.
|
||||
"""
|
||||
if BlacklistUserService.is_user_blacklisted(message.author.id):
|
||||
return
|
||||
|
||||
if message.author.bot or message.guild is None:
|
||||
return
|
||||
|
||||
_xp: XPHandler = XPHandler(self.client, message)
|
||||
if _xp.process():
|
||||
await asyncio.gather(
|
||||
_xp.notify(),
|
||||
_xp.reward(),
|
||||
)
|
||||
|
||||
|
||||
async def setup(client: Luminara) -> None:
|
||||
await client.add_cog(XpListener(client))
|
Loading…
Reference in a new issue