Files
gomag-vending/api/app/database.py
Claude Agent a0649279cf log
2026-03-16 15:51:15 +00:00

344 lines
12 KiB
Python

import oracledb
import aiosqlite
import sqlite3
import logging
import os
from pathlib import Path
from .config import settings
logger = logging.getLogger(__name__)
# ---- Oracle Pool ----
pool = None
def init_oracle():
"""Initialize Oracle client mode and create connection pool."""
global pool
force_thin = settings.FORCE_THIN_MODE
instantclient_path = settings.INSTANTCLIENTPATH
dsn = settings.ORACLE_DSN
# Ensure TNS_ADMIN is set as OS env var so oracledb can find tnsnames.ora
if settings.TNS_ADMIN:
os.environ['TNS_ADMIN'] = settings.TNS_ADMIN
logger.info(f"Oracle config: DSN={dsn}, TNS_ADMIN={settings.TNS_ADMIN or os.environ.get('TNS_ADMIN', '(not set)')}, INSTANTCLIENTPATH={instantclient_path or '(not set)'}")
if force_thin:
logger.info(f"FORCE_THIN_MODE=true: thin mode for {dsn}")
elif instantclient_path:
try:
oracledb.init_oracle_client(lib_dir=instantclient_path)
logger.info(f"Thick mode activated for {dsn}")
except Exception as e:
logger.error(f"Thick mode error: {e}")
logger.info("Fallback to thin mode")
else:
logger.info(f"Thin mode (default) for {dsn}")
pool = oracledb.create_pool(
user=settings.ORACLE_USER,
password=settings.ORACLE_PASSWORD,
dsn=settings.ORACLE_DSN,
min=2,
max=4,
increment=1
)
logger.info(f"Oracle pool created for {dsn}")
return pool
def get_oracle_connection():
"""Get a connection from the Oracle pool."""
if pool is None:
raise RuntimeError("Oracle pool not initialized")
return pool.acquire()
def close_oracle():
"""Close the Oracle connection pool."""
global pool
if pool:
pool.close()
pool = None
logger.info("Oracle pool closed")
# ---- SQLite ----
SQLITE_SCHEMA = """
CREATE TABLE IF NOT EXISTS sync_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT UNIQUE,
started_at TEXT,
finished_at TEXT,
status TEXT,
total_orders INTEGER DEFAULT 0,
imported INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
errors INTEGER DEFAULT 0,
json_files INTEGER DEFAULT 0,
error_message TEXT,
already_imported INTEGER DEFAULT 0,
new_imported INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS orders (
order_number TEXT PRIMARY KEY,
order_date TEXT,
customer_name TEXT,
status TEXT,
id_comanda INTEGER,
id_partener INTEGER,
id_adresa_facturare INTEGER,
id_adresa_livrare INTEGER,
error_message TEXT,
missing_skus TEXT,
items_count INTEGER,
times_skipped INTEGER DEFAULT 0,
first_seen_at TEXT DEFAULT (datetime('now')),
last_sync_run_id TEXT REFERENCES sync_runs(run_id),
updated_at TEXT DEFAULT (datetime('now')),
shipping_name TEXT,
billing_name TEXT,
payment_method TEXT,
delivery_method TEXT,
factura_serie TEXT,
factura_numar TEXT,
factura_total_fara_tva REAL,
factura_total_tva REAL,
factura_total_cu_tva REAL,
invoice_checked_at TEXT,
order_total REAL,
delivery_cost REAL,
discount_total REAL
);
CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status);
CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date);
CREATE TABLE IF NOT EXISTS sync_run_orders (
sync_run_id TEXT REFERENCES sync_runs(run_id),
order_number TEXT REFERENCES orders(order_number),
status_at_run TEXT,
PRIMARY KEY (sync_run_id, order_number)
);
CREATE TABLE IF NOT EXISTS missing_skus (
sku TEXT PRIMARY KEY,
product_name TEXT,
first_seen TEXT DEFAULT (datetime('now')),
resolved INTEGER DEFAULT 0,
resolved_at TEXT,
order_count INTEGER DEFAULT 0,
order_numbers TEXT,
customers TEXT
);
CREATE TABLE IF NOT EXISTS scheduler_config (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS web_products (
sku TEXT PRIMARY KEY,
product_name TEXT,
first_seen TEXT DEFAULT (datetime('now')),
last_seen TEXT DEFAULT (datetime('now')),
order_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS order_items (
order_number TEXT,
sku TEXT,
product_name TEXT,
quantity REAL,
price REAL,
vat REAL,
mapping_status TEXT,
codmat TEXT,
id_articol INTEGER,
cantitate_roa REAL,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (order_number, sku)
);
CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_number);
"""
_sqlite_db_path = None
def init_sqlite():
"""Initialize SQLite database with schema."""
global _sqlite_db_path
_sqlite_db_path = settings.SQLITE_DB_PATH
# Ensure directory exists
db_dir = os.path.dirname(_sqlite_db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
# Create tables synchronously
conn = sqlite3.connect(_sqlite_db_path)
# Check existing tables before running schema
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
existing_tables = {row[0] for row in cursor.fetchall()}
# Migration: import_orders → orders (one row per order)
if 'import_orders' in existing_tables and 'orders' not in existing_tables:
logger.info("Migrating import_orders → orders schema...")
conn.executescript("""
CREATE TABLE orders (
order_number TEXT PRIMARY KEY,
order_date TEXT,
customer_name TEXT,
status TEXT,
id_comanda INTEGER,
id_partener INTEGER,
id_adresa_facturare INTEGER,
id_adresa_livrare INTEGER,
error_message TEXT,
missing_skus TEXT,
items_count INTEGER,
times_skipped INTEGER DEFAULT 0,
first_seen_at TEXT DEFAULT (datetime('now')),
last_sync_run_id TEXT,
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status);
CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date);
CREATE TABLE sync_run_orders (
sync_run_id TEXT,
order_number TEXT,
status_at_run TEXT,
PRIMARY KEY (sync_run_id, order_number)
);
""")
# Copy latest record per order_number into orders
# Note: old import_orders didn't have address columns — those stay NULL
conn.execute("""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus,
items_count, last_sync_run_id)
SELECT io.order_number, io.order_date, io.customer_name, io.status,
io.id_comanda, io.id_partener, io.error_message, io.missing_skus,
io.items_count, io.sync_run_id
FROM import_orders io
INNER JOIN (
SELECT order_number, MAX(id) as max_id
FROM import_orders
GROUP BY order_number
) latest ON io.id = latest.max_id
""")
# Populate sync_run_orders from all import_orders rows
conn.execute("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
SELECT sync_run_id, order_number, status
FROM import_orders
WHERE sync_run_id IS NOT NULL
""")
# Migrate order_items: drop sync_run_id, change PK to (order_number, sku)
if 'order_items' in existing_tables:
conn.executescript("""
CREATE TABLE order_items_new (
order_number TEXT,
sku TEXT,
product_name TEXT,
quantity REAL,
price REAL,
vat REAL,
mapping_status TEXT,
codmat TEXT,
id_articol INTEGER,
cantitate_roa REAL,
created_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (order_number, sku)
);
INSERT OR IGNORE INTO order_items_new
(order_number, sku, product_name, quantity, price, vat,
mapping_status, codmat, id_articol, cantitate_roa, created_at)
SELECT order_number, sku, product_name, quantity, price, vat,
mapping_status, codmat, id_articol, cantitate_roa, created_at
FROM order_items;
DROP TABLE order_items;
ALTER TABLE order_items_new RENAME TO order_items;
CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_number);
""")
# Rename old table instead of dropping (safety backup)
conn.execute("ALTER TABLE import_orders RENAME TO import_orders_bak")
conn.commit()
logger.info("Migration complete: import_orders → orders")
conn.executescript(SQLITE_SCHEMA)
# Migrate: add columns if missing (for existing databases)
try:
cursor = conn.execute("PRAGMA table_info(missing_skus)")
cols = {row[1] for row in cursor.fetchall()}
for col, typedef in [("order_count", "INTEGER DEFAULT 0"),
("order_numbers", "TEXT"),
("customers", "TEXT")]:
if col not in cols:
conn.execute(f"ALTER TABLE missing_skus ADD COLUMN {col} {typedef}")
logger.info(f"Migrated missing_skus: added column {col}")
# Migrate sync_runs: add columns
cursor = conn.execute("PRAGMA table_info(sync_runs)")
sync_cols = {row[1] for row in cursor.fetchall()}
if "error_message" not in sync_cols:
conn.execute("ALTER TABLE sync_runs ADD COLUMN error_message TEXT")
logger.info("Migrated sync_runs: added column error_message")
if "already_imported" not in sync_cols:
conn.execute("ALTER TABLE sync_runs ADD COLUMN already_imported INTEGER DEFAULT 0")
logger.info("Migrated sync_runs: added column already_imported")
if "new_imported" not in sync_cols:
conn.execute("ALTER TABLE sync_runs ADD COLUMN new_imported INTEGER DEFAULT 0")
logger.info("Migrated sync_runs: added column new_imported")
# Migrate orders: add shipping/billing/payment/delivery + invoice columns
cursor = conn.execute("PRAGMA table_info(orders)")
order_cols = {row[1] for row in cursor.fetchall()}
for col, typedef in [
("shipping_name", "TEXT"),
("billing_name", "TEXT"),
("payment_method", "TEXT"),
("delivery_method", "TEXT"),
("factura_serie", "TEXT"),
("factura_numar", "TEXT"),
("factura_total_fara_tva", "REAL"),
("factura_total_tva", "REAL"),
("factura_total_cu_tva", "REAL"),
("invoice_checked_at", "TEXT"),
("order_total", "REAL"),
("delivery_cost", "REAL"),
("discount_total", "REAL"),
]:
if col not in order_cols:
conn.execute(f"ALTER TABLE orders ADD COLUMN {col} {typedef}")
logger.info(f"Migrated orders: added column {col}")
conn.commit()
except Exception as e:
logger.warning(f"Migration check failed: {e}")
conn.close()
logger.info(f"SQLite initialized: {_sqlite_db_path}")
async def get_sqlite():
"""Get async SQLite connection."""
if _sqlite_db_path is None:
raise RuntimeError("SQLite not initialized")
db = await aiosqlite.connect(_sqlite_db_path)
db.row_factory = aiosqlite.Row
return db
def get_sqlite_sync():
"""Get synchronous SQLite connection."""
if _sqlite_db_path is None:
raise RuntimeError("SQLite not initialized")
conn = sqlite3.connect(_sqlite_db_path)
conn.row_factory = sqlite3.Row
return conn