from datetime import UTC, datetime from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession SYNCABLE_TABLES = [ "vehicles", "orders", "order_lines", "invoices", "appointments", "catalog_marci", "catalog_modele", "catalog_ansamble", "catalog_norme", "catalog_preturi", "catalog_tipuri_deviz", "catalog_tipuri_motoare", "mecanici", ] # Tables that don't have tenant_id directly NO_TENANT_TABLES = {"catalog_modele"} async def _get_table_columns(db: AsyncSession, table: str) -> set[str]: """Return the set of column names for a given table using PRAGMA table_info.""" rows = await db.execute(text(f"PRAGMA table_info({table})")) return {row[1] for row in rows} async def get_full(db: AsyncSession, tenant_id: str) -> dict: result = {} for table in SYNCABLE_TABLES: if table == "catalog_modele": rows = await db.execute( text( "SELECT cm.* FROM catalog_modele cm " "JOIN catalog_marci marc ON cm.marca_id = marc.id " "WHERE marc.tenant_id = :tid" ), {"tid": tenant_id}, ) else: rows = await db.execute( text(f"SELECT * FROM {table} WHERE tenant_id = :tid"), {"tid": tenant_id}, ) result[table] = [dict(r._mapping) for r in rows] return result async def get_changes(db: AsyncSession, tenant_id: str, since: str) -> dict: result = {} for table in SYNCABLE_TABLES: if table == "catalog_modele": rows = await db.execute( text( "SELECT cm.* FROM catalog_modele cm " "JOIN catalog_marci marc ON cm.marca_id = marc.id " "WHERE marc.tenant_id = :tid AND cm.updated_at > :since" ), {"tid": tenant_id, "since": since}, ) else: rows = await db.execute( text( f"SELECT * FROM {table} WHERE tenant_id = :tid AND updated_at > :since" ), {"tid": tenant_id, "since": since}, ) rows_list = [dict(r._mapping) for r in rows] if rows_list: result[table] = rows_list return result async def apply_push( db: AsyncSession, tenant_id: str, operations: list ) -> dict: applied = 0 errors = [] # Cache column sets per table to avoid repeated PRAGMA calls table_columns_cache: dict[str, set[str]] = {} for op in operations: table = op["table"] if table not in SYNCABLE_TABLES: continue data = dict(op.get("data", {})) # Enforce tenant isolation (except for no-tenant tables) if table not in NO_TENANT_TABLES: if data.get("tenant_id") and data["tenant_id"] != tenant_id: continue data["tenant_id"] = tenant_id try: if op["operation"] in ("INSERT", "UPDATE"): # Fetch and cache the valid column names for this table if table not in table_columns_cache: table_columns_cache[table] = await _get_table_columns(db, table) valid_cols = table_columns_cache[table] # Filter data to only include columns that exist in the DB table filtered = {k: v for k, v in data.items() if k in valid_cols} if not filtered: continue cols = ", ".join(filtered.keys()) ph = ", ".join(f":{k}" for k in filtered.keys()) await db.execute( text(f"INSERT OR REPLACE INTO {table} ({cols}) VALUES ({ph})"), filtered, ) applied += 1 elif op["operation"] == "DELETE": if table in NO_TENANT_TABLES: await db.execute( text(f"DELETE FROM {table} WHERE id = :id"), {"id": op["id"]}, ) else: await db.execute( text( f"DELETE FROM {table} WHERE id = :id AND tenant_id = :tid" ), {"id": op["id"], "tid": tenant_id}, ) applied += 1 except Exception as exc: # noqa: BLE001 errors.append({"table": table, "id": op.get("id"), "error": str(exc)}) await db.rollback() await db.commit() return {"applied": applied, "conflicts": errors}