feat(sync): already_imported tracking, invoice cache, path fixes, remove vfp
- Track already_imported/new_imported counts separately in sync_runs and surface them in status API + dashboard last-run card - Cache invoice data in SQLite orders table (factura_* columns); dashboard falls back to Oracle only for uncached imported orders - Resolve JSON_OUTPUT_DIR and SQLITE_DB_PATH relative to known anchored roots in config.py, independent of CWD (fixes WSL2 start) - Use single Oracle connection for entire validation phase (perf) - Batch upsert web_products instead of one-by-one - Remove stale VFP scripts (replaced by gomag-vending.prg workflow) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,10 +2,11 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from . import order_reader, validation_service, import_service, sqlite_service
|
||||
from . import order_reader, validation_service, import_service, sqlite_service, invoice_service
|
||||
from ..config import settings
|
||||
from .. import database
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -43,7 +44,7 @@ def _update_progress(phase: str, phase_text: str, current: int = 0, total: int =
|
||||
_current_sync["phase_text"] = phase_text
|
||||
_current_sync["progress_current"] = current
|
||||
_current_sync["progress_total"] = total
|
||||
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0}
|
||||
_current_sync["counts"] = counts or {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0}
|
||||
|
||||
|
||||
async def get_sync_status():
|
||||
@@ -71,11 +72,25 @@ async def prepare_sync(id_pol: int = None, id_sectie: int = None) -> dict:
|
||||
"phase_text": "Starting...",
|
||||
"progress_current": 0,
|
||||
"progress_total": 0,
|
||||
"counts": {"imported": 0, "skipped": 0, "errors": 0},
|
||||
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0},
|
||||
}
|
||||
return {"run_id": run_id, "status": "starting"}
|
||||
|
||||
|
||||
def _derive_customer_info(order):
|
||||
"""Extract shipping/billing names and customer from an order."""
|
||||
shipping_name = ""
|
||||
if order.shipping:
|
||||
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
|
||||
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
|
||||
if not shipping_name:
|
||||
shipping_name = billing_name
|
||||
customer = shipping_name or order.billing.company_name or billing_name
|
||||
payment_method = getattr(order, 'payment_name', None) or None
|
||||
delivery_method = getattr(order, 'delivery_name', None) or None
|
||||
return shipping_name, billing_name, customer, payment_method, delivery_method
|
||||
|
||||
|
||||
async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None) -> dict:
|
||||
"""Run a full sync cycle. Returns summary dict."""
|
||||
global _current_sync
|
||||
@@ -96,7 +111,7 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
||||
"phase_text": "Reading JSON files...",
|
||||
"progress_current": 0,
|
||||
"progress_total": 0,
|
||||
"counts": {"imported": 0, "skipped": 0, "errors": 0},
|
||||
"counts": {"imported": 0, "skipped": 0, "errors": 0, "already_imported": 0},
|
||||
}
|
||||
|
||||
_update_progress("reading", "Reading JSON files...")
|
||||
@@ -118,10 +133,13 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
||||
_log_line(run_id, f"Gasite {len(orders)} comenzi in {json_count} fisiere")
|
||||
|
||||
# Populate web_products catalog from all orders (R4)
|
||||
for order in orders:
|
||||
for item in order.items:
|
||||
if item.sku and item.name:
|
||||
await sqlite_service.upsert_web_product(item.sku, item.name)
|
||||
web_product_items = [
|
||||
(item.sku, item.name)
|
||||
for order in orders
|
||||
for item in order.items
|
||||
if item.sku and item.name
|
||||
]
|
||||
await sqlite_service.upsert_web_products_batch(web_product_items)
|
||||
|
||||
if not orders:
|
||||
_log_line(run_id, "Nicio comanda gasita.")
|
||||
@@ -132,150 +150,176 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
||||
|
||||
_update_progress("validation", f"Validating {len(orders)} orders...", 0, len(orders))
|
||||
|
||||
# Step 2a: Find new orders (not yet in Oracle)
|
||||
all_order_numbers = [o.number for o in orders]
|
||||
new_orders = await asyncio.to_thread(
|
||||
validation_service.find_new_orders, all_order_numbers
|
||||
)
|
||||
# ── Single Oracle connection for entire validation phase ──
|
||||
conn = await asyncio.to_thread(database.get_oracle_connection)
|
||||
try:
|
||||
# Step 2a: Find orders already in Oracle (date-range query)
|
||||
order_dates = [o.date for o in orders if o.date]
|
||||
if order_dates:
|
||||
min_date_str = min(order_dates)
|
||||
try:
|
||||
min_date = datetime.strptime(min_date_str[:10], "%Y-%m-%d") - timedelta(days=1)
|
||||
except (ValueError, TypeError):
|
||||
min_date = datetime.now() - timedelta(days=90)
|
||||
else:
|
||||
min_date = datetime.now() - timedelta(days=90)
|
||||
|
||||
# Step 2b: Validate SKUs (blocking Oracle call -> run in thread)
|
||||
all_skus = order_reader.get_all_skus(orders)
|
||||
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus)
|
||||
importable, skipped = validation_service.classify_orders(orders, validation)
|
||||
|
||||
_update_progress("validation", f"{len(importable)} importable, {len(skipped)} skipped (missing SKUs)",
|
||||
0, len(importable))
|
||||
_log_line(run_id, f"Validare SKU-uri: {len(importable)} importabile, {len(skipped)} nemapate")
|
||||
|
||||
# Step 2c: Build SKU context from skipped orders
|
||||
sku_context = {} # {sku: {"orders": [], "customers": []}}
|
||||
for order, missing_skus_list in skipped:
|
||||
customer = order.billing.company_name or \
|
||||
f"{order.billing.firstname} {order.billing.lastname}"
|
||||
for sku in missing_skus_list:
|
||||
if sku not in sku_context:
|
||||
sku_context[sku] = {"orders": [], "customers": []}
|
||||
if order.number not in sku_context[sku]["orders"]:
|
||||
sku_context[sku]["orders"].append(order.number)
|
||||
if customer not in sku_context[sku]["customers"]:
|
||||
sku_context[sku]["customers"].append(customer)
|
||||
|
||||
# Track missing SKUs with context
|
||||
for sku in validation["missing"]:
|
||||
product_name = ""
|
||||
for order in orders:
|
||||
for item in order.items:
|
||||
if item.sku == sku:
|
||||
product_name = item.name
|
||||
break
|
||||
if product_name:
|
||||
break
|
||||
ctx = sku_context.get(sku, {})
|
||||
await sqlite_service.track_missing_sku(
|
||||
sku, product_name,
|
||||
order_count=len(ctx.get("orders", [])),
|
||||
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
|
||||
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
|
||||
existing_map = await asyncio.to_thread(
|
||||
validation_service.check_orders_in_roa, min_date, conn
|
||||
)
|
||||
|
||||
# Step 2d: Pre-validate prices for importable articles
|
||||
id_pol = id_pol or settings.ID_POL
|
||||
id_sectie = id_sectie or settings.ID_SECTIE
|
||||
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
|
||||
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
|
||||
if id_pol and importable:
|
||||
_update_progress("validation", "Validating prices...", 0, len(importable))
|
||||
_log_line(run_id, "Validare preturi...")
|
||||
# Gather all CODMATs from importable orders
|
||||
all_codmats = set()
|
||||
# Step 2b: Validate SKUs (reuse same connection)
|
||||
all_skus = order_reader.get_all_skus(orders)
|
||||
validation = await asyncio.to_thread(validation_service.validate_skus, all_skus, conn)
|
||||
importable, skipped = validation_service.classify_orders(orders, validation)
|
||||
|
||||
# ── Split importable into truly_importable vs already_in_roa ──
|
||||
truly_importable = []
|
||||
already_in_roa = []
|
||||
for order in importable:
|
||||
for item in order.items:
|
||||
if item.sku in validation["mapped"]:
|
||||
# Mapped SKUs resolve to codmat via ARTICOLE_TERTI (handled by import)
|
||||
pass
|
||||
elif item.sku in validation["direct"]:
|
||||
all_codmats.add(item.sku)
|
||||
# For mapped SKUs, we'd need the ARTICOLE_TERTI lookup - direct SKUs = codmat
|
||||
if all_codmats:
|
||||
price_result = await asyncio.to_thread(
|
||||
validation_service.validate_prices, all_codmats, id_pol
|
||||
if order.number in existing_map:
|
||||
already_in_roa.append(order)
|
||||
else:
|
||||
truly_importable.append(order)
|
||||
|
||||
_update_progress("validation",
|
||||
f"{len(truly_importable)} new, {len(already_in_roa)} already imported, {len(skipped)} skipped",
|
||||
0, len(truly_importable))
|
||||
_log_line(run_id, f"Validare: {len(truly_importable)} noi, {len(already_in_roa)} deja importate, {len(skipped)} nemapate")
|
||||
|
||||
# Step 2c: Build SKU context from skipped orders
|
||||
sku_context = {}
|
||||
for order, missing_skus_list in skipped:
|
||||
customer = order.billing.company_name or \
|
||||
f"{order.billing.firstname} {order.billing.lastname}"
|
||||
for sku in missing_skus_list:
|
||||
if sku not in sku_context:
|
||||
sku_context[sku] = {"orders": [], "customers": []}
|
||||
if order.number not in sku_context[sku]["orders"]:
|
||||
sku_context[sku]["orders"].append(order.number)
|
||||
if customer not in sku_context[sku]["customers"]:
|
||||
sku_context[sku]["customers"].append(customer)
|
||||
|
||||
# Track missing SKUs with context
|
||||
for sku in validation["missing"]:
|
||||
product_name = ""
|
||||
for order in orders:
|
||||
for item in order.items:
|
||||
if item.sku == sku:
|
||||
product_name = item.name
|
||||
break
|
||||
if product_name:
|
||||
break
|
||||
ctx = sku_context.get(sku, {})
|
||||
await sqlite_service.track_missing_sku(
|
||||
sku, product_name,
|
||||
order_count=len(ctx.get("orders", [])),
|
||||
order_numbers=json.dumps(ctx.get("orders", [])) if ctx.get("orders") else None,
|
||||
customers=json.dumps(ctx.get("customers", [])) if ctx.get("customers") else None,
|
||||
)
|
||||
if price_result["missing_price"]:
|
||||
logger.info(
|
||||
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
|
||||
f"direct articles in policy {id_pol}"
|
||||
)
|
||||
await asyncio.to_thread(
|
||||
validation_service.ensure_prices,
|
||||
price_result["missing_price"], id_pol
|
||||
)
|
||||
|
||||
# Step 3: Record skipped orders + store items
|
||||
skipped_count = 0
|
||||
# Step 2d: Pre-validate prices for importable articles
|
||||
id_pol = id_pol or settings.ID_POL
|
||||
id_sectie = id_sectie or settings.ID_SECTIE
|
||||
logger.info(f"Sync params: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
|
||||
_log_line(run_id, f"Parametri import: ID_POL={id_pol}, ID_SECTIE={id_sectie}")
|
||||
if id_pol and (truly_importable or already_in_roa):
|
||||
_update_progress("validation", "Validating prices...", 0, len(truly_importable))
|
||||
_log_line(run_id, "Validare preturi...")
|
||||
all_codmats = set()
|
||||
for order in (truly_importable + already_in_roa):
|
||||
for item in order.items:
|
||||
if item.sku in validation["mapped"]:
|
||||
pass
|
||||
elif item.sku in validation["direct"]:
|
||||
all_codmats.add(item.sku)
|
||||
if all_codmats:
|
||||
price_result = await asyncio.to_thread(
|
||||
validation_service.validate_prices, all_codmats, id_pol,
|
||||
conn, validation.get("direct_id_map")
|
||||
)
|
||||
if price_result["missing_price"]:
|
||||
logger.info(
|
||||
f"Auto-adding price 0 for {len(price_result['missing_price'])} "
|
||||
f"direct articles in policy {id_pol}"
|
||||
)
|
||||
await asyncio.to_thread(
|
||||
validation_service.ensure_prices,
|
||||
price_result["missing_price"], id_pol,
|
||||
conn, validation.get("direct_id_map")
|
||||
)
|
||||
finally:
|
||||
await asyncio.to_thread(database.pool.release, conn)
|
||||
|
||||
# Step 3a: Record already-imported orders (batch)
|
||||
already_imported_count = len(already_in_roa)
|
||||
already_batch = []
|
||||
for order in already_in_roa:
|
||||
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
||||
id_comanda_roa = existing_map.get(order.number)
|
||||
order_items_data = [
|
||||
{"sku": item.sku, "product_name": item.name,
|
||||
"quantity": item.quantity, "price": item.price, "vat": item.vat,
|
||||
"mapping_status": "mapped" if item.sku in validation["mapped"] else "direct",
|
||||
"codmat": None, "id_articol": None, "cantitate_roa": None}
|
||||
for item in order.items
|
||||
]
|
||||
already_batch.append({
|
||||
"sync_run_id": run_id, "order_number": order.number,
|
||||
"order_date": order.date, "customer_name": customer,
|
||||
"status": "ALREADY_IMPORTED", "status_at_run": "ALREADY_IMPORTED",
|
||||
"id_comanda": id_comanda_roa, "id_partener": None,
|
||||
"error_message": None, "missing_skus": None,
|
||||
"items_count": len(order.items),
|
||||
"shipping_name": shipping_name, "billing_name": billing_name,
|
||||
"payment_method": payment_method, "delivery_method": delivery_method,
|
||||
"items": order_items_data,
|
||||
})
|
||||
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → DEJA IMPORTAT (ID: {id_comanda_roa})")
|
||||
await sqlite_service.save_orders_batch(already_batch)
|
||||
|
||||
# Step 3b: Record skipped orders + store items (batch)
|
||||
skipped_count = len(skipped)
|
||||
skipped_batch = []
|
||||
for order, missing_skus in skipped:
|
||||
skipped_count += 1
|
||||
# Derive shipping / billing names
|
||||
shipping_name = ""
|
||||
if order.shipping:
|
||||
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
|
||||
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
|
||||
if not shipping_name:
|
||||
shipping_name = billing_name
|
||||
customer = shipping_name or order.billing.company_name or billing_name
|
||||
payment_method = getattr(order, 'payment_name', None) or None
|
||||
delivery_method = getattr(order, 'delivery_name', None) or None
|
||||
|
||||
await sqlite_service.upsert_order(
|
||||
sync_run_id=run_id,
|
||||
order_number=order.number,
|
||||
order_date=order.date,
|
||||
customer_name=customer,
|
||||
status="SKIPPED",
|
||||
missing_skus=missing_skus,
|
||||
items_count=len(order.items),
|
||||
shipping_name=shipping_name,
|
||||
billing_name=billing_name,
|
||||
payment_method=payment_method,
|
||||
delivery_method=delivery_method,
|
||||
)
|
||||
await sqlite_service.add_sync_run_order(run_id, order.number, "SKIPPED")
|
||||
# Store order items with mapping status (R9)
|
||||
order_items_data = []
|
||||
for item in order.items:
|
||||
ms = "missing" if item.sku in validation["missing"] else \
|
||||
"mapped" if item.sku in validation["mapped"] else "direct"
|
||||
order_items_data.append({
|
||||
"sku": item.sku, "product_name": item.name,
|
||||
"quantity": item.quantity, "price": item.price, "vat": item.vat,
|
||||
"mapping_status": ms, "codmat": None, "id_articol": None,
|
||||
"cantitate_roa": None
|
||||
})
|
||||
await sqlite_service.add_order_items(order.number, order_items_data)
|
||||
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
||||
order_items_data = [
|
||||
{"sku": item.sku, "product_name": item.name,
|
||||
"quantity": item.quantity, "price": item.price, "vat": item.vat,
|
||||
"mapping_status": "missing" if item.sku in validation["missing"] else
|
||||
"mapped" if item.sku in validation["mapped"] else "direct",
|
||||
"codmat": None, "id_articol": None, "cantitate_roa": None}
|
||||
for item in order.items
|
||||
]
|
||||
skipped_batch.append({
|
||||
"sync_run_id": run_id, "order_number": order.number,
|
||||
"order_date": order.date, "customer_name": customer,
|
||||
"status": "SKIPPED", "status_at_run": "SKIPPED",
|
||||
"id_comanda": None, "id_partener": None,
|
||||
"error_message": None, "missing_skus": missing_skus,
|
||||
"items_count": len(order.items),
|
||||
"shipping_name": shipping_name, "billing_name": billing_name,
|
||||
"payment_method": payment_method, "delivery_method": delivery_method,
|
||||
"items": order_items_data,
|
||||
})
|
||||
_log_line(run_id, f"#{order.number} [{order.date or '?'}] {customer} → OMIS (lipsa: {', '.join(missing_skus)})")
|
||||
_update_progress("skipped", f"Skipped {skipped_count}/{len(skipped)}: #{order.number} {customer}",
|
||||
0, len(importable),
|
||||
{"imported": 0, "skipped": skipped_count, "errors": 0})
|
||||
await sqlite_service.save_orders_batch(skipped_batch)
|
||||
_update_progress("skipped", f"Skipped {skipped_count}",
|
||||
0, len(truly_importable),
|
||||
{"imported": 0, "skipped": skipped_count, "errors": 0, "already_imported": already_imported_count})
|
||||
|
||||
# Step 4: Import valid orders
|
||||
# Step 4: Import only truly new orders
|
||||
imported_count = 0
|
||||
error_count = 0
|
||||
|
||||
for i, order in enumerate(importable):
|
||||
# Derive shipping / billing names
|
||||
shipping_name = ""
|
||||
if order.shipping:
|
||||
shipping_name = f"{getattr(order.shipping, 'firstname', '') or ''} {getattr(order.shipping, 'lastname', '') or ''}".strip()
|
||||
billing_name = f"{getattr(order.billing, 'firstname', '') or ''} {getattr(order.billing, 'lastname', '') or ''}".strip()
|
||||
if not shipping_name:
|
||||
shipping_name = billing_name
|
||||
customer = shipping_name or order.billing.company_name or billing_name
|
||||
payment_method = getattr(order, 'payment_name', None) or None
|
||||
delivery_method = getattr(order, 'delivery_name', None) or None
|
||||
for i, order in enumerate(truly_importable):
|
||||
shipping_name, billing_name, customer, payment_method, delivery_method = _derive_customer_info(order)
|
||||
|
||||
_update_progress("import",
|
||||
f"Import {i+1}/{len(importable)}: #{order.number} {customer}",
|
||||
i + 1, len(importable),
|
||||
{"imported": imported_count, "skipped": len(skipped), "errors": error_count})
|
||||
f"Import {i+1}/{len(truly_importable)}: #{order.number} {customer}",
|
||||
i + 1, len(truly_importable),
|
||||
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
|
||||
"already_imported": already_imported_count})
|
||||
|
||||
result = await asyncio.to_thread(
|
||||
import_service.import_single_order,
|
||||
@@ -343,10 +387,41 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
||||
logger.warning("Too many errors, stopping sync")
|
||||
break
|
||||
|
||||
# Step 4b: Invoice check — update cached invoice data
|
||||
_update_progress("invoices", "Checking invoices...", 0, 0)
|
||||
invoices_updated = 0
|
||||
try:
|
||||
uninvoiced = await sqlite_service.get_uninvoiced_imported_orders()
|
||||
if uninvoiced:
|
||||
id_comanda_list = [o["id_comanda"] for o in uninvoiced]
|
||||
invoice_data = await asyncio.to_thread(
|
||||
invoice_service.check_invoices_for_orders, id_comanda_list
|
||||
)
|
||||
# Build reverse map: id_comanda → order_number
|
||||
id_to_order = {o["id_comanda"]: o["order_number"] for o in uninvoiced}
|
||||
for idc, inv in invoice_data.items():
|
||||
order_num = id_to_order.get(idc)
|
||||
if order_num and inv.get("facturat"):
|
||||
await sqlite_service.update_order_invoice(
|
||||
order_num,
|
||||
serie=inv.get("serie_act"),
|
||||
numar=str(inv.get("numar_act", "")),
|
||||
total_fara_tva=inv.get("total_fara_tva"),
|
||||
total_tva=inv.get("total_tva"),
|
||||
total_cu_tva=inv.get("total_cu_tva"),
|
||||
)
|
||||
invoices_updated += 1
|
||||
if invoices_updated:
|
||||
_log_line(run_id, f"Facturi actualizate: {invoices_updated} comenzi facturate")
|
||||
except Exception as e:
|
||||
logger.warning(f"Invoice check failed: {e}")
|
||||
|
||||
# Step 5: Update sync run
|
||||
total_imported = imported_count + already_imported_count # backward-compat
|
||||
status = "completed" if error_count <= 10 else "failed"
|
||||
await sqlite_service.update_sync_run(
|
||||
run_id, status, len(orders), imported_count, len(skipped), error_count
|
||||
run_id, status, len(orders), total_imported, len(skipped), error_count,
|
||||
already_imported=already_imported_count, new_imported=imported_count
|
||||
)
|
||||
|
||||
summary = {
|
||||
@@ -354,29 +429,36 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
||||
"status": status,
|
||||
"json_files": json_count,
|
||||
"total_orders": len(orders),
|
||||
"new_orders": len(new_orders),
|
||||
"imported": imported_count,
|
||||
"new_orders": len(truly_importable),
|
||||
"imported": total_imported,
|
||||
"new_imported": imported_count,
|
||||
"already_imported": already_imported_count,
|
||||
"skipped": len(skipped),
|
||||
"errors": error_count,
|
||||
"missing_skus": len(validation["missing"])
|
||||
"missing_skus": len(validation["missing"]),
|
||||
"invoices_updated": invoices_updated,
|
||||
}
|
||||
|
||||
_update_progress("completed",
|
||||
f"Completed: {imported_count} imported, {len(skipped)} skipped, {error_count} errors",
|
||||
len(importable), len(importable),
|
||||
{"imported": imported_count, "skipped": len(skipped), "errors": error_count})
|
||||
f"Completed: {imported_count} new, {already_imported_count} already, {len(skipped)} skipped, {error_count} errors",
|
||||
len(truly_importable), len(truly_importable),
|
||||
{"imported": imported_count, "skipped": len(skipped), "errors": error_count,
|
||||
"already_imported": already_imported_count})
|
||||
if _current_sync:
|
||||
_current_sync["status"] = status
|
||||
_current_sync["finished_at"] = datetime.now().isoformat()
|
||||
|
||||
logger.info(
|
||||
f"Sync {run_id} completed: {imported_count} imported, "
|
||||
f"Sync {run_id} completed: {imported_count} new, {already_imported_count} already imported, "
|
||||
f"{len(skipped)} skipped, {error_count} errors"
|
||||
)
|
||||
|
||||
duration = (datetime.now() - started_dt).total_seconds()
|
||||
_log_line(run_id, "")
|
||||
_run_logs[run_id].append(f"Finalizat: {imported_count} importate, {len(skipped)} nemapate, {error_count} erori din {len(orders)} comenzi | Durata: {int(duration)}s")
|
||||
_run_logs[run_id].append(
|
||||
f"Finalizat: {imported_count} importate, {already_imported_count} deja importate, "
|
||||
f"{len(skipped)} nemapate, {error_count} erori din {len(orders)} comenzi | Durata: {int(duration)}s"
|
||||
)
|
||||
|
||||
return summary
|
||||
|
||||
@@ -405,6 +487,4 @@ async def run_sync(id_pol: int = None, id_sectie: int = None, run_id: str = None
|
||||
|
||||
def stop_sync():
|
||||
"""Signal sync to stop. Currently sync runs to completion."""
|
||||
# For now, sync runs are not cancellable mid-flight.
|
||||
# Future: use an asyncio.Event for cooperative cancellation.
|
||||
pass
|
||||
|
||||
@@ -1,23 +1,53 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from .. import database
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def validate_skus(skus: set[str]) -> dict:
|
||||
def check_orders_in_roa(min_date, conn) -> dict:
|
||||
"""Check which orders already exist in Oracle COMENZI by date range.
|
||||
Returns: {comanda_externa: id_comanda} for all existing orders.
|
||||
Much faster than IN-clause batching — single query using date index.
|
||||
"""
|
||||
if conn is None:
|
||||
return {}
|
||||
|
||||
existing = {}
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
SELECT comanda_externa, id_comanda FROM COMENZI
|
||||
WHERE data_comanda >= :min_date
|
||||
AND comanda_externa IS NOT NULL AND sters = 0
|
||||
""", {"min_date": min_date})
|
||||
for row in cur:
|
||||
existing[str(row[0])] = row[1]
|
||||
except Exception as e:
|
||||
logger.error(f"check_orders_in_roa failed: {e}")
|
||||
|
||||
logger.info(f"ROA order check (since {min_date}): {len(existing)} existing orders found")
|
||||
return existing
|
||||
|
||||
|
||||
def validate_skus(skus: set[str], conn=None) -> dict:
|
||||
"""Validate a set of SKUs against Oracle.
|
||||
Returns: {mapped: set, direct: set, missing: set}
|
||||
Returns: {mapped: set, direct: set, missing: set, direct_id_map: {codmat: id_articol}}
|
||||
- mapped: found in ARTICOLE_TERTI (active)
|
||||
- direct: found in NOM_ARTICOLE by codmat (not in ARTICOLE_TERTI)
|
||||
- missing: not found anywhere
|
||||
- direct_id_map: {codmat: id_articol} for direct SKUs (saves a round-trip in validate_prices)
|
||||
"""
|
||||
if not skus:
|
||||
return {"mapped": set(), "direct": set(), "missing": set()}
|
||||
return {"mapped": set(), "direct": set(), "missing": set(), "direct_id_map": {}}
|
||||
|
||||
mapped = set()
|
||||
direct = set()
|
||||
direct_id_map = {}
|
||||
sku_list = list(skus)
|
||||
|
||||
conn = database.get_oracle_connection()
|
||||
own_conn = conn is None
|
||||
if own_conn:
|
||||
conn = database.get_oracle_connection()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
# Check in batches of 500
|
||||
@@ -34,24 +64,26 @@ def validate_skus(skus: set[str]) -> dict:
|
||||
for row in cur:
|
||||
mapped.add(row[0])
|
||||
|
||||
# Check NOM_ARTICOLE for remaining
|
||||
# Check NOM_ARTICOLE for remaining — also fetch id_articol
|
||||
remaining = [s for s in batch if s not in mapped]
|
||||
if remaining:
|
||||
placeholders2 = ",".join([f":n{j}" for j in range(len(remaining))])
|
||||
params2 = {f"n{j}": sku for j, sku in enumerate(remaining)}
|
||||
cur.execute(f"""
|
||||
SELECT DISTINCT codmat FROM NOM_ARTICOLE
|
||||
SELECT codmat, id_articol FROM NOM_ARTICOLE
|
||||
WHERE codmat IN ({placeholders2}) AND sters = 0 AND inactiv = 0
|
||||
""", params2)
|
||||
for row in cur:
|
||||
direct.add(row[0])
|
||||
direct_id_map[row[0]] = row[1]
|
||||
finally:
|
||||
database.pool.release(conn)
|
||||
if own_conn:
|
||||
database.pool.release(conn)
|
||||
|
||||
missing = skus - mapped - direct
|
||||
|
||||
logger.info(f"SKU validation: {len(mapped)} mapped, {len(direct)} direct, {len(missing)} missing")
|
||||
return {"mapped": mapped, "direct": direct, "missing": missing}
|
||||
return {"mapped": mapped, "direct": direct, "missing": missing, "direct_id_map": direct_id_map}
|
||||
|
||||
def classify_orders(orders, validation_result):
|
||||
"""Classify orders as importable or skipped based on SKU validation.
|
||||
@@ -73,39 +105,9 @@ def classify_orders(orders, validation_result):
|
||||
|
||||
return importable, skipped
|
||||
|
||||
def find_new_orders(order_numbers: list[str]) -> set[str]:
|
||||
"""Check which order numbers do NOT already exist in Oracle COMENZI.
|
||||
Returns: set of order numbers that are truly new (not yet imported).
|
||||
"""
|
||||
if not order_numbers:
|
||||
return set()
|
||||
|
||||
existing = set()
|
||||
num_list = list(order_numbers)
|
||||
|
||||
conn = database.get_oracle_connection()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
for i in range(0, len(num_list), 500):
|
||||
batch = num_list[i:i+500]
|
||||
placeholders = ",".join([f":o{j}" for j in range(len(batch))])
|
||||
params = {f"o{j}": num for j, num in enumerate(batch)}
|
||||
|
||||
cur.execute(f"""
|
||||
SELECT DISTINCT comanda_externa FROM COMENZI
|
||||
WHERE comanda_externa IN ({placeholders}) AND sters = 0
|
||||
""", params)
|
||||
for row in cur:
|
||||
existing.add(row[0])
|
||||
finally:
|
||||
database.pool.release(conn)
|
||||
|
||||
new_orders = set(order_numbers) - existing
|
||||
logger.info(f"Order check: {len(new_orders)} new, {len(existing)} already exist out of {len(order_numbers)} total")
|
||||
return new_orders
|
||||
|
||||
def validate_prices(codmats: set[str], id_pol: int) -> dict:
|
||||
def validate_prices(codmats: set[str], id_pol: int, conn=None, direct_id_map: dict=None) -> dict:
|
||||
"""Check which CODMATs have a price entry in CRM_POLITICI_PRET_ART for the given policy.
|
||||
If direct_id_map is provided, skips the NOM_ARTICOLE lookup for those CODMATs.
|
||||
Returns: {"has_price": set_of_codmats, "missing_price": set_of_codmats}
|
||||
"""
|
||||
if not codmats:
|
||||
@@ -115,21 +117,31 @@ def validate_prices(codmats: set[str], id_pol: int) -> dict:
|
||||
ids_with_price = set()
|
||||
codmat_list = list(codmats)
|
||||
|
||||
conn = database.get_oracle_connection()
|
||||
# Pre-populate from direct_id_map if available
|
||||
if direct_id_map:
|
||||
for cm in codmat_list:
|
||||
if cm in direct_id_map:
|
||||
codmat_to_id[cm] = direct_id_map[cm]
|
||||
|
||||
own_conn = conn is None
|
||||
if own_conn:
|
||||
conn = database.get_oracle_connection()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
# Step 1: Get ID_ARTICOL for each CODMAT
|
||||
for i in range(0, len(codmat_list), 500):
|
||||
batch = codmat_list[i:i+500]
|
||||
placeholders = ",".join([f":c{j}" for j in range(len(batch))])
|
||||
params = {f"c{j}": cm for j, cm in enumerate(batch)}
|
||||
# Step 1: Get ID_ARTICOL for CODMATs not already in direct_id_map
|
||||
remaining = [cm for cm in codmat_list if cm not in codmat_to_id]
|
||||
if remaining:
|
||||
for i in range(0, len(remaining), 500):
|
||||
batch = remaining[i:i+500]
|
||||
placeholders = ",".join([f":c{j}" for j in range(len(batch))])
|
||||
params = {f"c{j}": cm for j, cm in enumerate(batch)}
|
||||
|
||||
cur.execute(f"""
|
||||
SELECT id_articol, codmat FROM NOM_ARTICOLE
|
||||
WHERE codmat IN ({placeholders})
|
||||
""", params)
|
||||
for row in cur:
|
||||
codmat_to_id[row[1]] = row[0]
|
||||
cur.execute(f"""
|
||||
SELECT id_articol, codmat FROM NOM_ARTICOLE
|
||||
WHERE codmat IN ({placeholders})
|
||||
""", params)
|
||||
for row in cur:
|
||||
codmat_to_id[row[1]] = row[0]
|
||||
|
||||
# Step 2: Check which ID_ARTICOLs have a price in the policy
|
||||
id_list = list(codmat_to_id.values())
|
||||
@@ -146,7 +158,8 @@ def validate_prices(codmats: set[str], id_pol: int) -> dict:
|
||||
for row in cur:
|
||||
ids_with_price.add(row[0])
|
||||
finally:
|
||||
database.pool.release(conn)
|
||||
if own_conn:
|
||||
database.pool.release(conn)
|
||||
|
||||
# Map back to CODMATs
|
||||
has_price = {cm for cm, aid in codmat_to_id.items() if aid in ids_with_price}
|
||||
@@ -155,12 +168,17 @@ def validate_prices(codmats: set[str], id_pol: int) -> dict:
|
||||
logger.info(f"Price validation (policy {id_pol}): {len(has_price)} have price, {len(missing_price)} missing price")
|
||||
return {"has_price": has_price, "missing_price": missing_price}
|
||||
|
||||
def ensure_prices(codmats: set[str], id_pol: int):
|
||||
"""Insert price 0 entries for CODMATs missing from the given price policy."""
|
||||
def ensure_prices(codmats: set[str], id_pol: int, conn=None, direct_id_map: dict=None):
|
||||
"""Insert price 0 entries for CODMATs missing from the given price policy.
|
||||
Uses batch executemany instead of individual INSERTs.
|
||||
Relies on TRG_CRM_POLITICI_PRET_ART trigger for ID_POL_ART sequence.
|
||||
"""
|
||||
if not codmats:
|
||||
return
|
||||
|
||||
conn = database.get_oracle_connection()
|
||||
own_conn = conn is None
|
||||
if own_conn:
|
||||
conn = database.get_oracle_connection()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
# Get ID_VALUTA for this policy
|
||||
@@ -173,31 +191,53 @@ def ensure_prices(codmats: set[str], id_pol: int):
|
||||
return
|
||||
id_valuta = row[0]
|
||||
|
||||
# Build batch params using direct_id_map where available
|
||||
batch_params = []
|
||||
need_lookup = []
|
||||
codmat_id_map = dict(direct_id_map) if direct_id_map else {}
|
||||
|
||||
for codmat in codmats:
|
||||
# Get ID_ARTICOL
|
||||
cur.execute("""
|
||||
SELECT id_articol FROM NOM_ARTICOLE WHERE codmat = :codmat
|
||||
""", {"codmat": codmat})
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
if codmat not in codmat_id_map:
|
||||
need_lookup.append(codmat)
|
||||
|
||||
# Batch lookup remaining CODMATs
|
||||
if need_lookup:
|
||||
for i in range(0, len(need_lookup), 500):
|
||||
batch = need_lookup[i:i+500]
|
||||
placeholders = ",".join([f":c{j}" for j in range(len(batch))])
|
||||
params = {f"c{j}": cm for j, cm in enumerate(batch)}
|
||||
cur.execute(f"""
|
||||
SELECT codmat, id_articol FROM NOM_ARTICOLE
|
||||
WHERE codmat IN ({placeholders}) AND sters = 0 AND inactiv = 0
|
||||
""", params)
|
||||
for r in cur:
|
||||
codmat_id_map[r[0]] = r[1]
|
||||
|
||||
for codmat in codmats:
|
||||
id_articol = codmat_id_map.get(codmat)
|
||||
if not id_articol:
|
||||
logger.warning(f"CODMAT {codmat} not found in NOM_ARTICOLE, skipping price insert")
|
||||
continue
|
||||
id_articol = row[0]
|
||||
batch_params.append({
|
||||
"id_pol": id_pol,
|
||||
"id_articol": id_articol,
|
||||
"id_valuta": id_valuta
|
||||
})
|
||||
|
||||
cur.execute("""
|
||||
if batch_params:
|
||||
cur.executemany("""
|
||||
INSERT INTO CRM_POLITICI_PRET_ART
|
||||
(ID_POL_ART, ID_POL, ID_ARTICOL, PRET, ID_VALUTA,
|
||||
ID_UTIL, DATAORA, PROC_TVAV,
|
||||
PRETFTVA, PRETCTVA)
|
||||
(ID_POL, ID_ARTICOL, PRET, ID_VALUTA,
|
||||
ID_UTIL, DATAORA, PROC_TVAV, PRETFTVA, PRETCTVA)
|
||||
VALUES
|
||||
(SEQ_CRM_POLITICI_PRET_ART.NEXTVAL, :id_pol, :id_articol, 0, :id_valuta,
|
||||
-3, SYSDATE, 1.19,
|
||||
0, 0)
|
||||
""", {"id_pol": id_pol, "id_articol": id_articol, "id_valuta": id_valuta})
|
||||
logger.info(f"Pret 0 adaugat pentru CODMAT {codmat} in politica {id_pol}")
|
||||
(:id_pol, :id_articol, 0, :id_valuta,
|
||||
-3, SYSDATE, 1.19, 0, 0)
|
||||
""", batch_params)
|
||||
logger.info(f"Batch inserted {len(batch_params)} price entries for policy {id_pol}")
|
||||
|
||||
conn.commit()
|
||||
finally:
|
||||
database.pool.release(conn)
|
||||
if own_conn:
|
||||
database.pool.release(conn)
|
||||
|
||||
logger.info(f"Ensure prices done: {len(codmats)} CODMATs processed for policy {id_pol}")
|
||||
|
||||
Reference in New Issue
Block a user