mirror of
https://github.com/wlinator/luminara.git
synced 2024-10-02 20:43:12 +00:00
110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
import os
|
|
import pathlib
|
|
import re
|
|
from typing import Any
|
|
|
|
import mysql.connector
|
|
from loguru import logger
|
|
from mysql.connector import pooling
|
|
|
|
from lib.const import CONST
|
|
|
|
|
|
def create_connection_pool(name: str, size: int) -> pooling.MySQLConnectionPool:
|
|
return pooling.MySQLConnectionPool(
|
|
pool_name=name,
|
|
pool_size=size,
|
|
host="db",
|
|
port=3306,
|
|
database=CONST.MARIADB_DATABASE,
|
|
user=CONST.MARIADB_USER,
|
|
password=CONST.MARIADB_PASSWORD,
|
|
charset="utf8mb4",
|
|
collation="utf8mb4_unicode_ci",
|
|
)
|
|
|
|
|
|
try:
|
|
_cnxpool = create_connection_pool("core-pool", 25)
|
|
except mysql.connector.Error as e:
|
|
logger.critical(f"Couldn't create the MySQL connection pool: {e}")
|
|
raise
|
|
|
|
|
|
def execute_query(query: str, values: tuple[Any, ...] | None = None) -> None:
|
|
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
|
|
cursor.execute(query, values)
|
|
conn.commit()
|
|
return cursor
|
|
|
|
|
|
def select_query(query: str, values: tuple[Any, ...] | None = None) -> list[Any]:
|
|
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
|
|
cursor.execute(query, values)
|
|
return cursor.fetchall()
|
|
|
|
|
|
def select_query_one(query: str, values: tuple[Any, ...] | None = None) -> Any | None:
|
|
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
|
|
cursor.execute(query, values)
|
|
output = cursor.fetchone()
|
|
return output[0] if output else None
|
|
|
|
|
|
def select_query_dict(query: str, values: tuple[Any, ...] | None = None) -> list[dict[str, Any]]:
|
|
with _cnxpool.get_connection() as conn, conn.cursor(dictionary=True) as cursor:
|
|
cursor.execute(query, values)
|
|
return cursor.fetchall()
|
|
|
|
|
|
def run_migrations():
|
|
migrations_dir = "db/migrations"
|
|
migration_files = sorted(
|
|
[f for f in os.listdir(migrations_dir) if f.endswith(".sql")],
|
|
)
|
|
|
|
with _cnxpool.get_connection() as conn, conn.cursor() as cursor:
|
|
# Create migrations table if it doesn't exist
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS migrations (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
filename VARCHAR(255) NOT NULL,
|
|
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
for migration_file in migration_files:
|
|
# Check if migration has already been applied
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM migrations WHERE filename = %s",
|
|
(migration_file,),
|
|
)
|
|
if cursor.fetchone()[0] > 0:
|
|
logger.debug(
|
|
f"Migration {migration_file} already applied, skipping.",
|
|
)
|
|
continue
|
|
|
|
# Read and execute migration file
|
|
migration_sql = pathlib.Path(migrations_dir) / migration_file
|
|
migration_sql = migration_sql.read_text()
|
|
try:
|
|
# Split the migration file into individual statements
|
|
statements = re.split(r";\s*$", migration_sql, flags=re.MULTILINE)
|
|
for statement in statements:
|
|
if statement.strip():
|
|
cursor.execute(statement)
|
|
|
|
# Record successful migration
|
|
cursor.execute(
|
|
"INSERT INTO migrations (filename) VALUES (%s)",
|
|
(migration_file,),
|
|
)
|
|
conn.commit()
|
|
logger.debug(f"Successfully applied migration: {migration_file}")
|
|
except mysql.connector.Error as e:
|
|
conn.rollback()
|
|
logger.error(f"Error applying migration {migration_file}: {e}")
|
|
raise
|
|
|
|
logger.success("All database migrations completed.")
|