from datetime import UTC, datetime from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession SYNCABLE_TABLES = [ "vehicles", "orders", "order_lines", "invoices", "appointments", "catalog_marci", "catalog_modele", "catalog_ansamble", "catalog_norme", "catalog_preturi", "catalog_tipuri_deviz", "catalog_tipuri_motoare", "mecanici", ] # Tables that don't have tenant_id directly NO_TENANT_TABLES = {"catalog_modele"} async def get_full(db: AsyncSession, tenant_id: str) -> dict: result = {} for table in SYNCABLE_TABLES: if table == "catalog_modele": rows = await db.execute( text( "SELECT cm.* FROM catalog_modele cm " "JOIN catalog_marci marc ON cm.marca_id = marc.id " "WHERE marc.tenant_id = :tid" ), {"tid": tenant_id}, ) else: rows = await db.execute( text(f"SELECT * FROM {table} WHERE tenant_id = :tid"), {"tid": tenant_id}, ) result[table] = [dict(r._mapping) for r in rows] return result async def get_changes(db: AsyncSession, tenant_id: str, since: str) -> dict: result = {} for table in SYNCABLE_TABLES: if table == "catalog_modele": rows = await db.execute( text( "SELECT cm.* FROM catalog_modele cm " "JOIN catalog_marci marc ON cm.marca_id = marc.id " "WHERE marc.tenant_id = :tid AND cm.updated_at > :since" ), {"tid": tenant_id, "since": since}, ) else: rows = await db.execute( text( f"SELECT * FROM {table} WHERE tenant_id = :tid AND updated_at > :since" ), {"tid": tenant_id, "since": since}, ) rows_list = [dict(r._mapping) for r in rows] if rows_list: result[table] = rows_list return result async def apply_push( db: AsyncSession, tenant_id: str, operations: list ) -> dict: applied = 0 for op in operations: table = op["table"] if table not in SYNCABLE_TABLES: continue data = op.get("data", {}) # Enforce tenant isolation (except for no-tenant tables) if table not in NO_TENANT_TABLES: if data.get("tenant_id") and data["tenant_id"] != tenant_id: continue data["tenant_id"] = tenant_id if op["operation"] in ("INSERT", "UPDATE"): cols = ", ".join(data.keys()) ph = ", ".join(f":{k}" for k in data.keys()) await db.execute( text(f"INSERT OR REPLACE INTO {table} ({cols}) VALUES ({ph})"), data, ) applied += 1 elif op["operation"] == "DELETE": if table in NO_TENANT_TABLES: await db.execute( text(f"DELETE FROM {table} WHERE id = :id"), {"id": op["id"]}, ) else: await db.execute( text( f"DELETE FROM {table} WHERE id = :id AND tenant_id = :tid" ), {"id": op["id"], "tid": tenant_id}, ) applied += 1 await db.commit() return {"applied": applied, "conflicts": []}