Files
gomag-vending/api/app/services/sync_service.py
Claude Agent b221b257a3 fix: price sync kit components + vat_included type bug
- Fix vat_included comparison: GoMag API returns int 1, not str "1",
  causing all prices to be multiplied by TVA again (double TVA)
- Normalize vat_included to string in gomag_client at parse time
- Price sync now processes kit components individually by looking up
  each component's CODMAT as standalone GoMag product
- Add _insert_component_price for components without existing Oracle price
- resolve_mapped_codmats: ROW_NUMBER dedup for CODMATs with multiple
  NOM_ARTICOLE entries, prefer article with current stock
- pack_import_comenzi: merge_or_insert_articol to merge quantities when
  same article appears from kit + individual on same order

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 15:07:53 +00:00

865 lines
45 KiB
Python

import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
_tz_bucharest = ZoneInfo("Europe/Bucharest")
def _now():
"""Return current time in Bucharest timezone (naive, for display/storage)."""
return datetime.now(_tz_bucharest).replace(tzinfo=None)
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client
from ..config import settings
from .. import database
logger = logging.getLogger(__name__)
# Sync state
_sync_lock = asyncio.Lock()
_current_sync = None # dict with run_id, status, progress info
# In-memory text log buffer per run
_run_logs: dict[str, list[str]] = {}
def _log_line(run_id: str, message: str):
"""Append a timestamped line to the in-memory log buffer."""
if run_id not in _run_logs:
_run_logs[run_id] = []
ts = _now().strftime("%H:%M:%S")
_run_logs[run_id].append(f"[{ts}] {message}")
def get_run_text_log(run_id: str) -> str | None:
"""Return the accumulated text log for a run, or None if not found."""
lines = _run_logs.get(run_id)
if lines is None:
return None
return "\n".join(lines)
def _update_progress(phase: str, phase_text: str, current: int = 0, total: int = 0,
counts: dict = None):
"""Update _current_sync with progress details for polling."""
global _current_sync
if _current_sync is None:
return
_current_sync["phase"] = phase
_current_sync["phase_text"] = phase_text
_current_sync["progress_current"] = current
_current_sync["progress_total"] = total
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0}
async def get_sync_status():
"""Get current sync status."""
if _current_sync:
return {**_current_sync}
return {"status": "idle"}
async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
"""Prepare a sync run - creates run_id and sets initial state.
Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running.
"""
global _current_sync
if _sync_lock.locked():
return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None}
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
_current_sync = {
"run_id": run_id,
"status": "running",
"started_at": _now().isoformat(),
"finished_at": None,
"phase": "starting",
"phase_text": "Starting...",
"progress_current": 0,
"progress_total": 0,
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0},
}
return {"run_id": run_id, "status": "starting"}
def _derive_customer_info(order):
"""Extract shipping/billing names and customer from an order.
customer = who appears on the invoice (partner in ROA):
- company name if billing is on a company
- shipping person name otherwise (consistent with import_service partner logic)
"""
shipping_name = ""
if order.shipping:
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
if not shipping_name:
shipping_name = billing_name
if order.billing.is_company and order.billing.company_name:
customer = order.billing.company_name
else:
customer = shipping_name or billing_name
payment_method = getattr(order, 'payment_name', None) or None
delivery_method = getattr(order, 'delivery_name', None) or None
return shipping_name, billing_name, customer, payment_method, delivery_method
async def _fix_stale_error_orders(existing_map: dict, run_id: str):
"""Fix orders stuck in ERROR status that are actually in Oracle.
This can happen when a previous import committed partially (no rollback on error).
If the order exists in Oracle COMENZI, update SQLite status to ALREADY_IMPORTED.
"""
from ..database import get_sqlite
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT order_number FROM orders WHERE status = 'ERROR'"
)
error_orders = [row["order_number"] for row in await cursor.fetchall()]
fixed = 0
for order_number in error_orders:
if order_number in existing_map:
id_comanda = existing_map[order_number]
await db.execute("""
UPDATE orders SET
status = 'ALREADY_IMPORTED',
id_comanda = ?,
error_message = NULL,
updated_at = datetime('now')
WHERE order_number = ? AND status = 'ERROR'
""", (id_comanda, order_number))
fixed += 1
_log_line(run_id, f"#{order_number} → status corectat ERROR → ALREADY_IMPORTED (ID: {id_comanda})")
if fixed:
await db.commit()
logger.info(f"Fixed {fixed} stale ERROR orders that exist in Oracle")
finally:
await db.close()
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
"""Run a full sync cycle. Returns summary dict."""
global _current_sync
if _sync_lock.locked():
return {"error": "Sync already running"}
async with _sync_lock:
# Use provided run_id or generate one
if not run_id:
run_id = _now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
_current_sync = {
"run_id": run_id,
"status": "running",
"started_at": _now().isoformat(),
"finished_at": None,
"phase": "reading",
"phase_text": "Reading JSON files...",
"progress_current": 0,
"progress_total": 0,
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0, "cancelled": 0},
}
_update_progress("reading", "Reading JSON files...")
started_dt = _now()
_run_logs[run_id] = [
f"=== Sync Run {run_id} ===",
f"Inceput: {started_dt.strftime('%d.%m.%Y %H:%M:%S')}",
""
]
json_dir = settings.JSON_OUTPUT_DIR
try:
# Phase 0: Download orders from GoMag API
_update_progress("downloading", "Descărcare comenzi din GoMag API...")
_log_line(run_id, "Descărcare comenzi din GoMag API...")
# Read GoMag settings from SQLite (override config defaults)
dl_settings = await sqlite_service.get_app_settings()
gomag_key = dl_settings.get("gomag_api_key") or None
gomag_shop = dl_settings.get("gomag_api_shop") or None
gomag_days_str = dl_settings.get("gomag_order_days_back")
gomag_days = int(gomag_days_str) if gomag_days_str else None
gomag_limit_str = dl_settings.get("gomag_limit")
gomag_limit = int(gomag_limit_str) if gomag_limit_str else None
dl_result = await gomag_client.download_orders(
json_dir, log_fn=lambda msg: _log_line(run_id, msg),
api_key=gomag_key, api_shop=gomag_shop,
days_back=gomag_days, limit=gomag_limit,
)
if dl_result["files"]:
_log_line(run_id, f"GoMag: {dl_result['total']} comenzi în {dl_result['pages']} pagini → {len(dl_result['files'])} fișiere")
_update_progress("reading", "Citire fisiere JSON...")
_log_line(run_id, "Citire fisiere JSON...")
# Step 1: Read orders and sort chronologically (oldest first - R3)
orders, json_count = order_reader.read_json_orders()
orders.sort(key=lambda o: o.date or '')
await sqlite_service.create_sync_run(run_id, json_count)
_update_progress("reading", f"Found {len(orders)} orders in {json_count} files", 0, len(orders))
_log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere")
# Populate web_products catalog from all orders (R4)
web_product_items = [
(item.sku, item.name)
for order in orders
for item in order.items
if item.sku and item.name
]
await sqlite_service.upsert_web_products_batch(web_product_items)
if not orders:
_log_line(run_id, "Nicio comanda gasita.")
await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0)
_update_progress("completed", "No orders found")
summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count}
return summary
# ── Separate cancelled orders (GoMag status "Anulata" / statusId "7") ──
cancelled_orders = [o for o in orders if o.status_id == "7" or (o.status and o.status.lower() == "anulata")]
active_orders = [o for o in orders if o not in cancelled_orders]
cancelled_count = len(cancelled_orders)
if cancelled_orders:
_log_line(run_id, f"Comenzi anulate in GoMag: {cancelled_count}")
# Record cancelled orders in SQLite
cancelled_batch = []
for order in cancelled_orders:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
order_items_data = [
{"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price, "vat": item.vat,
"mapping_status": "unknown", "codmat": None,
"id_articol": None, "cantitate_roa": None}
for item in order.items
]
cancelled_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "CANCELLED", "status_at_run": "CANCELLED",
"id_comanda": None, "id_partener": None,
"error_message": "Comanda anulata in GoMag",
"missing_skus": None,
"items_count": len(order.items),
"shipping_name": shipping_name, "billing_name": billing_name,
"payment_method": payment_method, "delivery_method": delivery_method,
"order_total": order.total or None,
"delivery_cost": order.delivery_cost or None,
"discount_total": order.discount_total or None,
"web_status": order.status or "Anulata",
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → ANULAT in GoMag")
await sqlite_service.save_orders_batch(cancelled_batch)
# Check if any cancelled orders were previously imported
from ..database import get_sqlite as _get_sqlite
db_check = await _get_sqlite()
try:
cancelled_numbers = [o.number for o in cancelled_orders]
placeholders = ",".join("?" for _ in cancelled_numbers)
cursor = await db_check.execute(f"""
SELECT order_number, id_comanda FROM orders
WHERE order_number IN ({placeholders})
AND id_comanda IS NOT NULL
AND status = 'CANCELLED'
""", cancelled_numbers)
previously_imported = [dict(r) for r in await cursor.fetchall()]
finally:
await db_check.close()
if previously_imported:
_log_line(run_id, f"Verificare {len(previously_imported)} comenzi anulate care erau importate in Oracle...")
# Check which have invoices
id_comanda_list = [o["id_comanda"] for o in previously_imported]
invoice_data = await asyncio.to_thread(
invoice_service.check_invoices_for_orders, id_comanda_list
)
for o in previously_imported:
idc = o["id_comanda"]
order_num = o["order_number"]
if idc in invoice_data:
# Invoiced — keep in Oracle, just log warning
_log_line(run_id,
f"#{order_num} → ANULAT dar FACTURAT (factura {invoice_data[idc].get('serie_act', '')}"
f"{invoice_data[idc].get('numar_act', '')}) — NU se sterge din Oracle")
# Update web_status but keep CANCELLED status (already set by batch above)
else:
# Not invoiced — soft-delete in Oracle
del_result = await asyncio.to_thread(
import_service.soft_delete_order_in_roa, idc
)
if del_result["success"]:
# Clear id_comanda via mark_order_cancelled
await sqlite_service.mark_order_cancelled(order_num, "Anulata")
_log_line(run_id,
f"#{order_num} → ANULAT + STERS din Oracle (ID: {idc}, "
f"{del_result['details_deleted']} detalii)")
else:
_log_line(run_id,
f"#{order_num} → ANULAT dar EROARE la stergere Oracle: {del_result['error']}")
orders = active_orders
if not orders:
_log_line(run_id, "Nicio comanda activa dupa filtrare anulate.")
await sqlite_service.update_sync_run(run_id, "completed", cancelled_count, 0, 0, 0)
_update_progress("completed", f"No active orders ({cancelled_count} cancelled)")
summary = {"run_id": run_id, "status": "completed",
"message": f"No active orders ({cancelled_count} cancelled)",
"json_files": json_count, "cancelled": cancelled_count}
return summary
_update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders))
# ── Single Oracle connection for entire validation phase ──
conn = await asyncio.to_thread(database.get_oracle_connection)
try:
# Step 2a: Find orders already in Oracle (date-range query)
order_dates = [o.date for o in orders if o.date]
if order_dates:
min_date_str = min(order_dates)
try:
min_date = datetime.strptime(min_date_str[:10], "%Y-%m-%d") - timedelta(days=1)
except (ValueError, TypeError):
min_date = _now() - timedelta(days=90)
else:
min_date = _now() - timedelta(days=90)
existing_map = await asyncio.to_thread(
validation_service.check_orders_in_roa, min_date, conn
)
# Step 2a-fix: Fix ERROR orders that are actually in Oracle
# (can happen if previous import committed partially without rollback)
await _fix_stale_error_orders(existing_map, run_id)
# Load app settings early (needed for id_gestiune in SKU validation)
app_settings = await sqlite_service.get_app_settings()
id_pol = id_pol or int(app_settings.get("id_pol") or 0) or settings.ID_POL
id_sectie = id_sectie or int(app_settings.get("id_sectie") or 0) or settings.ID_SECTIE
# Parse multi-gestiune CSV: "1,3" → [1, 3], "" → None
id_gestiune_raw = (app_settings.get("id_gestiune") or "").strip()
if id_gestiune_raw and id_gestiune_raw != "0":
id_gestiuni = [int(g) for g in id_gestiune_raw.split(",") if g.strip()]
else:
id_gestiuni = None # None = orice gestiune
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}")
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}, ID_GESTIUNI={id_gestiuni}")
# Step 2b: Validate SKUs (reuse same connection)
all_skus = order_reader.get_all_skus(orders)
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus, conn, id_gestiuni)
importable, skipped = validation_service.classify_orders(orders, validation)
# ── Split importable into truly_importable vs already_in_roa ──
truly_importable = []
already_in_roa = []
for order in importable:
if order.number in existing_map:
already_in_roa.append(order)
else:
truly_importable.append(order)
_update_progress("validation",
f"{len(truly_importable)} new, {len(already_in_roa)} already imported, {len(skipped)} skipped",
0, len(truly_importable))
_log_line(run_id, f"Validare: {len(truly_importable)} noi, {len(already_in_roa)} deja importate, {len(skipped)} nemapate")
# Step 2c: Build SKU context from skipped orders
sku_context = {}
for order, missing_skus_list in skipped:
if order.billing.is_company and order.billing.company_name:
customer = order.billing.company_name
else:
ship_name = ""
if order.shipping:
ship_name = f"{order.shipping.firstname} {order.shipping.lastname}".strip()
customer = ship_name or f"{order.billing.firstname} {order.billing.lastname}"
for sku in missing_skus_list:
if sku not in sku_context:
sku_context[sku] = {"orders": [], "customers": []}
if order.number not in sku_context[sku]["orders"]:
sku_context[sku]["orders"].append(order.number)
if customer not in sku_context[sku]["customers"]:
sku_context[sku]["customers"].append(customer)
# Track missing SKUs with context
for sku in validation["missing"]:
product_name = ""
for order in orders:
for item in order.items:
if item.sku == sku:
product_name = item.name
break
if product_name:
break
ctx = sku_context.get(sku, {})
await sqlite_service.track_missing_sku(
sku, product_name,
order_count=len(ctx.get("orders", [])),
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
)
# Step 2d: Pre-validate prices for importable articles
if id_pol and (truly_importable or already_in_roa):
_update_progress("validation", "Validating prices...", 0, len(truly_importable))
_log_line(run_id, "Validare preturi...")
all_codmats = set()
for order in (truly_importable + already_in_roa):
for item in order.items:
if item.sku in validation["mapped"]:
pass
elif item.sku in validation["direct"]:
all_codmats.add(item.sku)
# Get standard VAT rate from settings for PROC_TVAV metadata
cota_tva = float(app_settings.get("discount_vat") or 21)
# Dual pricing policy support
id_pol_productie = int(app_settings.get("id_pol_productie") or 0) or None
codmat_policy_map = {}
if all_codmats:
if id_pol_productie:
# Dual-policy: classify articles by cont (sales vs production)
codmat_policy_map = await asyncio.to_thread(
validation_service.validate_and_ensure_prices_dual,
all_codmats, id_pol, id_pol_productie,
conn, validation.get("direct_id_map"),
cota_tva=cota_tva
)
_log_line(run_id,
f"Politici duale: {sum(1 for v in codmat_policy_map.values() if v == id_pol)} vanzare, "
f"{sum(1 for v in codmat_policy_map.values() if v == id_pol_productie)} productie")
else:
# Single-policy (backward compatible)
price_result = await asyncio.to_thread(
validation_service.validate_prices, all_codmats, id_pol,
conn, validation.get("direct_id_map")
)
if price_result["missing_price"]:
logger.info(
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
f"direct articles in policy {id_pol}"
)
await asyncio.to_thread(
validation_service.ensure_prices,
price_result["missing_price"], id_pol,
conn, validation.get("direct_id_map"),
cota_tva=cota_tva
)
# Also validate mapped SKU prices (cherry-pick 1)
mapped_skus_in_orders = set()
for order in (truly_importable + already_in_roa):
for item in order.items:
if item.sku in validation["mapped"]:
mapped_skus_in_orders.add(item.sku)
mapped_codmat_data = {}
if mapped_skus_in_orders:
mapped_codmat_data = await asyncio.to_thread(
validation_service.resolve_mapped_codmats, mapped_skus_in_orders, conn,
id_gestiuni=id_gestiuni
)
# Build id_map for mapped codmats and validate/ensure their prices
mapped_id_map = {}
for sku, entries in mapped_codmat_data.items():
for entry in entries:
mapped_id_map[entry["codmat"]] = {
"id_articol": entry["id_articol"],
"cont": entry.get("cont")
}
mapped_codmats = set(mapped_id_map.keys())
if mapped_codmats:
if id_pol_productie:
mapped_policy_map = await asyncio.to_thread(
validation_service.validate_and_ensure_prices_dual,
mapped_codmats, id_pol, id_pol_productie,
conn, mapped_id_map, cota_tva=cota_tva
)
codmat_policy_map.update(mapped_policy_map)
else:
mp_result = await asyncio.to_thread(
validation_service.validate_prices,
mapped_codmats, id_pol, conn, mapped_id_map
)
if mp_result["missing_price"]:
await asyncio.to_thread(
validation_service.ensure_prices,
mp_result["missing_price"], id_pol,
conn, mapped_id_map, cota_tva=cota_tva
)
# Add SKU → policy entries for mapped articles (1:1 and kits)
# codmat_policy_map has CODMAT keys, but build_articles_json
# looks up by GoMag SKU — bridge the gap here
if codmat_policy_map and mapped_codmat_data:
for sku, entries in mapped_codmat_data.items():
if len(entries) == 1:
# 1:1 mapping: SKU inherits the CODMAT's policy
codmat = entries[0]["codmat"]
if codmat in codmat_policy_map:
codmat_policy_map[sku] = codmat_policy_map[codmat]
# Pass codmat_policy_map to import via app_settings
if codmat_policy_map:
app_settings["_codmat_policy_map"] = codmat_policy_map
# ── Kit component price validation ──
kit_pricing_mode = app_settings.get("kit_pricing_mode")
if kit_pricing_mode and mapped_codmat_data:
id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None
kit_missing = await asyncio.to_thread(
validation_service.validate_kit_component_prices,
mapped_codmat_data, id_pol, id_pol_prod, conn
)
if kit_missing:
kit_skus_missing = set(kit_missing.keys())
for sku, missing_codmats in kit_missing.items():
_log_line(run_id, f"Kit {sku}: prețuri lipsă pentru {', '.join(missing_codmats)}")
new_truly = []
for order in truly_importable:
order_skus = {item.sku for item in order.items}
if order_skus & kit_skus_missing:
missing_list = list(order_skus & kit_skus_missing)
skipped.append((order, missing_list))
else:
new_truly.append(order)
truly_importable = new_truly
# Mode B config validation
if kit_pricing_mode == "separate_line":
if not app_settings.get("kit_discount_codmat"):
_log_line(run_id, "EROARE: Kit mode 'separate_line' dar kit_discount_codmat nu e configurat!")
finally:
await asyncio.to_thread(database.pool.release, conn)
# Step 3a: Record already-imported orders (batch)
already_imported_count = len(already_in_roa)
already_batch = []
for order in already_in_roa:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
id_comanda_roa = existing_map.get(order.number)
order_items_data = [
{"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price, "vat": item.vat,
"mapping_status": "mapped" if item.sku in validation["mapped"] else "direct",
"codmat": None, "id_articol": None, "cantitate_roa": None}
for item in order.items
]
already_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "ALREADY_IMPORTED", "status_at_run": "ALREADY_IMPORTED",
"id_comanda": id_comanda_roa, "id_partener": None,
"error_message": None, "missing_skus": None,
"items_count": len(order.items),
"shipping_name": shipping_name, "billing_name": billing_name,
"payment_method": payment_method, "delivery_method": delivery_method,
"order_total": order.total or None,
"delivery_cost": order.delivery_cost or None,
"discount_total": order.discount_total or None,
"web_status": order.status or None,
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
await sqlite_service.save_orders_batch(already_batch)
# Step 3b: Record skipped orders + store items (batch)
skipped_count = len(skipped)
skipped_batch = []
for order, missing_skus in skipped:
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
order_items_data = [
{"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price, "vat": item.vat,
"mapping_status": "missing" if item.sku in validation["missing"] else
"mapped" if item.sku in validation["mapped"] else "direct",
"codmat": None, "id_articol": None, "cantitate_roa": None}
for item in order.items
]
skipped_batch.append({
"sync_run_id": run_id, "order_number": order.number,
"order_date": order.date, "customer_name": customer,
"status": "SKIPPED", "status_at_run": "SKIPPED",
"id_comanda": None, "id_partener": None,
"error_message": None, "missing_skus": missing_skus,
"items_count": len(order.items),
"shipping_name": shipping_name, "billing_name": billing_name,
"payment_method": payment_method, "delivery_method": delivery_method,
"order_total": order.total or None,
"delivery_cost": order.delivery_cost or None,
"discount_total": order.discount_total or None,
"web_status": order.status or None,
"items": order_items_data,
})
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
await sqlite_service.save_orders_batch(skipped_batch)
# ── Price sync from orders ──
if app_settings.get("price_sync_enabled") == "1":
try:
all_sync_orders = truly_importable + already_in_roa
direct_id_map = validation.get("direct_id_map", {})
id_pol_prod = int(app_settings.get("id_pol_productie") or 0) or None
price_updates = await asyncio.to_thread(
validation_service.sync_prices_from_order,
all_sync_orders, mapped_codmat_data,
direct_id_map, codmat_policy_map, id_pol,
id_pol_productie=id_pol_prod,
settings=app_settings
)
if price_updates:
_log_line(run_id, f"Sync prețuri: {len(price_updates)} prețuri actualizate")
for pu in price_updates:
_log_line(run_id, f" {pu['codmat']}: {pu['old_price']:.2f}{pu['new_price']:.2f}")
except Exception as e:
_log_line(run_id, f"Eroare sync prețuri din comenzi: {e}")
logger.error(f"Price sync error: {e}")
_update_progress("skipped", f"Skipped {skipped_count}",
0, len(truly_importable),
{"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count})
# Step 4: Import only truly new orders
imported_count = 0
error_count = 0
for i, order in enumerate(truly_importable):
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
_update_progress("import",
f"Import {i+1}/{len(truly_importable)}: #{order.number} {customer}",
i + 1, len(truly_importable),
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
"already_imported": already_imported_count})
result = await asyncio.to_thread(
import_service.import_single_order,
order, id_pol=id_pol, id_sectie=id_sectie,
app_settings=app_settings, id_gestiuni=id_gestiuni
)
# Build order items data for storage (R9)
order_items_data = []
for item in order.items:
ms = "mapped" if item.sku in validation["mapped"] else "direct"
order_items_data.append({
"sku": item.sku, "product_name": item.name,
"quantity": item.quantity, "price": item.price, "vat": item.vat,
"mapping_status": ms, "codmat": None, "id_articol": None,
"cantitate_roa": None
})
# Compute discount split for SQLite storage
ds = import_service.compute_discount_split(order, app_settings)
discount_split_json = json.dumps(ds) if ds else None
if result["success"]:
imported_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="IMPORTED",
id_comanda=result["id_comanda"],
id_partener=result["id_partener"],
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
order_total=order.total or None,
delivery_cost=order.delivery_cost or None,
discount_total=order.discount_total or None,
web_status=order.status or None,
discount_split=discount_split_json,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "IMPORTED")
# Store ROA address IDs (R9)
await sqlite_service.update_import_order_addresses(
order.number,
id_adresa_facturare=result.get("id_adresa_facturare"),
id_adresa_livrare=result.get("id_adresa_livrare")
)
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})")
else:
error_count += 1
await sqlite_service.upsert_order(
sync_run_id=run_id,
order_number=order.number,
order_date=order.date,
customer_name=customer,
status="ERROR",
id_partener=result.get("id_partener"),
error_message=result["error"],
items_count=len(order.items),
shipping_name=shipping_name,
billing_name=billing_name,
payment_method=payment_method,
delivery_method=delivery_method,
order_total=order.total or None,
delivery_cost=order.delivery_cost or None,
discount_total=order.discount_total or None,
web_status=order.status or None,
discount_split=discount_split_json,
)
await sqlite_service.add_sync_run_order(run_id, order.number, "ERROR")
await sqlite_service.add_order_items(order.number, order_items_data)
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}")
# Safety: stop if too many errors
if error_count > 10:
logger.warning("Too many errors, stopping sync")
break
# Step 4b: Invoice & order status check — sync with Oracle
_update_progress("invoices", "Checking invoices & order status...", 0, 0)
invoices_updated = 0
invoices_cleared = 0
orders_deleted = 0
try:
# 4b-1: Uninvoiced → check for new invoices
uninvoiced = await sqlite_service.get_uninvoiced_imported_orders()
if uninvoiced:
id_comanda_list = [o["id_comanda"] for o in uninvoiced]
invoice_data = await asyncio.to_thread(
invoice_service.check_invoices_for_orders, id_comanda_list
)
id_to_order = {o["id_comanda"]: o["order_number"] for o in uninvoiced}
for idc, inv in invoice_data.items():
order_num = id_to_order.get(idc)
if order_num and inv.get("facturat"):
await sqlite_service.update_order_invoice(
order_num,
serie=inv.get("serie_act"),
numar=str(inv.get("numar_act", "")),
total_fara_tva=inv.get("total_fara_tva"),
total_tva=inv.get("total_tva"),
total_cu_tva=inv.get("total_cu_tva"),
data_act=inv.get("data_act"),
)
invoices_updated += 1
# 4b-2: Invoiced → check for deleted invoices
invoiced = await sqlite_service.get_invoiced_imported_orders()
if invoiced:
id_comanda_list = [o["id_comanda"] for o in invoiced]
invoice_data = await asyncio.to_thread(
invoice_service.check_invoices_for_orders, id_comanda_list
)
for o in invoiced:
if o["id_comanda"] not in invoice_data:
await sqlite_service.clear_order_invoice(o["order_number"])
invoices_cleared += 1
# 4b-3: All imported → check for deleted orders in ROA
all_imported = await sqlite_service.get_all_imported_orders()
if all_imported:
id_comanda_list = [o["id_comanda"] for o in all_imported]
existing_ids = await asyncio.to_thread(
invoice_service.check_orders_exist, id_comanda_list
)
for o in all_imported:
if o["id_comanda"] not in existing_ids:
await sqlite_service.mark_order_deleted_in_roa(o["order_number"])
orders_deleted += 1
if invoices_updated:
_log_line(run_id, f"Facturi noi: {invoices_updated} comenzi facturate")
if invoices_cleared:
_log_line(run_id, f"Facturi sterse: {invoices_cleared} facturi eliminate din cache")
if orders_deleted:
_log_line(run_id, f"Comenzi sterse din ROA: {orders_deleted}")
except Exception as e:
logger.warning(f"Invoice/order status check failed: {e}")
# Step 5: Update sync run
total_imported = imported_count + already_imported_count # backward-compat
status = "completed" if error_count <= 10 else "failed"
await sqlite_service.update_sync_run(
run_id, status, len(orders), total_imported, len(skipped), error_count,
already_imported=already_imported_count, new_imported=imported_count
)
summary = {
"run_id": run_id,
"status": status,
"json_files": json_count,
"total_orders": len(orders) + cancelled_count,
"new_orders": len(truly_importable),
"imported": total_imported,
"new_imported": imported_count,
"already_imported": already_imported_count,
"skipped": len(skipped),
"errors": error_count,
"cancelled": cancelled_count,
"missing_skus": len(validation["missing"]),
"invoices_updated": invoices_updated,
"invoices_cleared": invoices_cleared,
"orders_deleted_in_roa": orders_deleted,
}
_update_progress("completed",
f"Completed: {imported_count} new, {already_imported_count} already, {len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled",
len(truly_importable), len(truly_importable),
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
"already_imported": already_imported_count, "cancelled": cancelled_count})
if _current_sync:
_current_sync["status"] = status
_current_sync["finished_at"] = _now().isoformat()
logger.info(
f"Sync {run_id} completed: {imported_count} new, {already_imported_count} already imported, "
f"{len(skipped)} skipped, {error_count} errors, {cancelled_count} cancelled"
)
duration = (_now() - started_dt).total_seconds()
_log_line(run_id, "")
cancelled_text = f", {cancelled_count} anulate" if cancelled_count else ""
_run_logs[run_id].append(
f"Finalizat: {imported_count} importate, {already_imported_count} deja importate, "
f"{len(skipped)} nemapate, {error_count} erori{cancelled_text} din {len(orders) + cancelled_count} comenzi | Durata: {int(duration)}s"
)
return summary
except Exception as e:
logger.error(f"Sync {run_id} failed: {e}")
_log_line(run_id, f"EROARE FATALA: {e}")
await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e))
if _current_sync:
_current_sync["status"] = "failed"
_current_sync["finished_at"] = _now().isoformat()
_current_sync["error"] = str(e)
return {"run_id": run_id, "status": "failed", "error": str(e)}
finally:
# Keep _current_sync for 10 seconds so status endpoint can show final result
async def _clear_current_sync():
await asyncio.sleep(10)
global _current_sync
_current_sync = None
asyncio.ensure_future(_clear_current_sync())
async def _clear_run_logs():
await asyncio.sleep(300) # 5 minutes
_run_logs.pop(run_id, None)
asyncio.ensure_future(_clear_run_logs())
def stop_sync():
"""Signal sync to stop. Currently sync runs to completion."""
pass