diff --git a/modules/economy/blackjack.py b/modules/economy/blackjack.py new file mode 100644 index 0000000..74c88a8 --- /dev/null +++ b/modules/economy/blackjack.py @@ -0,0 +1,299 @@ +import random + +import discord +import pytz +from discord.ext import commands +from loguru import logger + +from lib.const import CONST +from lib.exceptions import LumiException +from services.currency_service import Currency +from services.stats_service import BlackJackStats +from ui.embeds import Builder +from ui.views.blackjack import BlackJackButtons + +EST = pytz.timezone("US/Eastern") +ACTIVE_BLACKJACK_GAMES: dict[int, bool] = {} + +Card = str +Hand = list[Card] + + +class Blackjack(commands.Cog): + def __init__(self, bot: commands.Bot) -> None: + self.bot: commands.Bot = bot + + @commands.hybrid_command( + name="blackjack", + aliases=["bj"], + ) + @commands.guild_only() + async def blackjack( + self, + ctx: commands.Context[commands.Bot], + bet: int, + ) -> None: + """ + Play a game of blackjack. + + Parameters + ---------- + ctx : commands.Context[commands.Bot] + The context of the command. + bet : int + The amount to bet. + """ + if ctx.author.id in ACTIVE_BLACKJACK_GAMES: + raise LumiException(CONST.STRINGS["error_already_playing_blackjack"]) + + currency = Currency(ctx.author.id) + if bet > currency.balance: + raise LumiException(CONST.STRINGS["error_not_enough_cash"]) + if bet <= 0: + raise LumiException(CONST.STRINGS["error_invalid_bet"]) + + ACTIVE_BLACKJACK_GAMES[ctx.author.id] = True + + try: + await self.play_blackjack(ctx, currency, bet) + except Exception as e: + logger.exception(f"Error in blackjack game: {e}") + raise LumiException(CONST.STRINGS["error_blackjack_game_error"]) from e + finally: + del ACTIVE_BLACKJACK_GAMES[ctx.author.id] + + async def play_blackjack(self, ctx: commands.Context[commands.Bot], currency: Currency, bet: int) -> None: + deck = self.get_new_deck() + player_hand, dealer_hand = self.initial_deal(deck) + multiplier = CONST.BLACKJACK_MULTIPLIER + + player_value = self.calculate_hand_value(player_hand) + status = 5 if player_value == 21 else 0 + view = BlackJackButtons(ctx) + playing_embed = False + response_message: discord.Message | None = None + + while status == 0: + dealer_value = self.calculate_hand_value(dealer_hand) + + embed = self.create_game_embed( + ctx, + bet, + player_hand, + dealer_hand, + player_value, + dealer_value, + ) + if not playing_embed: + response_message = await ctx.reply(embed=embed, view=view) + playing_embed = True + else: + assert response_message + await response_message.edit(embed=embed, view=view) + + await view.wait() + + if view.clickedHit: + player_hand.append(self.deal_card(deck)) + player_value = self.calculate_hand_value(player_hand) + if player_value > 21: + status = 1 + break + if player_value == 21: + status = 2 + break + elif view.clickedStand: + status = self.dealer_play(deck, dealer_hand, player_value) + break + else: + currency.take_balance(bet) + currency.push() + raise LumiException(CONST.STRINGS["error_out_of_time_economy"]) + + view = BlackJackButtons(ctx) + + await self.handle_game_end( + ctx, + response_message, + currency, + bet, + player_hand, + dealer_hand, + status, + multiplier, + playing_embed, + ) + + def initial_deal(self, deck: list[Card]) -> tuple[Hand, Hand]: + return [self.deal_card(deck) for _ in range(2)], [self.deal_card(deck)] + + def dealer_play(self, deck: list[Card], dealer_hand: Hand, player_value: int) -> int: + while self.calculate_hand_value(dealer_hand) <= player_value: + dealer_hand.append(self.deal_card(deck)) + return 3 if self.calculate_hand_value(dealer_hand) > 21 else 4 + + async def handle_game_end( + self, + ctx: commands.Context[commands.Bot], + response_message: discord.Message | None, + currency: Currency, + bet: int, + player_hand: Hand, + dealer_hand: Hand, + status: int, + multiplier: float, + playing_embed: bool, + ) -> None: + player_value = self.calculate_hand_value(player_hand) + dealer_value = self.calculate_hand_value(dealer_hand) + payout = bet * (2 if status == 5 else multiplier) + is_won = status not in [1, 4] + + embed = self.create_end_game_embed(ctx, bet, player_value, dealer_value, payout, status) + if playing_embed: + assert response_message + await response_message.edit(embed=embed) + else: + await ctx.reply(embed=embed) + + if is_won: + currency.add_balance(int(payout)) + else: + currency.take_balance(bet) + currency.push() + + BlackJackStats( + user_id=ctx.author.id, + is_won=is_won, + bet=bet, + payout=int(payout) if is_won else 0, + hand_player=player_hand, + hand_dealer=dealer_hand, + ).push() + + def create_game_embed( + self, + ctx: commands.Context[commands.Bot], + bet: int, + player_hand: Hand, + dealer_hand: Hand, + player_value: int, + dealer_value: int, + ) -> discord.Embed: + player_hand_str = " + ".join(player_hand) + dealer_hand_str = f"{dealer_hand[0]} + " + ( + CONST.STRINGS["blackjack_dealer_hidden"] if len(dealer_hand) < 2 else " + ".join(dealer_hand[1:]) + ) + + description = ( + f"{CONST.STRINGS['blackjack_player_hand'].format(player_value, player_hand_str)}\n\n" + f"{CONST.STRINGS['blackjack_dealer_hand'].format(dealer_value, dealer_hand_str)}" + ) + + footer_text = ( + f"{CONST.STRINGS['blackjack_bet'].format(Currency.format_human(bet))} • " + f"{CONST.STRINGS['blackjack_deck_shuffled']}" + ) + + return Builder.create_embed( + theme="default", + user_name=ctx.author.name, + title=CONST.STRINGS["blackjack_title"], + description=description, + footer_text=footer_text, + footer_icon_url=CONST.MUFFIN_ART, + ) + + def create_end_game_embed( + self, + ctx: commands.Context[commands.Bot], + bet: int, + player_value: int, + dealer_value: int, + payout: int | float, + status: int, + ) -> discord.Embed: + embed = Builder.create_embed( + theme="default", + user_name=ctx.author.name, + title=CONST.STRINGS["blackjack_title"], + description=CONST.STRINGS["blackjack_description"].format( + player_value, + dealer_value, + ), + footer_text=CONST.STRINGS["blackjack_footer"], + footer_icon_url=CONST.MUFFIN_ART, + ) + + result = { + 1: ( + CONST.STRINGS["blackjack_busted"], + CONST.STRINGS["blackjack_lost"].format(Currency.format_human(bet)), + discord.Color.red(), + CONST.CLOUD_ART, + ), + 2: ( + CONST.STRINGS["blackjack_won_21"], + CONST.STRINGS["blackjack_won_payout"].format(Currency.format_human(int(payout))), + discord.Color.green(), + CONST.TROPHY_ART, + ), + 3: ( + CONST.STRINGS["blackjack_dealer_busted"], + CONST.STRINGS["blackjack_won_payout"].format(Currency.format_human(int(payout))), + discord.Color.green(), + CONST.TROPHY_ART, + ), + 4: ( + CONST.STRINGS["blackjack_lost_generic"], + CONST.STRINGS["blackjack_lost"].format(Currency.format_human(bet)), + discord.Color.red(), + CONST.CLOUD_ART, + ), + 5: ( + CONST.STRINGS["blackjack_won_natural"], + CONST.STRINGS["blackjack_won_payout"].format(Currency.format_human(int(payout))), + discord.Color.green(), + CONST.TROPHY_ART, + ), + }.get( + status, + ( + CONST.STRINGS["blackjack_error"], + CONST.STRINGS["blackjack_error_description"], + discord.Color.red(), + None, + ), + ) + + name, value, color, thumbnail_url = result + embed.add_field(name=name, value=value, inline=False) + embed.colour = color + if thumbnail_url: + embed.set_thumbnail(url=thumbnail_url) + + return embed + + def get_new_deck(self) -> list[Card]: + deck = [ + rank + suit + for suit in ["♠", "♡", "♢", "♣"] + for rank in ["A", "2", "3", "4", "5", "6", "7", "8", "9", "10", "J", "Q", "K"] + ] + random.shuffle(deck) + return deck + + def deal_card(self, deck: list[Card]) -> Card: + return deck.pop() + + def calculate_hand_value(self, hand: Hand) -> int: + value = sum(10 if rank in "JQK" else 11 if rank == "A" else int(rank) for card in hand for rank in card[:-1]) + aces = sum(card[0] == "A" for card in hand) + while value > 21 and aces: + value -= 10 + aces -= 1 + return value + + +async def setup(bot: commands.Bot) -> None: + await bot.add_cog(Blackjack(bot)) diff --git a/services/stats_service.py b/services/stats_service.py new file mode 100644 index 0000000..e12f139 --- /dev/null +++ b/services/stats_service.py @@ -0,0 +1,150 @@ +import json + +from db import database + + +class BlackJackStats: + def __init__( + self, + user_id: int, + is_won: bool, + bet: int, + payout: int, + hand_player: list[str], + hand_dealer: list[str], + ): + self.user_id: int = user_id + self.is_won: bool = is_won + self.bet: int = bet + self.payout: int = payout + self.hand_player: str = json.dumps(hand_player) + self.hand_dealer: str = json.dumps(hand_dealer) + + def push(self) -> None: + query: str = """ + INSERT INTO blackjack (user_id, is_won, bet, payout, hand_player, hand_dealer) + VALUES (%s, %s, %s, %s, %s, %s) + """ + + values: tuple[int, bool, int, int, str, str] = ( + self.user_id, + self.is_won, + self.bet, + self.payout, + self.hand_player, + self.hand_dealer, + ) + + database.execute_query(query, values) + + @staticmethod + def get_user_stats(user_id: int) -> dict[str, int]: + query: str = """ + SELECT + COUNT(*) AS amount_of_games, + SUM(bet) AS total_bet, + SUM(payout) AS total_payout, + SUM(CASE WHEN is_won = 1 THEN 1 ELSE 0 END) AS winning, + SUM(CASE WHEN is_won = 0 THEN 1 ELSE 0 END) AS losing + FROM blackjack + WHERE user_id = %s; + """ + result: tuple[int, int, int, int, int] = database.select_query(query, (user_id,))[0] + ( + amount_of_games, + total_bet, + total_payout, + winning_amount, + losing_amount, + ) = result + + return { + "amount_of_games": amount_of_games, + "total_bet": total_bet, + "total_payout": total_payout, + "winning_amount": winning_amount, + "losing_amount": losing_amount, + } + + @staticmethod + def get_total_rows_count() -> int: + query: str = """ + SELECT SUM(TABLE_ROWS) + FROM INFORMATION_SCHEMA.TABLES + """ + + result = database.select_query_one(query) + return int(result) if result is not None else 0 + + +class SlotsStats: + """ + Handles statistics for the /slots command + """ + + def __init__(self, user_id: int, is_won: bool, bet: int, payout: int, spin_type: str, icons: list[str]): + self.user_id: int = user_id + self.is_won: bool = is_won + self.bet: int = bet + self.payout: int = payout + self.spin_type: str = spin_type + self.icons: str = json.dumps(icons) + + def push(self) -> None: + """ + Insert the services from any given slots game into the database + """ + query: str = """ + INSERT INTO slots (user_id, is_won, bet, payout, spin_type, icons) + VALUES (%s, %s, %s, %s, %s, %s) + """ + + values: tuple[int, bool, int, int, str, str] = ( + self.user_id, + self.is_won, + self.bet, + self.payout, + self.spin_type, + self.icons, + ) + + database.execute_query(query, values) + + @staticmethod + def get_user_stats(user_id: int) -> dict[str, int]: + """ + Retrieve the Slots stats for a given user from the database. + """ + query: str = """ + SELECT + COUNT(*) AS amount_of_games, + SUM(bet) AS total_bet, + SUM(payout) AS total_payout, + SUM(CASE WHEN spin_type = 'pair' AND is_won = 1 THEN 1 ELSE 0 END) AS games_won_pair, + SUM(CASE WHEN spin_type = 'three_of_a_kind' AND is_won = 1 THEN 1 ELSE 0 END) AS games_won_three_of_a_kind, + SUM(CASE WHEN spin_type = 'three_diamonds' AND is_won = 1 THEN 1 ELSE 0 END) AS games_won_three_diamonds, + SUM(CASE WHEN spin_type = 'jackpot' AND is_won = 1 THEN 1 ELSE 0 END) AS games_won_jackpot + FROM slots + WHERE user_id = %s + """ + + result: tuple[int, int, int, int, int, int, int] = database.select_query(query, (user_id,))[0] + ( + amount_of_games, + total_bet, + total_payout, + games_won_pair, + games_won_three_of_a_kind, + games_won_three_diamonds, + games_won_jackpot, + ) = result + + return { + "amount_of_games": amount_of_games, + "total_bet": total_bet, + "total_payout": total_payout, + "games_won_pair": games_won_pair, + "games_won_three_of_a_kind": games_won_three_of_a_kind, + "games_won_three_diamonds": games_won_three_diamonds, + "games_won_jackpot": games_won_jackpot, + } diff --git a/ui/embeds.py b/ui/embeds.py index ae0a457..4fcf49a 100644 --- a/ui/embeds.py +++ b/ui/embeds.py @@ -37,6 +37,7 @@ class Builder: "warning": (CONST.COLOR_WARNING, CONST.WARNING_ICON), "default": (color or CONST.COLOR_DEFAULT, None), } + color, author_icon_url = theme_settings[theme] if user_name and not hide_name_in_description: