import oracledb import aiosqlite import sqlite3 import logging import os from pathlib import Path from .config import settings logger = logging.getLogger(__name__) # ---- Oracle Pool ---- pool = None def init_oracle(): """Initialize Oracle client mode and create connection pool.""" global pool force_thin = settings.FORCE_THIN_MODE instantclient_path = settings.INSTANTCLIENTPATH dsn = settings.ORACLE_DSN # Ensure TNS_ADMIN is set as OS env var so oracledb can find tnsnames.ora if settings.TNS_ADMIN: os.environ['TNS_ADMIN'] = settings.TNS_ADMIN logger.info(f"Oracle config: DSN={dsn}, TNS_ADMIN={settings.TNS_ADMIN or os.environ.get('TNS_ADMIN', '(not set)')}, INSTANTCLIENTPATH={instantclient_path or '(not set)'}") if force_thin: logger.info(f"FORCE_THIN_MODE=true: thin mode for {dsn}") elif instantclient_path: try: oracledb.init_oracle_client(lib_dir=instantclient_path) logger.info(f"Thick mode activated for {dsn}") except Exception as e: logger.error(f"Thick mode error: {e}") logger.info("Fallback to thin mode") else: logger.info(f"Thin mode (default) for {dsn}") pool = oracledb.create_pool( user=settings.ORACLE_USER, password=settings.ORACLE_PASSWORD, dsn=settings.ORACLE_DSN, min=2, max=4, increment=1 ) logger.info(f"Oracle pool created for {dsn}") return pool def get_oracle_connection(): """Get a connection from the Oracle pool.""" if pool is None: raise RuntimeError("Oracle pool not initialized") return pool.acquire() def close_oracle(): """Close the Oracle connection pool.""" global pool if pool: pool.close() pool = None logger.info("Oracle pool closed") # ---- SQLite ---- SQLITE_SCHEMA = """ CREATE TABLE IF NOT EXISTS sync_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT UNIQUE, started_at TEXT, finished_at TEXT, status TEXT, total_orders INTEGER DEFAULT 0, imported INTEGER DEFAULT 0, skipped INTEGER DEFAULT 0, errors INTEGER DEFAULT 0, json_files INTEGER DEFAULT 0, error_message TEXT, already_imported INTEGER DEFAULT 0, new_imported INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS orders ( order_number TEXT PRIMARY KEY, order_date TEXT, customer_name TEXT, status TEXT, id_comanda INTEGER, id_partener INTEGER, id_adresa_facturare INTEGER, id_adresa_livrare INTEGER, error_message TEXT, missing_skus TEXT, items_count INTEGER, times_skipped INTEGER DEFAULT 0, first_seen_at TEXT DEFAULT (datetime('now')), last_sync_run_id TEXT REFERENCES sync_runs(run_id), updated_at TEXT DEFAULT (datetime('now')), shipping_name TEXT, billing_name TEXT, payment_method TEXT, delivery_method TEXT, factura_serie TEXT, factura_numar TEXT, factura_total_fara_tva REAL, factura_total_tva REAL, factura_total_cu_tva REAL, factura_data TEXT, invoice_checked_at TEXT, order_total REAL, delivery_cost REAL, discount_total REAL, web_status TEXT ); CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status); CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date); CREATE TABLE IF NOT EXISTS sync_run_orders ( sync_run_id TEXT REFERENCES sync_runs(run_id), order_number TEXT REFERENCES orders(order_number), status_at_run TEXT, PRIMARY KEY (sync_run_id, order_number) ); CREATE TABLE IF NOT EXISTS missing_skus ( sku TEXT PRIMARY KEY, product_name TEXT, first_seen TEXT DEFAULT (datetime('now')), resolved INTEGER DEFAULT 0, resolved_at TEXT, order_count INTEGER DEFAULT 0, order_numbers TEXT, customers TEXT ); CREATE TABLE IF NOT EXISTS scheduler_config ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS web_products ( sku TEXT PRIMARY KEY, product_name TEXT, first_seen TEXT DEFAULT (datetime('now')), last_seen TEXT DEFAULT (datetime('now')), order_count INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS app_settings ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS order_items ( order_number TEXT, sku TEXT, product_name TEXT, quantity REAL, price REAL, vat REAL, mapping_status TEXT, codmat TEXT, id_articol INTEGER, cantitate_roa REAL, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (order_number, sku) ); CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_number); """ _sqlite_db_path = None def init_sqlite(): """Initialize SQLite database with schema.""" global _sqlite_db_path _sqlite_db_path = settings.SQLITE_DB_PATH # Ensure directory exists db_dir = os.path.dirname(_sqlite_db_path) if db_dir: os.makedirs(db_dir, exist_ok=True) # Create tables synchronously conn = sqlite3.connect(_sqlite_db_path) # Check existing tables before running schema cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") existing_tables = {row[0] for row in cursor.fetchall()} # Migration: import_orders → orders (one row per order) if 'import_orders' in existing_tables and 'orders' not in existing_tables: logger.info("Migrating import_orders → orders schema...") conn.executescript(""" CREATE TABLE orders ( order_number TEXT PRIMARY KEY, order_date TEXT, customer_name TEXT, status TEXT, id_comanda INTEGER, id_partener INTEGER, id_adresa_facturare INTEGER, id_adresa_livrare INTEGER, error_message TEXT, missing_skus TEXT, items_count INTEGER, times_skipped INTEGER DEFAULT 0, first_seen_at TEXT DEFAULT (datetime('now')), last_sync_run_id TEXT, updated_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status); CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date); CREATE TABLE sync_run_orders ( sync_run_id TEXT, order_number TEXT, status_at_run TEXT, PRIMARY KEY (sync_run_id, order_number) ); """) # Copy latest record per order_number into orders # Note: old import_orders didn't have address columns — those stay NULL conn.execute(""" INSERT INTO orders (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, missing_skus, items_count, last_sync_run_id) SELECT io.order_number, io.order_date, io.customer_name, io.status, io.id_comanda, io.id_partener, io.error_message, io.missing_skus, io.items_count, io.sync_run_id FROM import_orders io INNER JOIN ( SELECT order_number, MAX(id) as max_id FROM import_orders GROUP BY order_number ) latest ON io.id = latest.max_id """) # Populate sync_run_orders from all import_orders rows conn.execute(""" INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) SELECT sync_run_id, order_number, status FROM import_orders WHERE sync_run_id IS NOT NULL """) # Migrate order_items: drop sync_run_id, change PK to (order_number, sku) if 'order_items' in existing_tables: conn.executescript(""" CREATE TABLE order_items_new ( order_number TEXT, sku TEXT, product_name TEXT, quantity REAL, price REAL, vat REAL, mapping_status TEXT, codmat TEXT, id_articol INTEGER, cantitate_roa REAL, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (order_number, sku) ); INSERT OR IGNORE INTO order_items_new (order_number, sku, product_name, quantity, price, vat, mapping_status, codmat, id_articol, cantitate_roa, created_at) SELECT order_number, sku, product_name, quantity, price, vat, mapping_status, codmat, id_articol, cantitate_roa, created_at FROM order_items; DROP TABLE order_items; ALTER TABLE order_items_new RENAME TO order_items; CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_number); """) # Rename old table instead of dropping (safety backup) conn.execute("ALTER TABLE import_orders RENAME TO import_orders_bak") conn.commit() logger.info("Migration complete: import_orders → orders") conn.executescript(SQLITE_SCHEMA) # Migrate: add columns if missing (for existing databases) try: cursor = conn.execute("PRAGMA table_info(missing_skus)") cols = {row[1] for row in cursor.fetchall()} for col, typedef in [("order_count", "INTEGER DEFAULT 0"), ("order_numbers", "TEXT"), ("customers", "TEXT")]: if col not in cols: conn.execute(f"ALTER TABLE missing_skus ADD COLUMN {col} {typedef}") logger.info(f"Migrated missing_skus: added column {col}") # Migrate sync_runs: add columns cursor = conn.execute("PRAGMA table_info(sync_runs)") sync_cols = {row[1] for row in cursor.fetchall()} if "error_message" not in sync_cols: conn.execute("ALTER TABLE sync_runs ADD COLUMN error_message TEXT") logger.info("Migrated sync_runs: added column error_message") if "already_imported" not in sync_cols: conn.execute("ALTER TABLE sync_runs ADD COLUMN already_imported INTEGER DEFAULT 0") logger.info("Migrated sync_runs: added column already_imported") if "new_imported" not in sync_cols: conn.execute("ALTER TABLE sync_runs ADD COLUMN new_imported INTEGER DEFAULT 0") logger.info("Migrated sync_runs: added column new_imported") # Migrate orders: add shipping/billing/payment/delivery + invoice columns cursor = conn.execute("PRAGMA table_info(orders)") order_cols = {row[1] for row in cursor.fetchall()} for col, typedef in [ ("shipping_name", "TEXT"), ("billing_name", "TEXT"), ("payment_method", "TEXT"), ("delivery_method", "TEXT"), ("factura_serie", "TEXT"), ("factura_numar", "TEXT"), ("factura_total_fara_tva", "REAL"), ("factura_total_tva", "REAL"), ("factura_total_cu_tva", "REAL"), ("factura_data", "TEXT"), ("invoice_checked_at", "TEXT"), ("order_total", "REAL"), ("delivery_cost", "REAL"), ("discount_total", "REAL"), ("web_status", "TEXT"), ]: if col not in order_cols: conn.execute(f"ALTER TABLE orders ADD COLUMN {col} {typedef}") logger.info(f"Migrated orders: added column {col}") conn.commit() except Exception as e: logger.warning(f"Migration check failed: {e}") conn.close() logger.info(f"SQLite initialized: {_sqlite_db_path}") async def get_sqlite(): """Get async SQLite connection.""" if _sqlite_db_path is None: raise RuntimeError("SQLite not initialized") db = await aiosqlite.connect(_sqlite_db_path) db.row_factory = aiosqlite.Row return db def get_sqlite_sync(): """Get synchronous SQLite connection.""" if _sqlite_db_path is None: raise RuntimeError("SQLite not initialized") conn = sqlite3.connect(_sqlite_db_path) conn.row_factory = sqlite3.Row return conn