import oracledb import aiosqlite import sqlite3 import logging import os from pathlib import Path from .config import settings logger = logging.getLogger(__name__) # ---- Oracle Pool ---- pool = None def init_oracle(): """Initialize Oracle client mode and create connection pool.""" global pool force_thin = settings.FORCE_THIN_MODE instantclient_path = settings.INSTANTCLIENTPATH dsn = settings.ORACLE_DSN if force_thin: logger.info(f"FORCE_THIN_MODE=true: thin mode for {dsn}") elif instantclient_path: try: oracledb.init_oracle_client(lib_dir=instantclient_path) logger.info(f"Thick mode activated for {dsn}") except Exception as e: logger.error(f"Thick mode error: {e}") logger.info("Fallback to thin mode") else: logger.info(f"Thin mode (default) for {dsn}") pool = oracledb.create_pool( user=settings.ORACLE_USER, password=settings.ORACLE_PASSWORD, dsn=settings.ORACLE_DSN, min=2, max=4, increment=1 ) logger.info(f"Oracle pool created for {dsn}") return pool def get_oracle_connection(): """Get a connection from the Oracle pool.""" if pool is None: raise RuntimeError("Oracle pool not initialized") return pool.acquire() def close_oracle(): """Close the Oracle connection pool.""" global pool if pool: pool.close() pool = None logger.info("Oracle pool closed") # ---- SQLite ---- SQLITE_SCHEMA = """ CREATE TABLE IF NOT EXISTS sync_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT UNIQUE, started_at TEXT, finished_at TEXT, status TEXT, total_orders INTEGER DEFAULT 0, imported INTEGER DEFAULT 0, skipped INTEGER DEFAULT 0, errors INTEGER DEFAULT 0, json_files INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS import_orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, sync_run_id TEXT REFERENCES sync_runs(run_id), order_number TEXT, order_date TEXT, customer_name TEXT, status TEXT, id_comanda INTEGER, id_partener INTEGER, error_message TEXT, missing_skus TEXT, items_count INTEGER, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS missing_skus ( sku TEXT PRIMARY KEY, product_name TEXT, first_seen TEXT DEFAULT (datetime('now')), resolved INTEGER DEFAULT 0, resolved_at TEXT, order_count INTEGER DEFAULT 0, order_numbers TEXT, customers TEXT ); CREATE TABLE IF NOT EXISTS scheduler_config ( key TEXT PRIMARY KEY, value TEXT ); """ _sqlite_db_path = None def init_sqlite(): """Initialize SQLite database with schema.""" global _sqlite_db_path _sqlite_db_path = settings.SQLITE_DB_PATH # Ensure directory exists db_dir = os.path.dirname(_sqlite_db_path) if db_dir: os.makedirs(db_dir, exist_ok=True) # Create tables synchronously conn = sqlite3.connect(_sqlite_db_path) conn.executescript(SQLITE_SCHEMA) # Migrate: add columns if missing (for existing databases) try: cursor = conn.execute("PRAGMA table_info(missing_skus)") cols = {row[1] for row in cursor.fetchall()} for col, typedef in [("order_count", "INTEGER DEFAULT 0"), ("order_numbers", "TEXT"), ("customers", "TEXT")]: if col not in cols: conn.execute(f"ALTER TABLE missing_skus ADD COLUMN {col} {typedef}") logger.info(f"Migrated missing_skus: added column {col}") conn.commit() except Exception as e: logger.warning(f"Migration check failed: {e}") conn.close() logger.info(f"SQLite initialized: {_sqlite_db_path}") async def get_sqlite(): """Get async SQLite connection.""" if _sqlite_db_path is None: raise RuntimeError("SQLite not initialized") db = await aiosqlite.connect(_sqlite_db_path) db.row_factory = aiosqlite.Row return db def get_sqlite_sync(): """Get synchronous SQLite connection.""" if _sqlite_db_path is None: raise RuntimeError("SQLite not initialized") conn = sqlite3.connect(_sqlite_db_path) conn.row_factory = sqlite3.Row return conn