"""Router import Treapta 2 — upload fisier xlsx/csv, preview, commit, export. Endpointuri: POST /v1/import — upload fisier, staging in import_batches/import_rows GET /v1/import/{id}/column-mapping — maparea de coloane existenta / sugestii fuzzy POST /v1/import/{id}/column-mapping — salveaza maparea de coloane GET /v1/import/{id}/preview — preview 6 stari per rand (fara enqueue) POST /v1/import/{id}/commit — gate HARD + enqueue randuri ok + log atestare GET /v1/import/{id}/export-failed — CSV cu randuri esuate (needs_data/needs_mapping/needs_review) Reguli cheie (plan §3.1-3.4, §12): - Issue 6: scrieri bulk in tranzactie explicita BEGIN IMMEDIATE...COMMIT + executemany. - Eng#5: already_sent lookup BATCH (IN chunk ~900), nu N+1. - OV-3: duplicate_in_file EXCLUSIV la preview/commit. NU atinge reconcile.py/worker. - Issue 1 (TOCTOU): commit per-rand cu ON CONFLICT(idempotency_key) DO NOTHING. - Issue 5a: import_rows.raw_json CRIPTAT Fernet. - Issue 5b: fuzzy coloane refoloseste mapping.normalize_for_match (DRY). - T4/D3: drift semnatura coloane -> NU aplica orb, cere re-confirmare. """ from __future__ import annotations import csv import hashlib import io import json from typing import Any from fastapi import APIRouter, Depends, HTTPException, UploadFile from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from ...auth import resolve_account_id from ...crypto import decrypt_creds, encrypt_creds from ...db import get_connection from ...idempotency import build_key, canonicalize_row from ...import_parse import ( FileTooLarge, HeaderError, MultipleSheets, ParsedFile, parse_date_value, parse_file, ) from ...mapping import ( account_or_default, has_no_auto_send, load_mapping_meta, normalize_for_match, resolve_prestatii, ) from ...validation import validate_prezentare router = APIRouter(prefix="/v1/import", tags=["import"]) # Marimea maxima a unui chunk pentru IN(...) SQLite (limite SQLite ~999) _IN_CHUNK = 900 # Campuri canonice si sinonimele lor pentru sugestie fuzzy coloane (Issue 5b/Eng#4) _CANONICAL_SYNONYMS: dict[str, list[str]] = { "vin": ["VIN", "Serie sasiu", "Sasiu", "Serie", "Numar sasiu", "Nr sasiu", "Chassis"], "nr_inmatriculare": ["Nr inmatriculare", "Numar inmatriculare", "Numar auto", "Nr auto", "Numar", "Nr"], "data_prestatie": ["Data prestatie", "Data", "Date", "Data service", "Data lucrare"], "odometru_final": ["Odometru final", "Odometru", "KM", "Kilometri", "Km final", "Citire contor"], "odometru_initial": ["Odometru initial", "KM initial", "Km start"], "operatie": ["Operatie", "Denumire prestatie", "Prestatie", "Lucrare", "Tip lucrare", "Cod prestatie", "Cod op"], "obs": ["Observatii", "Obs", "Mentiuni", "Note"], } # Prag minim scor fuzzy pentru pre-selectie (0..100) _FUZZY_MIN_SCORE = 55 # --------------------------------------------------------------------------- # # Helpere interne # # --------------------------------------------------------------------------- # def _signature(columns: list[str]) -> str: """Semnatura coloanelor = sha256 al listei sortate (case-insensitive).""" norm = sorted(c.strip().lower() for c in columns) return hashlib.sha256(json.dumps(norm, ensure_ascii=False).encode("utf-8")).hexdigest() def _fuzzy_suggest_column( col_name: str, *, limit: int = 3, ) -> list[dict]: """Sugereaza campuri canonice pentru o coloana din fisier. Refoloseste normalize_for_match + rapidfuzz.fuzz.token_sort_ratio (Issue 5b/Eng#4). Intoarce [{camp_canonic, score}] sortat descrescator. """ from rapidfuzz import fuzz, process query = normalize_for_match(col_name) # Construieste un dict {camp_canonic: sinonime_normalize} pentru matching choices: dict[str, str] = {} for camp, sinonime in _CANONICAL_SYNONYMS.items(): # Cel mai bun sinonim = primul (cel mai specific) best_syn = normalize_for_match(sinonime[0]) choices[camp] = best_syn ranked = process.extract( query, choices, scorer=fuzz.token_sort_ratio, limit=limit, ) return [ {"camp_canonic": camp, "score": float(score)} for _, score, camp in ranked if score >= _FUZZY_MIN_SCORE ] def _resolve_row_for_preview( raw_row: dict[str, Any], json_mapare: dict[str, str], date_col_format: dict[str, str], coercion_flags: list[str], mapping: dict[str, str], mapping_meta: dict[str, dict], formula_columns: list[str], ) -> dict[str, Any]: """Rezolva un rand din import pentru preview: aplica mapare coloane + validare. Intoarce un dict cu: resolved_status: ok/needs_mapping/needs_data/needs_review resolved: valorile finale rezolvate (VIN, data, km, prestatii) errors: lista erori validare flags: motive needs_review """ # Aplica maparea de coloane mapped: dict[str, Any] = {} for col_fisier, camp_canonic in json_mapare.items(): if col_fisier in raw_row and camp_canonic: mapped[camp_canonic] = raw_row[col_fisier] # Detectie coloane cu formule (Issue 3) — nu blocheaza, dar adauga flag formula_flag: list[str] = [] for col_fisier, camp_canonic in json_mapare.items(): if col_fisier in formula_columns: formula_flag.append(f"Coloana '{col_fisier}' pare sa contina formule fara valori calculate — re-salveaza fisierul in Excel.") # Rezolvare data prestatie # Gaseste coloana de data din mapare data_col_fisier = None for col_f, camp_c in json_mapare.items(): if camp_c == "data_prestatie": data_col_fisier = col_f break is_ambiguous_date = False if data_col_fisier: col_fmt = date_col_format.get(data_col_fisier, "ambiguous") raw_date = mapped.get("data_prestatie") if raw_date is not None: iso_date, is_amb = parse_date_value(raw_date, col_fmt) if iso_date: mapped["data_prestatie"] = iso_date if is_amb: is_ambiguous_date = True # Operatia: daca camp canonic e "operatie", construieste prestatii operatie_val = mapped.pop("operatie", None) if operatie_val and "prestatii" not in mapped: # Construieste un item de prestatie din operatie mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": str(operatie_val)}] # Canonicalizare (T9): normalizeaza VIN/nr/odometru canon = canonicalize_row(mapped) mapped.update({ "vin": canon["vin"], "nr_inmatriculare": canon["nr_inmatriculare"], "odometru_final": canon["odometru_final"], }) # Flags needs_review acumulate all_flags = list(coercion_flags) + formula_flag if is_ambiguous_date: all_flags.append("Data ambigua (zi<=12): verifica daca e DD.MM sau MM.DD") # Rezolvare prestatii prestatii = mapped.get("prestatii") or [] resolved, unmapped = resolve_prestatii(prestatii, mapping) mapped["prestatii"] = resolved # Determinare stare if unmapped: return { "resolved_status": "needs_mapping", "resolved": mapped, "errors": [{"unmapped": unmapped}], "flags": all_flags, } # Validare continut errors = validate_prezentare(mapped) if all_flags: # needs_review: chiar daca validarea trece, flagurile blocheaza auto-send return { "resolved_status": "needs_review", "resolved": mapped, "errors": errors, "flags": all_flags, } # auto_send gate (T6/OV-1) if has_no_auto_send(resolved, mapping_meta): return { "resolved_status": "needs_mapping", "resolved": mapped, "errors": [{"auto_send": "cod mapat cu auto_send=0; review manual inainte de trimitere"}], "flags": all_flags, } if errors: return { "resolved_status": "needs_data", "resolved": mapped, "errors": errors, "flags": all_flags, } return { "resolved_status": "ok", "resolved": mapped, "errors": [], "flags": all_flags, } def _build_idempotency_key(account_id: int | None, resolved: dict[str, Any]) -> str: """Construieste cheia de idempotenta pentru un rand rezolvat.""" canon = canonicalize_row(resolved) return build_key(account_id, canon) def _already_sent_lookup(conn, account_id: int, keys: list[str]) -> dict[str, dict]: """Cauta cheile de idempotenta in submissions (batch, nu N+1 — Eng#5). Intoarce {idempotency_key: {id, id_prezentare, created_at}} pentru cheile gasite. """ acct = account_or_default(account_id) found: dict[str, dict] = {} # Chunk ~900 parametri (limita SQLite) for i in range(0, len(keys), _IN_CHUNK): chunk = keys[i:i + _IN_CHUNK] placeholders = ",".join("?" * len(chunk)) rows = conn.execute( f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions " f"WHERE account_id=? AND idempotency_key IN ({placeholders})", (acct, *chunk), ).fetchall() for r in rows: found[r["idempotency_key"]] = { "submission_id": r["id"], "id_prezentare": r["id_prezentare"], "created_at": r["created_at"], } # Dual-lookup pentru chei legacy (OV-2: chei vechi cu account_id=None) legacy_keys_needed = [k for k in chunk if k not in found] if legacy_keys_needed: lph = ",".join("?" * len(legacy_keys_needed)) lrows = conn.execute( f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions " f"WHERE idempotency_key IN ({lph})", tuple(legacy_keys_needed), ).fetchall() for r in lrows: if r["idempotency_key"] not in found: found[r["idempotency_key"]] = { "submission_id": r["id"], "id_prezentare": r["id_prezentare"], "created_at": r["created_at"], } return found # --------------------------------------------------------------------------- # # POST /v1/import — upload fisier, staging # # --------------------------------------------------------------------------- # @router.post("") async def upload_import( file: UploadFile, sheet_name: str | None = None, account_id: int = Depends(resolve_account_id), ) -> dict: """Upload fisier xlsx/csv -> staging in import_batches/import_rows. Nu trimite nimic la RAR. Intoarce {import_id, columns, sample_rows, sheets?}. PII (raw_json) criptat Fernet la rest (Issue 5a). Scrieri bulk in tranzactie explicita (Issue 6). """ acct = account_or_default(account_id) data = await file.read() filename = file.filename or "fisier" # Parsare try: parsed: ParsedFile = parse_file(data, filename, sheet_name=sheet_name) except MultipleSheets as ms: raise HTTPException( status_code=422, detail={ "error": "multiple_sheets", "message": str(ms), "sheets": ms.sheet_names, }, ) except FileTooLarge as e: raise HTTPException( status_code=413, detail={ "error": "file_too_large", "message": str(e), }, ) except HeaderError as e: raise HTTPException( status_code=422, detail={ "error": "header_error", "message": str(e), "found": e.found, }, ) except UnicodeDecodeError as e: raise HTTPException( status_code=422, detail={ "error": "encoding_error", "message": f"Encoding nesuportat: {e.reason}", }, ) except Exception as e: # Fisier corupt (BadZipFile, InvalidFileException, etc.) raise HTTPException( status_code=422, detail={ "error": "invalid_file", "message": f"Fisier nerecunoscut (xlsx/csv): {type(e).__name__}", }, ) conn = get_connection() try: sig = _signature(parsed.columns) # Issue 6: tranzactie explicita BEGIN IMMEDIATE + executemany conn.execute("BEGIN IMMEDIATE") try: # Insert import_batches cur = conn.execute( "INSERT INTO import_batches (account_id, filename, status, total, purge_after) " "VALUES (?, ?, 'staging', ?, datetime('now', '+90 days'))", (acct, filename, len(parsed.rows)), ) batch_id = cur.lastrowid # Insert import_rows bulk (executemany) cu PII criptat row_params = [] for i, row_dict in enumerate(parsed.rows): raw_json_enc = encrypt_creds(row_dict) # Criptat Fernet (Issue 5a) row_params.append((batch_id, i, raw_json_enc, "pending", None)) conn.executemany( "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status, error) " "VALUES (?, ?, ?, ?, ?)", row_params, ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise # Semnatura coloane + mapare existenta existing_mapping = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() mapping_status = "matched" if existing_mapping else "new" # Sample rows (primele 3, fara PII) sample = parsed.rows[:3] # Persistam metadata parsedata (coercion_flags, date_col_format, formula_columns) # in import_batches pentru refolosire la preview (stocam ca JSON in 'status' nu e OK, # ci ca metadate suplimentare — le stocam intr-un rand separat sau returnam direct) # Solutie: le returnam in raspuns; preview-ul le va recalcula din raw_json deja stocat # SAU le stocam ca un camp extra. Cel mai simplu: stocam coloanele in batch. conn.execute( "UPDATE import_batches SET ok=?, needs_review=? WHERE id=?", (0, len(parsed.coercion_flags), batch_id), ) result: dict = { "import_id": batch_id, "columns": parsed.columns, "sample_rows": sample, "total_rows": len(parsed.rows), "formula_columns": parsed.formula_columns, "date_col_format": parsed.date_col_format, "coercion_flags_count": len(parsed.coercion_flags), "mapping_status": mapping_status, "signature": sig, } if existing_mapping: result["column_mapping"] = json.loads(existing_mapping["json_mapare"]) result["format_data"] = existing_mapping["format_data"] else: # Sugestii fuzzy per coloana (Issue 5b: refoloseste normalize_for_match) suggestions: dict[str, list[dict]] = {} for col in parsed.columns: sugg = _fuzzy_suggest_column(col, limit=3) if sugg: suggestions[col] = sugg result["fuzzy_suggestions"] = suggestions return result finally: conn.close() # --------------------------------------------------------------------------- # # GET /v1/import/{id}/column-mapping — mapare existenta / sugestii # # --------------------------------------------------------------------------- # @router.get("/{import_id}/column-mapping") def get_column_mapping( import_id: int, account_id: int = Depends(resolve_account_id), ) -> dict: """Intoarce maparea de coloane existenta sau sugestii fuzzy.""" acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, filename, status FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") # Obtine coloanele din primele randuri first_rows = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchall() # Obtine coloanele din batch — trebuie sa le stocam la upload # Le recalculam din primul rand (cheile sunt coloanele fisierului) columns = [] if first_rows: try: row_data = decrypt_creds(first_rows[0]["raw_json"]) if row_data: columns = list(row_data.keys()) except Exception: pass sig = _signature(columns) if columns else "" existing = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() if existing: return { "status": "matched", "column_mapping": json.loads(existing["json_mapare"]), "format_data": existing["format_data"], "columns": columns, "signature": sig, } # Sugestii fuzzy suggestions: dict[str, list[dict]] = {} for col in columns: sugg = _fuzzy_suggest_column(col, limit=3) if sugg: suggestions[col] = sugg return { "status": "new", "columns": columns, "signature": sig, "fuzzy_suggestions": suggestions, } finally: conn.close() # --------------------------------------------------------------------------- # # POST /v1/import/{id}/column-mapping — salveaza maparea # # --------------------------------------------------------------------------- # class ColumnMappingIn(BaseModel): json_mapare: dict[str, str] = Field(..., description="Mapare {col_fisier: camp_canonic}") format_data: str | None = Field(None, description="Format data, ex: DD.MM.YYYY") @router.post("/{import_id}/column-mapping") def save_column_mapping( import_id: int, req: ColumnMappingIn, account_id: int = Depends(resolve_account_id), ) -> dict: """Salveaza/actualizeaza maparea de coloane pentru contul curent. Semnatura = hash al coloanelor fisierului. Drift: daca coloanele se schimba, maparea veche nu se aplica (signature mismatch -> mapping_status='new'). """ acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") # Recalculeaza semnatura din coloanele fisierului (cheile maparii) columns = list(req.json_mapare.keys()) sig = _signature(columns) conn.execute( "INSERT INTO column_mappings (account_id, signature_coloane, json_mapare, format_data) " "VALUES (?, ?, ?, ?) " "ON CONFLICT(account_id, signature_coloane) DO UPDATE SET " "json_mapare=excluded.json_mapare, format_data=excluded.format_data", (acct, sig, json.dumps(req.json_mapare, ensure_ascii=False), req.format_data), ) return {"ok": True, "signature": sig, "columns": columns} finally: conn.close() # --------------------------------------------------------------------------- # # GET /v1/import/{id}/preview — 6 stari per rand (T2 + T11) # # --------------------------------------------------------------------------- # @router.get("/{import_id}/preview") def preview_import( import_id: int, account_id: int = Depends(resolve_account_id), ) -> dict: """Preview 6 stari per rand: ok/needs_mapping/needs_data/needs_review/already_sent/duplicate_in_file. Nu enqueue-aza nimic. Already_sent = lookup batch (Eng#5). Duplicate_in_file = intra-batch collision (OV-3: EXCLUSIV aici, NU in reconcile.py/worker). """ acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") # Incarca toate randurile raw_rows_db = conn.execute( "SELECT row_index, raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index", (import_id,), ).fetchall() if not raw_rows_db: return {"rows": [], "summary": {}} # Decripteaza si reconstruieste randurile rows: list[dict] = [] for r in raw_rows_db: try: row_data = decrypt_creds(r["raw_json"]) rows.append(row_data or {}) except Exception: rows.append({}) # Obtine coloanele col_names = list(rows[0].keys()) if rows else [] sig = _signature(col_names) # Mapare coloane mapping_row = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() if not mapping_row: raise HTTPException( status_code=422, detail={ "error": "no_column_mapping", "message": "Maparea coloanelor nu a fost configurata. Configureaza mai intai maparea.", }, ) json_mapare: dict[str, str] = json.loads(mapping_row["json_mapare"]) format_data = mapping_row["format_data"] # Incarca maparea de operatii o singura data (Eng#5: load_mapping o singura data) mapping_meta = load_mapping_meta(conn, acct) mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} # Reconstruieste parsed info (coercion_flags si date_col_format) din datele stocate # Nota: import_rows stocheaza raw_json dupa coercion (din parse_file) # Recalculam flags din valorile stocate (coercion_flags nu e stocat separat) # Vom folosi o detectie simpla: VIN-uri care par numerice si odometru float coercion_flags_map: dict[int, list[str]] = {} # Detectam din valorile stocate for i, row_dict in enumerate(rows): flags = [] # Detectam VIN numeric: daca valoarea a fost stocata si arata ca numar for col_f, camp_c in json_mapare.items(): if camp_c == "vin": vin_val = row_dict.get(col_f) if vin_val is not None and str(vin_val).replace(".", "").isdigit(): flags.append(f"VIN numeric ({vin_val}) — verificati seria sasiului") if flags: coercion_flags_map[i] = flags # Reconstructie date_col_format din mapare si valorile stocate # (Simplificat: folosim "ambiguous" daca format_data nu e setat) date_col_format: dict[str, str] = {} if format_data: for col_f, camp_c in json_mapare.items(): if camp_c == "data_prestatie": date_col_format[col_f] = format_data # Detectie formula_columns din valorile stocate (rata None ridicata) col_none_counts: dict[str, int] = {} for col_f in col_names: col_none_counts[col_f] = sum(1 for r in rows if r.get(col_f) is None) n_rows = len(rows) formula_columns = [ col_f for col_f, cnt in col_none_counts.items() if n_rows > 0 and cnt / n_rows >= 0.6 ] # Rezolva fiecare rand preview_rows: list[dict] = [] keys_for_lookup: list[str] = [] key_to_index: dict[str, list[int]] = {} # key -> [row_index, ...] for i, row_dict in enumerate(rows): flags = coercion_flags_map.get(i, []) resolved_info = _resolve_row_for_preview( raw_row=row_dict, json_mapare=json_mapare, date_col_format=date_col_format, coercion_flags=flags, mapping=mapping, mapping_meta=mapping_meta, formula_columns=formula_columns, ) # Calculeaza cheia de idempotenta pentru randurile ok/needs_review key = None if resolved_info["resolved_status"] in ("ok", "needs_review", "needs_data"): try: key = _build_idempotency_key(account_id, resolved_info["resolved"]) keys_for_lookup.append(key) if key not in key_to_index: key_to_index[key] = [] key_to_index[key].append(i) except Exception: pass preview_rows.append({ "row_index": i, "resolved_status": resolved_info["resolved_status"], "resolved": resolved_info["resolved"], "errors": resolved_info["errors"], "flags": resolved_info["flags"], "idempotency_key": key, }) # Already_sent: batch lookup (Eng#5 — nu N+1) unique_keys = list(set(keys_for_lookup)) already_sent_map = _already_sent_lookup(conn, account_id, unique_keys) # Duplicate_in_file (OV-3): detectie coliziuni intra-batch # Grupam pe cheie de idempotenta: >1 rand cu aceeasi cheie = duplicate key_to_indices: dict[str, list[int]] = {} for row in preview_rows: k = row.get("idempotency_key") if k: if k not in key_to_indices: key_to_indices[k] = [] key_to_indices[k].append(row["row_index"]) # Aplica already_sent si duplicate_in_file for row in preview_rows: k = row.get("idempotency_key") if not k: continue # Already_sent: cheia exista deja in submissions if k in already_sent_map and row["resolved_status"] in ("ok", "needs_review", "needs_data"): sent_info = already_sent_map[k] row["resolved_status"] = "already_sent" row["already_sent_info"] = sent_info continue # Duplicate_in_file (OV-3): >1 rand cu aceeasi cheie in ACELASI fisier indices_with_same_key = key_to_indices.get(k, []) if len(indices_with_same_key) > 1 and row["resolved_status"] in ("ok", "needs_review", "needs_data"): others = [idx for idx in indices_with_same_key if idx != row["row_index"]] row["resolved_status"] = "duplicate_in_file" row["duplicate_with"] = others # Rezumat summary: dict[str, int] = {} for row in preview_rows: s = row["resolved_status"] summary[s] = summary.get(s, 0) + 1 # Actualizeaza contoarele in import_batches conn.execute( "UPDATE import_batches SET ok=?, needs_mapping=?, needs_data=?, needs_review=?, " "already_sent=?, duplicate_in_file=? WHERE id=?", ( summary.get("ok", 0), summary.get("needs_mapping", 0), summary.get("needs_data", 0), summary.get("needs_review", 0), summary.get("already_sent", 0), summary.get("duplicate_in_file", 0), import_id, ), ) # Actualizeaza resolved_status in import_rows conn.execute("BEGIN IMMEDIATE") try: upd_params = [ (row["resolved_status"], import_id, row["row_index"]) for row in preview_rows ] conn.executemany( "UPDATE import_rows SET resolved_status=? WHERE batch_id=? AND row_index=?", upd_params, ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") return { "import_id": import_id, "rows": preview_rows, "summary": summary, "total": len(preview_rows), } finally: conn.close() # --------------------------------------------------------------------------- # # POST /v1/import/{id}/commit — gate HARD + enqueue + log atestare (T5+T12) # # --------------------------------------------------------------------------- # class CommitIn(BaseModel): n_confirmat: int = Field(..., description="Numarul de randuri ok confirmate (gate HARD)") reviewed_rows: list[int] = Field( default_factory=list, description="Indecsi de rand needs_review bifate explicit de utilizator", ) confirmed_by: str | None = Field(None, description="Email/identifier utilizator (log atestare)") @router.post("/{import_id}/commit") def commit_import( import_id: int, req: CommitIn, account_id: int = Depends(resolve_account_id), ) -> dict: """Gate HARD confirmare + enqueue randuri ok + log atestare (T5+T12). TOCTOU (Issue 1): INSERT per-rand cu ON CONFLICT(idempotency_key) DO NOTHING. Randuri colidante = reclasificate already_sent in rezultatul commit-ului. rows_hash + n_confirmed acopera DOAR randurile efectiv puse in coada. """ acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") if batch["status"] == "committed": raise HTTPException(status_code=409, detail="batch deja comis") # Incarca randurile cu stare ok sau needs_review ok_rows_db = conn.execute( "SELECT row_index, raw_json, resolved_status FROM import_rows " "WHERE batch_id=? AND resolved_status IN ('ok', 'needs_review') ORDER BY row_index", (import_id,), ).fetchall() if not ok_rows_db: raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat in acest batch.") # Decripteaza randurile ok ok_rows: list[dict] = [] ok_indices: list[int] = [] review_indices: set[int] = set() for r in ok_rows_db: try: row_data = decrypt_creds(r["raw_json"]) if row_data is None: continue except Exception: continue if r["resolved_status"] == "ok": ok_rows.append({"row_index": r["row_index"], "data": row_data, "status": "ok"}) ok_indices.append(r["row_index"]) elif r["resolved_status"] == "needs_review": review_indices.add(r["row_index"]) # needs_review bifate explicit (Voce#1 — atestare pe valori) confirmed_review = [idx for idx in req.reviewed_rows if idx in review_indices] for idx in confirmed_review: # Gaseste randul needs_review si il adauga la ok_rows for r in ok_rows_db: if r["row_index"] == idx and r["resolved_status"] == "needs_review": try: row_data = decrypt_creds(r["raw_json"]) if row_data: ok_rows.append({"row_index": idx, "data": row_data, "status": "needs_review"}) ok_indices.append(idx) except Exception: pass # Gate HARD: n_confirmat trebuie sa fie EXACT egal cu numarul de randuri ok n_total_ok = len(ok_rows) if req.n_confirmat != n_total_ok: raise HTTPException( status_code=422, detail={ "error": "confirmare_gresita", "message": f"Ai confirmat {req.n_confirmat} dar sunt {n_total_ok} randuri gata de trimis. Verifica preview-ul.", "n_ok": n_total_ok, }, ) if n_total_ok == 0: raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat.") # Incarca maparea de coloane pentru a construi payload-ul first_row_db = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchone() col_names = [] if first_row_db: try: fd = decrypt_creds(first_row_db["raw_json"]) if fd: col_names = list(fd.keys()) except Exception: pass sig = _signature(col_names) if col_names else "" mapping_row = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() json_mapare: dict[str, str] = {} if mapping_row: json_mapare = json.loads(mapping_row["json_mapare"]) # Incarca maparea de operatii mapping_meta = load_mapping_meta(conn, acct) mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} # Construieste payload-urile submissions enqueued: list[dict] = [] toctou_collisions: list[int] = [] rows_for_hash: list[str] = [] # Enqueue in tranzactie explicita (Issue 6) conn.execute("BEGIN IMMEDIATE") try: # purge_after pentru submissions noi (T16) purge_after_sql = "datetime('now', '+90 days')" for ok_row in ok_rows: row_dict = ok_row["data"] row_index = ok_row["row_index"] # Aplica maparea de coloane mapped: dict[str, Any] = {} for col_f, camp_c in json_mapare.items(): if col_f in row_dict and camp_c: mapped[camp_c] = row_dict[col_f] # Rezolva data date_col_fisier = None for col_f, camp_c in json_mapare.items(): if camp_c == "data_prestatie": date_col_fisier = col_f break if date_col_fisier: col_fmt = (mapping_row["format_data"] if mapping_row else None) or "ambiguous" raw_date = mapped.get("data_prestatie") if raw_date is not None: iso_date, _ = parse_date_value(raw_date, col_fmt) if iso_date: mapped["data_prestatie"] = iso_date # Operatia -> prestatii operatie_val = mapped.pop("operatie", None) if operatie_val and "prestatii" not in mapped: mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": str(operatie_val)}] # Rezolva prestatii INAINTE de canonicalizare (altfel cheia difera de cea din preview) prestatii = mapped.get("prestatii") or [] resolved, _ = resolve_prestatii(prestatii, mapping) mapped["prestatii"] = resolved # Canonicalizare (dupa rezolvare prestatii -> cod_prestatie inclus in cheie) canon = canonicalize_row(mapped) mapped.update({ "vin": canon["vin"], "nr_inmatriculare": canon["nr_inmatriculare"], "odometru_final": canon["odometru_final"], }) # Cheia de idempotenta (identica cu cheia din preview — aceeasi ordine) key = build_key(account_id, canon) # Hash row pentru atestare (valori rezolvate) rows_for_hash.append(json.dumps({ "row_index": row_index, "vin": mapped.get("vin"), "data_prestatie": mapped.get("data_prestatie"), "odometru_final": mapped.get("odometru_final"), "prestatii": [str(p.get("cod_prestatie") or p.get("cod_op_service") or "") for p in resolved], }, sort_keys=True, ensure_ascii=False)) payload_json = json.dumps(mapped, ensure_ascii=False) # INSERT ON CONFLICT DO NOTHING (TOCTOU — Issue 1) cur = conn.execute( "INSERT OR IGNORE INTO submissions " "(idempotency_key, account_id, status, payload_json, batch_id, row_index, purge_after) " "VALUES (?, ?, 'queued', ?, ?, ?, " + purge_after_sql + ")", (key, acct, payload_json, import_id, row_index), ) if cur.rowcount == 0: # Coliziune TOCTOU: cheia a fost inserata de un canal concurent toctou_collisions.append(row_index) else: sub_id = cur.lastrowid enqueued.append({ "submission_id": sub_id, "row_index": row_index, "idempotency_key": key, }) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise n_enqueued = len(enqueued) # Log atestare (Voce#9): rows_hash + n_confirmed acopera DOAR randurile puse in coada rows_hash = hashlib.sha256( json.dumps(sorted(rows_for_hash), ensure_ascii=False).encode("utf-8") ).hexdigest() if rows_for_hash else "" conn.execute( "INSERT INTO import_attestations (batch_id, account_id, confirmed_by, rows_hash, n_confirmed) " "VALUES (?, ?, ?, ?, ?)", (import_id, acct, req.confirmed_by, rows_hash, n_enqueued), ) # Actualizeaza status batch new_status = "committed" if n_enqueued > 0 else "staging" conn.execute( "UPDATE import_batches SET status=?, ok=? WHERE id=?", (new_status, n_enqueued, import_id), ) return { "import_id": import_id, "enqueued": n_enqueued, "toctou_collisions": toctou_collisions, "rows_hash": rows_hash, "submissions": enqueued, } finally: conn.close() # --------------------------------------------------------------------------- # # GET /v1/import/{id}/export-failed — CSV randuri esuate (T8) # # --------------------------------------------------------------------------- # _EXPORT_FAILED_COLUMNS = [ "row_index", "resolved_status", "vin", "nr_inmatriculare", "data_prestatie", "odometru_final", "operatie", "error", ] @router.get("/{import_id}/export-failed") def export_failed_rows( import_id: int, account_id: int = Depends(resolve_account_id), ) -> StreamingResponse: """CSV cu randurile esuate (needs_data/needs_mapping/needs_review) pentru corectie + re-upload.""" acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, filename FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") failed_rows = conn.execute( "SELECT row_index, raw_json, resolved_status, error FROM import_rows " "WHERE batch_id=? AND resolved_status IN ('needs_data', 'needs_mapping', 'needs_review') " "ORDER BY row_index", (import_id,), ).fetchall() # Incarca maparea de coloane pentru a stii ce campuri sa extraga first_row = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchone() col_names = [] if first_row: try: fd = decrypt_creds(first_row["raw_json"]) if fd: col_names = list(fd.keys()) except Exception: pass sig = _signature(col_names) if col_names else "" mapping_row = conn.execute( "SELECT json_mapare FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() json_mapare: dict[str, str] = {} if mapping_row: json_mapare = json.loads(mapping_row["json_mapare"]) buf = io.StringIO() writer = csv.DictWriter(buf, fieldnames=_EXPORT_FAILED_COLUMNS, extrasaction="ignore") writer.writeheader() for r in failed_rows: try: row_data = decrypt_creds(r["raw_json"]) or {} except Exception: row_data = {} # Extrage campuri canonice din mapare mapped: dict[str, Any] = {} for col_f, camp_c in json_mapare.items(): if col_f in row_data: mapped[camp_c] = row_data[col_f] # Operatia (camp canonic 'operatie' sau din prestatii) operatie_val = mapped.get("operatie") or "" if not operatie_val and mapped.get("prestatii"): prestatii = mapped.get("prestatii") if isinstance(prestatii, list) and prestatii: first_p = prestatii[0] if isinstance(first_p, dict): operatie_val = first_p.get("cod_op_service") or first_p.get("denumire") or "" # Erori (din import_rows.error sau din resolved_status) error_str = r["error"] or r["resolved_status"] writer.writerow({ "row_index": r["row_index"], "resolved_status": r["resolved_status"], "vin": mapped.get("vin") or "", "nr_inmatriculare": mapped.get("nr_inmatriculare") or "", "data_prestatie": mapped.get("data_prestatie") or "", "odometru_final": mapped.get("odometru_final") or "", "operatie": operatie_val, "error": error_str, }) data = buf.getvalue() finally: conn.close() fname = f"import_{import_id}_failed.csv" return StreamingResponse( iter([data]), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{fname}"'}, )