- New gomag_client.py service: async httpx client that downloads orders from GoMag API with full pagination and 1s rate-limit sleep - config.py: add GOMAG_API_KEY, GOMAG_API_SHOP, GOMAG_ORDER_DAYS_BACK, GOMAG_LIMIT, GOMAG_API_URL settings - sync_service.py: Phase 0 downloads fresh orders before reading JSONs; graceful skip if API keys not configured - start.sh: auto-detect INSTANTCLIENTPATH from .env, fallback to thin mode - .env.example: document GoMag API variables Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
504 lines
24 KiB
Python
504 lines
24 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service, gomag_client
|
|
from ..config import settings
|
|
from .. import database
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Sync state
|
|
_sync_lock = asyncio.Lock()
|
|
_current_sync = None # dict with run_id, status, progress info
|
|
|
|
# In-memory text log buffer per run
|
|
_run_logs: dict[str, list[str]] = {}
|
|
|
|
|
|
def _log_line(run_id: str, message: str):
|
|
"""Append a timestamped line to the in-memory log buffer."""
|
|
if run_id not in _run_logs:
|
|
_run_logs[run_id] = []
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
_run_logs[run_id].append(f"[{ts}] {message}")
|
|
|
|
|
|
def get_run_text_log(run_id: str) -> str | None:
|
|
"""Return the accumulated text log for a run, or None if not found."""
|
|
lines = _run_logs.get(run_id)
|
|
if lines is None:
|
|
return None
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _update_progress(phase: str, phase_text: str, current: int = 0, total: int = 0,
|
|
counts: dict = None):
|
|
"""Update _current_sync with progress details for polling."""
|
|
global _current_sync
|
|
if _current_sync is None:
|
|
return
|
|
_current_sync["phase"] = phase
|
|
_current_sync["phase_text"] = phase_text
|
|
_current_sync["progress_current"] = current
|
|
_current_sync["progress_total"] = total
|
|
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0}
|
|
|
|
|
|
async def get_sync_status():
|
|
"""Get current sync status."""
|
|
if _current_sync:
|
|
return {**_current_sync}
|
|
return {"status": "idle"}
|
|
|
|
|
|
async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
|
|
"""Prepare a sync run - creates run_id and sets initial state.
|
|
Returns {"run_id": ..., "status": "starting"} or {"error": ...} if already running.
|
|
"""
|
|
global _current_sync
|
|
if _sync_lock.locked():
|
|
return {"error": "Sync already running", "run_id": _current_sync.get("run_id") if _current_sync else None}
|
|
|
|
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
|
|
_current_sync = {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"started_at": datetime.now().isoformat(),
|
|
"finished_at": None,
|
|
"phase": "starting",
|
|
"phase_text": "Starting...",
|
|
"progress_current": 0,
|
|
"progress_total": 0,
|
|
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0},
|
|
}
|
|
return {"run_id": run_id, "status": "starting"}
|
|
|
|
|
|
def _derive_customer_info(order):
|
|
"""Extract shipping/billing names and customer from an order."""
|
|
shipping_name = ""
|
|
if order.shipping:
|
|
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
|
|
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
|
|
if not shipping_name:
|
|
shipping_name = billing_name
|
|
customer = shipping_name or order.billing.company_name or billing_name
|
|
payment_method = getattr(order, 'payment_name', None) or None
|
|
delivery_method = getattr(order, 'delivery_name', None) or None
|
|
return shipping_name, billing_name, customer, payment_method, delivery_method
|
|
|
|
|
|
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
|
|
"""Run a full sync cycle. Returns summary dict."""
|
|
global _current_sync
|
|
|
|
if _sync_lock.locked():
|
|
return {"error": "Sync already running"}
|
|
|
|
async with _sync_lock:
|
|
# Use provided run_id or generate one
|
|
if not run_id:
|
|
run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
|
|
_current_sync = {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"started_at": datetime.now().isoformat(),
|
|
"finished_at": None,
|
|
"phase": "reading",
|
|
"phase_text": "Reading JSON files...",
|
|
"progress_current": 0,
|
|
"progress_total": 0,
|
|
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0},
|
|
}
|
|
|
|
_update_progress("reading", "Reading JSON files...")
|
|
|
|
started_dt = datetime.now()
|
|
_run_logs[run_id] = [
|
|
f"=== Sync Run {run_id} ===",
|
|
f"Inceput: {started_dt.strftime('%d.%m.%Y %H:%M:%S')}",
|
|
""
|
|
]
|
|
|
|
json_dir = settings.JSON_OUTPUT_DIR
|
|
|
|
try:
|
|
# Phase 0: Download orders from GoMag API
|
|
_update_progress("downloading", "Descărcare comenzi din GoMag API...")
|
|
_log_line(run_id, "Descărcare comenzi din GoMag API...")
|
|
dl_result = await gomag_client.download_orders(
|
|
json_dir, log_fn=lambda msg: _log_line(run_id, msg)
|
|
)
|
|
if dl_result["files"]:
|
|
_log_line(run_id, f"GoMag: {dl_result['total']} comenzi în {dl_result['pages']} pagini → {len(dl_result['files'])} fișiere")
|
|
|
|
_update_progress("reading", "Citire fisiere JSON...")
|
|
_log_line(run_id, "Citire fisiere JSON...")
|
|
|
|
# Step 1: Read orders and sort chronologically (oldest first - R3)
|
|
orders, json_count = order_reader.read_json_orders()
|
|
orders.sort(key=lambda o: o.date or '')
|
|
await sqlite_service.create_sync_run(run_id, json_count)
|
|
_update_progress("reading", f"Found {len(orders)} orders in {json_count} files", 0, len(orders))
|
|
_log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere")
|
|
|
|
# Populate web_products catalog from all orders (R4)
|
|
web_product_items = [
|
|
(item.sku, item.name)
|
|
for order in orders
|
|
for item in order.items
|
|
if item.sku and item.name
|
|
]
|
|
await sqlite_service.upsert_web_products_batch(web_product_items)
|
|
|
|
if not orders:
|
|
_log_line(run_id, "Nicio comanda gasita.")
|
|
await sqlite_service.update_sync_run(run_id, "completed", 0, 0, 0, 0)
|
|
_update_progress("completed", "No orders found")
|
|
summary = {"run_id": run_id, "status": "completed", "message": "No orders found", "json_files": json_count}
|
|
return summary
|
|
|
|
_update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders))
|
|
|
|
# ── Single Oracle connection for entire validation phase ──
|
|
conn = await asyncio.to_thread(database.get_oracle_connection)
|
|
try:
|
|
# Step 2a: Find orders already in Oracle (date-range query)
|
|
order_dates = [o.date for o in orders if o.date]
|
|
if order_dates:
|
|
min_date_str = min(order_dates)
|
|
try:
|
|
min_date = datetime.strptime(min_date_str[:10], "%Y-%m-%d") - timedelta(days=1)
|
|
except (ValueError, TypeError):
|
|
min_date = datetime.now() - timedelta(days=90)
|
|
else:
|
|
min_date = datetime.now() - timedelta(days=90)
|
|
|
|
existing_map = await asyncio.to_thread(
|
|
validation_service.check_orders_in_roa, min_date, conn
|
|
)
|
|
|
|
# Step 2b: Validate SKUs (reuse same connection)
|
|
all_skus = order_reader.get_all_skus(orders)
|
|
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus, conn)
|
|
importable, skipped = validation_service.classify_orders(orders, validation)
|
|
|
|
# ── Split importable into truly_importable vs already_in_roa ──
|
|
truly_importable = []
|
|
already_in_roa = []
|
|
for order in importable:
|
|
if order.number in existing_map:
|
|
already_in_roa.append(order)
|
|
else:
|
|
truly_importable.append(order)
|
|
|
|
_update_progress("validation",
|
|
f"{len(truly_importable)} new, {len(already_in_roa)} already imported, {len(skipped)} skipped",
|
|
0, len(truly_importable))
|
|
_log_line(run_id, f"Validare: {len(truly_importable)} noi, {len(already_in_roa)} deja importate, {len(skipped)} nemapate")
|
|
|
|
# Step 2c: Build SKU context from skipped orders
|
|
sku_context = {}
|
|
for order, missing_skus_list in skipped:
|
|
customer = order.billing.company_name or \
|
|
f"{order.billing.firstname} {order.billing.lastname}"
|
|
for sku in missing_skus_list:
|
|
if sku not in sku_context:
|
|
sku_context[sku] = {"orders": [], "customers": []}
|
|
if order.number not in sku_context[sku]["orders"]:
|
|
sku_context[sku]["orders"].append(order.number)
|
|
if customer not in sku_context[sku]["customers"]:
|
|
sku_context[sku]["customers"].append(customer)
|
|
|
|
# Track missing SKUs with context
|
|
for sku in validation["missing"]:
|
|
product_name = ""
|
|
for order in orders:
|
|
for item in order.items:
|
|
if item.sku == sku:
|
|
product_name = item.name
|
|
break
|
|
if product_name:
|
|
break
|
|
ctx = sku_context.get(sku, {})
|
|
await sqlite_service.track_missing_sku(
|
|
sku, product_name,
|
|
order_count=len(ctx.get("orders", [])),
|
|
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
|
|
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
|
|
)
|
|
|
|
# Step 2d: Pre-validate prices for importable articles
|
|
id_pol = id_pol or settings.ID_POL
|
|
id_sectie = id_sectie or settings.ID_SECTIE
|
|
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
|
|
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
|
|
if id_pol and (truly_importable or already_in_roa):
|
|
_update_progress("validation", "Validating prices...", 0, len(truly_importable))
|
|
_log_line(run_id, "Validare preturi...")
|
|
all_codmats = set()
|
|
for order in (truly_importable + already_in_roa):
|
|
for item in order.items:
|
|
if item.sku in validation["mapped"]:
|
|
pass
|
|
elif item.sku in validation["direct"]:
|
|
all_codmats.add(item.sku)
|
|
if all_codmats:
|
|
price_result = await asyncio.to_thread(
|
|
validation_service.validate_prices, all_codmats, id_pol,
|
|
conn, validation.get("direct_id_map")
|
|
)
|
|
if price_result["missing_price"]:
|
|
logger.info(
|
|
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
|
|
f"direct articles in policy {id_pol}"
|
|
)
|
|
await asyncio.to_thread(
|
|
validation_service.ensure_prices,
|
|
price_result["missing_price"], id_pol,
|
|
conn, validation.get("direct_id_map")
|
|
)
|
|
finally:
|
|
await asyncio.to_thread(database.pool.release, conn)
|
|
|
|
# Step 3a: Record already-imported orders (batch)
|
|
already_imported_count = len(already_in_roa)
|
|
already_batch = []
|
|
for order in already_in_roa:
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
id_comanda_roa = existing_map.get(order.number)
|
|
order_items_data = [
|
|
{"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price, "vat": item.vat,
|
|
"mapping_status": "mapped" if item.sku in validation["mapped"] else "direct",
|
|
"codmat": None, "id_articol": None, "cantitate_roa": None}
|
|
for item in order.items
|
|
]
|
|
already_batch.append({
|
|
"sync_run_id": run_id, "order_number": order.number,
|
|
"order_date": order.date, "customer_name": customer,
|
|
"status": "ALREADY_IMPORTED", "status_at_run": "ALREADY_IMPORTED",
|
|
"id_comanda": id_comanda_roa, "id_partener": None,
|
|
"error_message": None, "missing_skus": None,
|
|
"items_count": len(order.items),
|
|
"shipping_name": shipping_name, "billing_name": billing_name,
|
|
"payment_method": payment_method, "delivery_method": delivery_method,
|
|
"items": order_items_data,
|
|
})
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
|
|
await sqlite_service.save_orders_batch(already_batch)
|
|
|
|
# Step 3b: Record skipped orders + store items (batch)
|
|
skipped_count = len(skipped)
|
|
skipped_batch = []
|
|
for order, missing_skus in skipped:
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
order_items_data = [
|
|
{"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price, "vat": item.vat,
|
|
"mapping_status": "missing" if item.sku in validation["missing"] else
|
|
"mapped" if item.sku in validation["mapped"] else "direct",
|
|
"codmat": None, "id_articol": None, "cantitate_roa": None}
|
|
for item in order.items
|
|
]
|
|
skipped_batch.append({
|
|
"sync_run_id": run_id, "order_number": order.number,
|
|
"order_date": order.date, "customer_name": customer,
|
|
"status": "SKIPPED", "status_at_run": "SKIPPED",
|
|
"id_comanda": None, "id_partener": None,
|
|
"error_message": None, "missing_skus": missing_skus,
|
|
"items_count": len(order.items),
|
|
"shipping_name": shipping_name, "billing_name": billing_name,
|
|
"payment_method": payment_method, "delivery_method": delivery_method,
|
|
"items": order_items_data,
|
|
})
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
|
|
await sqlite_service.save_orders_batch(skipped_batch)
|
|
_update_progress("skipped", f"Skipped {skipped_count}",
|
|
0, len(truly_importable),
|
|
{"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count})
|
|
|
|
# Step 4: Import only truly new orders
|
|
imported_count = 0
|
|
error_count = 0
|
|
|
|
for i, order in enumerate(truly_importable):
|
|
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
|
|
|
_update_progress("import",
|
|
f"Import {i+1}/{len(truly_importable)}: #{order.number} {customer}",
|
|
i + 1, len(truly_importable),
|
|
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
|
|
"already_imported": already_imported_count})
|
|
|
|
result = await asyncio.to_thread(
|
|
import_service.import_single_order,
|
|
order, id_pol=id_pol, id_sectie=id_sectie
|
|
)
|
|
|
|
# Build order items data for storage (R9)
|
|
order_items_data = []
|
|
for item in order.items:
|
|
ms = "mapped" if item.sku in validation["mapped"] else "direct"
|
|
order_items_data.append({
|
|
"sku": item.sku, "product_name": item.name,
|
|
"quantity": item.quantity, "price": item.price, "vat": item.vat,
|
|
"mapping_status": ms, "codmat": None, "id_articol": None,
|
|
"cantitate_roa": None
|
|
})
|
|
|
|
if result["success"]:
|
|
imported_count += 1
|
|
await sqlite_service.upsert_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status="IMPORTED",
|
|
id_comanda=result["id_comanda"],
|
|
id_partener=result["id_partener"],
|
|
items_count=len(order.items),
|
|
shipping_name=shipping_name,
|
|
billing_name=billing_name,
|
|
payment_method=payment_method,
|
|
delivery_method=delivery_method,
|
|
)
|
|
await sqlite_service.add_sync_run_order(run_id, order.number, "IMPORTED")
|
|
# Store ROA address IDs (R9)
|
|
await sqlite_service.update_import_order_addresses(
|
|
order.number,
|
|
id_adresa_facturare=result.get("id_adresa_facturare"),
|
|
id_adresa_livrare=result.get("id_adresa_livrare")
|
|
)
|
|
await sqlite_service.add_order_items(order.number, order_items_data)
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → IMPORTAT (ID: {result['id_comanda']})")
|
|
else:
|
|
error_count += 1
|
|
await sqlite_service.upsert_order(
|
|
sync_run_id=run_id,
|
|
order_number=order.number,
|
|
order_date=order.date,
|
|
customer_name=customer,
|
|
status="ERROR",
|
|
id_partener=result.get("id_partener"),
|
|
error_message=result["error"],
|
|
items_count=len(order.items),
|
|
shipping_name=shipping_name,
|
|
billing_name=billing_name,
|
|
payment_method=payment_method,
|
|
delivery_method=delivery_method,
|
|
)
|
|
await sqlite_service.add_sync_run_order(run_id, order.number, "ERROR")
|
|
await sqlite_service.add_order_items(order.number, order_items_data)
|
|
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → EROARE: {result['error']}")
|
|
|
|
# Safety: stop if too many errors
|
|
if error_count > 10:
|
|
logger.warning("Too many errors, stopping sync")
|
|
break
|
|
|
|
# Step 4b: Invoice check — update cached invoice data
|
|
_update_progress("invoices", "Checking invoices...", 0, 0)
|
|
invoices_updated = 0
|
|
try:
|
|
uninvoiced = await sqlite_service.get_uninvoiced_imported_orders()
|
|
if uninvoiced:
|
|
id_comanda_list = [o["id_comanda"] for o in uninvoiced]
|
|
invoice_data = await asyncio.to_thread(
|
|
invoice_service.check_invoices_for_orders, id_comanda_list
|
|
)
|
|
# Build reverse map: id_comanda → order_number
|
|
id_to_order = {o["id_comanda"]: o["order_number"] for o in uninvoiced}
|
|
for idc, inv in invoice_data.items():
|
|
order_num = id_to_order.get(idc)
|
|
if order_num and inv.get("facturat"):
|
|
await sqlite_service.update_order_invoice(
|
|
order_num,
|
|
serie=inv.get("serie_act"),
|
|
numar=str(inv.get("numar_act", "")),
|
|
total_fara_tva=inv.get("total_fara_tva"),
|
|
total_tva=inv.get("total_tva"),
|
|
total_cu_tva=inv.get("total_cu_tva"),
|
|
)
|
|
invoices_updated += 1
|
|
if invoices_updated:
|
|
_log_line(run_id, f"Facturi actualizate: {invoices_updated} comenzi facturate")
|
|
except Exception as e:
|
|
logger.warning(f"Invoice check failed: {e}")
|
|
|
|
# Step 5: Update sync run
|
|
total_imported = imported_count + already_imported_count # backward-compat
|
|
status = "completed" if error_count <= 10 else "failed"
|
|
await sqlite_service.update_sync_run(
|
|
run_id, status, len(orders), total_imported, len(skipped), error_count,
|
|
already_imported=already_imported_count, new_imported=imported_count
|
|
)
|
|
|
|
summary = {
|
|
"run_id": run_id,
|
|
"status": status,
|
|
"json_files": json_count,
|
|
"total_orders": len(orders),
|
|
"new_orders": len(truly_importable),
|
|
"imported": total_imported,
|
|
"new_imported": imported_count,
|
|
"already_imported": already_imported_count,
|
|
"skipped": len(skipped),
|
|
"errors": error_count,
|
|
"missing_skus": len(validation["missing"]),
|
|
"invoices_updated": invoices_updated,
|
|
}
|
|
|
|
_update_progress("completed",
|
|
f"Completed: {imported_count} new, {already_imported_count} already, {len(skipped)} skipped, {error_count} errors",
|
|
len(truly_importable), len(truly_importable),
|
|
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
|
|
"already_imported": already_imported_count})
|
|
if _current_sync:
|
|
_current_sync["status"] = status
|
|
_current_sync["finished_at"] = datetime.now().isoformat()
|
|
|
|
logger.info(
|
|
f"Sync {run_id} completed: {imported_count} new, {already_imported_count} already imported, "
|
|
f"{len(skipped)} skipped, {error_count} errors"
|
|
)
|
|
|
|
duration = (datetime.now() - started_dt).total_seconds()
|
|
_log_line(run_id, "")
|
|
_run_logs[run_id].append(
|
|
f"Finalizat: {imported_count} importate, {already_imported_count} deja importate, "
|
|
f"{len(skipped)} nemapate, {error_count} erori din {len(orders)} comenzi | Durata: {int(duration)}s"
|
|
)
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Sync {run_id} failed: {e}")
|
|
_log_line(run_id, f"EROARE FATALA: {e}")
|
|
await sqlite_service.update_sync_run(run_id, "failed", 0, 0, 0, 1, error_message=str(e))
|
|
if _current_sync:
|
|
_current_sync["status"] = "failed"
|
|
_current_sync["finished_at"] = datetime.now().isoformat()
|
|
_current_sync["error"] = str(e)
|
|
return {"run_id": run_id, "status": "failed", "error": str(e)}
|
|
finally:
|
|
# Keep _current_sync for 10 seconds so status endpoint can show final result
|
|
async def _clear_current_sync():
|
|
await asyncio.sleep(10)
|
|
global _current_sync
|
|
_current_sync = None
|
|
asyncio.ensure_future(_clear_current_sync())
|
|
|
|
async def _clear_run_logs():
|
|
await asyncio.sleep(300) # 5 minutes
|
|
_run_logs.pop(run_id, None)
|
|
asyncio.ensure_future(_clear_run_logs())
|
|
|
|
|
|
def stop_sync():
|
|
"""Signal sync to stop. Currently sync runs to completion."""
|
|
pass
|