Previously, orders deleted from Oracle (sters=1) remained as IMPORTED in SQLite, and deleted invoices kept stale cache data. Now the refresh button and sync cycle re-verify all imported orders against Oracle: - Deleted orders → marked DELETED_IN_ROA with cleared id_comanda - Deleted invoices → invoice cache fields cleared - New status badge for DELETED_IN_ROA in dashboard and logs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
885 lines
33 KiB
Python
885 lines
33 KiB
Python
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from ..database import get_sqlite, get_sqlite_sync
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def create_sync_run(run_id: str, json_files: int = 0):
|
|
"""Create a new sync run record."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT INTO sync_runs (run_id, started_at, status, json_files)
|
|
VALUES (?, datetime('now'), 'running', ?)
|
|
""", (run_id, json_files))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def update_sync_run(run_id: str, status: str, total_orders: int = 0,
|
|
imported: int = 0, skipped: int = 0, errors: int = 0,
|
|
error_message: str = None,
|
|
already_imported: int = 0, new_imported: int = 0):
|
|
"""Update sync run with results."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE sync_runs SET
|
|
finished_at = datetime('now'),
|
|
status = ?,
|
|
total_orders = ?,
|
|
imported = ?,
|
|
skipped = ?,
|
|
errors = ?,
|
|
error_message = ?,
|
|
already_imported = ?,
|
|
new_imported = ?
|
|
WHERE run_id = ?
|
|
""", (status, total_orders, imported, skipped, errors, error_message,
|
|
already_imported, new_imported, run_id))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def upsert_order(sync_run_id: str, order_number: str, order_date: str,
|
|
customer_name: str, status: str, id_comanda: int = None,
|
|
id_partener: int = None, error_message: str = None,
|
|
missing_skus: list = None, items_count: int = 0,
|
|
shipping_name: str = None, billing_name: str = None,
|
|
payment_method: str = None, delivery_method: str = None,
|
|
order_total: float = None,
|
|
delivery_cost: float = None, discount_total: float = None):
|
|
"""Upsert a single order — one row per order_number, status updated in place."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT INTO orders
|
|
(order_number, order_date, customer_name, status,
|
|
id_comanda, id_partener, error_message, missing_skus, items_count,
|
|
last_sync_run_id, shipping_name, billing_name,
|
|
payment_method, delivery_method, order_total,
|
|
delivery_cost, discount_total)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(order_number) DO UPDATE SET
|
|
status = CASE
|
|
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
|
|
THEN orders.status
|
|
ELSE excluded.status
|
|
END,
|
|
error_message = excluded.error_message,
|
|
missing_skus = excluded.missing_skus,
|
|
items_count = excluded.items_count,
|
|
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
|
|
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
|
|
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
|
|
THEN orders.times_skipped + 1
|
|
ELSE orders.times_skipped END,
|
|
last_sync_run_id = excluded.last_sync_run_id,
|
|
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
|
|
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
|
|
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
|
|
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
|
|
order_total = COALESCE(excluded.order_total, orders.order_total),
|
|
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
|
|
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
|
|
updated_at = datetime('now')
|
|
""", (order_number, order_date, customer_name, status,
|
|
id_comanda, id_partener, error_message,
|
|
json.dumps(missing_skus) if missing_skus else None,
|
|
items_count, sync_run_id, shipping_name, billing_name,
|
|
payment_method, delivery_method, order_total,
|
|
delivery_cost, discount_total))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str):
|
|
"""Record that this run processed this order (junction table)."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
|
|
VALUES (?, ?, ?)
|
|
""", (sync_run_id, order_number, status_at_run))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def save_orders_batch(orders_data: list[dict]):
|
|
"""Batch save a list of orders + their sync_run_orders + order_items in one transaction.
|
|
|
|
Each dict must have: sync_run_id, order_number, order_date, customer_name, status,
|
|
id_comanda, id_partener, error_message, missing_skus (list|None), items_count,
|
|
shipping_name, billing_name, payment_method, delivery_method, status_at_run,
|
|
items (list of item dicts), delivery_cost (optional), discount_total (optional).
|
|
"""
|
|
if not orders_data:
|
|
return
|
|
db = await get_sqlite()
|
|
try:
|
|
# 1. Upsert orders
|
|
await db.executemany("""
|
|
INSERT INTO orders
|
|
(order_number, order_date, customer_name, status,
|
|
id_comanda, id_partener, error_message, missing_skus, items_count,
|
|
last_sync_run_id, shipping_name, billing_name,
|
|
payment_method, delivery_method, order_total,
|
|
delivery_cost, discount_total)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(order_number) DO UPDATE SET
|
|
status = CASE
|
|
WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED'
|
|
THEN orders.status
|
|
ELSE excluded.status
|
|
END,
|
|
error_message = excluded.error_message,
|
|
missing_skus = excluded.missing_skus,
|
|
items_count = excluded.items_count,
|
|
id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda),
|
|
id_partener = COALESCE(excluded.id_partener, orders.id_partener),
|
|
times_skipped = CASE WHEN excluded.status = 'SKIPPED'
|
|
THEN orders.times_skipped + 1
|
|
ELSE orders.times_skipped END,
|
|
last_sync_run_id = excluded.last_sync_run_id,
|
|
shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name),
|
|
billing_name = COALESCE(excluded.billing_name, orders.billing_name),
|
|
payment_method = COALESCE(excluded.payment_method, orders.payment_method),
|
|
delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method),
|
|
order_total = COALESCE(excluded.order_total, orders.order_total),
|
|
delivery_cost = COALESCE(excluded.delivery_cost, orders.delivery_cost),
|
|
discount_total = COALESCE(excluded.discount_total, orders.discount_total),
|
|
updated_at = datetime('now')
|
|
""", [
|
|
(d["order_number"], d["order_date"], d["customer_name"], d["status"],
|
|
d.get("id_comanda"), d.get("id_partener"), d.get("error_message"),
|
|
json.dumps(d["missing_skus"]) if d.get("missing_skus") else None,
|
|
d.get("items_count", 0), d["sync_run_id"],
|
|
d.get("shipping_name"), d.get("billing_name"),
|
|
d.get("payment_method"), d.get("delivery_method"),
|
|
d.get("order_total"),
|
|
d.get("delivery_cost"), d.get("discount_total"))
|
|
for d in orders_data
|
|
])
|
|
|
|
# 2. Sync run orders
|
|
await db.executemany("""
|
|
INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run)
|
|
VALUES (?, ?, ?)
|
|
""", [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data])
|
|
|
|
# 3. Order items
|
|
all_items = []
|
|
for d in orders_data:
|
|
for item in d.get("items", []):
|
|
all_items.append((
|
|
d["order_number"],
|
|
item.get("sku"), item.get("product_name"),
|
|
item.get("quantity"), item.get("price"), item.get("vat"),
|
|
item.get("mapping_status"), item.get("codmat"),
|
|
item.get("id_articol"), item.get("cantitate_roa")
|
|
))
|
|
if all_items:
|
|
await db.executemany("""
|
|
INSERT OR IGNORE INTO order_items
|
|
(order_number, sku, product_name, quantity, price, vat,
|
|
mapping_status, codmat, id_articol, cantitate_roa)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", all_items)
|
|
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def track_missing_sku(sku: str, product_name: str = "",
|
|
order_count: int = 0, order_numbers: str = None,
|
|
customers: str = None):
|
|
"""Track a missing SKU with order context."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT OR IGNORE INTO missing_skus (sku, product_name)
|
|
VALUES (?, ?)
|
|
""", (sku, product_name))
|
|
if order_count or order_numbers or customers:
|
|
await db.execute("""
|
|
UPDATE missing_skus SET
|
|
order_count = ?,
|
|
order_numbers = ?,
|
|
customers = ?
|
|
WHERE sku = ?
|
|
""", (order_count, order_numbers, customers, sku))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def resolve_missing_sku(sku: str):
|
|
"""Mark a missing SKU as resolved."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now')
|
|
WHERE sku = ?
|
|
""", (sku,))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_missing_skus_paginated(page: int = 1, per_page: int = 20,
|
|
resolved: int = 0, search: str = None):
|
|
"""Get paginated missing SKUs. resolved=-1 means show all.
|
|
Optional search filters by sku or product_name (LIKE)."""
|
|
db = await get_sqlite()
|
|
try:
|
|
offset = (page - 1) * per_page
|
|
|
|
# Build WHERE clause parts
|
|
where_parts = []
|
|
params_count = []
|
|
params_data = []
|
|
|
|
if resolved != -1:
|
|
where_parts.append("resolved = ?")
|
|
params_count.append(resolved)
|
|
params_data.append(resolved)
|
|
|
|
if search:
|
|
like = f"%{search}%"
|
|
where_parts.append("(LOWER(sku) LIKE LOWER(?) OR LOWER(COALESCE(product_name,'')) LIKE LOWER(?))")
|
|
params_count.extend([like, like])
|
|
params_data.extend([like, like])
|
|
|
|
where_clause = ("WHERE " + " AND ".join(where_parts)) if where_parts else ""
|
|
|
|
order_clause = (
|
|
"ORDER BY resolved ASC, order_count DESC, first_seen DESC"
|
|
if resolved == -1
|
|
else "ORDER BY order_count DESC, first_seen DESC"
|
|
)
|
|
|
|
cursor = await db.execute(
|
|
f"SELECT COUNT(*) FROM missing_skus {where_clause}",
|
|
params_count
|
|
)
|
|
total = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute(f"""
|
|
SELECT sku, product_name, first_seen, resolved, resolved_at,
|
|
order_count, order_numbers, customers
|
|
FROM missing_skus
|
|
{where_clause}
|
|
{order_clause}
|
|
LIMIT ? OFFSET ?
|
|
""", params_data + [per_page, offset])
|
|
|
|
rows = await cursor.fetchall()
|
|
|
|
return {
|
|
"missing_skus": [dict(row) for row in rows],
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"pages": (total + per_page - 1) // per_page if total > 0 else 0
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_sync_runs(page: int = 1, per_page: int = 20):
|
|
"""Get paginated sync run history."""
|
|
db = await get_sqlite()
|
|
try:
|
|
offset = (page - 1) * per_page
|
|
|
|
cursor = await db.execute("SELECT COUNT(*) FROM sync_runs")
|
|
total = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute("""
|
|
SELECT * FROM sync_runs
|
|
ORDER BY started_at DESC
|
|
LIMIT ? OFFSET ?
|
|
""", (per_page, offset))
|
|
rows = await cursor.fetchall()
|
|
|
|
return {
|
|
"runs": [dict(row) for row in rows],
|
|
"total": total,
|
|
"page": page,
|
|
"pages": (total + per_page - 1) // per_page if total > 0 else 0
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_sync_run_detail(run_id: str):
|
|
"""Get details for a specific sync run including its orders via sync_run_orders."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT * FROM sync_runs WHERE run_id = ?", (run_id,)
|
|
)
|
|
run = await cursor.fetchone()
|
|
if not run:
|
|
return None
|
|
|
|
cursor = await db.execute("""
|
|
SELECT o.* FROM orders o
|
|
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
|
|
WHERE sro.sync_run_id = ?
|
|
ORDER BY o.order_date
|
|
""", (run_id,))
|
|
orders = await cursor.fetchall()
|
|
|
|
return {
|
|
"run": dict(run),
|
|
"orders": [dict(o) for o in orders]
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_dashboard_stats():
|
|
"""Get stats for the dashboard."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) FROM orders WHERE status = 'IMPORTED'"
|
|
)
|
|
imported = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) FROM orders WHERE status = 'SKIPPED'"
|
|
)
|
|
skipped = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) FROM orders WHERE status = 'ERROR'"
|
|
)
|
|
errors = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) FROM missing_skus WHERE resolved = 0"
|
|
)
|
|
missing = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute("SELECT COUNT(DISTINCT sku) FROM missing_skus")
|
|
total_missing_skus = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(DISTINCT sku) FROM missing_skus WHERE resolved = 0"
|
|
)
|
|
unresolved_skus = (await cursor.fetchone())[0]
|
|
|
|
cursor = await db.execute("""
|
|
SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT 1
|
|
""")
|
|
last_run = await cursor.fetchone()
|
|
|
|
return {
|
|
"imported": imported,
|
|
"skipped": skipped,
|
|
"errors": errors,
|
|
"missing_skus": missing,
|
|
"total_tracked_skus": total_missing_skus,
|
|
"unresolved_skus": unresolved_skus,
|
|
"last_run": dict(last_run) if last_run else None
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_scheduler_config():
|
|
"""Get scheduler configuration from SQLite."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("SELECT key, value FROM scheduler_config")
|
|
rows = await cursor.fetchall()
|
|
return {row["key"]: row["value"] for row in rows}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def set_scheduler_config(key: str, value: str):
|
|
"""Set a scheduler configuration value."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT OR REPLACE INTO scheduler_config (key, value)
|
|
VALUES (?, ?)
|
|
""", (key, value))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
# ── web_products ─────────────────────────────────
|
|
|
|
async def upsert_web_product(sku: str, product_name: str):
|
|
"""Insert or update a web product, incrementing order_count."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT INTO web_products (sku, product_name, order_count)
|
|
VALUES (?, ?, 1)
|
|
ON CONFLICT(sku) DO UPDATE SET
|
|
product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name),
|
|
last_seen = datetime('now'),
|
|
order_count = web_products.order_count + 1
|
|
""", (sku, product_name))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def upsert_web_products_batch(items: list[tuple[str, str]]):
|
|
"""Batch upsert web products in a single transaction. items: list of (sku, product_name)."""
|
|
if not items:
|
|
return
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.executemany("""
|
|
INSERT INTO web_products (sku, product_name, order_count)
|
|
VALUES (?, ?, 1)
|
|
ON CONFLICT(sku) DO UPDATE SET
|
|
product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name),
|
|
last_seen = datetime('now'),
|
|
order_count = web_products.order_count + 1
|
|
""", items)
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_web_product_name(sku: str) -> str:
|
|
"""Lookup product name by SKU."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute(
|
|
"SELECT product_name FROM web_products WHERE sku = ?", (sku,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
return row["product_name"] if row else ""
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_web_products_batch(skus: list) -> dict:
|
|
"""Batch lookup product names by SKU list. Returns {sku: product_name}."""
|
|
if not skus:
|
|
return {}
|
|
db = await get_sqlite()
|
|
try:
|
|
placeholders = ",".join("?" for _ in skus)
|
|
cursor = await db.execute(
|
|
f"SELECT sku, product_name FROM web_products WHERE sku IN ({placeholders})",
|
|
list(skus)
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return {row["sku"]: row["product_name"] for row in rows}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
# ── order_items ──────────────────────────────────
|
|
|
|
async def add_order_items(order_number: str, items: list):
|
|
"""Bulk insert order items. Uses INSERT OR IGNORE — PK is (order_number, sku)."""
|
|
if not items:
|
|
return
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.executemany("""
|
|
INSERT OR IGNORE INTO order_items
|
|
(order_number, sku, product_name, quantity, price, vat,
|
|
mapping_status, codmat, id_articol, cantitate_roa)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", [
|
|
(order_number,
|
|
item.get("sku"), item.get("product_name"),
|
|
item.get("quantity"), item.get("price"), item.get("vat"),
|
|
item.get("mapping_status"), item.get("codmat"),
|
|
item.get("id_articol"), item.get("cantitate_roa"))
|
|
for item in items
|
|
])
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_order_items(order_number: str) -> list:
|
|
"""Fetch items for one order."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("""
|
|
SELECT * FROM order_items
|
|
WHERE order_number = ?
|
|
ORDER BY sku
|
|
""", (order_number,))
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_order_detail(order_number: str) -> dict:
|
|
"""Get full order detail: order metadata + items."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("""
|
|
SELECT * FROM orders WHERE order_number = ?
|
|
""", (order_number,))
|
|
order = await cursor.fetchone()
|
|
if not order:
|
|
return None
|
|
|
|
cursor = await db.execute("""
|
|
SELECT * FROM order_items WHERE order_number = ?
|
|
ORDER BY sku
|
|
""", (order_number,))
|
|
items = await cursor.fetchall()
|
|
|
|
return {
|
|
"order": dict(order),
|
|
"items": [dict(i) for i in items]
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_run_orders_filtered(run_id: str, status_filter: str = "all",
|
|
page: int = 1, per_page: int = 50,
|
|
sort_by: str = "order_date", sort_dir: str = "asc"):
|
|
"""Get paginated orders for a run via sync_run_orders junction table."""
|
|
db = await get_sqlite()
|
|
try:
|
|
where = "WHERE sro.sync_run_id = ?"
|
|
params = [run_id]
|
|
|
|
if status_filter and status_filter != "all":
|
|
where += " AND UPPER(sro.status_at_run) = ?"
|
|
params.append(status_filter.upper())
|
|
|
|
allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
|
|
"status", "first_seen_at", "updated_at"}
|
|
if sort_by not in allowed_sort:
|
|
sort_by = "order_date"
|
|
if sort_dir.lower() not in ("asc", "desc"):
|
|
sort_dir = "asc"
|
|
|
|
cursor = await db.execute(
|
|
f"SELECT COUNT(*) FROM orders o INNER JOIN sync_run_orders sro "
|
|
f"ON sro.order_number = o.order_number {where}", params
|
|
)
|
|
total = (await cursor.fetchone())[0]
|
|
|
|
offset = (page - 1) * per_page
|
|
cursor = await db.execute(f"""
|
|
SELECT o.*, sro.status_at_run AS run_status FROM orders o
|
|
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
|
|
{where}
|
|
ORDER BY o.{sort_by} {sort_dir}
|
|
LIMIT ? OFFSET ?
|
|
""", params + [per_page, offset])
|
|
rows = await cursor.fetchall()
|
|
|
|
cursor = await db.execute("""
|
|
SELECT sro.status_at_run AS status, COUNT(*) as cnt
|
|
FROM orders o
|
|
INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number
|
|
WHERE sro.sync_run_id = ?
|
|
GROUP BY sro.status_at_run
|
|
""", (run_id,))
|
|
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
|
|
|
|
# Use run_status (status_at_run) as the status field for each order row
|
|
order_rows = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["status"] = d.pop("run_status", d.get("status"))
|
|
order_rows.append(d)
|
|
|
|
return {
|
|
"orders": order_rows,
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
|
|
"counts": {
|
|
"imported": status_counts.get("IMPORTED", 0),
|
|
"skipped": status_counts.get("SKIPPED", 0),
|
|
"error": status_counts.get("ERROR", 0),
|
|
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
|
|
"total": sum(status_counts.values())
|
|
}
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_orders(page: int = 1, per_page: int = 50,
|
|
search: str = "", status_filter: str = "all",
|
|
sort_by: str = "order_date", sort_dir: str = "desc",
|
|
period_days: int = 7,
|
|
period_start: str = "", period_end: str = ""):
|
|
"""Get orders with filters, sorting, and period.
|
|
|
|
period_days=0 with period_start/period_end uses custom date range.
|
|
period_days=0 without dates means all time.
|
|
"""
|
|
db = await get_sqlite()
|
|
try:
|
|
# Period + search clauses (used for counts — never include status filter)
|
|
base_clauses = []
|
|
base_params = []
|
|
|
|
if period_days and period_days > 0:
|
|
base_clauses.append("order_date >= date('now', ?)")
|
|
base_params.append(f"-{period_days} days")
|
|
elif period_days == 0 and period_start and period_end:
|
|
base_clauses.append("order_date BETWEEN ? AND ?")
|
|
base_params.extend([period_start, period_end])
|
|
|
|
if search:
|
|
base_clauses.append("(order_number LIKE ? OR customer_name LIKE ?)")
|
|
base_params.extend([f"%{search}%", f"%{search}%"])
|
|
|
|
# Data query adds status filter on top of base filters
|
|
data_clauses = list(base_clauses)
|
|
data_params = list(base_params)
|
|
|
|
if status_filter and status_filter not in ("all", "UNINVOICED"):
|
|
if status_filter.upper() == "IMPORTED":
|
|
data_clauses.append("UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')")
|
|
else:
|
|
data_clauses.append("UPPER(status) = ?")
|
|
data_params.append(status_filter.upper())
|
|
|
|
where = ("WHERE " + " AND ".join(data_clauses)) if data_clauses else ""
|
|
counts_where = ("WHERE " + " AND ".join(base_clauses)) if base_clauses else ""
|
|
|
|
allowed_sort = {"order_date", "order_number", "customer_name", "items_count",
|
|
"status", "first_seen_at", "updated_at"}
|
|
if sort_by not in allowed_sort:
|
|
sort_by = "order_date"
|
|
if sort_dir.lower() not in ("asc", "desc"):
|
|
sort_dir = "desc"
|
|
|
|
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {where}", data_params)
|
|
total = (await cursor.fetchone())[0]
|
|
|
|
offset = (page - 1) * per_page
|
|
cursor = await db.execute(f"""
|
|
SELECT * FROM orders
|
|
{where}
|
|
ORDER BY {sort_by} {sort_dir}
|
|
LIMIT ? OFFSET ?
|
|
""", data_params + [per_page, offset])
|
|
rows = await cursor.fetchall()
|
|
|
|
# Counts by status — always on full period+search, never filtered by status
|
|
cursor = await db.execute(f"""
|
|
SELECT status, COUNT(*) as cnt FROM orders
|
|
{counts_where}
|
|
GROUP BY status
|
|
""", base_params)
|
|
status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()}
|
|
|
|
# Uninvoiced count: IMPORTED/ALREADY_IMPORTED with no cached invoice, same period+search
|
|
uninv_clauses = list(base_clauses) + [
|
|
"UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')",
|
|
"(factura_numar IS NULL OR factura_numar = '')",
|
|
]
|
|
uninv_where = "WHERE " + " AND ".join(uninv_clauses)
|
|
cursor = await db.execute(f"SELECT COUNT(*) FROM orders {uninv_where}", base_params)
|
|
uninvoiced_sqlite = (await cursor.fetchone())[0]
|
|
|
|
return {
|
|
"orders": [dict(r) for r in rows],
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
|
|
"counts": {
|
|
"imported": status_counts.get("IMPORTED", 0),
|
|
"already_imported": status_counts.get("ALREADY_IMPORTED", 0),
|
|
"imported_all": status_counts.get("IMPORTED", 0) + status_counts.get("ALREADY_IMPORTED", 0),
|
|
"skipped": status_counts.get("SKIPPED", 0),
|
|
"error": status_counts.get("ERROR", 0),
|
|
"total": sum(status_counts.values()),
|
|
"uninvoiced_sqlite": uninvoiced_sqlite,
|
|
}
|
|
}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def update_import_order_addresses(order_number: str,
|
|
id_adresa_facturare: int = None,
|
|
id_adresa_livrare: int = None):
|
|
"""Update ROA address IDs on an order record."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE orders SET
|
|
id_adresa_facturare = ?,
|
|
id_adresa_livrare = ?,
|
|
updated_at = datetime('now')
|
|
WHERE order_number = ?
|
|
""", (id_adresa_facturare, id_adresa_livrare, order_number))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
# ── Invoice cache ────────────────────────────────
|
|
|
|
async def get_uninvoiced_imported_orders() -> list:
|
|
"""Get all imported orders that don't yet have invoice data cached."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("""
|
|
SELECT order_number, id_comanda FROM orders
|
|
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
|
|
AND id_comanda IS NOT NULL
|
|
AND factura_numar IS NULL
|
|
""")
|
|
rows = await cursor.fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def update_order_invoice(order_number: str, serie: str = None,
|
|
numar: str = None, total_fara_tva: float = None,
|
|
total_tva: float = None, total_cu_tva: float = None,
|
|
data_act: str = None):
|
|
"""Cache invoice data from Oracle onto the order record."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE orders SET
|
|
factura_serie = ?,
|
|
factura_numar = ?,
|
|
factura_total_fara_tva = ?,
|
|
factura_total_tva = ?,
|
|
factura_total_cu_tva = ?,
|
|
factura_data = ?,
|
|
invoice_checked_at = datetime('now'),
|
|
updated_at = datetime('now')
|
|
WHERE order_number = ?
|
|
""", (serie, numar, total_fara_tva, total_tva, total_cu_tva, data_act, order_number))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_invoiced_imported_orders() -> list:
|
|
"""Get imported orders that HAVE cached invoice data (for re-verification)."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("""
|
|
SELECT order_number, id_comanda FROM orders
|
|
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
|
|
AND id_comanda IS NOT NULL
|
|
AND factura_numar IS NOT NULL AND factura_numar != ''
|
|
""")
|
|
rows = await cursor.fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def get_all_imported_orders() -> list:
|
|
"""Get ALL imported orders with id_comanda (for checking if deleted in ROA)."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("""
|
|
SELECT order_number, id_comanda FROM orders
|
|
WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED')
|
|
AND id_comanda IS NOT NULL
|
|
""")
|
|
rows = await cursor.fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def clear_order_invoice(order_number: str):
|
|
"""Clear cached invoice data when invoice was deleted in ROA."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE orders SET
|
|
factura_serie = NULL,
|
|
factura_numar = NULL,
|
|
factura_total_fara_tva = NULL,
|
|
factura_total_tva = NULL,
|
|
factura_total_cu_tva = NULL,
|
|
factura_data = NULL,
|
|
invoice_checked_at = datetime('now'),
|
|
updated_at = datetime('now')
|
|
WHERE order_number = ?
|
|
""", (order_number,))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def mark_order_deleted_in_roa(order_number: str):
|
|
"""Mark an order as deleted in ROA — clears id_comanda and invoice cache."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
UPDATE orders SET
|
|
status = 'DELETED_IN_ROA',
|
|
id_comanda = NULL,
|
|
id_partener = NULL,
|
|
factura_serie = NULL,
|
|
factura_numar = NULL,
|
|
factura_total_fara_tva = NULL,
|
|
factura_total_tva = NULL,
|
|
factura_total_cu_tva = NULL,
|
|
factura_data = NULL,
|
|
invoice_checked_at = NULL,
|
|
error_message = 'Comanda stearsa din ROA',
|
|
updated_at = datetime('now')
|
|
WHERE order_number = ?
|
|
""", (order_number,))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
# ── App Settings ─────────────────────────────────
|
|
|
|
async def get_app_settings() -> dict:
|
|
"""Get all app settings as a dict."""
|
|
db = await get_sqlite()
|
|
try:
|
|
cursor = await db.execute("SELECT key, value FROM app_settings")
|
|
rows = await cursor.fetchall()
|
|
return {row["key"]: row["value"] for row in rows}
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def set_app_setting(key: str, value: str):
|
|
"""Set a single app setting value."""
|
|
db = await get_sqlite()
|
|
try:
|
|
await db.execute("""
|
|
INSERT OR REPLACE INTO app_settings (key, value)
|
|
VALUES (?, ?)
|
|
""", (key, value))
|
|
await db.commit()
|
|
finally:
|
|
await db.close()
|