1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-02 18:23:12 +00:00

First case implementation

This commit is contained in:
wlinator 2024-07-18 11:26:36 -04:00
parent e4048e0648
commit b1200bc0ab
7 changed files with 167 additions and 78 deletions

View file

@ -1,5 +1,11 @@
{
"bet_limit": "❌ | **{0}** you cannot place any bets above **${1}**.",
"case_new_case_author": "New Case",
"case_case_field": "Case:",
"case_type_field": "Type:",
"case_moderator_field": "Moderator:",
"case_target_field": "Target:",
"case_reason_field": "Reason:",
"daily_already_claimed_author": "Already Claimed",
"daily_already_claimed_description": "you can claim your daily reward again <t:{0}:R>.",
"daily_already_claimed_footer": "Daily reset is at 7 AM EST",

View file

@ -47,3 +47,11 @@ def select_query_one(query, values=None):
cursor.execute(query, values)
output = cursor.fetchone()
return output[0] if output else None
def execute_query_return_value(query, values=None):
with _cnxpool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, values)
conn.commit()
return cursor.fetchone()[0] if cursor.rowcount > 0 else None

View file

@ -7,9 +7,13 @@ CREATE TABLE IF NOT EXISTS mod_log (
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS action_types (
CREATE TABLE IF NOT EXISTS cases (
id SERIAL PRIMARY KEY,
name ENUM(
guild_id BIGINT UNSIGNED NOT NULL,
case_number INT UNSIGNED NOT NULL,
target_id BIGINT UNSIGNED NOT NULL,
moderator_id BIGINT UNSIGNED NOT NULL,
action_type ENUM(
'WARN',
'TIMEOUT',
'UNTIMEOUT',
@ -23,16 +27,7 @@ CREATE TABLE IF NOT EXISTS action_types (
'UNMUTE',
'DEAFEN',
'UNDEAFEN'
) NOT NULL
);
CREATE TABLE IF NOT EXISTS cases (
id SERIAL PRIMARY KEY,
guild_id BIGINT UNSIGNED NOT NULL,
case_number INT UNSIGNED NOT NULL,
target_id BIGINT UNSIGNED NOT NULL,
moderator_id BIGINT UNSIGNED NOT NULL,
action_type_id INT UNSIGNED NOT NULL,
) NOT NULL,
reason TEXT,
duration INT UNSIGNED, -- for timeouts
expires_at TIMESTAMP, -- for tempbans & mutes
@ -40,42 +35,11 @@ CREATE TABLE IF NOT EXISTS cases (
is_closed BOOLEAN NOT NULL DEFAULT FALSE, -- to indicate if the case is closed
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (action_type_id) REFERENCES action_types(id)
UNIQUE KEY unique_case (guild_id, case_number)
);
DELIMITER //
CREATE PROCEDURE insert_case(
IN p_guild_id BIGINT UNSIGNED,
IN p_target_id BIGINT UNSIGNED,
IN p_moderator_id BIGINT UNSIGNED,
IN p_action_type_id INT UNSIGNED,
IN p_reason TEXT,
IN p_duration INT UNSIGNED,
IN p_expires_at TIMESTAMP,
IN p_modlog_message_id BIGINT UNSIGNED
)
BEGIN
DECLARE v_case_number INT UNSIGNED;
-- Get the next case number for the guild
SELECT IFNULL(MAX(case_number), 0) + 1 INTO v_case_number
FROM cases
WHERE guild_id = p_guild_id;
-- Insert the new case
INSERT INTO cases (
guild_id, case_number, target_id, moderator_id, action_type_id, reason, duration, expires_at, modlog_message_id
) VALUES (
p_guild_id, v_case_number, p_target_id, p_moderator_id, p_action_type_id, p_reason, p_duration, p_expires_at, p_modlog_message_id
);
END //
DELIMITER ;
CREATE INDEX idx_cases_guild_id ON cases(guild_id);
CREATE INDEX idx_cases_target_id ON cases(target_id);
CREATE INDEX idx_cases_moderator_id ON cases(moderator_id);
CREATE INDEX idx_cases_action_type_id ON cases(action_type_id);
CREATE INDEX idx_cases_action_type ON cases(action_type);

View file

@ -1,17 +1,19 @@
import asyncio
import discord
from lib import formatter
from lib.constants import CONST
from lib.embed_builder import EmbedBuilder
from modules.moderation import functions
from modules.moderation.case_handler import create_case
from typing import Optional
async def ban_user(cog, ctx, target: discord.User, reason):
async def ban_user(cog, ctx, target: discord.User, reason: Optional[str] = None):
# see if user is in guild
member = await cog.client.get_or_fetch_member(ctx.guild, target.id)
if not reason:
reason = CONST.STRINGS["mod_no_reason"]
output_reason = reason if reason else CONST.STRINGS["mod_no_reason"]
# member -> user is in the guild, check role hierarchy
if member:
@ -26,7 +28,7 @@ async def ban_user(cog, ctx, target: discord.User, reason):
description=CONST.STRINGS["mod_ban_dm"].format(
target.name,
ctx.guild.name,
reason,
output_reason,
),
show_name=False,
),
@ -39,10 +41,11 @@ async def ban_user(cog, ctx, target: discord.User, reason):
await member.ban(
reason=CONST.STRINGS["mod_reason"].format(
ctx.author.name,
formatter.shorten(reason, 200),
formatter.shorten(output_reason, 200),
),
)
return await ctx.respond(
respond_task = ctx.respond(
embed=EmbedBuilder.create_success_embed(
ctx,
author_text=CONST.STRINGS["mod_banned_author"],
@ -52,6 +55,8 @@ async def ban_user(cog, ctx, target: discord.User, reason):
else CONST.STRINGS["mod_dm_not_sent"],
),
)
create_case_task = create_case(ctx, target, "BAN", reason)
await asyncio.gather(respond_task, create_case_task, return_exceptions=True)
# not a member in this guild, so ban right away
else:
@ -59,37 +64,42 @@ async def ban_user(cog, ctx, target: discord.User, reason):
target,
reason=CONST.STRINGS["mod_reason"].format(
ctx.author.name,
formatter.shorten(reason, 200),
formatter.shorten(output_reason, 200),
),
)
return await ctx.respond(
respond_task = ctx.respond(
embed=EmbedBuilder.create_success_embed(
ctx,
author_text=CONST.STRINGS["mod_banned_author"],
description=CONST.STRINGS["mod_banned_user"].format(target.id),
),
)
create_case_task = create_case(ctx, target, "BAN", reason)
await asyncio.gather(respond_task, create_case_task)
async def unban_user(ctx, target: discord.User, reason):
if not reason:
reason = CONST.STRINGS["mod_no_reason"]
async def unban_user(ctx, target: discord.User, reason: Optional[str] = None):
output_reason = reason if reason else CONST.STRINGS["mod_no_reason"]
try:
await ctx.guild.unban(
target,
reason=CONST.STRINGS["mod_reason"].format(
ctx.author.name,
formatter.shorten(reason, 200),
formatter.shorten(output_reason, 200),
),
)
return await ctx.respond(
respond_task = ctx.respond(
embed=EmbedBuilder.create_success_embed(
ctx,
author_text=CONST.STRINGS["mod_unbanned_author"],
description=CONST.STRINGS["mod_unbanned"].format(target.id),
),
)
create_case_task = create_case(ctx, target, "UNBAN", reason)
await asyncio.gather(respond_task, create_case_task)
except (discord.NotFound, discord.HTTPException):
return await ctx.respond(

View file

@ -0,0 +1,88 @@
import discord
from loguru import logger
from services.moderation.case_service import CaseService
from services.moderation.modlog_service import ModLogService
from lib.embed_builder import EmbedBuilder
from lib.constants import CONST
from typing import Optional
from discord.ext.commands import TextChannelConverter
case_service = CaseService()
modlog_service = ModLogService()
async def create_case(
ctx,
target: discord.User,
action_type: str,
reason: Optional[str] = None,
duration: Optional[int] = None,
expires_at: Optional[str] = None,
):
guild_id = ctx.guild.id
moderator_id = ctx.author.id
target_id = target.id
# Create the case
case_number: int = case_service.create_case(
guild_id=guild_id,
target_id=target_id,
moderator_id=moderator_id,
action_type=action_type,
reason=reason,
duration=duration,
expires_at=expires_at,
modlog_message_id=None,
)
logger.info(f"Created case {case_number} for {target.name} in guild {guild_id}")
# Send the case to the modlog if configured
mod_log_channel_id = modlog_service.fetch_modlog_channel_id(guild_id)
if mod_log_channel_id:
mod_log_channel = await TextChannelConverter().convert(
ctx,
str(mod_log_channel_id),
)
if mod_log_channel:
embed = EmbedBuilder.create_warning_embed(
ctx,
author_text=CONST.STRINGS["case_new_case_author"],
thumbnail_url=target.display_avatar.url,
show_name=False,
)
embed.add_field(
name=CONST.STRINGS["case_case_field"],
value=f"`{case_number}`",
inline=True,
)
embed.add_field(
name=CONST.STRINGS["case_type_field"],
value=f"`{action_type.lower().capitalize()}`",
inline=True,
)
embed.add_field(
name=CONST.STRINGS["case_moderator_field"],
value=f"`{ctx.author.name}`",
inline=True,
)
embed.add_field(
name=CONST.STRINGS["case_target_field"],
value=f"➡️ `{target.name}`",
inline=False,
)
embed.add_field(
name=CONST.STRINGS["case_reason_field"],
value=f"`{reason or CONST.STRINGS['mod_no_reason']}`",
inline=False,
)
message = await mod_log_channel.send(embed=embed)
# Update the case with the modlog_message_id
case_service.edit_case(
guild_id=guild_id,
case_number=case_number,
changes={"modlog_message_id": message.id},
)

View file

@ -16,23 +16,34 @@ class CaseService:
duration: Optional[int] = None,
expires_at: Optional[str] = None,
modlog_message_id: Optional[int] = None,
) -> None:
# Resolve action type id from action type name
action_type_id_query: str = """
SELECT id FROM action_types WHERE name = %s
"""
action_type_id = select_query_one(action_type_id_query, (action_type.upper(),))
) -> int:
# Get the next case number for the guild
query: str = """
CALL insert_case(%s, %s, %s, %s, %s, %s, %s, %s)
SELECT IFNULL(MAX(case_number), 0) + 1
FROM cases
WHERE guild_id = %s
"""
case_number = select_query_one(query, (guild_id,))
if case_number is None:
raise ValueError("Failed to retrieve the next case number.")
# Insert the new case
query: str = """
INSERT INTO cases (
guild_id, case_number, target_id, moderator_id, action_type, reason, duration, expires_at, modlog_message_id
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s
)
"""
execute_query(
query,
(
guild_id,
case_number,
target_id,
moderator_id,
action_type_id,
action_type.upper(),
reason,
duration,
expires_at,
@ -40,22 +51,24 @@ class CaseService:
),
)
def close_case(self, case_id):
return int(case_number)
def close_case(self, guild_id, case_number):
query = """
UPDATE cases
SET is_closed = TRUE, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
WHERE guild_id = %s AND case_number = %s
"""
execute_query(query, (case_id,))
execute_query(query, (guild_id, case_number))
def edit_case(self, case_id, changes: dict):
def edit_case(self, guild_id, case_number, changes: dict):
set_clause = ", ".join([f"{key} = %s" for key in changes.keys()])
query = f"""
UPDATE cases
SET {set_clause}, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
WHERE guild_id = %s AND case_number = %s
"""
execute_query(query, (*changes.values(), case_id))
execute_query(query, (*changes.values(), guild_id, case_number))
def fetch_case_by_id(self, case_id):
query = """
@ -101,11 +114,11 @@ class CaseService:
results = select_query(query, (guild_id, moderator_id))
return [dict(row) for row in results]
def fetch_cases_by_action_type(self, guild_id, action_type_id):
def fetch_cases_by_action_type(self, guild_id, action_type):
query = """
SELECT * FROM cases
WHERE guild_id = %s AND action_type_id = %s
WHERE guild_id = %s AND action_type = %s
ORDER BY case_number DESC
"""
results = select_query(query, (guild_id, action_type_id))
results = select_query(query, (guild_id, action_type.upper()))
return [dict(row) for row in results]

View file

@ -22,10 +22,10 @@ class ModLogService:
"""
execute_query(query, (guild_id,))
def fetch_modlog_channel_id(self, guild_id: int) -> Optional[dict]:
def fetch_modlog_channel_id(self, guild_id: int) -> Optional[int]:
query: str = """
SELECT * FROM mod_log
SELECT channel_id FROM mod_log
WHERE guild_id = %s AND is_enabled = TRUE
"""
result = select_query_one(query, (guild_id,))
return dict(result) if result else None
return result or None