import json import glob import os import logging from pathlib import Path from dataclasses import dataclass, field from typing import Optional from ..config import settings logger = logging.getLogger(__name__) @dataclass class OrderItem: sku: str name: str price: float quantity: float vat: float @dataclass class OrderBilling: firstname: str = "" lastname: str = "" phone: str = "" email: str = "" address: str = "" city: str = "" region: str = "" country: str = "" company_name: str = "" company_code: str = "" company_reg: str = "" is_company: bool = False @dataclass class OrderShipping: firstname: str = "" lastname: str = "" phone: str = "" email: str = "" address: str = "" city: str = "" region: str = "" country: str = "" @dataclass class OrderData: id: str number: str date: str status: str = "" status_id: str = "" items: list = field(default_factory=list) # list of OrderItem billing: OrderBilling = field(default_factory=OrderBilling) shipping: Optional[OrderShipping] = None total: float = 0.0 payment_name: str = "" delivery_name: str = "" source_file: str = "" def read_json_orders(json_dir: str = None) -> tuple[list[OrderData], int]: """Read all GoMag order JSON files from the output directory. Returns (list of OrderData, number of JSON files read). """ if json_dir is None: json_dir = settings.JSON_OUTPUT_DIR if not json_dir or not os.path.isdir(json_dir): logger.warning(f"JSON output directory not found: {json_dir}") return [], 0 # Find all gomag_orders*.json files pattern = os.path.join(json_dir, "gomag_orders*.json") json_files = sorted(glob.glob(pattern)) if not json_files: logger.info(f"No JSON files found in {json_dir}") return [], 0 orders = [] for filepath in json_files: try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) raw_orders = data.get("orders", {}) if not isinstance(raw_orders, dict): continue for order_id, order_data in raw_orders.items(): try: order = _parse_order(order_id, order_data, os.path.basename(filepath)) orders.append(order) except Exception as e: logger.warning(f"Error parsing order {order_id} from {filepath}: {e}") except Exception as e: logger.error(f"Error reading {filepath}: {e}") logger.info(f"Read {len(orders)} orders from {len(json_files)} JSON files") return orders, len(json_files) def _parse_order(order_id: str, data: dict, source_file: str) -> OrderData: """Parse a single order from JSON data.""" # Parse items items = [] raw_items = data.get("items", []) if isinstance(raw_items, list): for item in raw_items: if isinstance(item, dict) and item.get("sku"): items.append(OrderItem( sku=str(item.get("sku", "")).strip(), name=str(item.get("name", "")), price=float(item.get("price", 0) or 0), quantity=float(item.get("quantity", 0) or 0), vat=float(item.get("vat", 0) or 0) )) # Parse billing billing_data = data.get("billing", {}) or {} company = billing_data.get("company") is_company = isinstance(company, dict) and bool(company.get("name")) billing = OrderBilling( firstname=str(billing_data.get("firstname", "")), lastname=str(billing_data.get("lastname", "")), phone=str(billing_data.get("phone", "")), email=str(billing_data.get("email", "")), address=str(billing_data.get("address", "")), city=str(billing_data.get("city", "")), region=str(billing_data.get("region", "")), country=str(billing_data.get("country", "")), company_name=str(company.get("name", "")) if is_company else "", company_code=str(company.get("code", "")) if is_company else "", company_reg=str(company.get("registrationNo", "")) if is_company else "", is_company=is_company ) # Parse shipping shipping_data = data.get("shipping") shipping = None if isinstance(shipping_data, dict): shipping = OrderShipping( firstname=str(shipping_data.get("firstname", "")), lastname=str(shipping_data.get("lastname", "")), phone=str(shipping_data.get("phone", "")), email=str(shipping_data.get("email", "")), address=str(shipping_data.get("address", "")), city=str(shipping_data.get("city", "")), region=str(shipping_data.get("region", "")), country=str(shipping_data.get("country", "")) ) # Payment/delivery payment = data.get("payment", {}) or {} delivery = data.get("delivery", {}) or {} return OrderData( id=str(data.get("id", order_id)), number=str(data.get("number", "")), date=str(data.get("date", "")), status=str(data.get("status", "")), status_id=str(data.get("statusId", "")), items=items, billing=billing, shipping=shipping, total=float(data.get("total", 0) or 0), payment_name=str(payment.get("name", "")) if isinstance(payment, dict) else "", delivery_name=str(delivery.get("name", "")) if isinstance(delivery, dict) else "", source_file=source_file ) def get_all_skus(orders: list[OrderData]) -> set[str]: """Extract unique SKUs from all orders.""" skus = set() for order in orders: for item in order.items: if item.sku: skus.add(item.sku) return skus