Files
gomag-vending/api/app/services/order_reader.py
Claude Agent c4fa643eca feat(sync): add order_total field to SQLite tracking
Parse order total from GoMag JSON, store in SQLite orders table,
and expose via sync run API. Enables total display in mobile flat rows.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 21:20:57 +00:00

181 lines
5.7 KiB
Python

import json
import glob
import os
import logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
from ..config import settings
logger = logging.getLogger(__name__)
@dataclass
class OrderItem:
sku: str
name: str
price: float
quantity: float
vat: float
@dataclass
class OrderBilling:
firstname: str = ""
lastname: str = ""
phone: str = ""
email: str = ""
address: str = ""
city: str = ""
region: str = ""
country: str = ""
company_name: str = ""
company_code: str = ""
company_reg: str = ""
is_company: bool = False
@dataclass
class OrderShipping:
firstname: str = ""
lastname: str = ""
phone: str = ""
email: str = ""
address: str = ""
city: str = ""
region: str = ""
country: str = ""
@dataclass
class OrderData:
id: str
number: str
date: str
status: str = ""
status_id: str = ""
items: list = field(default_factory=list) # list of OrderItem
billing: OrderBilling = field(default_factory=OrderBilling)
shipping: Optional[OrderShipping] = None
total: float = 0.0
payment_name: str = ""
delivery_name: str = ""
source_file: str = ""
def read_json_orders(json_dir: str = None) -> tuple[list[OrderData], int]:
"""Read all GoMag order JSON files from the output directory.
Returns (list of OrderData, number of JSON files read).
"""
if json_dir is None:
json_dir = settings.JSON_OUTPUT_DIR
if not json_dir or not os.path.isdir(json_dir):
logger.warning(f"JSON output directory not found: {json_dir}")
return [], 0
# Find all gomag_orders*.json files
pattern = os.path.join(json_dir, "gomag_orders*.json")
json_files = sorted(glob.glob(pattern))
if not json_files:
logger.info(f"No JSON files found in {json_dir}")
return [], 0
orders = []
for filepath in json_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
raw_orders = data.get("orders", {})
if not isinstance(raw_orders, dict):
continue
for order_id, order_data in raw_orders.items():
try:
order = _parse_order(order_id, order_data, os.path.basename(filepath))
orders.append(order)
except Exception as e:
logger.warning(f"Error parsing order {order_id} from {filepath}: {e}")
except Exception as e:
logger.error(f"Error reading {filepath}: {e}")
logger.info(f"Read {len(orders)} orders from {len(json_files)} JSON files")
return orders, len(json_files)
def _parse_order(order_id: str, data: dict, source_file: str) -> OrderData:
"""Parse a single order from JSON data."""
# Parse items
items = []
raw_items = data.get("items", [])
if isinstance(raw_items, list):
for item in raw_items:
if isinstance(item, dict) and item.get("sku"):
items.append(OrderItem(
sku=str(item.get("sku", "")).strip(),
name=str(item.get("name", "")),
price=float(item.get("price", 0) or 0),
quantity=float(item.get("quantity", 0) or 0),
vat=float(item.get("vat", 0) or 0)
))
# Parse billing
billing_data = data.get("billing", {}) or {}
company = billing_data.get("company")
is_company = isinstance(company, dict) and bool(company.get("name"))
billing = OrderBilling(
firstname=str(billing_data.get("firstname", "")),
lastname=str(billing_data.get("lastname", "")),
phone=str(billing_data.get("phone", "")),
email=str(billing_data.get("email", "")),
address=str(billing_data.get("address", "")),
city=str(billing_data.get("city", "")),
region=str(billing_data.get("region", "")),
country=str(billing_data.get("country", "")),
company_name=str(company.get("name", "")) if is_company else "",
company_code=str(company.get("code", "")) if is_company else "",
company_reg=str(company.get("registrationNo", "")) if is_company else "",
is_company=is_company
)
# Parse shipping
shipping_data = data.get("shipping")
shipping = None
if isinstance(shipping_data, dict):
shipping = OrderShipping(
firstname=str(shipping_data.get("firstname", "")),
lastname=str(shipping_data.get("lastname", "")),
phone=str(shipping_data.get("phone", "")),
email=str(shipping_data.get("email", "")),
address=str(shipping_data.get("address", "")),
city=str(shipping_data.get("city", "")),
region=str(shipping_data.get("region", "")),
country=str(shipping_data.get("country", ""))
)
# Payment/delivery
payment = data.get("payment", {}) or {}
delivery = data.get("delivery", {}) or {}
return OrderData(
id=str(data.get("id", order_id)),
number=str(data.get("number", "")),
date=str(data.get("date", "")),
status=str(data.get("status", "")),
status_id=str(data.get("statusId", "")),
items=items,
billing=billing,
shipping=shipping,
total=float(data.get("total", 0) or 0),
payment_name=str(payment.get("name", "")) if isinstance(payment, dict) else "",
delivery_name=str(delivery.get("name", "")) if isinstance(delivery, dict) else "",
source_file=source_file
)
def get_all_skus(orders: list[OrderData]) -> set[str]:
"""Extract unique SKUs from all orders."""
skus = set()
for order in orders:
for item in order.items:
if item.sku:
skus.add(item.sku)
return skus