mirror of
https://github.com/allthingslinux/tux.git
synced 2024-10-02 16:43:12 +00:00
Merge pull request #564 from allthingslinux/gif_limiter
Added a configurable GIF ratelimiter cog
This commit is contained in:
commit
6568a7a077
4 changed files with 151 additions and 0 deletions
|
@ -40,3 +40,14 @@ EMBED_ICONS:
|
|||
KICK: "https://github.com/allthingslinux/tux/blob/main/assets/emojis/kick.png?raw=true"
|
||||
TIMEOUT: "https://github.com/allthingslinux/tux/blob/main/assets/emojis/timeout.png?raw=true"
|
||||
WARN: "https://github.com/allthingslinux/tux/blob/main/assets/emojis/warn.png?raw=true"
|
||||
|
||||
GIF_LIMITER:
|
||||
RECENT_GIF_AGE: 60
|
||||
|
||||
GIF_LIMIT_EXCLUDE:
|
||||
- 123456789012345
|
||||
|
||||
GIF_LIMITS_USER:
|
||||
"123456789012345": 2
|
||||
GIF_LIMITS_CHANNEL:
|
||||
"123456789012345": 3
|
||||
|
|
114
tux/cogs/services/gif_limiter.py
Normal file
114
tux/cogs/services/gif_limiter.py
Normal file
|
@ -0,0 +1,114 @@
|
|||
import asyncio
|
||||
from collections import defaultdict
|
||||
from time import time
|
||||
|
||||
import discord
|
||||
from discord.ext import commands, tasks
|
||||
|
||||
from tux.bot import Tux
|
||||
from tux.utils.constants import CONST
|
||||
|
||||
|
||||
class GifLimiter(commands.Cog):
|
||||
"""
|
||||
This class is a handler for GIF ratelimiting.
|
||||
It keeps a list of GIF send times and routinely removes old times.
|
||||
It will prevent people from posting GIFs if the quotas are exceeded.
|
||||
"""
|
||||
|
||||
def __init__(self, bot: Tux) -> None:
|
||||
self.bot = bot
|
||||
|
||||
# Max age for a GIF to be considered a recent post
|
||||
self.recent_gif_age: int = CONST.RECENT_GIF_AGE
|
||||
|
||||
# Max number of GIFs sent recently in a channel
|
||||
self.channelwide_gif_limits: dict[int, int] = CONST.GIF_LIMITS_CHANNEL
|
||||
# Max number of GIFs sent recently by a user to be able to post one in specified channels
|
||||
self.user_gif_limits: dict[int, int] = CONST.GIF_LIMITS
|
||||
|
||||
# list of channels in which not to count GIFs
|
||||
self.gif_limit_exclude: list[int] = CONST.GIF_LIMIT_EXCLUDE
|
||||
|
||||
# Timestamps for recently-sent GIFs for the server, and channels
|
||||
|
||||
# UID, list of timestamps
|
||||
self.recent_gifs_by_user: defaultdict[int, list[int]] = defaultdict(list)
|
||||
# Channel ID, list of timestamps
|
||||
self.recent_gifs_by_channel: defaultdict[int, list[int]] = defaultdict(list)
|
||||
|
||||
# Lock to prevent race conditions
|
||||
self.gif_lock = asyncio.Lock()
|
||||
|
||||
self.old_gif_remover.start()
|
||||
|
||||
async def _should_process_message(self, message: discord.Message) -> bool:
|
||||
"""Checks if a message contains a GIF and was not sent in a blacklisted channel"""
|
||||
return not (
|
||||
len(message.embeds) == 0
|
||||
or "gif" not in message.content.lower()
|
||||
or message.channel.id in self.gif_limit_exclude
|
||||
)
|
||||
|
||||
async def _handle_gif_message(self, message: discord.Message) -> None:
|
||||
"""Checks for ratelimit infringements"""
|
||||
async with self.gif_lock:
|
||||
channel: int = message.channel.id
|
||||
user: int = message.author.id
|
||||
|
||||
if (
|
||||
channel in self.channelwide_gif_limits
|
||||
and channel in self.recent_gifs_by_channel
|
||||
and len(self.recent_gifs_by_channel[channel]) >= self.channelwide_gif_limits[channel]
|
||||
):
|
||||
await self._delete_message(message, "for channel")
|
||||
return
|
||||
|
||||
if (
|
||||
user in self.recent_gifs_by_user
|
||||
and channel in self.user_gif_limits
|
||||
and len(self.recent_gifs_by_user[user]) >= self.user_gif_limits[channel]
|
||||
):
|
||||
await self._delete_message(message, "for user")
|
||||
return
|
||||
|
||||
# Add message to recent GIFs if it doesn't infringe on ratelimits
|
||||
current_time: int = int(time())
|
||||
self.recent_gifs_by_channel[channel].append(current_time)
|
||||
self.recent_gifs_by_user[user].append(current_time)
|
||||
|
||||
async def _delete_message(self, message: discord.Message, epilogue: str) -> None:
|
||||
"""
|
||||
Deletes the message passed as an argument, and sends a self-deleting message with the reason
|
||||
"""
|
||||
await message.delete()
|
||||
await message.channel.send(f"-# GIF ratelimit exceeded {epilogue}", delete_after=3)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: discord.Message) -> None:
|
||||
"""Checks for GIFs in every sent message"""
|
||||
|
||||
if await self._should_process_message(message):
|
||||
await self._handle_gif_message(message)
|
||||
|
||||
@tasks.loop(seconds=20)
|
||||
async def old_gif_remover(self) -> None:
|
||||
"""Regularly cleans old GIF timestamps"""
|
||||
current_time: int = int(time())
|
||||
|
||||
async with self.gif_lock:
|
||||
for channel_id, timestamps in self.recent_gifs_by_channel.items():
|
||||
self.recent_gifs_by_channel[channel_id] = [
|
||||
t for t in timestamps if current_time - t < self.recent_gif_age
|
||||
]
|
||||
|
||||
for user_id, timestamps in self.recent_gifs_by_user.items():
|
||||
self.recent_gifs_by_user[user_id] = [t for t in timestamps if current_time - t < self.recent_gif_age]
|
||||
|
||||
# Delete user key if no GIF has recently been sent by them
|
||||
if len(self.recent_gifs_by_user[user_id]) == 0:
|
||||
del self.recent_gifs_by_user[user_id]
|
||||
|
||||
|
||||
async def setup(bot: Tux) -> None:
|
||||
await bot.add_cog(GifLimiter(bot))
|
|
@ -6,6 +6,8 @@ from typing import Final
|
|||
import yaml
|
||||
from dotenv import load_dotenv, set_key
|
||||
|
||||
from tux.utils.functions import convert_dict_str_to_int
|
||||
|
||||
load_dotenv(verbose=True)
|
||||
|
||||
config_file = Path("config/settings.yml")
|
||||
|
@ -77,6 +79,13 @@ class Constants:
|
|||
# Icon constants
|
||||
EMBED_ICONS: Final[dict[str, str]] = config["EMBED_ICONS"]
|
||||
|
||||
# GIF ratelimit constants
|
||||
RECENT_GIF_AGE: Final[int] = config["GIF_LIMITER"]["RECENT_GIF_AGE"]
|
||||
GIF_LIMIT_EXCLUDE: Final[list[int]] = config["GIF_LIMITER"]["GIF_LIMIT_EXCLUDE"]
|
||||
|
||||
GIF_LIMITS: Final[dict[int, int]] = convert_dict_str_to_int(config["GIF_LIMITER"]["GIF_LIMITS_USER"])
|
||||
GIF_LIMITS_CHANNEL: Final[dict[int, int]] = convert_dict_str_to_int(config["GIF_LIMITER"]["GIF_LIMITS_CHANNEL"])
|
||||
|
||||
# Embed limit constants
|
||||
EMBED_MAX_NAME_LENGTH = 256
|
||||
EMBED_MAX_DESC_LENGTH = 4096
|
||||
|
|
|
@ -3,6 +3,7 @@ from datetime import UTC, datetime, timedelta
|
|||
from typing import Any
|
||||
|
||||
import discord
|
||||
from loguru import logger
|
||||
|
||||
harmful_command_pattern = r"(?:sudo\s+|doas\s+|run0\s+)?rm\s+(-[frR]*|--force|--recursive|--no-preserve-root|\s+)*([/\∕~]\s*|\*|/bin|/boot|/etc|/lib|/proc|/root|/sbin|/sys|/tmp|/usr|/var|/var/log|/network.|/system)(\s+--no-preserve-root|\s+\*)*|:\(\)\{ :|:& \};:" # noqa: RUF001
|
||||
|
||||
|
@ -300,3 +301,19 @@ def extract_member_attrs(member: discord.Member) -> dict[str, Any]:
|
|||
"status": member.status,
|
||||
"activity": member.activity,
|
||||
}
|
||||
|
||||
|
||||
def convert_dict_str_to_int(original_dict: dict[str, int]) -> dict[int, int]:
|
||||
"""Helper function used for GIF Limiter constants.
|
||||
Required as YAML keys are str. Channel and user IDs are int."""
|
||||
|
||||
converted_dict: dict[int, int] = {}
|
||||
|
||||
for key, value in original_dict.items():
|
||||
try:
|
||||
int_key: int = int(key)
|
||||
converted_dict[int_key] = value
|
||||
except ValueError:
|
||||
logger.exception(f"An error occurred when loading the GIF ratelimiter configuration at key {key}")
|
||||
|
||||
return converted_dict
|
||||
|
|
Loading…
Reference in a new issue