2023-06-19 14:20:17 +00:00
|
|
|
import os
|
|
|
|
import time
|
|
|
|
|
2024-04-04 11:09:11 +00:00
|
|
|
from discord.ext import commands
|
2024-04-04 20:41:42 +00:00
|
|
|
from dotenv import load_dotenv
|
2023-06-19 14:20:17 +00:00
|
|
|
|
|
|
|
from db import database
|
|
|
|
|
|
|
|
load_dotenv('.env')
|
2024-03-15 20:01:33 +00:00
|
|
|
xp_gain_per_message = int(os.getenv("XP_GAIN_PER_MESSAGE"))
|
|
|
|
xp_gain_cooldown = int(os.getenv("XP_GAIN_COOLDOWN"))
|
2023-06-19 14:20:17 +00:00
|
|
|
|
2024-04-04 20:41:42 +00:00
|
|
|
|
2024-04-04 11:03:51 +00:00
|
|
|
class XpService:
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
|
|
|
Stores and retrieves XP from the database for a given user.
|
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
|
|
|
|
def __init__(self, user_id, guild_id):
|
2023-06-29 11:21:17 +00:00
|
|
|
self.user_id = user_id
|
2024-03-15 20:01:33 +00:00
|
|
|
self.guild_id = guild_id
|
2024-03-15 22:44:27 +00:00
|
|
|
self.xp = None
|
|
|
|
self.level = None
|
2024-04-04 20:40:59 +00:00
|
|
|
self.cooldown_time = None
|
2024-03-15 20:01:33 +00:00
|
|
|
self.xp_gain = xp_gain_per_message
|
|
|
|
self.new_cooldown = xp_gain_cooldown
|
2023-06-29 11:21:17 +00:00
|
|
|
|
|
|
|
self.fetch_or_create_xp()
|
|
|
|
|
|
|
|
def push(self):
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
|
|
|
Updates the XP and cooldown for a user.
|
|
|
|
"""
|
2023-06-29 11:21:17 +00:00
|
|
|
query = """
|
|
|
|
UPDATE xp
|
2023-10-23 12:49:46 +00:00
|
|
|
SET user_xp = %s, user_level = %s, cooldown = %s
|
2024-03-15 20:01:33 +00:00
|
|
|
WHERE user_id = %s AND guild_id = %s
|
2023-06-29 11:21:17 +00:00
|
|
|
"""
|
2024-04-04 20:40:59 +00:00
|
|
|
database.execute_query(query, (self.xp, self.level, self.cooldown_time, self.user_id, self.guild_id))
|
2023-06-29 11:21:17 +00:00
|
|
|
|
|
|
|
def fetch_or_create_xp(self):
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
|
|
|
Gets a user's XP from the database or inserts a new row if it doesn't exist yet.
|
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
query = "SELECT user_xp, user_level, cooldown FROM xp WHERE user_id = %s AND guild_id = %s"
|
2023-06-29 11:21:17 +00:00
|
|
|
|
|
|
|
try:
|
2024-03-15 22:44:27 +00:00
|
|
|
(user_xp, user_level, cooldown) = database.select_query(query, (self.user_id, self.guild_id))[0]
|
2023-06-29 11:21:17 +00:00
|
|
|
except (IndexError, TypeError):
|
|
|
|
(user_xp, user_level, cooldown) = (None, None, None)
|
|
|
|
|
|
|
|
if any(var is None for var in [user_xp, user_level, cooldown]):
|
|
|
|
query = """
|
2024-03-15 20:01:33 +00:00
|
|
|
INSERT INTO xp (user_id, guild_id, user_xp, user_level, cooldown)
|
|
|
|
VALUES (%s, %s, 0, 0, %s)
|
2023-06-29 11:21:17 +00:00
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
database.execute_query(query, (self.user_id, self.guild_id, time.time()))
|
2023-06-29 11:21:17 +00:00
|
|
|
(user_xp, user_level, cooldown) = (0, 0, time.time())
|
|
|
|
|
2024-03-15 22:44:27 +00:00
|
|
|
self.xp = user_xp
|
|
|
|
self.level = user_level
|
2024-04-04 20:40:59 +00:00
|
|
|
self.cooldown_time = cooldown
|
2023-06-29 11:21:17 +00:00
|
|
|
|
|
|
|
def calculate_rank(self):
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
Checks which rank a user is in the guild
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
query = """
|
|
|
|
SELECT user_id, user_xp, user_level
|
|
|
|
FROM xp
|
|
|
|
WHERE guild_id = %s
|
|
|
|
ORDER BY user_level DESC, user_xp DESC
|
|
|
|
"""
|
|
|
|
data = database.select_query(query, (self.guild_id,))
|
2023-06-19 14:20:17 +00:00
|
|
|
|
|
|
|
leaderboard = []
|
|
|
|
rank = 1
|
|
|
|
for row in data:
|
|
|
|
row_user_id = row[0]
|
|
|
|
user_xp = row[1]
|
|
|
|
user_level = row[2]
|
|
|
|
leaderboard.append((row_user_id, user_xp, user_level, rank))
|
|
|
|
rank += 1
|
|
|
|
|
|
|
|
user_rank = None
|
|
|
|
for entry in leaderboard:
|
2023-06-29 11:21:17 +00:00
|
|
|
if entry[0] == self.user_id:
|
2023-06-19 14:20:17 +00:00
|
|
|
user_rank = entry[3]
|
|
|
|
break
|
|
|
|
|
|
|
|
return user_rank
|
|
|
|
|
|
|
|
@staticmethod
|
2024-03-15 20:01:33 +00:00
|
|
|
def load_leaderboard(guild_id):
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
Returns the guild's XP leaderboard
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
2024-03-15 20:01:33 +00:00
|
|
|
query = """
|
|
|
|
SELECT user_id, user_xp, user_level
|
|
|
|
FROM xp
|
|
|
|
WHERE guild_id = %s
|
2024-03-15 22:44:27 +00:00
|
|
|
ORDER BY user_level DESC, user_xp DESC
|
2024-03-15 20:01:33 +00:00
|
|
|
"""
|
|
|
|
data = database.select_query(query, (guild_id,))
|
2023-06-19 14:20:17 +00:00
|
|
|
|
|
|
|
leaderboard = []
|
|
|
|
for row in data:
|
|
|
|
row_user_id = row[0]
|
|
|
|
user_xp = row[1]
|
|
|
|
user_level = row[2]
|
2024-04-04 11:03:51 +00:00
|
|
|
needed_xp_for_next_level = XpService.xp_needed_for_next_level(user_level)
|
2024-03-15 20:01:33 +00:00
|
|
|
|
2024-03-15 22:44:27 +00:00
|
|
|
leaderboard.append((row_user_id, user_xp, user_level, needed_xp_for_next_level))
|
2023-06-19 14:20:17 +00:00
|
|
|
|
|
|
|
return leaderboard
|
|
|
|
|
|
|
|
@staticmethod
|
2023-06-29 11:21:17 +00:00
|
|
|
def generate_progress_bar(current_value, target_value, bar_length=10):
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
|
|
|
Generates an XP progress bar based on the current level and XP.
|
|
|
|
"""
|
2023-06-29 11:21:17 +00:00
|
|
|
progress = current_value / target_value
|
|
|
|
filled_length = int(bar_length * progress)
|
|
|
|
empty_length = bar_length - filled_length
|
|
|
|
bar = "â–°" * filled_length + "â–±" * empty_length
|
|
|
|
return f"`{bar}` {current_value}/{target_value}"
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def xp_needed_for_next_level(current_level):
|
2024-01-02 15:35:55 +00:00
|
|
|
"""
|
|
|
|
Calculates the amount of XP needed to go to the next level, based on the current level.
|
|
|
|
"""
|
2023-06-29 11:21:17 +00:00
|
|
|
formula_mapping = {
|
2024-03-15 20:04:30 +00:00
|
|
|
(10, 19): lambda level: 12 * level + 28,
|
|
|
|
(20, 29): lambda level: 15 * level + 29,
|
|
|
|
(30, 39): lambda level: 18 * level + 30,
|
|
|
|
(40, 49): lambda level: 21 * level + 31,
|
|
|
|
(50, 59): lambda level: 24 * level + 32,
|
|
|
|
(60, 69): lambda level: 27 * level + 33,
|
|
|
|
(70, 79): lambda level: 30 * level + 34,
|
|
|
|
(80, 89): lambda level: 33 * level + 35,
|
|
|
|
(90, 99): lambda level: 36 * level + 36,
|
2023-06-29 11:21:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for level_range, formula in formula_mapping.items():
|
|
|
|
if level_range[0] <= current_level <= level_range[1]:
|
|
|
|
return formula(current_level)
|
2023-06-19 14:20:17 +00:00
|
|
|
|
2023-06-29 11:21:17 +00:00
|
|
|
# For levels below 10 and levels 110 and above
|
2024-03-15 20:04:30 +00:00
|
|
|
return 10 * current_level + 27 if current_level < 10 else 42 * current_level + 37
|
2024-04-04 11:03:51 +00:00
|
|
|
|
2024-04-04 20:41:42 +00:00
|
|
|
|
2024-04-04 11:03:51 +00:00
|
|
|
class XpRewardService:
|
|
|
|
def __init__(self, guild_id):
|
|
|
|
self.guild_id = guild_id
|
|
|
|
self.rewards = self.get_rewards()
|
|
|
|
|
|
|
|
def get_rewards(self) -> dict:
|
|
|
|
query = """
|
|
|
|
SELECT level, role_id, persistent
|
|
|
|
FROM level_rewards
|
|
|
|
WHERE guild_id = %s
|
|
|
|
ORDER BY level DESC
|
|
|
|
"""
|
|
|
|
data = database.select_query(query, (self.guild_id,))
|
|
|
|
|
|
|
|
rewards = {}
|
|
|
|
for row in data:
|
|
|
|
rewards[int(row[0])] = [int(row[1]), bool(row[2])]
|
|
|
|
|
|
|
|
return rewards
|
|
|
|
|
|
|
|
def add_reward(self, level: int, role_id: int, persistent: bool):
|
2024-04-04 11:09:11 +00:00
|
|
|
|
|
|
|
if len(self.rewards) >= 25:
|
|
|
|
raise commands.BadArgument("a server can't have more than 25 xp rewards.")
|
|
|
|
|
2024-04-04 11:03:51 +00:00
|
|
|
query = """
|
|
|
|
INSERT INTO level_rewards (guild_id, level, role_id, persistent)
|
|
|
|
VALUES (%s, %s, %s, %s)
|
|
|
|
ON DUPLICATE KEY UPDATE role_id = %s, persistent = %s;
|
|
|
|
"""
|
|
|
|
|
|
|
|
database.execute_query(query, (self.guild_id, level, role_id, persistent, role_id, persistent))
|
|
|
|
|
|
|
|
def remove_reward(self, level: int):
|
|
|
|
query = """
|
|
|
|
DELETE FROM level_rewards
|
|
|
|
WHERE guild_id = %s
|
|
|
|
AND level = %s;
|
|
|
|
"""
|
|
|
|
|
|
|
|
database.execute_query(query, (self.guild_id, level))
|
|
|
|
|
|
|
|
def role(self, level: int):
|
|
|
|
if self.rewards:
|
|
|
|
|
|
|
|
if level in self.rewards:
|
|
|
|
role_id = self.rewards.get(level)[0]
|
|
|
|
return role_id
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
def replace_previous_reward(self, level):
|
|
|
|
replace = False
|
|
|
|
previous_reward = None
|
|
|
|
levels = sorted(self.rewards.keys())
|
|
|
|
|
|
|
|
if level in levels:
|
|
|
|
values_below = [x for x in levels if x < level]
|
|
|
|
|
|
|
|
if values_below:
|
|
|
|
replace = not bool(self.rewards.get(max(values_below))[1])
|
|
|
|
|
|
|
|
if replace:
|
|
|
|
previous_reward = self.rewards.get(max(values_below))[0]
|
|
|
|
|
|
|
|
return previous_reward, replace
|