1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-02 18:23:12 +00:00

Rewrite XP handling

This commit is contained in:
wlinator 2024-07-07 16:31:51 +02:00
parent a888dd42e1
commit 30d000924b
3 changed files with 363 additions and 173 deletions

View file

@ -3,7 +3,9 @@ import platform
from loguru import logger
import discord
from discord.ext import bridge
from discord.ext import bridge, commands
from discord.ext.commands import TextChannelConverter
from typing import Optional
from lib import metadata
@ -11,7 +13,7 @@ from lib import metadata
class LumiBot(bridge.Bot):
async def on_ready(self):
logger.info(f"{metadata.__title__} v{metadata.__version__}")
logger.info(f"Logged in with ID {self.user.id}")
logger.info(f"Logged in with ID {self.user.id if self.user else 'Unknown'}")
logger.info(f"discord.py API version: {discord.__version__}")
logger.info(f"Python version: {platform.python_version()}")
logger.info(f"Running on: {platform.system()} {platform.release()} ({os.name})")
@ -32,6 +34,40 @@ class LumiBot(bridge.Bot):
await ctx.trigger_typing()
await self.invoke(ctx)
@staticmethod
async def convert_to_text_channel(
ctx: commands.Context, channel_id: int
) -> Optional[discord.TextChannel]:
converter = TextChannelConverter()
try:
return await converter.convert(ctx, str(channel_id))
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
@staticmethod
async def convert_to_member(
ctx: commands.Context, user_id: int
) -> Optional[discord.Member]:
converter = commands.MemberConverter()
try:
member = await converter.convert(ctx, str(user_id))
except (
discord.HTTPException,
discord.NotFound,
discord.Forbidden,
commands.BadArgument,
):
return None
return member
@staticmethod
async def get_or_fetch_channel(guild, channel_id):
channel = guild.get_channel(channel_id)

View file

