Files
gomag-vending/api/app/services/sqlite_service.py
Claude Agent 1d59f1a484 refactor(price): remove price comparison UI and catalog sync
GoMag vs ROA price comparison generated too many false positives
(kits, volume discounts, special prices). Removes comparison columns,
dots, badges, catalog sync endpoints, and ~950 lines of dead code.
Keeps WRITE path (sync_prices_from_order) for kit pricing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 20:30:34 +00:00

1480 lines
56 KiB
Python

import json
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from ..database import get_sqlite, get_sqlite_sync
# Re-export so other services can import get_sqlite from sqlite_service
__all__ = ["get_sqlite", "get_sqlite_sync"]
_tz_bucharest = ZoneInfo("Europe/Bucharest")
def _now_str():
"""Return current Bucharest time as ISO string."""
return datetime.now(_tz_bucharest).replace(tzinfo=None).isoformat()
logger = logging.getLogger(__name__)
async def create_sync_run(run_id: str, json_files: int = 0):
"""Create a new sync run record."""
db = await get_sqlite()
try:
await db.execute("""
INSERT INTO sync_runs (run_id, started_at, status, json_files)
VALUES (?, ?, 'running', ?)
""", (run_id, _now_str(), json_files))
await db.commit()
finally:
await db.close()
async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
imported: int = 0, skipped: int = 0, errors: int = 0,
error_message: str = None,
already_imported: int = 0, new_imported: int = 0):
"""Update sync run with results."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE sync_runs SET
finished_at = ?,
status = ?,
total_orders = ?,
imported = ?,
skipped = ?,
errors = ?,
error_message = ?,
already_imported = ?,
new_imported = ?
WHERE run_id = ?
""", (_now_str(), status, total_orders, imported, skipped, errors, error_message,
already_imported, new_imported, run_id))
await db.commit()
finally:
await db.close()
async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
customer_name: str, status: str, id_comanda: int = None,
id_partener: int = None, error_message: str = None,
missing_skus: list = None, items_count: int = 0,
shipping_name: str = None, billing_name: str = None,
payment_method: str = None, delivery_method: str = None,
order_total: float = None,
delivery_cost: float = None, discount_total: float = None,
web_status: str = None, discount_split: str = None):
"""Upsert a single order — one row per order_number, status updated in place."""
db = await get_sqlite()
try:
await db.execute("""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
order_total = COALESCE(excluded.order_total, orders.order_total),
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
web_status = COALESCE(excluded.web_status, orders.web_status),
discount_split = COALESCE(excluded.discount_split, orders.discount_split),
updated_at = datetime('now')
""", (order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message,
json.dumps(missing_skus) if missing_skus else None,
items_count, sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split))
await db.commit()
finally:
await db.close()
async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str):
"""Record that this run processed this order (junction table)."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
VALUES (?, ?, ?)
""", (sync_run_id, order_number, status_at_run))
await db.commit()
finally:
await db.close()
async def save_orders_batch(orders_data: list[dict]):
"""Batch save a list of orders + their sync_run_orders + order_items in one transaction.
Each dict must have: sync_run_id, order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus (list|None), items_count,
shipping_name, billing_name, payment_method, delivery_method, status_at_run,
items (list of item dicts), delivery_cost (optional), discount_total (optional),
web_status (optional).
"""
if not orders_data:
return
db = await get_sqlite()
try:
# 1. Upsert orders
await db.executemany("""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method, order_total,
delivery_cost, discount_total, web_status, discount_split)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
customer_name = excluded.customer_name,
status = CASE
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
THEN orders.status
ELSE excluded.status
END,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
order_total = COALESCE(excluded.order_total, orders.order_total),
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
web_status = COALESCE(excluded.web_status, orders.web_status),
discount_split = COALESCE(excluded.discount_split, orders.discount_split),
updated_at = datetime('now')
""", [
(d["order_number"], d["order_date"], d["customer_name"], d["status"],
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
d.get("items_count", 0), d["sync_run_id"],
d.get("shipping_name"), d.get("billing_name"),
d.get("payment_method"), d.get("delivery_method"),
d.get("order_total"),
d.get("delivery_cost"), d.get("discount_total"),
d.get("web_status"), d.get("discount_split"))
for d in orders_data
])
# 2. Sync run orders
await db.executemany("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
VALUES (?, ?, ?)
""", [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data])
# 3. Order items
all_items = []
for d in orders_data:
for item in d.get("items", []):
all_items.append((
d["order_number"],
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa")
))
if all_items:
await db.executemany("""
INSERT OR IGNORE INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", all_items)
await db.commit()
finally:
await db.close()
async def track_missing_sku(sku: str, product_name: str = "",
order_count: int = 0, order_numbers: str = None,
customers: str = None):
"""Track a missing SKU with order context."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR IGNORE INTO missing_skus (sku, product_name)
VALUES (?, ?)
""", (sku, product_name))
if order_count or order_numbers or customers:
await db.execute("""
UPDATE missing_skus SET
order_count = ?,
order_numbers = ?,
customers = ?
WHERE sku = ?
""", (order_count, order_numbers, customers, sku))
await db.commit()
finally:
await db.close()
async def resolve_missing_skus_batch(skus: set):
"""Mark multiple missing SKUs as resolved (they now have mappings)."""
if not skus:
return 0
db = await get_sqlite()
try:
placeholders = ",".join("?" for _ in skus)
cursor = await db.execute(f"""
UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now')
WHERE sku IN ({placeholders}) AND resolved = 0
""", list(skus))
await db.commit()
return cursor.rowcount
finally:
await db.close()
async def resolve_missing_sku(sku: str):
"""Mark a missing SKU as resolved."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now')
WHERE sku = ?
""", (sku,))
await db.commit()
finally:
await db.close()
async def get_missing_skus_paginated(page: int = 1, per_page: int = 20,
resolved: int = 0, search: str = None):
"""Get paginated missing SKUs. resolved=-1 means show all.
Optional search filters by sku or product_name (LIKE)."""
db = await get_sqlite()
try:
offset = (page - 1) * per_page
# Build WHERE clause parts
where_parts = []
params_count = []
params_data = []
if resolved != -1:
where_parts.append("resolved = ?")
params_count.append(resolved)
params_data.append(resolved)
if search:
like = f"%{search}%"
where_parts.append("(LOWER(sku) LIKE LOWER(?) OR LOWER(COALESCE(product_name,'')) LIKE LOWER(?))")
params_count.extend([like, like])
params_data.extend([like, like])
where_clause = ("WHERE " + " AND ".join(where_parts)) if where_parts else ""
order_clause = (
"ORDER BY resolved ASC, order_count DESC, first_seen DESC"
if resolved == -1
else "ORDER BY order_count DESC, first_seen DESC"
)
cursor = await db.execute(
f"SELECT COUNT(*) FROM missing_skus {where_clause}",
params_count
)
total = (await cursor.fetchone())[0]
cursor = await db.execute(f"""
SELECT sku, product_name, first_seen, resolved, resolved_at,
order_count, order_numbers, customers
FROM missing_skus
{where_clause}
{order_clause}
LIMIT ? OFFSET ?
""", params_data + [per_page, offset])
rows = await cursor.fetchall()
return {
"missing_skus": [dict(row) for row in rows],
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0
}
finally:
await db.close()
async def get_sync_runs(page: int = 1, per_page: int = 20):
"""Get paginated sync run history."""
db = await get_sqlite()
try:
offset = (page - 1) * per_page
cursor = await db.execute("SELECT COUNT(*) FROM sync_runs")
total = (await cursor.fetchone())[0]
cursor = await db.execute("""
SELECT * FROM sync_runs
ORDER BY started_at DESC
LIMIT ? OFFSET ?
""", (per_page, offset))
rows = await cursor.fetchall()
return {
"runs": [dict(row) for row in rows],
"total": total,
"page": page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0
}
finally:
await db.close()
async def get_sync_run_detail(run_id: str):
"""Get details for a specific sync run including its orders via sync_run_orders."""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT * FROM sync_runs WHERE run_id = ?", (run_id,)
)
run = await cursor.fetchone()
if not run:
return None
cursor = await db.execute("""
SELECT o.* FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
WHERE sro.sync_run_id = ?
ORDER BY o.order_date
""", (run_id,))
orders = await cursor.fetchall()
return {
"run": dict(run),
"orders": [dict(o) for o in orders]
}
finally:
await db.close()
async def get_dashboard_stats():
"""Get stats for the dashboard."""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'IMPORTED'"
)
imported = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'SKIPPED'"
)
skipped = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'ERROR'"
)
errors = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM missing_skus WHERE resolved = 0"
)
missing = (await cursor.fetchone())[0]
cursor = await db.execute("SELECT COUNT(DISTINCT sku) FROM missing_skus")
total_missing_skus = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(DISTINCT sku) FROM missing_skus WHERE resolved = 0"
)
unresolved_skus = (await cursor.fetchone())[0]
cursor = await db.execute("""
SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT 1
""")
last_run = await cursor.fetchone()
return {
"imported": imported,
"skipped": skipped,
"errors": errors,
"missing_skus": missing,
"total_tracked_skus": total_missing_skus,
"unresolved_skus": unresolved_skus,
"last_run": dict(last_run) if last_run else None
}
finally:
await db.close()
async def get_scheduler_config():
"""Get scheduler configuration from SQLite."""
db = await get_sqlite()
try:
cursor = await db.execute("SELECT key, value FROM scheduler_config")
rows = await cursor.fetchall()
return {row["key"]: row["value"] for row in rows}
finally:
await db.close()
async def set_scheduler_config(key: str, value: str):
"""Set a scheduler configuration value."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR REPLACE INTO scheduler_config (key, value)
VALUES (?, ?)
""", (key, value))
await db.commit()
finally:
await db.close()
# ── web_products ─────────────────────────────────
async def upsert_web_product(sku: str, product_name: str):
"""Insert or update a web product, incrementing order_count."""
db = await get_sqlite()
try:
await db.execute("""
INSERT INTO web_products (sku, product_name, order_count)
VALUES (?, ?, 1)
ON CONFLICT(sku) DO UPDATE SET
product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name),
last_seen = datetime('now'),
order_count = web_products.order_count + 1
""", (sku, product_name))
await db.commit()
finally:
await db.close()
async def upsert_web_products_batch(items: list[tuple[str, str]]):
"""Batch upsert web products in a single transaction. items: list of (sku, product_name)."""
if not items:
return
db = await get_sqlite()
try:
await db.executemany("""
INSERT INTO web_products (sku, product_name, order_count)
VALUES (?, ?, 1)
ON CONFLICT(sku) DO UPDATE SET
product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name),
last_seen = datetime('now'),
order_count = web_products.order_count + 1
""", items)
await db.commit()
finally:
await db.close()
async def get_web_product_name(sku: str) -> str:
"""Lookup product name by SKU."""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT product_name FROM web_products WHERE sku = ?", (sku,)
)
row = await cursor.fetchone()
return row["product_name"] if row else ""
finally:
await db.close()
async def get_web_products_batch(skus: list) -> dict:
"""Batch lookup product names by SKU list. Returns {sku: product_name}."""
if not skus:
return {}
db = await get_sqlite()
try:
placeholders = ",".join("?" for _ in skus)
cursor = await db.execute(
f"SELECT sku, product_name FROM web_products WHERE sku IN ({placeholders})",
list(skus)
)
rows = await cursor.fetchall()
return {row["sku"]: row["product_name"] for row in rows}
finally:
await db.close()
# ── order_items ──────────────────────────────────
async def add_order_items(order_number: str, items: list):
"""Bulk insert order items. Uses INSERT OR IGNORE — PK is (order_number, sku)."""
if not items:
return
db = await get_sqlite()
try:
await db.executemany("""
INSERT OR IGNORE INTO order_items
(order_number, sku, product_name, quantity, price, baseprice,
vat, mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(order_number,
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("baseprice"),
item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"))
for item in items
])
await db.commit()
finally:
await db.close()
async def get_order_items(order_number: str) -> list:
"""Fetch items for one order."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT * FROM order_items
WHERE order_number = ?
ORDER BY sku
""", (order_number,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
finally:
await db.close()
async def get_order_detail(order_number: str) -> dict:
"""Get full order detail: order metadata + items."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT * FROM orders WHERE order_number = ?
""", (order_number,))
order = await cursor.fetchone()
if not order:
return None
cursor = await db.execute("""
SELECT * FROM order_items WHERE order_number = ?
ORDER BY sku
""", (order_number,))
items = await cursor.fetchall()
return {
"order": dict(order),
"items": [dict(i) for i in items]
}
finally:
await db.close()
async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
page: int = 1, per_page: int = 50,
sort_by: str = "order_date", sort_dir: str = "asc"):
"""Get paginated orders for a run via sync_run_orders junction table."""
db = await get_sqlite()
try:
where = "WHERE sro.sync_run_id = ?"
params = [run_id]
if status_filter and status_filter != "all":
where += " AND UPPER(sro.status_at_run) = ?"
params.append(status_filter.upper())
allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
"status", "first_seen_at", "updated_at"}
if sort_by not in allowed_sort:
sort_by = "order_date"
if sort_dir.lower() not in ("asc", "desc"):
sort_dir = "asc"
cursor = await db.execute(
f"SELECT COUNT(*) FROM orders o INNER JOIN sync_run_orders sro "
f"ON sro.order_number = o.order_number {where}", params
)
total = (await cursor.fetchone())[0]
offset = (page - 1) * per_page
cursor = await db.execute(f"""
SELECT o.*, sro.status_at_run AS run_status FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
{where}
ORDER BY o.{sort_by} {sort_dir}
LIMIT ? OFFSET ?
""", params + [per_page, offset])
rows = await cursor.fetchall()
cursor = await db.execute("""
SELECT sro.status_at_run AS status, COUNT(*) as cnt
FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
WHERE sro.sync_run_id = ?
GROUP BY sro.status_at_run
""", (run_id,))
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
# Use run_status (status_at_run) as the status field for each order row
order_rows = []
for r in rows:
d = dict(r)
d["status"] = d.pop("run_status", d.get("status"))
order_rows.append(d)
return {
"orders": order_rows,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": {
"imported": status_counts.get("IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0),
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
"cancelled": status_counts.get("CANCELLED", 0),
"total": sum(status_counts.values())
}
}
finally:
await db.close()
async def get_orders(page: int = 1, per_page: int = 50,
search: str = "", status_filter: str = "all",
sort_by: str = "order_date", sort_dir: str = "desc",
period_days: int = 7,
period_start: str = "", period_end: str = ""):
"""Get orders with filters, sorting, and period.
period_days=0 with period_start/period_end uses custom date range.
period_days=0 without dates means all time.
"""
db = await get_sqlite()
try:
# Period + search clauses (used for counts — never include status filter)
base_clauses = []
base_params = []
if period_days and period_days > 0:
base_clauses.append("order_date >= date('now', ?)")
base_params.append(f"-{period_days} days")
elif period_days == 0 and period_start and period_end:
base_clauses.append("order_date BETWEEN ? AND ?")
base_params.extend([period_start, period_end])
if search:
base_clauses.append("(order_number LIKE ? OR customer_name LIKE ?)")
base_params.extend([f"%{search}%", f"%{search}%"])
# Data query adds status filter on top of base filters
data_clauses = list(base_clauses)
data_params = list(base_params)
if status_filter and status_filter not in ("all", "UNINVOICED"):
if status_filter.upper() == "IMPORTED":
data_clauses.append("UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')")
elif status_filter.upper() == "DIFFS":
data_clauses.append(
"(anaf_cod_fiscal_adjusted = 1 OR anaf_denumire_mismatch = 1"
" OR partner_mismatch = 1"
" OR (cod_fiscal_gomag IS NOT NULL AND cod_fiscal_gomag != '' AND anaf_platitor_tva IS NOT NULL"
" AND anaf_cod_fiscal_adjusted != 1"
" AND ((UPPER(cod_fiscal_gomag) LIKE 'RO%' AND anaf_platitor_tva = 0)"
" OR (UPPER(cod_fiscal_gomag) NOT LIKE 'RO%' AND anaf_platitor_tva = 1))))"
)
else:
data_clauses.append("UPPER(status) = ?")
data_params.append(status_filter.upper())
where = ("WHERE " + " AND ".join(data_clauses)) if data_clauses else ""
counts_where = ("WHERE " + " AND ".join(base_clauses)) if base_clauses else ""
allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
"status", "first_seen_at", "updated_at"}
if sort_by not in allowed_sort:
sort_by = "order_date"
if sort_dir.lower() not in ("asc", "desc"):
sort_dir = "desc"
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {where}", data_params)
total = (await cursor.fetchone())[0]
offset = (page - 1) * per_page
cursor = await db.execute(f"""
SELECT * FROM orders
{where}
ORDER BY {sort_by} {sort_dir}
LIMIT ? OFFSET ?
""", data_params + [per_page, offset])
rows = await cursor.fetchall()
# Counts by status — always on full period+search, never filtered by status
cursor = await db.execute(f"""
SELECT status, COUNT(*) as cnt FROM orders
{counts_where}
GROUP BY status
""", base_params)
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
# Uninvoiced count: IMPORTED/ALREADY_IMPORTED with no cached invoice, same period+search
uninv_clauses = list(base_clauses) + [
"UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')",
"(factura_numar IS NULL OR factura_numar = '')",
]
uninv_where = "WHERE " + " AND ".join(uninv_clauses)
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {uninv_where}", base_params)
uninvoiced_sqlite = (await cursor.fetchone())[0]
# Uninvoiced > 3 days old
uninv_old_clauses = list(base_clauses) + [
"UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')",
"(factura_numar IS NULL OR factura_numar = '')",
"order_date < datetime('now', '-3 days')",
]
uninv_old_where = "WHERE " + " AND ".join(uninv_old_clauses)
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {uninv_old_where}", base_params)
uninvoiced_old = (await cursor.fetchone())[0]
# Diffs count: orders with ANAF adjustments, TVA mismatch, or partner mismatch
diffs_clauses = list(base_clauses) + [
"(anaf_cod_fiscal_adjusted = 1 OR anaf_denumire_mismatch = 1"
" OR partner_mismatch = 1"
" OR (cod_fiscal_gomag IS NOT NULL AND cod_fiscal_gomag != '' AND anaf_platitor_tva IS NOT NULL"
" AND anaf_cod_fiscal_adjusted != 1"
" AND ((UPPER(cod_fiscal_gomag) LIKE 'RO%' AND anaf_platitor_tva = 0)"
" OR (UPPER(cod_fiscal_gomag) NOT LIKE 'RO%' AND anaf_platitor_tva = 1))))"
]
diffs_where = "WHERE " + " AND ".join(diffs_clauses)
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {diffs_where}", base_params)
diffs_count = (await cursor.fetchone())[0]
# Partner mismatches count
pm_clauses = list(base_clauses) + ["partner_mismatch = 1"]
pm_where = "WHERE " + " AND ".join(pm_clauses)
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {pm_where}", base_params)
partner_mismatches_count = (await cursor.fetchone())[0]
return {
"orders": [dict(r) for r in rows],
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": {
"imported": status_counts.get("IMPORTED", 0),
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
"imported_all": status_counts.get("IMPORTED", 0) + status_counts.get("ALREADY_IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0),
"cancelled": status_counts.get("CANCELLED", 0),
"total": sum(status_counts.values()),
"uninvoiced_sqlite": uninvoiced_sqlite,
"uninvoiced_old": uninvoiced_old,
"diffs": diffs_count,
"partner_mismatches": partner_mismatches_count,
}
}
finally:
await db.close()
async def update_import_order_addresses(order_number: str,
id_adresa_facturare: int = None,
id_adresa_livrare: int = None):
"""Update ROA address IDs on an order record."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
id_adresa_facturare = ?,
id_adresa_livrare = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", (id_adresa_facturare, id_adresa_livrare, order_number))
await db.commit()
finally:
await db.close()
# ── Invoice cache ────────────────────────────────
async def get_uninvoiced_imported_orders() -> list:
"""Get all imported orders that don't yet have invoice data cached."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
AND id_comanda IS NOT NULL
AND factura_numar IS NULL
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]
finally:
await db.close()
async def update_order_invoice(order_number: str, serie: str = None,
numar: str = None, total_fara_tva: float = None,
total_tva: float = None, total_cu_tva: float = None,
data_act: str = None):
"""Cache invoice data from Oracle onto the order record."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
factura_serie = ?,
factura_numar = ?,
factura_total_fara_tva = ?,
factura_total_tva = ?,
factura_total_cu_tva = ?,
factura_data = ?,
invoice_checked_at = datetime('now'),
updated_at = datetime('now')
WHERE order_number = ?
""", (serie, numar, total_fara_tva, total_tva, total_cu_tva, data_act, order_number))
await db.commit()
finally:
await db.close()
async def update_order_price_match(order_number: str, match: bool | None):
"""Cache price_match result (True=OK, False=mismatch, None=unavailable)."""
db = await get_sqlite()
try:
val = None if match is None else (1 if match else 0)
await db.execute(
"UPDATE orders SET price_match = ?, updated_at = datetime('now') WHERE order_number = ?",
(val, order_number),
)
await db.commit()
finally:
await db.close()
async def get_invoiced_imported_orders() -> list:
"""Get imported orders that HAVE cached invoice data (for re-verification)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
AND id_comanda IS NOT NULL
AND factura_numar IS NOT NULL AND factura_numar != ''
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]
finally:
await db.close()
async def get_all_imported_orders() -> list:
"""Get ALL imported orders with id_comanda (for checking if deleted in ROA)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT order_number, id_comanda FROM orders
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
AND id_comanda IS NOT NULL
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]
finally:
await db.close()
async def clear_order_invoice(order_number: str):
"""Clear cached invoice data when invoice was deleted in ROA."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
factura_serie = NULL,
factura_numar = NULL,
factura_total_fara_tva = NULL,
factura_total_tva = NULL,
factura_total_cu_tva = NULL,
factura_data = NULL,
invoice_checked_at = datetime('now'),
updated_at = datetime('now')
WHERE order_number = ?
""", (order_number,))
await db.commit()
finally:
await db.close()
async def mark_order_deleted_in_roa(order_number: str):
"""Mark an order as deleted in ROA — clears id_comanda and invoice cache."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
status = 'DELETED_IN_ROA',
id_comanda = NULL,
id_partener = NULL,
factura_serie = NULL,
factura_numar = NULL,
factura_total_fara_tva = NULL,
factura_total_tva = NULL,
factura_total_cu_tva = NULL,
factura_data = NULL,
invoice_checked_at = NULL,
error_message = 'Comanda stearsa din ROA',
updated_at = datetime('now')
WHERE order_number = ?
""", (order_number,))
await db.commit()
finally:
await db.close()
async def mark_order_cancelled(order_number: str, web_status: str = "Anulata"):
"""Mark an order as cancelled from GoMag. Clears id_comanda and invoice cache."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
status = 'CANCELLED',
id_comanda = NULL,
id_partener = NULL,
factura_serie = NULL,
factura_numar = NULL,
factura_total_fara_tva = NULL,
factura_total_tva = NULL,
factura_total_cu_tva = NULL,
factura_data = NULL,
invoice_checked_at = NULL,
web_status = ?,
error_message = 'Comanda anulata in GoMag',
updated_at = datetime('now')
WHERE order_number = ?
""", (web_status, order_number))
await db.commit()
finally:
await db.close()
# ── App Settings ─────────────────────────────────
async def get_app_settings() -> dict:
"""Get all app settings as a dict."""
db = await get_sqlite()
try:
cursor = await db.execute("SELECT key, value FROM app_settings")
rows = await cursor.fetchall()
return {row["key"]: row["value"] for row in rows}
finally:
await db.close()
async def set_app_setting(key: str, value: str):
"""Set a single app setting value."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR REPLACE INTO app_settings (key, value)
VALUES (?, ?)
""", (key, value))
await db.commit()
finally:
await db.close()
# ── SKU-based order lookup ────────────────────────
async def get_skipped_orders_with_sku(sku: str) -> list[str]:
"""Get order_numbers of SKIPPED orders that contain the given SKU."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT DISTINCT oi.order_number
FROM order_items oi
JOIN orders o ON o.order_number = oi.order_number
WHERE oi.sku = ? AND o.status = 'SKIPPED'
""", (sku,))
rows = await cursor.fetchall()
return [row[0] for row in rows]
finally:
await db.close()
# ── Price Sync Runs ───────────────────────────────
# ── ANAF Cache ───────────────────────────────────
async def get_anaf_cache(bare_cui: str) -> dict | None:
"""Get cached ANAF data for a CUI (valid for 7 days)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT scp_tva, denumire_anaf, checked_at
FROM anaf_cache
WHERE cui = ? AND checked_at > datetime('now', '-7 days')
""", (bare_cui,))
row = await cursor.fetchone()
if not row:
return None
return {
"scpTVA": bool(row["scp_tva"]) if row["scp_tva"] is not None else None,
"denumire_anaf": row["denumire_anaf"] or "",
"checked_at": row["checked_at"],
}
finally:
await db.close()
async def upsert_anaf_cache(cui: str, scp_tva: int | None, denumire_anaf: str):
"""Insert or update ANAF cache entry."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR REPLACE INTO anaf_cache (cui, scp_tva, denumire_anaf, checked_at)
VALUES (?, ?, ?, datetime('now'))
""", (cui, scp_tva, denumire_anaf))
await db.commit()
finally:
await db.close()
async def bulk_populate_anaf_cache(results: dict[str, dict]):
"""Batch insert/update ANAF cache entries.
results format: {cui: {"scpTVA": bool|None, "denumire_anaf": str, "checked_at": str}, ...}
"""
if not results:
return
db = await get_sqlite()
try:
rows = []
for cui, data in results.items():
scp = None
if data.get("scpTVA") is True:
scp = 1
elif data.get("scpTVA") is False:
scp = 0
rows.append((cui, scp, data.get("denumire_anaf", ""), data.get("checked_at", _now_str())))
await db.executemany("""
INSERT OR REPLACE INTO anaf_cache (cui, scp_tva, denumire_anaf, checked_at)
VALUES (?, ?, ?, ?)
""", rows)
await db.commit()
finally:
await db.close()
async def get_expired_cuis_for_prepopulate() -> list[str]:
"""Get CUIs from recent orders that need ANAF cache refresh."""
from . import anaf_service
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT DISTINCT cod_fiscal_gomag FROM orders
WHERE cod_fiscal_gomag IS NOT NULL
AND cod_fiscal_gomag != ''
AND order_date >= date('now', '-3 months')
""")
rows = await cursor.fetchall()
cuis_to_check = []
for row in rows:
raw = row["cod_fiscal_gomag"]
bare = anaf_service.strip_ro_prefix(raw)
if not anaf_service.validate_cui(bare):
continue
# Check if cache is valid
cached = await get_anaf_cache(bare)
if cached is None:
cuis_to_check.append(bare)
return cuis_to_check
finally:
await db.close()
# ── Partner/Address Data on Orders ─────────────────
async def update_order_partner_data(order_number: str, partner_data: dict):
"""Update order with partner/ANAF/address comparison data.
partner_data keys: cod_fiscal_gomag, cod_fiscal_roa, denumire_roa,
anaf_platitor_tva, anaf_checked_at, anaf_cod_fiscal_adjusted,
adresa_livrare_gomag, adresa_facturare_gomag, adresa_livrare_roa,
adresa_facturare_roa, anaf_denumire_mismatch, denumire_anaf
"""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
cod_fiscal_gomag = ?,
cod_fiscal_roa = ?,
denumire_roa = ?,
anaf_platitor_tva = ?,
anaf_checked_at = ?,
anaf_cod_fiscal_adjusted = ?,
adresa_livrare_gomag = ?,
adresa_facturare_gomag = ?,
adresa_livrare_roa = ?,
adresa_facturare_roa = ?,
anaf_denumire_mismatch = ?,
denumire_anaf = ?,
address_mismatch = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", (
partner_data.get("cod_fiscal_gomag"),
partner_data.get("cod_fiscal_roa"),
partner_data.get("denumire_roa"),
partner_data.get("anaf_platitor_tva"),
partner_data.get("anaf_checked_at"),
partner_data.get("anaf_cod_fiscal_adjusted", 0),
partner_data.get("adresa_livrare_gomag"),
partner_data.get("adresa_facturare_gomag"),
partner_data.get("adresa_livrare_roa"),
partner_data.get("adresa_facturare_roa"),
partner_data.get("anaf_denumire_mismatch", 0),
partner_data.get("denumire_anaf"),
partner_data.get("address_mismatch", 0),
order_number,
))
await db.commit()
finally:
await db.close()
async def update_gomag_addresses_batch(updates: list[dict]):
"""Update GoMag addresses and recompute address_mismatch for a batch of orders.
Each dict: {order_number, adresa_livrare_gomag, adresa_facturare_gomag}
"""
if not updates:
return
from ..services.sync_service import _addr_match
db = await get_sqlite()
try:
for u in updates:
order_number = u["order_number"]
livr_gomag = u.get("adresa_livrare_gomag")
fact_gomag = u.get("adresa_facturare_gomag")
# Update GoMag addresses
await db.execute("""
UPDATE orders SET
adresa_livrare_gomag = COALESCE(?, adresa_livrare_gomag),
adresa_facturare_gomag = COALESCE(?, adresa_facturare_gomag),
updated_at = datetime('now')
WHERE order_number = ?
""", (livr_gomag, fact_gomag, order_number))
# Recompute address_mismatch from stored addresses
cursor = await db.execute(
"SELECT adresa_livrare_gomag, adresa_livrare_roa, "
"adresa_facturare_gomag, adresa_facturare_roa FROM orders WHERE order_number = ?",
(order_number,)
)
row = await cursor.fetchone()
if row and (row[1] or row[3]): # has at least one ROA address
livr_ok = _addr_match(row[0], row[1])
fact_ok = _addr_match(row[2], row[3])
new_val = 1 if (not livr_ok or not fact_ok) else 0
await db.execute(
"UPDATE orders SET address_mismatch = ? WHERE order_number = ?",
(new_val, order_number)
)
await db.commit()
finally:
await db.close()
async def get_order_address_ids(order_number: str) -> dict | None:
"""Return id_adresa_livrare, id_adresa_facturare, adresa_*_gomag for an order."""
db = await get_sqlite()
try:
cursor = await db.execute("""SELECT id_adresa_livrare, id_adresa_facturare,
adresa_livrare_gomag, adresa_facturare_gomag,
adresa_livrare_roa
FROM orders WHERE order_number = ?""", [order_number])
row = await cursor.fetchone()
return dict(row) if row else None
finally:
await db.close()
async def update_order_address_cache(order_number: str, livr_roa: dict | None,
fact_roa: dict | None, mismatch: bool):
"""Update ONLY the 3 address-cache columns — does NOT touch ANAF/partner fields."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
adresa_livrare_roa = ?,
adresa_facturare_roa = ?,
address_mismatch = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", (
json.dumps(livr_roa) if livr_roa else None,
json.dumps(fact_roa) if fact_roa else None,
1 if mismatch else 0,
order_number,
))
await db.commit()
finally:
await db.close()
async def get_orders_with_address_ids() -> list[dict]:
"""Get all orders that have Oracle address IDs stored (for batch refresh)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT order_number, id_adresa_livrare, id_adresa_facturare,
adresa_livrare_gomag, adresa_facturare_gomag
FROM orders
WHERE id_adresa_livrare IS NOT NULL OR id_adresa_facturare IS NOT NULL
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]
finally:
await db.close()
async def get_orders_missing_anaf() -> list[dict]:
"""Get orders with cod_fiscal_roa set but no ANAF data (for backfill)."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT order_number, cod_fiscal_roa, denumire_roa, customer_name
FROM orders
WHERE cod_fiscal_roa IS NOT NULL
AND cod_fiscal_roa != ''
AND anaf_platitor_tva IS NULL
AND status IN ('IMPORTED', 'ALREADY_IMPORTED')
""")
rows = await cursor.fetchall()
return [dict(r) for r in rows]
finally:
await db.close()
async def get_anaf_cache_batch(bare_cuis: list[str]) -> dict[str, dict]:
"""Get cached ANAF data for multiple CUIs (valid for 7 days)."""
if not bare_cuis:
return {}
db = await get_sqlite()
try:
placeholders = ",".join("?" for _ in bare_cuis)
cursor = await db.execute(f"""
SELECT cui, scp_tva, denumire_anaf, checked_at
FROM anaf_cache
WHERE cui IN ({placeholders}) AND checked_at > datetime('now', '-7 days')
""", bare_cuis)
rows = await cursor.fetchall()
return {
r["cui"]: {
"scpTVA": bool(r["scp_tva"]) if r["scp_tva"] is not None else None,
"denumire_anaf": r["denumire_anaf"] or "",
"checked_at": r["checked_at"],
}
for r in rows
}
finally:
await db.close()
async def bulk_update_order_anaf_data(updates: list[tuple]):
"""Batch update orders with ANAF data.
updates: list of (anaf_platitor_tva, anaf_checked_at, anaf_denumire_mismatch, denumire_anaf, order_number)
"""
if not updates:
return
db = await get_sqlite()
try:
await db.executemany("""
UPDATE orders SET
anaf_platitor_tva = ?,
anaf_checked_at = ?,
anaf_denumire_mismatch = ?,
denumire_anaf = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", updates)
await db.commit()
finally:
await db.close()
# ── Address Quality Cache (via app_settings) ──────
async def get_incomplete_addresses_count() -> int:
"""Get cached count of orders with incomplete ROA addresses.
Returns -1 if cache is stale (> 1 hour old) or not set.
"""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT value FROM app_settings WHERE key = 'incomplete_addresses_checked_at'"
)
row = await cursor.fetchone()
if not row or not row["value"]:
return -1
# Check freshness
from datetime import datetime, timedelta
try:
checked_at = datetime.fromisoformat(row["value"])
if datetime.now() - checked_at > timedelta(hours=1):
return -1
except (ValueError, TypeError):
return -1
cursor = await db.execute(
"SELECT value FROM app_settings WHERE key = 'incomplete_addresses_count'"
)
row = await cursor.fetchone()
return int(row["value"]) if row and row["value"] else 0
finally:
await db.close()
async def set_incomplete_addresses_count(count: int):
"""Cache incomplete addresses count in app_settings."""
db = await get_sqlite()
try:
await db.execute(
"INSERT OR REPLACE INTO app_settings (key, value) VALUES ('incomplete_addresses_count', ?)",
(str(count),)
)
await db.execute(
"INSERT OR REPLACE INTO app_settings (key, value) VALUES ('incomplete_addresses_checked_at', ?)",
(_now_str(),)
)
await db.commit()
finally:
await db.close()
# ── Partner Mismatch ──────────────────────────────
async def get_orders_partner_data_batch(order_numbers: list) -> dict:
"""Return {order_number: {cod_fiscal_gomag, denumire_roa, id_partener, factura_numar, id_comanda}}."""
if not order_numbers:
return {}
db = await get_sqlite()
try:
result = {}
for i in range(0, len(order_numbers), 500):
batch = order_numbers[i:i+500]
placeholders = ",".join("?" * len(batch))
cursor = await db.execute(
f"SELECT order_number, cod_fiscal_gomag, denumire_roa, id_partener, "
f"factura_numar, id_comanda FROM orders WHERE order_number IN ({placeholders})",
batch
)
for row in await cursor.fetchall():
result[row[0]] = {
"cod_fiscal_gomag": row[1],
"denumire_roa": row[2],
"id_partener": row[3],
"factura_numar": row[4],
"id_comanda": row[5],
}
return result
finally:
await db.close()
async def update_partner_mismatch_batch(updates: list) -> None:
"""Update partner_mismatch flag for a batch of orders.
Each item: {order_number, partner_mismatch: 0|1}
"""
if not updates:
return
db = await get_sqlite()
try:
await db.executemany(
"UPDATE orders SET partner_mismatch = ?, updated_at = datetime('now') WHERE order_number = ?",
[(u["partner_mismatch"], u["order_number"]) for u in updates]
)
await db.commit()
finally:
await db.close()
async def clear_stale_partner_mismatches_no_cui(exclude_numbers: set) -> int:
"""Clear partner_mismatch=1 for orders with cod_fiscal_gomag=NULL that are NOT in the
current sync batch. These were flagged by old code (before the no-CUI fix) and will
never self-correct because they fall outside the active sync window.
Returns number of rows cleared.
"""
db = await get_sqlite()
try:
if exclude_numbers:
placeholders = ",".join("?" * len(exclude_numbers))
sql = f"""
UPDATE orders SET partner_mismatch = 0, updated_at = datetime('now')
WHERE partner_mismatch = 1
AND cod_fiscal_gomag IS NULL
AND order_number NOT IN ({placeholders})
"""
await db.execute(sql, list(exclude_numbers))
else:
await db.execute("""
UPDATE orders SET partner_mismatch = 0, updated_at = datetime('now')
WHERE partner_mismatch = 1 AND cod_fiscal_gomag IS NULL
""")
await db.commit()
cursor = await db.execute("SELECT changes()")
row = await cursor.fetchone()
return row[0] if row else 0
finally:
await db.close()
async def update_partner_resync_data(order_number: str, data: dict) -> None:
"""Update partner fields + clear partner_mismatch after a successful resync."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
id_partener = ?,
cod_fiscal_gomag = ?,
cod_fiscal_roa = ?,
denumire_roa = ?,
partner_mismatch = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", (
data.get("id_partener"),
data.get("cod_fiscal_gomag"),
data.get("cod_fiscal_roa"),
data.get("denumire_roa"),
data.get("partner_mismatch", 0),
order_number,
))
await db.commit()
finally:
await db.close()