Files
gomag-vending/api/app/services/sqlite_service.py
Marius Mutu 5f8b9b6003 feat(dashboard): redesign UI with smart polling, unified sync card, filter bar
Replace SSE with smart polling (30s idle / 3s when running). Unify sync
panel into single two-row card with live progress text. Add unified filter
bar (period dropdown, status pills, search) with period-total counts.
Add Client/Cont tooltip for different shipping/billing persons. Add SKU
mappings pct_total badges + complete/incomplete filter + 409 duplicate
check. Add missing SKUs search + rescan progress UX. Migrate SQLite
orders schema (shipping_name, billing_name, payment_method,
delivery_method). Fix JSON_OUTPUT_DIR path for server running from
project root. Fix pagination controls showing top+bottom with per-page
selector (25/50/100/250).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 17:55:36 +02:00

591 lines
21 KiB
Python

import json
import logging
from datetime import datetime
from ..database import get_sqlite, get_sqlite_sync
logger = logging.getLogger(__name__)
async def create_sync_run(run_id: str, json_files: int = 0):
"""Create a new sync run record."""
db = await get_sqlite()
try:
await db.execute("""
INSERT INTO sync_runs (run_id, started_at, status, json_files)
VALUES (?, datetime('now'), 'running', ?)
""", (run_id, json_files))
await db.commit()
finally:
await db.close()
async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
imported: int = 0, skipped: int = 0, errors: int = 0,
error_message: str = None):
"""Update sync run with results."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE sync_runs SET
finished_at = datetime('now'),
status = ?,
total_orders = ?,
imported = ?,
skipped = ?,
errors = ?,
error_message = ?
WHERE run_id = ?
""", (status, total_orders, imported, skipped, errors, error_message, run_id))
await db.commit()
finally:
await db.close()
async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
customer_name: str, status: str, id_comanda: int = None,
id_partener: int = None, error_message: str = None,
missing_skus: list = None, items_count: int = 0,
shipping_name: str = None, billing_name: str = None,
payment_method: str = None, delivery_method: str = None):
"""Upsert a single order — one row per order_number, status updated in place."""
db = await get_sqlite()
try:
await db.execute("""
INSERT INTO orders
(order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message, missing_skus, items_count,
last_sync_run_id, shipping_name, billing_name,
payment_method, delivery_method)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_number) DO UPDATE SET
status = excluded.status,
error_message = excluded.error_message,
missing_skus = excluded.missing_skus,
items_count = excluded.items_count,
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
THEN orders.times_skipped + 1
ELSE orders.times_skipped END,
last_sync_run_id = excluded.last_sync_run_id,
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
updated_at = datetime('now')
""", (order_number, order_date, customer_name, status,
id_comanda, id_partener, error_message,
json.dumps(missing_skus) if missing_skus else None,
items_count, sync_run_id, shipping_name, billing_name,
payment_method, delivery_method))
await db.commit()
finally:
await db.close()
async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str):
"""Record that this run processed this order (junction table)."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
VALUES (?, ?, ?)
""", (sync_run_id, order_number, status_at_run))
await db.commit()
finally:
await db.close()
async def track_missing_sku(sku: str, product_name: str = "",
order_count: int = 0, order_numbers: str = None,
customers: str = None):
"""Track a missing SKU with order context."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR IGNORE INTO missing_skus (sku, product_name)
VALUES (?, ?)
""", (sku, product_name))
if order_count or order_numbers or customers:
await db.execute("""
UPDATE missing_skus SET
order_count = ?,
order_numbers = ?,
customers = ?
WHERE sku = ?
""", (order_count, order_numbers, customers, sku))
await db.commit()
finally:
await db.close()
async def resolve_missing_sku(sku: str):
"""Mark a missing SKU as resolved."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now')
WHERE sku = ?
""", (sku,))
await db.commit()
finally:
await db.close()
async def get_missing_skus_paginated(page: int = 1, per_page: int = 20,
resolved: int = 0, search: str = None):
"""Get paginated missing SKUs. resolved=-1 means show all.
Optional search filters by sku or product_name (LIKE)."""
db = await get_sqlite()
try:
offset = (page - 1) * per_page
# Build WHERE clause parts
where_parts = []
params_count = []
params_data = []
if resolved != -1:
where_parts.append("resolved = ?")
params_count.append(resolved)
params_data.append(resolved)
if search:
like = f"%{search}%"
where_parts.append("(LOWER(sku) LIKE LOWER(?) OR LOWER(COALESCE(product_name,'')) LIKE LOWER(?))")
params_count.extend([like, like])
params_data.extend([like, like])
where_clause = ("WHERE " + " AND ".join(where_parts)) if where_parts else ""
order_clause = (
"ORDER BY resolved ASC, order_count DESC, first_seen DESC"
if resolved == -1
else "ORDER BY order_count DESC, first_seen DESC"
)
cursor = await db.execute(
f"SELECT COUNT(*) FROM missing_skus {where_clause}",
params_count
)
total = (await cursor.fetchone())[0]
cursor = await db.execute(f"""
SELECT sku, product_name, first_seen, resolved, resolved_at,
order_count, order_numbers, customers
FROM missing_skus
{where_clause}
{order_clause}
LIMIT ? OFFSET ?
""", params_data + [per_page, offset])
rows = await cursor.fetchall()
return {
"missing_skus": [dict(row) for row in rows],
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0
}
finally:
await db.close()
async def get_sync_runs(page: int = 1, per_page: int = 20):
"""Get paginated sync run history."""
db = await get_sqlite()
try:
offset = (page - 1) * per_page
cursor = await db.execute("SELECT COUNT(*) FROM sync_runs")
total = (await cursor.fetchone())[0]
cursor = await db.execute("""
SELECT * FROM sync_runs
ORDER BY started_at DESC
LIMIT ? OFFSET ?
""", (per_page, offset))
rows = await cursor.fetchall()
return {
"runs": [dict(row) for row in rows],
"total": total,
"page": page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0
}
finally:
await db.close()
async def get_sync_run_detail(run_id: str):
"""Get details for a specific sync run including its orders via sync_run_orders."""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT * FROM sync_runs WHERE run_id = ?", (run_id,)
)
run = await cursor.fetchone()
if not run:
return None
cursor = await db.execute("""
SELECT o.* FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
WHERE sro.sync_run_id = ?
ORDER BY o.order_date
""", (run_id,))
orders = await cursor.fetchall()
return {
"run": dict(run),
"orders": [dict(o) for o in orders]
}
finally:
await db.close()
async def get_dashboard_stats():
"""Get stats for the dashboard."""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'IMPORTED'"
)
imported = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'SKIPPED'"
)
skipped = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM orders WHERE status = 'ERROR'"
)
errors = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(*) FROM missing_skus WHERE resolved = 0"
)
missing = (await cursor.fetchone())[0]
cursor = await db.execute("SELECT COUNT(DISTINCT sku) FROM missing_skus")
total_missing_skus = (await cursor.fetchone())[0]
cursor = await db.execute(
"SELECT COUNT(DISTINCT sku) FROM missing_skus WHERE resolved = 0"
)
unresolved_skus = (await cursor.fetchone())[0]
cursor = await db.execute("""
SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT 1
""")
last_run = await cursor.fetchone()
return {
"imported": imported,
"skipped": skipped,
"errors": errors,
"missing_skus": missing,
"total_tracked_skus": total_missing_skus,
"unresolved_skus": unresolved_skus,
"last_run": dict(last_run) if last_run else None
}
finally:
await db.close()
async def get_scheduler_config():
"""Get scheduler configuration from SQLite."""
db = await get_sqlite()
try:
cursor = await db.execute("SELECT key, value FROM scheduler_config")
rows = await cursor.fetchall()
return {row["key"]: row["value"] for row in rows}
finally:
await db.close()
async def set_scheduler_config(key: str, value: str):
"""Set a scheduler configuration value."""
db = await get_sqlite()
try:
await db.execute("""
INSERT OR REPLACE INTO scheduler_config (key, value)
VALUES (?, ?)
""", (key, value))
await db.commit()
finally:
await db.close()
# ── web_products ─────────────────────────────────
async def upsert_web_product(sku: str, product_name: str):
"""Insert or update a web product, incrementing order_count."""
db = await get_sqlite()
try:
await db.execute("""
INSERT INTO web_products (sku, product_name, order_count)
VALUES (?, ?, 1)
ON CONFLICT(sku) DO UPDATE SET
product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name),
last_seen = datetime('now'),
order_count = web_products.order_count + 1
""", (sku, product_name))
await db.commit()
finally:
await db.close()
async def get_web_product_name(sku: str) -> str:
"""Lookup product name by SKU."""
db = await get_sqlite()
try:
cursor = await db.execute(
"SELECT product_name FROM web_products WHERE sku = ?", (sku,)
)
row = await cursor.fetchone()
return row["product_name"] if row else ""
finally:
await db.close()
async def get_web_products_batch(skus: list) -> dict:
"""Batch lookup product names by SKU list. Returns {sku: product_name}."""
if not skus:
return {}
db = await get_sqlite()
try:
placeholders = ",".join("?" for _ in skus)
cursor = await db.execute(
f"SELECT sku, product_name FROM web_products WHERE sku IN ({placeholders})",
list(skus)
)
rows = await cursor.fetchall()
return {row["sku"]: row["product_name"] for row in rows}
finally:
await db.close()
# ── order_items ──────────────────────────────────
async def add_order_items(order_number: str, items: list):
"""Bulk insert order items. Uses INSERT OR IGNORE — PK is (order_number, sku)."""
if not items:
return
db = await get_sqlite()
try:
await db.executemany("""
INSERT OR IGNORE INTO order_items
(order_number, sku, product_name, quantity, price, vat,
mapping_status, codmat, id_articol, cantitate_roa)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
(order_number,
item.get("sku"), item.get("product_name"),
item.get("quantity"), item.get("price"), item.get("vat"),
item.get("mapping_status"), item.get("codmat"),
item.get("id_articol"), item.get("cantitate_roa"))
for item in items
])
await db.commit()
finally:
await db.close()
async def get_order_items(order_number: str) -> list:
"""Fetch items for one order."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT * FROM order_items
WHERE order_number = ?
ORDER BY sku
""", (order_number,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
finally:
await db.close()
async def get_order_detail(order_number: str) -> dict:
"""Get full order detail: order metadata + items."""
db = await get_sqlite()
try:
cursor = await db.execute("""
SELECT * FROM orders WHERE order_number = ?
""", (order_number,))
order = await cursor.fetchone()
if not order:
return None
cursor = await db.execute("""
SELECT * FROM order_items WHERE order_number = ?
ORDER BY sku
""", (order_number,))
items = await cursor.fetchall()
return {
"order": dict(order),
"items": [dict(i) for i in items]
}
finally:
await db.close()
async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
page: int = 1, per_page: int = 50,
sort_by: str = "order_date", sort_dir: str = "asc"):
"""Get paginated orders for a run via sync_run_orders junction table."""
db = await get_sqlite()
try:
where = "WHERE sro.sync_run_id = ?"
params = [run_id]
if status_filter and status_filter != "all":
where += " AND UPPER(o.status) = ?"
params.append(status_filter.upper())
allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
"status", "first_seen_at", "updated_at"}
if sort_by not in allowed_sort:
sort_by = "order_date"
if sort_dir.lower() not in ("asc", "desc"):
sort_dir = "asc"
cursor = await db.execute(
f"SELECT COUNT(*) FROM orders o INNER JOIN sync_run_orders sro "
f"ON sro.order_number = o.order_number {where}", params
)
total = (await cursor.fetchone())[0]
offset = (page - 1) * per_page
cursor = await db.execute(f"""
SELECT o.* FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
{where}
ORDER BY o.{sort_by} {sort_dir}
LIMIT ? OFFSET ?
""", params + [per_page, offset])
rows = await cursor.fetchall()
cursor = await db.execute("""
SELECT o.status, COUNT(*) as cnt
FROM orders o
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
WHERE sro.sync_run_id = ?
GROUP BY o.status
""", (run_id,))
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
return {
"orders": [dict(r) for r in rows],
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": {
"imported": status_counts.get("IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0),
"total": sum(status_counts.values())
}
}
finally:
await db.close()
async def get_orders(page: int = 1, per_page: int = 50,
search: str = "", status_filter: str = "all",
sort_by: str = "order_date", sort_dir: str = "desc",
period_days: int = 7,
period_start: str = "", period_end: str = ""):
"""Get orders with filters, sorting, and period.
period_days=0 with period_start/period_end uses custom date range.
period_days=0 without dates means all time.
"""
db = await get_sqlite()
try:
where_clauses = []
params = []
if period_days and period_days > 0:
where_clauses.append("order_date >= date('now', ?)")
params.append(f"-{period_days} days")
elif period_days == 0 and period_start and period_end:
where_clauses.append("order_date BETWEEN ? AND ?")
params.extend([period_start, period_end])
if search:
where_clauses.append("(order_number LIKE ? OR customer_name LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
if status_filter and status_filter not in ("all", "UNINVOICED"):
where_clauses.append("UPPER(status) = ?")
params.append(status_filter.upper())
where = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""
allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
"status", "first_seen_at", "updated_at"}
if sort_by not in allowed_sort:
sort_by = "order_date"
if sort_dir.lower() not in ("asc", "desc"):
sort_dir = "desc"
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {where}", params)
total = (await cursor.fetchone())[0]
offset = (page - 1) * per_page
cursor = await db.execute(f"""
SELECT * FROM orders
{where}
ORDER BY {sort_by} {sort_dir}
LIMIT ? OFFSET ?
""", params + [per_page, offset])
rows = await cursor.fetchall()
# Counts by status (on full period, not just this page)
cursor = await db.execute(f"""
SELECT status, COUNT(*) as cnt FROM orders
{where}
GROUP BY status
""", params)
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
return {
"orders": [dict(r) for r in rows],
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": {
"imported": status_counts.get("IMPORTED", 0),
"skipped": status_counts.get("SKIPPED", 0),
"error": status_counts.get("ERROR", 0),
"total": sum(status_counts.values())
}
}
finally:
await db.close()
async def update_import_order_addresses(order_number: str,
id_adresa_facturare: int = None,
id_adresa_livrare: int = None):
"""Update ROA address IDs on an order record."""
db = await get_sqlite()
try:
await db.execute("""
UPDATE orders SET
id_adresa_facturare = ?,
id_adresa_livrare = ?,
updated_at = datetime('now')
WHERE order_number = ?
""", (id_adresa_facturare, id_adresa_livrare, order_number))
await db.commit()
finally:
await db.close()