Kit discount: v_disc_amt is per-kit, not per-unit — remove division by v_cantitate_web so discount lines compute correctly (e.g. -2 x 5 = -10). Price sync: stop auto-inserting missing articles into price policies (was inserting with wrong proc_tvav from GoMag). Log warning instead. Kit detection: extend to single-component repackagings (cantitate_roa > 1) in both PL/SQL package and price sync/validation services. Add repackaging kit pricing test for separate_line and distributed modes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
259 lines
11 KiB
Python
259 lines
11 KiB
Python
"""Catalog price sync service — syncs product prices from GoMag catalog to ROA Oracle."""
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from . import gomag_client, validation_service, sqlite_service
|
|
from .. import database
|
|
from ..config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
_tz = ZoneInfo("Europe/Bucharest")
|
|
|
|
_price_sync_lock = asyncio.Lock()
|
|
_current_price_sync = None
|
|
|
|
|
|
def _now():
|
|
return datetime.now(_tz).replace(tzinfo=None)
|
|
|
|
|
|
async def prepare_price_sync() -> dict:
|
|
global _current_price_sync
|
|
if _price_sync_lock.locked():
|
|
return {"error": "Price sync already running"}
|
|
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_ps_" + uuid.uuid4().hex[:6]
|
|
_current_price_sync = {
|
|
"run_id": run_id, "status": "running",
|
|
"started_at": _now().isoformat(), "finished_at": None,
|
|
"phase_text": "Starting...",
|
|
}
|
|
# Create SQLite record
|
|
db = await sqlite_service.get_sqlite()
|
|
try:
|
|
await db.execute(
|
|
"INSERT INTO price_sync_runs (run_id, started_at, status) VALUES (?, ?, 'running')",
|
|
(run_id, _now().strftime("%d.%m.%Y %H:%M:%S"))
|
|
)
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
return {"run_id": run_id}
|
|
|
|
|
|
async def get_price_sync_status() -> dict:
|
|
if _current_price_sync and _current_price_sync.get("status") == "running":
|
|
return _current_price_sync
|
|
# Return last run from SQLite
|
|
db = await sqlite_service.get_sqlite()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT * FROM price_sync_runs ORDER BY started_at DESC LIMIT 1"
|
|
)
|
|
row = await cursor.fetchone()
|
|
if row:
|
|
return {"status": "idle", "last_run": dict(row)}
|
|
return {"status": "idle"}
|
|
except Exception:
|
|
return {"status": "idle"}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def run_catalog_price_sync(run_id: str):
|
|
global _current_price_sync
|
|
async with _price_sync_lock:
|
|
log_lines = []
|
|
def _log(msg):
|
|
logger.info(msg)
|
|
log_lines.append(f"[{_now().strftime('%H:%M:%S')}] {msg}")
|
|
if _current_price_sync:
|
|
_current_price_sync["phase_text"] = msg
|
|
|
|
try:
|
|
app_settings = await sqlite_service.get_app_settings()
|
|
id_pol = int(app_settings.get("id_pol") or 0) or None
|
|
id_pol_productie = int(app_settings.get("id_pol_productie") or 0) or None
|
|
|
|
if not id_pol:
|
|
_log("Politica de preț nu e configurată — skip sync")
|
|
await _finish_run(run_id, "error", log_lines, error="No price policy")
|
|
return
|
|
|
|
# Fetch products from GoMag
|
|
_log("Descărcare produse din GoMag API...")
|
|
products = await gomag_client.download_products(
|
|
api_key=app_settings.get("gomag_api_key"),
|
|
api_shop=app_settings.get("gomag_api_shop"),
|
|
products_url=app_settings.get("gomag_products_url") or None,
|
|
log_fn=_log,
|
|
)
|
|
|
|
if not products:
|
|
_log("Niciun produs descărcat")
|
|
await _finish_run(run_id, "completed", log_lines, products_total=0)
|
|
return
|
|
|
|
# Index products by SKU for kit component lookup
|
|
products_by_sku = {p["sku"]: p for p in products}
|
|
|
|
# Connect to Oracle
|
|
conn = await asyncio.to_thread(database.get_oracle_connection)
|
|
try:
|
|
# Get all mappings from ARTICOLE_TERTI
|
|
_log("Citire mapări ARTICOLE_TERTI...")
|
|
mapped_data = await asyncio.to_thread(
|
|
validation_service.resolve_mapped_codmats,
|
|
{p["sku"] for p in products}, conn
|
|
)
|
|
|
|
# Get direct articles from NOM_ARTICOLE
|
|
_log("Identificare articole directe...")
|
|
direct_id_map = {}
|
|
with conn.cursor() as cur:
|
|
all_skus = list({p["sku"] for p in products})
|
|
for i in range(0, len(all_skus), 500):
|
|
batch = all_skus[i:i+500]
|
|
placeholders = ",".join([f":s{j}" for j in range(len(batch))])
|
|
params = {f"s{j}": sku for j, sku in enumerate(batch)}
|
|
cur.execute(f"""
|
|
SELECT codmat, id_articol, cont FROM nom_articole
|
|
WHERE codmat IN ({placeholders}) AND sters = 0 AND inactiv = 0
|
|
""", params)
|
|
for row in cur:
|
|
if row[0] not in mapped_data:
|
|
direct_id_map[row[0]] = {"id_articol": row[1], "cont": row[2]}
|
|
|
|
matched = 0
|
|
updated = 0
|
|
errors = 0
|
|
|
|
for product in products:
|
|
sku = product["sku"]
|
|
try:
|
|
price_str = product.get("price", "0")
|
|
price = float(price_str) if price_str else 0
|
|
if price <= 0:
|
|
continue
|
|
|
|
vat = float(product.get("vat", "19"))
|
|
|
|
# Calculate price with TVA (vat_included can be int 1 or str "1")
|
|
if str(product.get("vat_included", "1")) == "1":
|
|
price_cu_tva = price
|
|
else:
|
|
price_cu_tva = price * (1 + vat / 100)
|
|
|
|
# For kits, sync each component individually from standalone GoMag prices
|
|
mapped_comps = mapped_data.get(sku, [])
|
|
is_kit = len(mapped_comps) > 1 or (
|
|
len(mapped_comps) == 1 and (mapped_comps[0].get("cantitate_roa") or 1) > 1
|
|
)
|
|
if is_kit:
|
|
for comp in mapped_data[sku]:
|
|
comp_codmat = comp["codmat"]
|
|
comp_product = products_by_sku.get(comp_codmat)
|
|
if not comp_product:
|
|
continue # Component not in GoMag as standalone product
|
|
|
|
comp_price_str = comp_product.get("price", "0")
|
|
comp_price = float(comp_price_str) if comp_price_str else 0
|
|
if comp_price <= 0:
|
|
continue
|
|
|
|
comp_vat = float(comp_product.get("vat", "19"))
|
|
|
|
# vat_included can be int 1 or str "1"
|
|
if str(comp_product.get("vat_included", "1")) == "1":
|
|
comp_price_cu_tva = comp_price
|
|
else:
|
|
comp_price_cu_tva = comp_price * (1 + comp_vat / 100)
|
|
|
|
comp_cont_str = str(comp.get("cont") or "").strip()
|
|
comp_pol = id_pol_productie if (comp_cont_str in ("341", "345") and id_pol_productie) else id_pol
|
|
|
|
matched += 1
|
|
result = await asyncio.to_thread(
|
|
validation_service.compare_and_update_price,
|
|
comp["id_articol"], comp_pol, comp_price_cu_tva, conn
|
|
)
|
|
if result and result["updated"]:
|
|
updated += 1
|
|
_log(f" {comp_codmat}: {result['old_price']:.2f} → {result['new_price']:.2f} (kit {sku})")
|
|
elif result is None:
|
|
_log(f" {comp_codmat}: LIPSESTE din politica {comp_pol} — adauga manual in ROA (kit {sku})")
|
|
continue
|
|
|
|
# Determine id_articol and policy
|
|
id_articol = None
|
|
cantitate_roa = 1
|
|
|
|
if sku in mapped_data and len(mapped_data[sku]) == 1 and (mapped_data[sku][0].get("cantitate_roa") or 1) <= 1:
|
|
comp = mapped_data[sku][0]
|
|
id_articol = comp["id_articol"]
|
|
cantitate_roa = comp.get("cantitate_roa") or 1
|
|
elif sku in direct_id_map:
|
|
id_articol = direct_id_map[sku]["id_articol"]
|
|
else:
|
|
continue # SKU not in ROA
|
|
|
|
matched += 1
|
|
price_per_unit = price_cu_tva / cantitate_roa if cantitate_roa != 1 else price_cu_tva
|
|
|
|
# Determine policy
|
|
cont = None
|
|
if sku in mapped_data and len(mapped_data[sku]) == 1 and (mapped_data[sku][0].get("cantitate_roa") or 1) <= 1:
|
|
cont = mapped_data[sku][0].get("cont")
|
|
elif sku in direct_id_map:
|
|
cont = direct_id_map[sku].get("cont")
|
|
|
|
cont_str = str(cont or "").strip()
|
|
pol = id_pol_productie if (cont_str in ("341", "345") and id_pol_productie) else id_pol
|
|
|
|
result = await asyncio.to_thread(
|
|
validation_service.compare_and_update_price,
|
|
id_articol, pol, price_per_unit, conn
|
|
)
|
|
if result and result["updated"]:
|
|
updated += 1
|
|
_log(f" {result['codmat']}: {result['old_price']:.2f} → {result['new_price']:.2f}")
|
|
|
|
except Exception as e:
|
|
errors += 1
|
|
_log(f"Eroare produs {sku}: {e}")
|
|
|
|
_log(f"Sync complet: {len(products)} produse, {matched} potrivite, {updated} actualizate, {errors} erori")
|
|
|
|
finally:
|
|
await asyncio.to_thread(database.pool.release, conn)
|
|
|
|
await _finish_run(run_id, "completed", log_lines,
|
|
products_total=len(products), matched=matched,
|
|
updated=updated, errors=errors)
|
|
|
|
except Exception as e:
|
|
_log(f"Eroare critică: {e}")
|
|
logger.error(f"Catalog price sync error: {e}", exc_info=True)
|
|
await _finish_run(run_id, "error", log_lines, error=str(e))
|
|
|
|
|
|
async def _finish_run(run_id, status, log_lines, products_total=0,
|
|
matched=0, updated=0, errors=0, error=None):
|
|
global _current_price_sync
|
|
db = await sqlite_service.get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE price_sync_runs SET
|
|
finished_at = ?, status = ?, products_total = ?,
|
|
matched = ?, updated = ?, errors = ?,
|
|
log_text = ?
|
|
WHERE run_id = ?
|
|
""", (_now().strftime("%d.%m.%Y %H:%M:%S"), status, products_total, matched, updated, errors,
|
|
"\n".join(log_lines), run_id))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
_current_price_sync = None
|