1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-02 22:23:13 +00:00
Lumi/db/database.py

111 lines
3.7 KiB
Python
Raw Normal View History

2024-08-28 14:44:07 +00:00
import os
import pathlib
import re
2024-08-29 08:48:14 +00:00
from typing import Any
2024-08-28 14:44:07 +00:00
import mysql.connector
from loguru import logger
from mysql.connector import pooling
from lib.const import CONST
def create_connection_pool(name: str, size: int) -> pooling.MySQLConnectionPool:
return pooling.MySQLConnectionPool(
pool_name=name,
pool_size=size,
host="db",
port=3306,
database=CONST.MARIADB_DATABASE,
user=CONST.MARIADB_USER,
password=CONST.MARIADB_PASSWORD,
charset="utf8mb4",
collation="utf8mb4_unicode_ci",
)
try:
_cnxpool = create_connection_pool("core-pool", 25)
except mysql.connector.Error as e:
logger.critical(f"Couldn't create the MySQL connection pool: {e}")
2024-08-29 08:48:14 +00:00
raise
2024-08-28 14:44:07 +00:00
2024-08-29 08:48:14 +00:00
def execute_query(query: str, values: tuple[Any, ...] | None = None) -> None:
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
cursor.execute(query, values)
conn.commit()
return cursor
2024-08-28 14:44:07 +00:00
2024-08-29 08:48:14 +00:00
def select_query(query: str, values: tuple[Any, ...] | None = None) -> list[Any]:
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
cursor.execute(query, values)
return cursor.fetchall()
2024-08-28 14:44:07 +00:00
2024-08-29 08:48:14 +00:00
def select_query_one(query: str, values: tuple[Any, ...] | None = None) -> Any | None:
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
cursor.execute(query, values)
output = cursor.fetchone()
return output[0] if output else None
2024-08-28 14:44:07 +00:00
2024-08-29 08:48:14 +00:00
def select_query_dict(query: str, values: tuple[Any, ...] | None = None) -> list[dict[str, Any]]:
with _cnxpool.get_connection() as conn, conn.cursor(dictionary=True) as cursor:
cursor.execute(query, values)
return cursor.fetchall()
2024-08-28 14:44:07 +00:00
def run_migrations():
migrations_dir = "db/migrations"
migration_files = sorted(
[f for f in os.listdir(migrations_dir) if f.endswith(".sql")],
)
2024-08-29 08:48:14 +00:00
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
# Create migrations table if it doesn't exist
cursor.execute("""
2024-08-28 14:44:07 +00:00
CREATE TABLE IF NOT EXISTS migrations (
id INT AUTO_INCREMENT PRIMARY KEY,
filename VARCHAR(255) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
2024-08-29 08:48:14 +00:00
for migration_file in migration_files:
# Check if migration has already been applied
cursor.execute(
"SELECT COUNT(*) FROM migrations WHERE filename = %s",
(migration_file,),
)
if cursor.fetchone()[0] > 0:
logger.debug(
f"Migration {migration_file} already applied, skipping.",
)
continue
# Read and execute migration file
migration_sql = pathlib.Path(migrations_dir) / migration_file
migration_sql = migration_sql.read_text()
try:
# Split the migration file into individual statements
statements = re.split(r";\s*$", migration_sql, flags=re.MULTILINE)
for statement in statements:
if statement.strip():
cursor.execute(statement)
# Record successful migration
2024-08-28 14:44:07 +00:00
cursor.execute(
2024-08-29 08:48:14 +00:00
"INSERT INTO migrations (filename) VALUES (%s)",
2024-08-28 14:44:07 +00:00
(migration_file,),
)
2024-08-29 08:48:14 +00:00
conn.commit()
logger.debug(f"Successfully applied migration: {migration_file}")
except mysql.connector.Error as e:
conn.rollback()
logger.error(f"Error applying migration {migration_file}: {e}")
raise
2024-08-28 14:44:07 +00:00
logger.success("All database migrations completed.")