1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-02 18:03:12 +00:00
Lumi/db/database.py
2024-08-29 04:48:14 -04:00

110 lines
3.7 KiB
Python

import os
import pathlib
import re
from typing import Any
import mysql.connector
from loguru import logger
from mysql.connector import pooling
from lib.const import CONST
def create_connection_pool(name: str, size: int) -> pooling.MySQLConnectionPool:
return pooling.MySQLConnectionPool(
pool_name=name,
pool_size=size,
host="db",
port=3306,
database=CONST.MARIADB_DATABASE,
user=CONST.MARIADB_USER,
password=CONST.MARIADB_PASSWORD,
charset="utf8mb4",
collation="utf8mb4_unicode_ci",
)
try:
_cnxpool = create_connection_pool("core-pool", 25)
except mysql.connector.Error as e:
logger.critical(f"Couldn't create the MySQL connection pool: {e}")
raise
def execute_query(query: str, values: tuple[Any, ...] | None = None) -> None:
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
cursor.execute(query, values)
conn.commit()
return cursor
def select_query(query: str, values: tuple[Any, ...] | None = None) -> list[Any]:
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
cursor.execute(query, values)
return cursor.fetchall()
def select_query_one(query: str, values: tuple[Any, ...] | None = None) -> Any | None:
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
cursor.execute(query, values)
output = cursor.fetchone()
return output[0] if output else None
def select_query_dict(query: str, values: tuple[Any, ...] | None = None) -> list[dict[str, Any]]:
with _cnxpool.get_connection() as conn, conn.cursor(dictionary=True) as cursor:
cursor.execute(query, values)
return cursor.fetchall()
def run_migrations():
migrations_dir = "db/migrations"
migration_files = sorted(
[f for f in os.listdir(migrations_dir) if f.endswith(".sql")],
)
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
# Create migrations table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS migrations (
id INT AUTO_INCREMENT PRIMARY KEY,
filename VARCHAR(255) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
for migration_file in migration_files:
# Check if migration has already been applied
cursor.execute(
"SELECT COUNT(*) FROM migrations WHERE filename = %s",
(migration_file,),
)
if cursor.fetchone()[0] > 0:
logger.debug(
f"Migration {migration_file} already applied, skipping.",
)
continue
# Read and execute migration file
migration_sql = pathlib.Path(migrations_dir) / migration_file
migration_sql = migration_sql.read_text()
try:
# Split the migration file into individual statements
statements = re.split(r";\s*$", migration_sql, flags=re.MULTILINE)
for statement in statements:
if statement.strip():
cursor.execute(statement)
# Record successful migration
cursor.execute(
"INSERT INTO migrations (filename) VALUES (%s)",
(migration_file,),
)
conn.commit()
logger.debug(f"Successfully applied migration: {migration_file}")
except mysql.connector.Error as e:
conn.rollback()
logger.error(f"Error applying migration {migration_file}: {e}")
raise
logger.success("All database migrations completed.")