"""Catalog price sync service — syncs product prices from GoMag catalog to ROA Oracle.""" import asyncio import logging import uuid from datetime import datetime from zoneinfo import ZoneInfo from . import gomag_client, validation_service, sqlite_service from .. import database from ..config import settings logger = logging.getLogger(__name__) _tz = ZoneInfo("Europe/Bucharest") _price_sync_lock = asyncio.Lock() _current_price_sync = None def _now(): return datetime.now(_tz).replace(tzinfo=None) async def prepare_price_sync() -> dict: global _current_price_sync if _price_sync_lock.locked(): return {"error": "Price sync already running"} run_id = _now().strftime("%Y%m%d_%H%M%S") + "_ps_" + uuid.uuid4().hex[:6] _current_price_sync = { "run_id": run_id, "status": "running", "started_at": _now().isoformat(), "finished_at": None, "phase_text": "Starting...", } # Create SQLite record db = await sqlite_service.get_sqlite() try: await db.execute( "INSERT INTO price_sync_runs (run_id, started_at, status) VALUES (?, ?, 'running')", (run_id, _now().strftime("%d.%m.%Y %H:%M:%S")) ) await db.commit() finally: await db.close() return {"run_id": run_id} async def get_price_sync_status() -> dict: if _current_price_sync and _current_price_sync.get("status") == "running": return _current_price_sync # Return last run from SQLite db = await sqlite_service.get_sqlite() try: cursor = await db.execute( "SELECT * FROM price_sync_runs ORDER BY started_at DESC LIMIT 1" ) row = await cursor.fetchone() if row: return {"status": "idle", "last_run": dict(row)} return {"status": "idle"} except Exception: return {"status": "idle"} finally: await db.close() async def run_catalog_price_sync(run_id: str): global _current_price_sync async with _price_sync_lock: log_lines = [] def _log(msg): logger.info(msg) log_lines.append(f"[{_now().strftime('%H:%M:%S')}] {msg}") if _current_price_sync: _current_price_sync["phase_text"] = msg try: app_settings = await sqlite_service.get_app_settings() id_pol = int(app_settings.get("id_pol") or 0) or None id_pol_productie = int(app_settings.get("id_pol_productie") or 0) or None if not id_pol: _log("Politica de preț nu e configurată — skip sync") await _finish_run(run_id, "error", log_lines, error="No price policy") return # Fetch products from GoMag _log("Descărcare produse din GoMag API...") products = await gomag_client.download_products( api_key=app_settings.get("gomag_api_key"), api_shop=app_settings.get("gomag_api_shop"), products_url=app_settings.get("gomag_products_url") or None, log_fn=_log, ) if not products: _log("Niciun produs descărcat") await _finish_run(run_id, "completed", log_lines, products_total=0) return # Connect to Oracle conn = await asyncio.to_thread(database.get_oracle_connection) try: # Get all mappings from ARTICOLE_TERTI _log("Citire mapări ARTICOLE_TERTI...") mapped_data = await asyncio.to_thread( validation_service.resolve_mapped_codmats, {p["sku"] for p in products}, conn ) # Get direct articles from NOM_ARTICOLE _log("Identificare articole directe...") direct_id_map = {} with conn.cursor() as cur: all_skus = list({p["sku"] for p in products}) for i in range(0, len(all_skus), 500): batch = all_skus[i:i+500] placeholders = ",".join([f":s{j}" for j in range(len(batch))]) params = {f"s{j}": sku for j, sku in enumerate(batch)} cur.execute(f""" SELECT codmat, id_articol, cont FROM nom_articole WHERE codmat IN ({placeholders}) AND sters = 0 AND inactiv = 0 """, params) for row in cur: if row[0] not in mapped_data: direct_id_map[row[0]] = {"id_articol": row[1], "cont": row[2]} matched = 0 updated = 0 errors = 0 for product in products: sku = product["sku"] try: price_str = product.get("price", "0") price = float(price_str) if price_str else 0 if price <= 0: continue vat = float(product.get("vat", "19")) vat_included = product.get("vat_included", "1") # Calculate price with TVA if vat_included == "1": price_cu_tva = price else: price_cu_tva = price * (1 + vat / 100) # Skip kits (>1 CODMAT) if sku in mapped_data and len(mapped_data[sku]) > 1: continue # Determine id_articol and policy id_articol = None cantitate_roa = 1 if sku in mapped_data and len(mapped_data[sku]) == 1: comp = mapped_data[sku][0] id_articol = comp["id_articol"] cantitate_roa = comp.get("cantitate_roa") or 1 elif sku in direct_id_map: id_articol = direct_id_map[sku]["id_articol"] else: continue # SKU not in ROA matched += 1 price_per_unit = price_cu_tva / cantitate_roa if cantitate_roa != 1 else price_cu_tva # Determine policy cont = None if sku in mapped_data and len(mapped_data[sku]) == 1: cont = mapped_data[sku][0].get("cont") elif sku in direct_id_map: cont = direct_id_map[sku].get("cont") cont_str = str(cont or "").strip() pol = id_pol_productie if (cont_str in ("341", "345") and id_pol_productie) else id_pol result = await asyncio.to_thread( validation_service.compare_and_update_price, id_articol, pol, price_per_unit, conn ) if result and result["updated"]: updated += 1 _log(f" {result['codmat']}: {result['old_price']:.2f} → {result['new_price']:.2f}") except Exception as e: errors += 1 _log(f"Eroare produs {sku}: {e}") _log(f"Sync complet: {len(products)} produse, {matched} potrivite, {updated} actualizate, {errors} erori") finally: await asyncio.to_thread(database.pool.release, conn) await _finish_run(run_id, "completed", log_lines, products_total=len(products), matched=matched, updated=updated, errors=errors) except Exception as e: _log(f"Eroare critică: {e}") logger.error(f"Catalog price sync error: {e}", exc_info=True) await _finish_run(run_id, "error", log_lines, error=str(e)) async def _finish_run(run_id, status, log_lines, products_total=0, matched=0, updated=0, errors=0, error=None): global _current_price_sync db = await sqlite_service.get_sqlite() try: await db.execute(""" UPDATE price_sync_runs SET finished_at = ?, status = ?, products_total = ?, matched = ?, updated = ?, errors = ?, log_text = ? WHERE run_id = ? """, (_now().strftime("%d.%m.%Y %H:%M:%S"), status, products_total, matched, updated, errors, "\n".join(log_lines), run_id)) await db.commit() finally: await db.close() _current_price_sync = None