import json import logging from datetime import datetime from ..database import get_sqlite, get_sqlite_sync logger = logging.getLogger(__name__) async def create_sync_run(run_id: str, json_files: int = 0): """Create a new sync run record.""" db = await get_sqlite() try: await db.execute(""" INSERT INTO sync_runs (run_id, started_at, status, json_files) VALUES (?, datetime('now'), 'running', ?) """, (run_id, json_files)) await db.commit() finally: await db.close() async def update_sync_run(run_id: str, status: str, total_orders: int = 0, imported: int = 0, skipped: int = 0, errors: int = 0, error_message: str = None, already_imported: int = 0, new_imported: int = 0): """Update sync run with results.""" db = await get_sqlite() try: await db.execute(""" UPDATE sync_runs SET finished_at = datetime('now'), status = ?, total_orders = ?, imported = ?, skipped = ?, errors = ?, error_message = ?, already_imported = ?, new_imported = ? WHERE run_id = ? """, (status, total_orders, imported, skipped, errors, error_message, already_imported, new_imported, run_id)) await db.commit() finally: await db.close() async def upsert_order(sync_run_id: str, order_number: str, order_date: str, customer_name: str, status: str, id_comanda: int = None, id_partener: int = None, error_message: str = None, missing_skus: list = None, items_count: int = 0, shipping_name: str = None, billing_name: str = None, payment_method: str = None, delivery_method: str = None): """Upsert a single order — one row per order_number, status updated in place.""" db = await get_sqlite() try: await db.execute(""" INSERT INTO orders (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, missing_skus, items_count, last_sync_run_id, shipping_name, billing_name, payment_method, delivery_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(order_number) DO UPDATE SET status = CASE WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED' THEN orders.status ELSE excluded.status END, error_message = excluded.error_message, missing_skus = excluded.missing_skus, items_count = excluded.items_count, id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), id_partener = COALESCE(excluded.id_partener, orders.id_partener), times_skipped = CASE WHEN excluded.status = 'SKIPPED' THEN orders.times_skipped + 1 ELSE orders.times_skipped END, last_sync_run_id = excluded.last_sync_run_id, shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), billing_name = COALESCE(excluded.billing_name, orders.billing_name), payment_method = COALESCE(excluded.payment_method, orders.payment_method), delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), updated_at = datetime('now') """, (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, json.dumps(missing_skus) if missing_skus else None, items_count, sync_run_id, shipping_name, billing_name, payment_method, delivery_method)) await db.commit() finally: await db.close() async def add_sync_run_order(sync_run_id: str, order_number: str, status_at_run: str): """Record that this run processed this order (junction table).""" db = await get_sqlite() try: await db.execute(""" INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?) """, (sync_run_id, order_number, status_at_run)) await db.commit() finally: await db.close() async def save_orders_batch(orders_data: list[dict]): """Batch save a list of orders + their sync_run_orders + order_items in one transaction. Each dict must have: sync_run_id, order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, missing_skus (list|None), items_count, shipping_name, billing_name, payment_method, delivery_method, status_at_run, items (list of item dicts). """ if not orders_data: return db = await get_sqlite() try: # 1. Upsert orders await db.executemany(""" INSERT INTO orders (order_number, order_date, customer_name, status, id_comanda, id_partener, error_message, missing_skus, items_count, last_sync_run_id, shipping_name, billing_name, payment_method, delivery_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(order_number) DO UPDATE SET status = CASE WHEN orders.status = 'IMPORTED' AND excluded.status = 'ALREADY_IMPORTED' THEN orders.status ELSE excluded.status END, error_message = excluded.error_message, missing_skus = excluded.missing_skus, items_count = excluded.items_count, id_comanda = COALESCE(excluded.id_comanda, orders.id_comanda), id_partener = COALESCE(excluded.id_partener, orders.id_partener), times_skipped = CASE WHEN excluded.status = 'SKIPPED' THEN orders.times_skipped + 1 ELSE orders.times_skipped END, last_sync_run_id = excluded.last_sync_run_id, shipping_name = COALESCE(excluded.shipping_name, orders.shipping_name), billing_name = COALESCE(excluded.billing_name, orders.billing_name), payment_method = COALESCE(excluded.payment_method, orders.payment_method), delivery_method = COALESCE(excluded.delivery_method, orders.delivery_method), updated_at = datetime('now') """, [ (d["order_number"], d["order_date"], d["customer_name"], d["status"], d.get("id_comanda"), d.get("id_partener"), d.get("error_message"), json.dumps(d["missing_skus"]) if d.get("missing_skus") else None, d.get("items_count", 0), d["sync_run_id"], d.get("shipping_name"), d.get("billing_name"), d.get("payment_method"), d.get("delivery_method")) for d in orders_data ]) # 2. Sync run orders await db.executemany(""" INSERT OR IGNORE INTO sync_run_orders (sync_run_id, order_number, status_at_run) VALUES (?, ?, ?) """, [(d["sync_run_id"], d["order_number"], d["status_at_run"]) for d in orders_data]) # 3. Order items all_items = [] for d in orders_data: for item in d.get("items", []): all_items.append(( d["order_number"], item.get("sku"), item.get("product_name"), item.get("quantity"), item.get("price"), item.get("vat"), item.get("mapping_status"), item.get("codmat"), item.get("id_articol"), item.get("cantitate_roa") )) if all_items: await db.executemany(""" INSERT OR IGNORE INTO order_items (order_number, sku, product_name, quantity, price, vat, mapping_status, codmat, id_articol, cantitate_roa) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, all_items) await db.commit() finally: await db.close() async def track_missing_sku(sku: str, product_name: str = "", order_count: int = 0, order_numbers: str = None, customers: str = None): """Track a missing SKU with order context.""" db = await get_sqlite() try: await db.execute(""" INSERT OR IGNORE INTO missing_skus (sku, product_name) VALUES (?, ?) """, (sku, product_name)) if order_count or order_numbers or customers: await db.execute(""" UPDATE missing_skus SET order_count = ?, order_numbers = ?, customers = ? WHERE sku = ? """, (order_count, order_numbers, customers, sku)) await db.commit() finally: await db.close() async def resolve_missing_sku(sku: str): """Mark a missing SKU as resolved.""" db = await get_sqlite() try: await db.execute(""" UPDATE missing_skus SET resolved = 1, resolved_at = datetime('now') WHERE sku = ? """, (sku,)) await db.commit() finally: await db.close() async def get_missing_skus_paginated(page: int = 1, per_page: int = 20, resolved: int = 0, search: str = None): """Get paginated missing SKUs. resolved=-1 means show all. Optional search filters by sku or product_name (LIKE).""" db = await get_sqlite() try: offset = (page - 1) * per_page # Build WHERE clause parts where_parts = [] params_count = [] params_data = [] if resolved != -1: where_parts.append("resolved = ?") params_count.append(resolved) params_data.append(resolved) if search: like = f"%{search}%" where_parts.append("(LOWER(sku) LIKE LOWER(?) OR LOWER(COALESCE(product_name,'')) LIKE LOWER(?))") params_count.extend([like, like]) params_data.extend([like, like]) where_clause = ("WHERE " + " AND ".join(where_parts)) if where_parts else "" order_clause = ( "ORDER BY resolved ASC, order_count DESC, first_seen DESC" if resolved == -1 else "ORDER BY order_count DESC, first_seen DESC" ) cursor = await db.execute( f"SELECT COUNT(*) FROM missing_skus {where_clause}", params_count ) total = (await cursor.fetchone())[0] cursor = await db.execute(f""" SELECT sku, product_name, first_seen, resolved, resolved_at, order_count, order_numbers, customers FROM missing_skus {where_clause} {order_clause} LIMIT ? OFFSET ? """, params_data + [per_page, offset]) rows = await cursor.fetchall() return { "missing_skus": [dict(row) for row in rows], "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if total > 0 else 0 } finally: await db.close() async def get_sync_runs(page: int = 1, per_page: int = 20): """Get paginated sync run history.""" db = await get_sqlite() try: offset = (page - 1) * per_page cursor = await db.execute("SELECT COUNT(*) FROM sync_runs") total = (await cursor.fetchone())[0] cursor = await db.execute(""" SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT ? OFFSET ? """, (per_page, offset)) rows = await cursor.fetchall() return { "runs": [dict(row) for row in rows], "total": total, "page": page, "pages": (total + per_page - 1) // per_page if total > 0 else 0 } finally: await db.close() async def get_sync_run_detail(run_id: str): """Get details for a specific sync run including its orders via sync_run_orders.""" db = await get_sqlite() try: cursor = await db.execute( "SELECT * FROM sync_runs WHERE run_id = ?", (run_id,) ) run = await cursor.fetchone() if not run: return None cursor = await db.execute(""" SELECT o.* FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number WHERE sro.sync_run_id = ? ORDER BY o.order_date """, (run_id,)) orders = await cursor.fetchall() return { "run": dict(run), "orders": [dict(o) for o in orders] } finally: await db.close() async def get_dashboard_stats(): """Get stats for the dashboard.""" db = await get_sqlite() try: cursor = await db.execute( "SELECT COUNT(*) FROM orders WHERE status = 'IMPORTED'" ) imported = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(*) FROM orders WHERE status = 'SKIPPED'" ) skipped = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(*) FROM orders WHERE status = 'ERROR'" ) errors = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(*) FROM missing_skus WHERE resolved = 0" ) missing = (await cursor.fetchone())[0] cursor = await db.execute("SELECT COUNT(DISTINCT sku) FROM missing_skus") total_missing_skus = (await cursor.fetchone())[0] cursor = await db.execute( "SELECT COUNT(DISTINCT sku) FROM missing_skus WHERE resolved = 0" ) unresolved_skus = (await cursor.fetchone())[0] cursor = await db.execute(""" SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT 1 """) last_run = await cursor.fetchone() return { "imported": imported, "skipped": skipped, "errors": errors, "missing_skus": missing, "total_tracked_skus": total_missing_skus, "unresolved_skus": unresolved_skus, "last_run": dict(last_run) if last_run else None } finally: await db.close() async def get_scheduler_config(): """Get scheduler configuration from SQLite.""" db = await get_sqlite() try: cursor = await db.execute("SELECT key, value FROM scheduler_config") rows = await cursor.fetchall() return {row["key"]: row["value"] for row in rows} finally: await db.close() async def set_scheduler_config(key: str, value: str): """Set a scheduler configuration value.""" db = await get_sqlite() try: await db.execute(""" INSERT OR REPLACE INTO scheduler_config (key, value) VALUES (?, ?) """, (key, value)) await db.commit() finally: await db.close() # ── web_products ───────────────────────────────── async def upsert_web_product(sku: str, product_name: str): """Insert or update a web product, incrementing order_count.""" db = await get_sqlite() try: await db.execute(""" INSERT INTO web_products (sku, product_name, order_count) VALUES (?, ?, 1) ON CONFLICT(sku) DO UPDATE SET product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name), last_seen = datetime('now'), order_count = web_products.order_count + 1 """, (sku, product_name)) await db.commit() finally: await db.close() async def upsert_web_products_batch(items: list[tuple[str, str]]): """Batch upsert web products in a single transaction. items: list of (sku, product_name).""" if not items: return db = await get_sqlite() try: await db.executemany(""" INSERT INTO web_products (sku, product_name, order_count) VALUES (?, ?, 1) ON CONFLICT(sku) DO UPDATE SET product_name = COALESCE(NULLIF(excluded.product_name, ''), web_products.product_name), last_seen = datetime('now'), order_count = web_products.order_count + 1 """, items) await db.commit() finally: await db.close() async def get_web_product_name(sku: str) -> str: """Lookup product name by SKU.""" db = await get_sqlite() try: cursor = await db.execute( "SELECT product_name FROM web_products WHERE sku = ?", (sku,) ) row = await cursor.fetchone() return row["product_name"] if row else "" finally: await db.close() async def get_web_products_batch(skus: list) -> dict: """Batch lookup product names by SKU list. Returns {sku: product_name}.""" if not skus: return {} db = await get_sqlite() try: placeholders = ",".join("?" for _ in skus) cursor = await db.execute( f"SELECT sku, product_name FROM web_products WHERE sku IN ({placeholders})", list(skus) ) rows = await cursor.fetchall() return {row["sku"]: row["product_name"] for row in rows} finally: await db.close() # ── order_items ────────────────────────────────── async def add_order_items(order_number: str, items: list): """Bulk insert order items. Uses INSERT OR IGNORE — PK is (order_number, sku).""" if not items: return db = await get_sqlite() try: await db.executemany(""" INSERT OR IGNORE INTO order_items (order_number, sku, product_name, quantity, price, vat, mapping_status, codmat, id_articol, cantitate_roa) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ (order_number, item.get("sku"), item.get("product_name"), item.get("quantity"), item.get("price"), item.get("vat"), item.get("mapping_status"), item.get("codmat"), item.get("id_articol"), item.get("cantitate_roa")) for item in items ]) await db.commit() finally: await db.close() async def get_order_items(order_number: str) -> list: """Fetch items for one order.""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT * FROM order_items WHERE order_number = ? ORDER BY sku """, (order_number,)) rows = await cursor.fetchall() return [dict(row) for row in rows] finally: await db.close() async def get_order_detail(order_number: str) -> dict: """Get full order detail: order metadata + items.""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT * FROM orders WHERE order_number = ? """, (order_number,)) order = await cursor.fetchone() if not order: return None cursor = await db.execute(""" SELECT * FROM order_items WHERE order_number = ? ORDER BY sku """, (order_number,)) items = await cursor.fetchall() return { "order": dict(order), "items": [dict(i) for i in items] } finally: await db.close() async def get_run_orders_filtered(run_id: str, status_filter: str = "all", page: int = 1, per_page: int = 50, sort_by: str = "order_date", sort_dir: str = "asc"): """Get paginated orders for a run via sync_run_orders junction table.""" db = await get_sqlite() try: where = "WHERE sro.sync_run_id = ?" params = [run_id] if status_filter and status_filter != "all": where += " AND UPPER(sro.status_at_run) = ?" params.append(status_filter.upper()) allowed_sort = {"order_date", "order_number", "customer_name", "items_count", "status", "first_seen_at", "updated_at"} if sort_by not in allowed_sort: sort_by = "order_date" if sort_dir.lower() not in ("asc", "desc"): sort_dir = "asc" cursor = await db.execute( f"SELECT COUNT(*) FROM orders o INNER JOIN sync_run_orders sro " f"ON sro.order_number = o.order_number {where}", params ) total = (await cursor.fetchone())[0] offset = (page - 1) * per_page cursor = await db.execute(f""" SELECT o.*, sro.status_at_run AS run_status FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number {where} ORDER BY o.{sort_by} {sort_dir} LIMIT ? OFFSET ? """, params + [per_page, offset]) rows = await cursor.fetchall() cursor = await db.execute(""" SELECT sro.status_at_run AS status, COUNT(*) as cnt FROM orders o INNER JOIN sync_run_orders sro ON sro.order_number = o.order_number WHERE sro.sync_run_id = ? GROUP BY sro.status_at_run """, (run_id,)) status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()} # Use run_status (status_at_run) as the status field for each order row order_rows = [] for r in rows: d = dict(r) d["status"] = d.pop("run_status", d.get("status")) order_rows.append(d) return { "orders": order_rows, "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if total > 0 else 0, "counts": { "imported": status_counts.get("IMPORTED", 0), "skipped": status_counts.get("SKIPPED", 0), "error": status_counts.get("ERROR", 0), "already_imported": status_counts.get("ALREADY_IMPORTED", 0), "total": sum(status_counts.values()) } } finally: await db.close() async def get_orders(page: int = 1, per_page: int = 50, search: str = "", status_filter: str = "all", sort_by: str = "order_date", sort_dir: str = "desc", period_days: int = 7, period_start: str = "", period_end: str = ""): """Get orders with filters, sorting, and period. period_days=0 with period_start/period_end uses custom date range. period_days=0 without dates means all time. """ db = await get_sqlite() try: # Period + search clauses (used for counts — never include status filter) base_clauses = [] base_params = [] if period_days and period_days > 0: base_clauses.append("order_date >= date('now', ?)") base_params.append(f"-{period_days} days") elif period_days == 0 and period_start and period_end: base_clauses.append("order_date BETWEEN ? AND ?") base_params.extend([period_start, period_end]) if search: base_clauses.append("(order_number LIKE ? OR customer_name LIKE ?)") base_params.extend([f"%{search}%", f"%{search}%"]) # Data query adds status filter on top of base filters data_clauses = list(base_clauses) data_params = list(base_params) if status_filter and status_filter not in ("all", "UNINVOICED"): if status_filter.upper() == "IMPORTED": data_clauses.append("UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')") else: data_clauses.append("UPPER(status) = ?") data_params.append(status_filter.upper()) where = ("WHERE " + " AND ".join(data_clauses)) if data_clauses else "" counts_where = ("WHERE " + " AND ".join(base_clauses)) if base_clauses else "" allowed_sort = {"order_date", "order_number", "customer_name", "items_count", "status", "first_seen_at", "updated_at"} if sort_by not in allowed_sort: sort_by = "order_date" if sort_dir.lower() not in ("asc", "desc"): sort_dir = "desc" cursor = await db.execute(f"SELECT COUNT(*) FROM orders {where}", data_params) total = (await cursor.fetchone())[0] offset = (page - 1) * per_page cursor = await db.execute(f""" SELECT * FROM orders {where} ORDER BY {sort_by} {sort_dir} LIMIT ? OFFSET ? """, data_params + [per_page, offset]) rows = await cursor.fetchall() # Counts by status — always on full period+search, never filtered by status cursor = await db.execute(f""" SELECT status, COUNT(*) as cnt FROM orders {counts_where} GROUP BY status """, base_params) status_counts = {row["status"]: row["cnt"] for row in await cursor.fetchall()} # Uninvoiced count: IMPORTED/ALREADY_IMPORTED with no cached invoice, same period+search uninv_clauses = list(base_clauses) + [ "UPPER(status) IN ('IMPORTED', 'ALREADY_IMPORTED')", "(factura_numar IS NULL OR factura_numar = '')", ] uninv_where = "WHERE " + " AND ".join(uninv_clauses) cursor = await db.execute(f"SELECT COUNT(*) FROM orders {uninv_where}", base_params) uninvoiced_sqlite = (await cursor.fetchone())[0] return { "orders": [dict(r) for r in rows], "total": total, "page": page, "per_page": per_page, "pages": (total + per_page - 1) // per_page if total > 0 else 0, "counts": { "imported": status_counts.get("IMPORTED", 0), "already_imported": status_counts.get("ALREADY_IMPORTED", 0), "imported_all": status_counts.get("IMPORTED", 0) + status_counts.get("ALREADY_IMPORTED", 0), "skipped": status_counts.get("SKIPPED", 0), "error": status_counts.get("ERROR", 0), "total": sum(status_counts.values()), "uninvoiced_sqlite": uninvoiced_sqlite, } } finally: await db.close() async def update_import_order_addresses(order_number: str, id_adresa_facturare: int = None, id_adresa_livrare: int = None): """Update ROA address IDs on an order record.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET id_adresa_facturare = ?, id_adresa_livrare = ?, updated_at = datetime('now') WHERE order_number = ? """, (id_adresa_facturare, id_adresa_livrare, order_number)) await db.commit() finally: await db.close() # ── Invoice cache ──────────────────────────────── async def get_uninvoiced_imported_orders() -> list: """Get all imported orders that don't yet have invoice data cached.""" db = await get_sqlite() try: cursor = await db.execute(""" SELECT order_number, id_comanda FROM orders WHERE status IN ('IMPORTED', 'ALREADY_IMPORTED') AND id_comanda IS NOT NULL AND factura_numar IS NULL """) rows = await cursor.fetchall() return [dict(r) for r in rows] finally: await db.close() async def update_order_invoice(order_number: str, serie: str = None, numar: str = None, total_fara_tva: float = None, total_tva: float = None, total_cu_tva: float = None): """Cache invoice data from Oracle onto the order record.""" db = await get_sqlite() try: await db.execute(""" UPDATE orders SET factura_serie = ?, factura_numar = ?, factura_total_fara_tva = ?, factura_total_tva = ?, factura_total_cu_tva = ?, invoice_checked_at = datetime('now'), updated_at = datetime('now') WHERE order_number = ? """, (serie, numar, total_fara_tva, total_tva, total_cu_tva, order_number)) await db.commit() finally: await db.close()