@ -1,9 +1,12 @@
import asyncio
import contextlib
import random
import time
from typing import Optional
import discord
from discord.ext.commands import Cog
from discord.ext import commands
from Client import LumiBot
from config.parser import JsonCache
from lib import formatter
@ -15,19 +18,32 @@ _messages = JsonCache.read_json("levels")
class XPHandler:
def __init__(self, message):
self.message = message
self.author = message.author
self.guild = message.guild
self.channel = message.channel
def __init__(self, client: LumiBot, message: discord.Message) -> None:
"""
Initializes the XPHandler with the given message.
self.xp_conf = XpService(self.author.id, self.guild.id)
self.guild_conf = None
Args:
message (discord.Message): The message object.
"""
self.client = client
self.message: discord.Message = message
self.author: discord.Member | discord.User = message.author
self.guild: discord.Guild | None = message.guild
self.xp_conf: XpService = XpService(
self.author.id, self.guild.id if self.guild else 0
)
self.guild_conf: Optional[GuildConfig] = None
def process(self) -> bool:
_xp = self.xp_conf
_now = time.time()
leveled_up = False
"""
Processes the XP gain and level up for the user.
Returns:
bool: True if the user leveled up, False otherwise.
"""
_xp: XpService = self.xp_conf
_now: float = time.time()
leveled_up: bool = False
if _xp.cooldown_time and _now < _xp.cooldown_time:
return False
@ -46,13 +62,24 @@ class XPHandler:
return leveled_up
async def notify(self) -> None:
_xp = self.xp_conf
_gd = GuildConfig(self.guild.id)
"""
Notifies the user and the guild about the level up.
"""
if self.guild is None:
return
level_message = await self.get_level_message(_gd, _xp, self.author)
_xp: XpService = self.xp_conf
_gd: GuildConfig = GuildConfig(self.guild.id)
level_message: Optional[str] = None # Initialize level_message
if isinstance(self.author, discord.Member):
level_message = await self.get_level_message(_gd, _xp, self.author)
if level_message:
level_channel = await self.get_level_channel(self.message, _gd)
level_channel: Optional[discord.TextChannel] = await self.get_level_channel(
self.message, _gd
)
if level_channel:
await level_channel.send(content=level_message)
@ -60,42 +87,70 @@ class XPHandler:
await self.message.reply(content=level_message)
async def reward(self) -> None:
_xp = self.xp_conf
_rew = XpRewardService(self.guild.id)
"""
Rewards the user with a role for leveling up.
"""
if self.guild is None:
return
role_id = _rew.role(_xp.level)
reason = 'Automated Level Reward'
_xp: XpService = self.xp_conf
_rew: XpRewardService = XpRewardService(self.guild.id)
if role_id:
if role_id := _rew.get_role(_xp.level):
reason: str = "Automated Level Reward"
role = self.guild.get_role(role_id)
if role:
try:
await self.author.add_roles(role, reason=reason)
except (discord.Forbidden, discord.NotFound, discord.HTTPException):
pass
if role := self.guild.get_role(role_id):
with contextlib.suppress(
discord.Forbidden, discord.NotFound, discord.HTTPException
):
if isinstance(self.author, discord.Member):
await self.author.add_roles(role, reason=reason)
previous, replace = _rew.should_replace_previous_reward(_xp.level)
previous = _rew.replace_previous_reward(_xp.level)
if previous[1]:
role = self.guild.get_role(previous[0])
if role:
try:
if replace and isinstance(self.author, discord.Member):
if role := self.guild.get_role(previous or role_id):
with contextlib.suppress(
discord.Forbidden, discord.NotFound, discord.HTTPException
):
await self.author.remove_roles(role, reason=reason)
except (discord.Forbidden, discord.NotFound, discord.HTTPException):
pass
@staticmethod
async def get_level_channel(message, guild_config):
if guild_config.level_channel_id:
try:
return message.guild.get_channel(guild_config.level_channel_id)
except discord.HTTPException:
pass # channel not found
async def get_level_channel(
self, message: discord.Message, guild_config: GuildConfig
) -> Optional[discord.TextChannel]:
"""
Retrieves the level up notification channel for the guild.
Args:
message (discord.Message): The message object.
guild_config (GuildConfig): The guild configuration.
Returns:
Optional[discord.abc.GuildChannel]: The level up notification channel, or None if not found.
"""
if guild_config.level_channel_id and message.guild:
context = await self.client.get_context(message)
with contextlib.suppress(discord.HTTPException):
return await self.client.convert_to_text_channel(
context, guild_config.level_channel_id
)
return None
@staticmethod
async def get_level_message(guild_config, level_config, author):
async def get_level_message(
guild_config: GuildConfig, level_config: XpService, author: discord.Member
) -> Optional[str]:
"""
Retrieves the level up message for the user.
Args:
guild_config (GuildConfig): The guild configuration.
level_config (XpService): The XP service configuration.
author (discord.Member): The user who leveled up.
Returns:
Optional[str]: The level up message, or None if not found.
"""
match guild_config.level_message_type:
case 0:
level_message = None
@ -103,30 +158,47 @@ class XPHandler:
level_message = XPHandler.messages_whimsical(level_config.level, author)
case 2:
if not guild_config.level_message:
level_message = XPHandler.level_message_generic(level_config.level, author)
level_message = XPHandler.level_message_generic(
level_config.level, author
)
else:
level_message = formatter.template(guild_config.level_message, author.name, level_config.level)
level_message = formatter.template(
guild_config.level_message, author.name, level_config.level
)
case _:
raise Exception
raise ValueError("Invalid level message type")
return level_message
@staticmethod
def level_message_generic(level, author):
def level_message_generic(level: int, author: discord.Member) -> str:
"""
Generates a generic level up message.
Args:
level (int): The new level of the user.
author (discord.Member): The user who leveled up.
Returns:
str: The generic level up message.
"""
return _strings["level_up"].format(author.name, level)
@staticmethod
def messages_whimsical(level, author):
"""
v2 of the level messages, randomized output from levels.en-US.JSON.
:param level:
:param author:
:return:
def messages_whimsical(level: int, author: discord.Member) -> str:
"""
Generates a whimsical level up message.
level_range = None
Args:
level (int): The new level of the user.
author (discord.Member): The user who leveled up.
Returns:
str: The whimsical level up message.
"""
level_range: Optional[str] = None
for key in _messages.keys():
start, end = map(int, key.split('-'))
start, end = map(int, key.split("-"))
if start <= level <= end:
level_range = key
break
@ -141,28 +213,34 @@ class XPHandler:
return start_string + random_message.format(level)
class XpListener(Cog):
def __init__(self, client):
self.client = client
class XpListener(commands.Cog):
def __init__(self, client: LumiBot) -> None:
"""
Initializes the XpListener with the given client.
@Cog.listener('on_message')
async def xp_listener(self, message):
if (
message.author.bot or
message.guild is None
):
Args:
client (commands.Bot): The bot client.
"""
self.client: LumiBot = client
@commands.Cog.listener("on_message")
async def xp_listener(self, message: discord.Message) -> None:
"""
Listens for messages and processes XP gain and level up.
Args:
message (discord.Message): The message object.
"""
if message.author.bot or message.guild is None:
return
_xp = XPHandler(message)
leveled_up = _xp.process()
if leveled_up:
coroutines = [
asyncio.create_task(_xp.notify()),
asyncio.create_task(_xp.reward())
]
await asyncio.wait(coroutines)
_xp: XPHandler = XPHandler(self.client, message)
if _xp.process():
await asyncio.gather(
_xp.notify(),
_xp.reward(),
)
def setup(client):
def setup(client: LumiBot) -> None:
client.add_cog(XpListener(client))

View file

@ -1,12 +1,14 @@
import os
import time
from typing import Callable, Dict, List, Optional, Tuple
from discord.ext import commands
from db import database
xp_gain_per_message = int(os.environ.get("LUMI_XP_GAIN_PER_MESSAGE"))
xp_gain_cooldown = int(os.environ.get("LUMI_XP_GAIN_COOLDOWN"))
xp_gain_per_message: int = int(os.environ.get("LUMI_XP_GAIN_PER_MESSAGE", 1))
xp_gain_cooldown: int = int(os.environ.get("LUMI_XP_GAIN_COOLDOWN", 8))
class XpService:
@ -14,38 +16,50 @@ class XpService:
Stores and retrieves XP from the database for a given user.
"""
def __init__(self, user_id, guild_id):
self.user_id = user_id
self.guild_id = guild_id
self.xp = None
self.level = None
self.cooldown_time = None
self.xp_gain = xp_gain_per_message
self.new_cooldown = xp_gain_cooldown
def __init__(self, user_id: int, guild_id: int) -> None:
"""
Initializes the XpService with user and guild IDs, and fetches or creates XP data.
Args:
user_id (int): The ID of the user.
guild_id (int): The ID of the guild.
"""
self.user_id: int = user_id
self.guild_id: int = guild_id
self.xp: int = 0
self.level: int = 0
self.cooldown_time: Optional[float] = None
self.xp_gain: int = xp_gain_per_message
self.new_cooldown: int = xp_gain_cooldown
self.fetch_or_create_xp()
def push(self):
def push(self) -> None:
"""
Updates the XP and cooldown for a user.
Updates the XP and cooldown for a user in the database.
"""
query = """
query: str = """
UPDATE xp
SET user_xp = %s, user_level = %s, cooldown = %s
WHERE user_id = %s AND guild_id = %s
"""
database.execute_query(query, (self.xp, self.level, self.cooldown_time, self.user_id, self.guild_id))
database.execute_query(
query,
(self.xp, self.level, self.cooldown_time, self.user_id, self.guild_id),
)
def fetch_or_create_xp(self):
def fetch_or_create_xp(self) -> None:
"""
Gets a user's XP from the database or inserts a new row if it doesn't exist yet.
"""
query = "SELECT user_xp, user_level, cooldown FROM xp WHERE user_id = %s AND guild_id = %s"
query: str = "SELECT user_xp, user_level, cooldown FROM xp WHERE user_id = %s AND guild_id = %s"
try:
(user_xp, user_level, cooldown) = database.select_query(query, (self.user_id, self.guild_id))[0]
user_xp, user_level, cooldown = database.select_query(
query, (self.user_id, self.guild_id)
)[0]
except (IndexError, TypeError):
(user_xp, user_level, cooldown) = (None, None, None)
user_xp, user_level, cooldown = 0, 0, None
if any(var is None for var in [user_xp, user_level, cooldown]):
query = """
@ -53,82 +67,103 @@ class XpService:
VALUES (%s, %s, 0, 0, %s)
"""
database.execute_query(query, (self.user_id, self.guild_id, time.time()))
(user_xp, user_level, cooldown) = (0, 0, time.time())
user_xp, user_level, cooldown = 0, 0, time.time()
self.xp = user_xp
self.level = user_level
self.cooldown_time = cooldown
def calculate_rank(self):
def calculate_rank(self) -> Optional[int]:
"""
Checks which rank a user is in the guild
Checks which rank a user is in the guild.
Returns:
int | None: The rank of the user in the guild, or None if not found.
"""
query = """
query: str = """
SELECT user_id, user_xp, user_level
FROM xp
WHERE guild_id = %s
ORDER BY user_level DESC, user_xp DESC
"""
data = database.select_query(query, (self.guild_id,))
data: List[Tuple[int, int, int]] = database.select_query(
query, (self.guild_id,)
)
leaderboard = []
rank = 1
for row in data:
row_user_id = row[0]
user_xp = row[1]
user_level = row[2]
leaderboard.append((row_user_id, user_xp, user_level, rank))
rank += 1
user_rank = None
for entry in leaderboard:
if entry[0] == self.user_id:
user_rank = entry[3]
break
return user_rank
leaderboard: List[Tuple[int, int, int, int]] = [
(row[0], row[1], row[2], rank) for rank, row in enumerate(data, start=1)
]
return next(
(entry[3] for entry in leaderboard if entry[0] == self.user_id), None
)
@staticmethod
def load_leaderboard(guild_id):
def load_leaderboard(guild_id: int) -> List[Tuple[int, int, int, int]]:
"""
Returns the guild's XP leaderboard
Returns the guild's XP leaderboard.
Args:
guild_id (int): The ID of the guild.
Returns:
list: A list of tuples containing user_id, user_xp, user_level, and needed_xp_for_next_level.
"""
query = """
query: str = """
SELECT user_id, user_xp, user_level
FROM xp
WHERE guild_id = %s
ORDER BY user_level DESC, user_xp DESC
"""
data = database.select_query(query, (guild_id,))
data: List[Tuple[int, int, int]] = database.select_query(query, (guild_id,))
leaderboard = []
leaderboard: List[Tuple[int, int, int, int]] = []
for row in data:
row_user_id = row[0]
user_xp = row[1]
user_level = row[2]
needed_xp_for_next_level = XpService.xp_needed_for_next_level(user_level)
row_user_id: int = row[0]
user_xp: int = row[1]
user_level: int = row[2]
needed_xp_for_next_level: int = XpService.xp_needed_for_next_level(
user_level
)
leaderboard.append((row_user_id, user_xp, user_level, needed_xp_for_next_level))
leaderboard.append(
(row_user_id, user_xp, user_level, needed_xp_for_next_level)
)
return leaderboard
@staticmethod
def generate_progress_bar(current_value, target_value, bar_length=10):
def generate_progress_bar(
current_value: int, target_value: int, bar_length: int = 10
) -> str:
"""
Generates an XP progress bar based on the current level and XP.
Args:
current_value (int): The current XP value.
target_value (int): The target XP value.
bar_length (int, optional): The length of the progress bar. Defaults to 10.
Returns:
str: The formatted progress bar.
"""
progress = current_value / target_value
filled_length = int(bar_length * progress)
empty_length = bar_length - filled_length
bar = "" * filled_length + "" * empty_length
progress: float = current_value / target_value
filled_length: int = int(bar_length * progress)
empty_length: int = bar_length - filled_length
bar: str = "" * filled_length + "" * empty_length
return f"`{bar}` {current_value}/{target_value}"
@staticmethod
def xp_needed_for_next_level(current_level):
def xp_needed_for_next_level(current_level: int) -> int:
"""
Calculates the amount of XP needed to go to the next level, based on the current level.
Args:
current_level (int): The current level of the user.
Returns:
int: The amount of XP needed for the next level.
"""
formula_mapping = {
formula_mapping: Dict[Tuple[int, int], Callable[[int], int]] = {
(10, 19): lambda level: 12 * level + 28,
(20, 29): lambda level: 15 * level + 29,
(30, 39): lambda level: 18 * level + 30,
@ -140,77 +175,118 @@ class XpService:
(90, 99): lambda level: 36 * level + 36,
}
for level_range, formula in formula_mapping.items():
if level_range[0] <= current_level <= level_range[1]:
return formula(current_level)
# For levels below 10 and levels 110 and above
return 10 * current_level + 27 if current_level < 10 else 42 * current_level + 37
return next(
(
formula(current_level)
for level_range, formula in formula_mapping.items()
if level_range[0] <= current_level <= level_range[1]
),
(
10 * current_level + 27
if current_level < 10
else 42 * current_level + 37
),
)
class XpRewardService:
def __init__(self, guild_id):
self.guild_id = guild_id
self.rewards = self.get_rewards()
"""
Manages XP rewards for a guild.
"""
def get_rewards(self) -> dict:
query = """
def __init__(self, guild_id: int) -> None:
"""
Initializes the XpRewardService with the guild ID and fetches rewards.
Args:
guild_id (int): The ID of the guild.
"""
self.guild_id: int = guild_id
self.rewards: Dict[int, Tuple[int, bool]] = self._fetch_rewards()
def _fetch_rewards(self) -> Dict[int, Tuple[int, bool]]:
"""
Retrieves the XP rewards for the guild from the database.
Returns:
Dict[int, Tuple[int, bool]]: A dictionary of rewards with levels as keys and (role_id, persistent) as values.
"""
query: str = """
SELECT level, role_id, persistent
FROM level_rewards
WHERE guild_id = %s
ORDER BY level DESC
"""
data = database.select_query(query, (self.guild_id,))
data: List[Tuple[int, int, bool]] = database.select_query(
query, (self.guild_id,)
)
return {level: (role_id, persistent) for level, role_id, persistent in data}
rewards = {}
for row in data:
rewards[int(row[0])] = [int(row[1]), bool(row[2])]
def add_reward(self, level: int, role_id: int, persistent: bool) -> None:
"""
Adds a new XP reward for the guild.
return rewards
def add_reward(self, level: int, role_id: int, persistent: bool):
Args:
level (int): The level at which the reward is given.
role_id (int): The ID of the role to be awarded.
persistent (bool): Whether the reward is persistent.
Raises:
commands.BadArgument: If the server has more than 25 XP rewards.
"""
if len(self.rewards) >= 25:
raise commands.BadArgument("a server can't have more than 25 xp rewards.")
raise commands.BadArgument("A server can't have more than 25 XP rewards.")
query = """
query: str = """
INSERT INTO level_rewards (guild_id, level, role_id, persistent)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE role_id = %s, persistent = %s;
"""
database.execute_query(
query, (self.guild_id, level, role_id, persistent, role_id, persistent)
)
self.rewards[level] = (role_id, persistent)
database.execute_query(query, (self.guild_id, level, role_id, persistent, role_id, persistent))
def remove_reward(self, level: int) -> None:
"""
Removes an XP reward for the guild.
def remove_reward(self, level: int):
query = """
Args:
level (int): The level at which the reward is to be removed.
"""
query: str = """
DELETE FROM level_rewards
WHERE guild_id = %s
AND level = %s;
WHERE guild_id = %s AND level = %s;
"""
database.execute_query(query, (self.guild_id, level))
self.rewards.pop(level, None)
def role(self, level: int):
if self.rewards:
def get_role(self, level: int) -> Optional[int]:
"""
Retrieves the role ID for a given level.
if level in self.rewards:
role_id = self.rewards.get(level)[0]
return role_id
Args:
level (int): The level for which to retrieve the role ID.
return None
Returns:
Optional[int]: The role ID if found, otherwise None.
"""
return self.rewards.get(level, (None,))[0]
def replace_previous_reward(self, level):
replace = False
previous_reward = None
levels = sorted(self.rewards.keys())
def should_replace_previous_reward(self, level: int) -> Tuple[Optional[int], bool]:
"""
Checks if the previous reward should be replaced based on the given level.
if level in levels:
values_below = [x for x in levels if x < level]
Args:
level (int): The level to check for replacement.
if values_below:
replace = not bool(self.rewards.get(max(values_below))[1])
if replace:
previous_reward = self.rewards.get(max(values_below))[0]
Returns:
Tuple[Optional[int], bool]: A tuple containing the previous reward and a boolean indicating if it should be replaced.
"""
previous_reward, replace = None, False
if levels_below := [lvl for lvl in sorted(self.rewards) if lvl < level]:
highest_level_below = max(levels_below)
previous_reward, persistent = self.rewards[highest_level_below]
replace = not persistent
return previous_reward, replace