1
Fork 0
mirror of https://github.com/wlinator/luminara.git synced 2024-10-03 00:23:13 +00:00
Lumi/db/database.py

116 lines
3.7 KiB
Python
Raw Normal View History

import os
import pathlib
import re
2024-03-29 17:17:51 +00:00
2024-08-18 09:16:18 +00:00
import mysql.connector
from loguru import logger
from mysql.connector import pooling
2024-07-17 12:01:12 +00:00
from lib.constants import CONST
2023-06-19 14:20:17 +00:00
2024-07-14 16:07:00 +00:00
def create_connection_pool(name: str, size: int) -> pooling.MySQLConnectionPool:
return pooling.MySQLConnectionPool(
2024-07-14 16:07:00 +00:00
pool_name=name,
pool_size=size,
host="db",
port=3306,
database=CONST.MARIADB_DATABASE,
user=CONST.MARIADB_USER,
password=CONST.MARIADB_PASSWORD,
2024-07-14 16:07:00 +00:00
charset="utf8mb4",
collation="utf8mb4_unicode_ci",
)
try:
_cnxpool = create_connection_pool("core-pool", 25)
2024-07-14 16:07:00 +00:00
except mysql.connector.Error as e:
logger.critical(f"Couldn't create the MySQL connection pool: {e}")
raise e
2023-06-19 14:20:17 +00:00
def execute_query(query, values=None):
with _cnxpool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, values)
conn.commit()
return cursor
2023-06-19 14:20:17 +00:00
def select_query(query, values=None):
with _cnxpool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, values)
return cursor.fetchall()
2023-06-19 14:20:17 +00:00
def select_query_one(query, values=None):
with _cnxpool.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, values)
output = cursor.fetchone()
return output[0] if output else None
2024-07-18 15:26:36 +00:00
def select_query_dict(query, values=None):
2024-07-18 15:26:36 +00:00
with _cnxpool.get_connection() as conn:
with conn.cursor(dictionary=True) as cursor:
2024-07-18 15:26:36 +00:00
cursor.execute(query, values)
return cursor.fetchall()
2024-08-01 13:53:09 +00:00
2024-08-01 13:53:46 +00:00
def run_migrations():
migrations_dir = "db/migrations"
migration_files = sorted(
[f for f in os.listdir(migrations_dir) if f.endswith(".sql")],
)
with _cnxpool.get_connection() as conn:
with conn.cursor() as cursor:
# Create migrations table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS migrations (
id INT AUTO_INCREMENT PRIMARY KEY,
filename VARCHAR(255) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
for migration_file in migration_files:
# Check if migration has already been applied
cursor.execute(
"SELECT COUNT(*) FROM migrations WHERE filename = %s",
(migration_file,),
)
if cursor.fetchone()[0] > 0:
logger.debug(
f"Migration {migration_file} already applied, skipping.",
)
continue
# Read and execute migration file
migration_sql = pathlib.Path(
os.path.join(migrations_dir, migration_file),
).read_text()
try:
# Split the migration file into individual statements
statements = re.split(r";\s*$", migration_sql, flags=re.MULTILINE)
for statement in statements:
if statement.strip():
cursor.execute(statement)
# Record successful migration
cursor.execute(
"INSERT INTO migrations (filename) VALUES (%s)",
(migration_file,),
)
conn.commit()
logger.debug(f"Successfully applied migration: {migration_file}")
except mysql.connector.Error as e:
conn.rollback()
logger.error(f"Error applying migration {migration_file}: {e}")
raise
logger.debug("All migrations completed.")