"""Router import Treapta 2 — upload fisier xlsx/csv, preview, commit, export. Endpointuri: POST /v1/import — upload fisier, staging in import_batches/import_rows GET /v1/import/{id}/column-mapping — maparea de coloane existenta / sugestii fuzzy POST /v1/import/{id}/column-mapping — salveaza maparea de coloane GET /v1/import/{id}/preview — preview 6 stari per rand (fara enqueue) POST /v1/import/{id}/commit — gate HARD + enqueue randuri ok + log atestare GET /v1/import/{id}/export-failed — CSV cu randuri esuate (needs_data/needs_mapping/needs_review) Reguli cheie (plan §3.1-3.4, §12): - Issue 6: scrieri bulk in tranzactie explicita BEGIN IMMEDIATE...COMMIT + executemany. - Eng#5: already_sent lookup BATCH (IN chunk ~900), nu N+1. - OV-3: duplicate_in_file EXCLUSIV la preview/commit. NU atinge reconcile.py/worker. - Issue 1 (TOCTOU): commit per-rand cu ON CONFLICT(idempotency_key) DO NOTHING. - Issue 5a: import_rows.raw_json CRIPTAT Fernet. - Issue 5b: fuzzy coloane refoloseste mapping.normalize_for_match (DRY). - T4/D3: drift semnatura coloane -> NU aplica orb, cere re-confirmare. """ from __future__ import annotations import csv import hashlib import io import json from typing import Any from fastapi import APIRouter, Depends, HTTPException, UploadFile from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from ... import errors from ...auth import resolve_account_id from ...crypto import decrypt_creds, encrypt_creds from ...db import get_connection from ...idempotency import build_key, canonicalize_row from ...import_parse import ( FileTooLarge, HeaderError, MultipleSheets, ParsedFile, parse_date_value, parse_file, ) from ...mapping import ( account_or_default, has_no_auto_send, load_mapping_meta, normalize_for_match, resolve_prestatii, ) from ...validation import validate_prezentare router = APIRouter(prefix="/v1/import", tags=["import"]) # Marimea maxima a unui chunk pentru IN(...) SQLite (limite SQLite ~999) _IN_CHUNK = 900 # Campuri canonice si sinonimele lor pentru sugestie fuzzy coloane (Issue 5b/Eng#4) _CANONICAL_SYNONYMS: dict[str, list[str]] = { "vin": ["VIN", "Serie sasiu", "Sasiu", "Serie", "Numar sasiu", "Nr sasiu", "Chassis"], "nr_inmatriculare": ["Nr inmatriculare", "Numar inmatriculare", "Numar auto", "Nr auto", "Numar", "Nr"], "data_prestatie": ["Data prestatie", "Data", "Date", "Data service", "Data lucrare"], "odometru_final": ["Odometru final", "Odometru", "KM", "Kilometri", "Km final", "Citire contor"], "odometru_initial": ["Odometru initial", "KM initial", "Km start"], "operatie": ["Operatie", "Cod prestatie", "Prestatie", "Lucrare", "Tip lucrare", "Cod op"], "denumire_op": ["Denumire operatie", "Denumire", "Descriere", "Denumire prestatie", "Nume operatie"], "obs": ["Observatii", "Obs", "Mentiuni", "Note"], } # Prag minim scor fuzzy pentru pre-selectie (0..100) _FUZZY_MIN_SCORE = 55 # --------------------------------------------------------------------------- # # Helpere interne # # --------------------------------------------------------------------------- # def _signature(columns: list[str]) -> str: """Semnatura coloanelor = sha256 al listei sortate (case-insensitive).""" norm = sorted(c.strip().lower() for c in columns) return hashlib.sha256(json.dumps(norm, ensure_ascii=False).encode("utf-8")).hexdigest() def _fuzzy_suggest_column( col_name: str, *, limit: int = 3, ) -> list[dict]: """Sugereaza campuri canonice pentru o coloana din fisier. Refoloseste normalize_for_match + rapidfuzz.fuzz.token_sort_ratio (Issue 5b/Eng#4). Intoarce [{camp_canonic, score}] sortat descrescator. """ from rapidfuzz import fuzz, process query = normalize_for_match(col_name) # Construieste un dict {camp_canonic: sinonime_normalize} pentru matching choices: dict[str, str] = {} for camp, sinonime in _CANONICAL_SYNONYMS.items(): # Cel mai bun sinonim = primul (cel mai specific) best_syn = normalize_for_match(sinonime[0]) choices[camp] = best_syn ranked = process.extract( query, choices, scorer=fuzz.token_sort_ratio, limit=limit, ) return [ {"camp_canonic": camp, "score": float(score)} for _, score, camp in ranked if score >= _FUZZY_MIN_SCORE ] def _resolve_row_for_preview( raw_row: dict[str, Any], json_mapare: dict[str, str], date_col_format: dict[str, str], coercion_flags: list[str], mapping: dict[str, str], mapping_meta: dict[str, dict], formula_columns: list[str], override: dict[str, Any] | None = None, ) -> dict[str, Any]: """Rezolva un rand din import pentru preview: aplica mapare coloane + validare. Intoarce un dict cu: resolved_status: ok/needs_mapping/needs_data/needs_review resolved: valorile finale rezolvate (VIN, data, km, prestatii) errors: lista erori validare flags: motive needs_review `override` (3.6, Approach B): patch CANONIC editat in preview, aplicat ULTIMUL peste valorile mapate (dupa `json_mapare` si canonicalizare). Permite corectarea unei valori sau completarea unui camp a carui coloana LIPSESTE din fisier, fara sa atinga `raw_json`/idempotency. """ # Aplica maparea de coloane mapped: dict[str, Any] = {} for col_fisier, camp_canonic in json_mapare.items(): if col_fisier in raw_row and camp_canonic: mapped[camp_canonic] = raw_row[col_fisier] # Detectie coloane cu formule (Issue 3) — nu blocheaza, dar adauga flag formula_flag: list[str] = [] for col_fisier, camp_canonic in json_mapare.items(): if col_fisier in formula_columns: formula_flag.append(f"Coloana '{col_fisier}' pare sa contina formule fara valori calculate — re-salveaza fisierul in Excel.") # Rezolvare data prestatie # Gaseste coloana de data din mapare data_col_fisier = None for col_f, camp_c in json_mapare.items(): if camp_c == "data_prestatie": data_col_fisier = col_f break is_ambiguous_date = False if data_col_fisier: col_fmt = date_col_format.get(data_col_fisier, "ambiguous") raw_date = mapped.get("data_prestatie") if raw_date is not None: iso_date, is_amb = parse_date_value(raw_date, col_fmt) if iso_date: mapped["data_prestatie"] = iso_date if is_amb: is_ambiguous_date = True # Operatia: daca camp canonic e "operatie", construieste prestatii. # denumire_op (coloana descriptiva, ex. "Reparatie Motor") alimenteaza # `denumire` -> sugestia fuzzy din editorul de mapari devine utila; fara ea, # denumire = codul opac (ex. "OP-MOTOR") si fuzzy nu are pe ce sa lucreze. operatie_val = mapped.pop("operatie", None) denumire_val = mapped.pop("denumire_op", None) if operatie_val and "prestatii" not in mapped: denumire = str(denumire_val).strip() if denumire_val not in (None, "") else str(operatie_val) mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": denumire}] # Canonicalizare (T9): normalizeaza VIN/nr/odometru canon = canonicalize_row(mapped) mapped.update({ "vin": canon["vin"], "nr_inmatriculare": canon["nr_inmatriculare"], "odometru_final": canon["odometru_final"], }) # Override editat in preview (3.6) — aplicat ULTIMUL, peste valorile mapate + # canonicalizate. Valorile din override sunt deja canonice (vezi _merge_override). if override: mapped.update(override) # Flags needs_review acumulate all_flags = list(coercion_flags) + formula_flag if is_ambiguous_date: all_flags.append("Data ambigua (zi<=12): verifica daca e DD.MM sau MM.DD") # Rezolvare prestatii prestatii = mapped.get("prestatii") or [] resolved, unmapped = resolve_prestatii(prestatii, mapping) mapped["prestatii"] = resolved # Determinare stare if unmapped: return { "resolved_status": "needs_mapping", "resolved": mapped, "errors": [{"unmapped": unmapped}], "flags": all_flags, } # Validare continut errors = validate_prezentare(mapped) if all_flags: # needs_review: chiar daca validarea trece, flagurile blocheaza auto-send return { "resolved_status": "needs_review", "resolved": mapped, "errors": errors, "flags": all_flags, } # auto_send gate (T6/OV-1) if has_no_auto_send(resolved, mapping_meta): return { "resolved_status": "needs_mapping", "resolved": mapped, "errors": [{"auto_send": "cod mapat cu auto_send=0; review manual inainte de trimitere"}], "flags": all_flags, } if errors: return { "resolved_status": "needs_data", "resolved": mapped, "errors": errors, "flags": all_flags, } return { "resolved_status": "ok", "resolved": mapped, "errors": [], "flags": all_flags, } def _build_idempotency_key(account_id: int | None, resolved: dict[str, Any]) -> str: """Construieste cheia de idempotenta pentru un rand rezolvat.""" canon = canonicalize_row(resolved) return build_key(account_id, canon) # Campuri de continut editabile in preview (3.6). Operatia/codul RAR NU se editeaza # aici (raman in panoul de mapare) — vezi Non-Goals din PRD 3.6. EDIT_FIELDS = ("vin", "nr_inmatriculare", "data_prestatie", "odometru_initial", "odometru_final") def _merge_override(current: dict[str, Any], fields: dict[str, str | None]) -> dict[str, Any]: """Aplica campurile editate peste override-ul curent (mutatie pura). Semantica: - valoare None -> camp ne-trimis in cerere -> neschimbat. - valoare "" -> STERGE cheia din override (revine la valoarea din fisier). - valoare negoala -> set valoare CANONICA (vin/nr upper, odometru_final fara ".0"). `odometru_initial`/`data_prestatie` se pastreaza stripped (canonicalize_row normeaza doar `_final`; validarea le verifica direct). """ out = dict(current) raw: dict[str, str] = {} for camp in EDIT_FIELDS: val = fields.get(camp) if val is None: continue s = str(val).strip() if s == "": out.pop(camp, None) # empty = clear else: raw[camp] = s if raw: canon = canonicalize_row(raw) for camp in raw: if camp in ("vin", "nr_inmatriculare", "odometru_final"): out[camp] = canon[camp] else: out[camp] = raw[camp] return out def apply_row_override( conn, *, import_id: int, account_id: int | None, row_index: int, fields: dict[str, str | None], ) -> dict[str, Any]: """Persista override-ul canonic pentru un rand de preview (mutatie PURA de stocare). NU recalculeaza statusul si NU atinge `submissions` — preview-ul rederiva statusul prin `_resolve_row_for_preview` (un singur clasificator, fara drift). Ridica HTTPException: 404 (rand/batch inexistent sau alt cont — scoping JOIN), 409 (batch deja comis), 422 (override curent corupt -> no-op defensiv, fara scriere goala). Intoarce noul dict de override (gol = override sters). """ acct = account_or_default(account_id) # Scoping intr-o singura interogare JOIN -> 404 pe gol (alt cont / batch / row_index). row = conn.execute( "SELECT r.id AS rid, r.override_json AS oj, b.status AS bstatus " "FROM import_rows r JOIN import_batches b ON b.id = r.batch_id " "WHERE b.id=? AND b.account_id=? AND r.row_index=?", (import_id, acct, row_index), ).fetchone() if not row: raise HTTPException(status_code=404, detail="rand de import inexistent") if row["bstatus"] == "committed": raise HTTPException(status_code=409, detail="batch deja comis; editarea nu mai are efect") current: dict[str, Any] = {} if row["oj"]: dec = decrypt_creds(row["oj"]) if dec is None: # Decrypt fail (cheie schimbata / token corupt): no-op defensiv, NICIODATA scriere goala. _oi_msg = "override curent ilizibil; editare anulata" raise HTTPException( status_code=422, detail={ "error": "override_ilizibil", "message": _oi_msg, **errors.eroare("IMPORT_OVERRIDE_ILIZIBIL", cauza=_oi_msg), }, ) current = dec new_override = _merge_override(current, fields) enc = encrypt_creds(new_override) if new_override else None conn.execute("UPDATE import_rows SET override_json=? WHERE id=?", (enc, row["rid"])) return new_override def _already_sent_lookup(conn, account_id: int, keys: list[str]) -> dict[str, dict]: """Cauta cheile de idempotenta in submissions (batch, nu N+1 — Eng#5). Intoarce {idempotency_key: {id, id_prezentare, created_at}} pentru cheile gasite. """ acct = account_or_default(account_id) found: dict[str, dict] = {} # Chunk ~900 parametri (limita SQLite) for i in range(0, len(keys), _IN_CHUNK): chunk = keys[i:i + _IN_CHUNK] placeholders = ",".join("?" * len(chunk)) rows = conn.execute( f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions " f"WHERE account_id=? AND idempotency_key IN ({placeholders})", (acct, *chunk), ).fetchall() for r in rows: found[r["idempotency_key"]] = { "submission_id": r["id"], "id_prezentare": r["id_prezentare"], "created_at": r["created_at"], } # Dual-lookup pentru chei legacy (OV-2: chei vechi cu account_id=None) legacy_keys_needed = [k for k in chunk if k not in found] if legacy_keys_needed: lph = ",".join("?" * len(legacy_keys_needed)) lrows = conn.execute( f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions " f"WHERE idempotency_key IN ({lph})", tuple(legacy_keys_needed), ).fetchall() for r in lrows: if r["idempotency_key"] not in found: found[r["idempotency_key"]] = { "submission_id": r["id"], "id_prezentare": r["id_prezentare"], "created_at": r["created_at"], } return found # --------------------------------------------------------------------------- # # POST /v1/import — upload fisier, staging # # --------------------------------------------------------------------------- # @router.post("") async def upload_import( file: UploadFile, sheet_name: str | None = None, account_id: int = Depends(resolve_account_id), ) -> dict: """Upload fisier xlsx/csv -> staging in import_batches/import_rows. Nu trimite nimic la RAR. Intoarce {import_id, columns, sample_rows, sheets?}. PII (raw_json) criptat Fernet la rest (Issue 5a). Scrieri bulk in tranzactie explicita (Issue 6). """ acct = account_or_default(account_id) data = await file.read() filename = file.filename or "fisier" # Parsare try: parsed: ParsedFile = parse_file(data, filename, sheet_name=sheet_name) except MultipleSheets as ms: raise HTTPException( status_code=422, detail={ "error": "multiple_sheets", "message": str(ms), "sheets": ms.sheet_names, **errors.eroare("IMPORT_MULTIPLE_SHEETS", cauza=str(ms)), }, ) except FileTooLarge as e: raise HTTPException( status_code=413, detail={ "error": "file_too_large", "message": str(e), **errors.eroare("IMPORT_FISIER_PREA_MARE", cauza=str(e)), }, ) except HeaderError as e: raise HTTPException( status_code=422, detail={ "error": "header_error", "message": str(e), "found": e.found, **errors.eroare("IMPORT_ANTET_NECLAR", cauza=str(e)), }, ) except UnicodeDecodeError as e: _enc_msg = f"Encoding nesuportat: {e.reason}" raise HTTPException( status_code=422, detail={ "error": "encoding_error", "message": _enc_msg, **errors.eroare("IMPORT_ENCODING", cauza=_enc_msg), }, ) except Exception as e: # Fisier corupt (BadZipFile, InvalidFileException, etc.) _inv_msg = f"Fisier nerecunoscut (xlsx/csv): {type(e).__name__}" raise HTTPException( status_code=422, detail={ "error": "invalid_file", "message": _inv_msg, **errors.eroare("IMPORT_FISIER_NERECUNOSCUT", cauza=_inv_msg), }, ) conn = get_connection() try: sig = _signature(parsed.columns) # Issue 6: tranzactie explicita BEGIN IMMEDIATE + executemany conn.execute("BEGIN IMMEDIATE") try: # Insert import_batches cur = conn.execute( "INSERT INTO import_batches (account_id, filename, status, total, purge_after) " "VALUES (?, ?, 'staging', ?, datetime('now', '+90 days'))", (acct, filename, len(parsed.rows)), ) batch_id = cur.lastrowid # Insert import_rows bulk (executemany) cu PII criptat row_params = [] for i, row_dict in enumerate(parsed.rows): raw_json_enc = encrypt_creds(row_dict) # Criptat Fernet (Issue 5a) row_params.append((batch_id, i, raw_json_enc, "pending", None)) conn.executemany( "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status, error) " "VALUES (?, ?, ?, ?, ?)", row_params, ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise # Semnatura coloane + mapare existenta existing_mapping = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() mapping_status = "matched" if existing_mapping else "new" # Sample rows (primele 3, fara PII) sample = parsed.rows[:3] # Persistam metadata parsedata (coercion_flags, date_col_format, formula_columns) # in import_batches pentru refolosire la preview (stocam ca JSON in 'status' nu e OK, # ci ca metadate suplimentare — le stocam intr-un rand separat sau returnam direct) # Solutie: le returnam in raspuns; preview-ul le va recalcula din raw_json deja stocat # SAU le stocam ca un camp extra. Cel mai simplu: stocam coloanele in batch. conn.execute( "UPDATE import_batches SET ok=?, needs_review=? WHERE id=?", (0, len(parsed.coercion_flags), batch_id), ) result: dict = { "import_id": batch_id, "columns": parsed.columns, "sample_rows": sample, "total_rows": len(parsed.rows), "formula_columns": parsed.formula_columns, "date_col_format": parsed.date_col_format, "coercion_flags_count": len(parsed.coercion_flags), "mapping_status": mapping_status, "signature": sig, } if existing_mapping: result["column_mapping"] = json.loads(existing_mapping["json_mapare"]) result["format_data"] = existing_mapping["format_data"] else: # Sugestii fuzzy per coloana (Issue 5b: refoloseste normalize_for_match) suggestions: dict[str, list[dict]] = {} for col in parsed.columns: sugg = _fuzzy_suggest_column(col, limit=3) if sugg: suggestions[col] = sugg result["fuzzy_suggestions"] = suggestions return result finally: conn.close() # --------------------------------------------------------------------------- # # GET /v1/import/{id}/column-mapping — mapare existenta / sugestii # # --------------------------------------------------------------------------- # @router.get("/{import_id}/column-mapping") def get_column_mapping( import_id: int, account_id: int = Depends(resolve_account_id), ) -> dict: """Intoarce maparea de coloane existenta sau sugestii fuzzy.""" acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, filename, status FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") # Obtine coloanele din primele randuri first_rows = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchall() # Obtine coloanele din batch — trebuie sa le stocam la upload # Le recalculam din primul rand (cheile sunt coloanele fisierului) columns = [] if first_rows: try: row_data = decrypt_creds(first_rows[0]["raw_json"]) if row_data: columns = list(row_data.keys()) except Exception: pass sig = _signature(columns) if columns else "" existing = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() if existing: return { "status": "matched", "column_mapping": json.loads(existing["json_mapare"]), "format_data": existing["format_data"], "columns": columns, "signature": sig, } # Sugestii fuzzy suggestions: dict[str, list[dict]] = {} for col in columns: sugg = _fuzzy_suggest_column(col, limit=3) if sugg: suggestions[col] = sugg return { "status": "new", "columns": columns, "signature": sig, "fuzzy_suggestions": suggestions, } finally: conn.close() # --------------------------------------------------------------------------- # # POST /v1/import/{id}/column-mapping — salveaza maparea # # --------------------------------------------------------------------------- # class ColumnMappingIn(BaseModel): json_mapare: dict[str, str] = Field(..., description="Mapare {col_fisier: camp_canonic}") format_data: str | None = Field(None, description="Format data, ex: DD.MM.YYYY") @router.post("/{import_id}/column-mapping") def save_column_mapping( import_id: int, req: ColumnMappingIn, account_id: int = Depends(resolve_account_id), ) -> dict: """Salveaza/actualizeaza maparea de coloane pentru contul curent. Semnatura = hash al coloanelor fisierului. Drift: daca coloanele se schimba, maparea veche nu se aplica (signature mismatch -> mapping_status='new'). """ acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") # Semnatura = antetul COMPLET al fisierului (toate coloanele din batch), nu # doar campurile mapate. Altfel, daca clientul ignora o coloana, semnatura # difera de cea calculata la preview (col_names = antet complet) si maparea # retinuta nu mai e gasita. Citim antetul din primul rand al batch-ului. first_row = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchone() columns = list(req.json_mapare.keys()) if first_row: try: rd = decrypt_creds(first_row["raw_json"]) or {} if rd: columns = list(rd.keys()) except Exception: pass sig = _signature(columns) conn.execute( "INSERT INTO column_mappings (account_id, signature_coloane, json_mapare, format_data) " "VALUES (?, ?, ?, ?) " "ON CONFLICT(account_id, signature_coloane) DO UPDATE SET " "json_mapare=excluded.json_mapare, format_data=excluded.format_data", (acct, sig, json.dumps(req.json_mapare, ensure_ascii=False), req.format_data), ) return {"ok": True, "signature": sig, "columns": columns} finally: conn.close() # --------------------------------------------------------------------------- # # GET /v1/import/{id}/preview — 6 stari per rand (T2 + T11) # # --------------------------------------------------------------------------- # @router.get("/{import_id}/preview") def preview_import( import_id: int, account_id: int = Depends(resolve_account_id), ) -> dict: """Preview 6 stari per rand: ok/needs_mapping/needs_data/needs_review/already_sent/duplicate_in_file. Nu enqueue-aza nimic. Already_sent = lookup batch (Eng#5). Duplicate_in_file = intra-batch collision (OV-3: EXCLUSIV aici, NU in reconcile.py/worker). """ acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") # Incarca toate randurile raw_rows_db = conn.execute( "SELECT row_index, raw_json, override_json FROM import_rows WHERE batch_id=? ORDER BY row_index", (import_id,), ).fetchall() if not raw_rows_db: return {"rows": [], "summary": {}} # Decripteaza si reconstruieste randurile + override-urile editate (3.6) rows: list[dict] = [] overrides: list[dict] = [] for r in raw_rows_db: try: row_data = decrypt_creds(r["raw_json"]) rows.append(row_data or {}) except Exception: rows.append({}) try: ov = decrypt_creds(r["override_json"]) if r["override_json"] else None except Exception: ov = None overrides.append(ov or {}) # Obtine coloanele col_names = list(rows[0].keys()) if rows else [] sig = _signature(col_names) # Mapare coloane mapping_row = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() if not mapping_row: _ncm_msg = "Maparea coloanelor nu a fost configurata. Configureaza mai intai maparea." raise HTTPException( status_code=422, detail={ "error": "no_column_mapping", "message": _ncm_msg, **errors.eroare("IMPORT_FARA_MAPARE_COLOANE", cauza=_ncm_msg), }, ) json_mapare: dict[str, str] = json.loads(mapping_row["json_mapare"]) format_data = mapping_row["format_data"] # Incarca maparea de operatii o singura data (Eng#5: load_mapping o singura data) mapping_meta = load_mapping_meta(conn, acct) mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} # Reconstruieste parsed info (coercion_flags si date_col_format) din datele stocate # Nota: import_rows stocheaza raw_json dupa coercion (din parse_file) # Recalculam flags din valorile stocate (coercion_flags nu e stocat separat) # Vom folosi o detectie simpla: VIN-uri care par numerice si odometru float coercion_flags_map: dict[int, list[str]] = {} # Detectam din valorile stocate for i, row_dict in enumerate(rows): flags = [] # Detectam VIN numeric: daca valoarea a fost stocata si arata ca numar for col_f, camp_c in json_mapare.items(): if camp_c == "vin": vin_val = row_dict.get(col_f) if vin_val is not None and str(vin_val).replace(".", "").isdigit(): flags.append(f"VIN numeric ({vin_val}) — verificati seria sasiului") if flags: coercion_flags_map[i] = flags # Reconstructie date_col_format din mapare si valorile stocate # (Simplificat: folosim "ambiguous" daca format_data nu e setat) date_col_format: dict[str, str] = {} if format_data: for col_f, camp_c in json_mapare.items(): if camp_c == "data_prestatie": date_col_format[col_f] = format_data # Detectie formula_columns din valorile stocate (rata None ridicata) col_none_counts: dict[str, int] = {} for col_f in col_names: col_none_counts[col_f] = sum(1 for r in rows if r.get(col_f) is None) n_rows = len(rows) formula_columns = [ col_f for col_f, cnt in col_none_counts.items() if n_rows > 0 and cnt / n_rows >= 0.6 ] # Rezolva fiecare rand preview_rows: list[dict] = [] keys_for_lookup: list[str] = [] key_to_index: dict[str, list[int]] = {} # key -> [row_index, ...] for i, row_dict in enumerate(rows): flags = coercion_flags_map.get(i, []) resolved_info = _resolve_row_for_preview( raw_row=row_dict, json_mapare=json_mapare, date_col_format=date_col_format, coercion_flags=flags, mapping=mapping, mapping_meta=mapping_meta, formula_columns=formula_columns, override=overrides[i] or None, ) # Calculeaza cheia de idempotenta pentru randurile ok/needs_review key = None if resolved_info["resolved_status"] in ("ok", "needs_review", "needs_data"): try: key = _build_idempotency_key(account_id, resolved_info["resolved"]) keys_for_lookup.append(key) if key not in key_to_index: key_to_index[key] = [] key_to_index[key].append(i) except Exception: pass preview_rows.append({ "row_index": i, "resolved_status": resolved_info["resolved_status"], "resolved": resolved_info["resolved"], "errors": resolved_info["errors"], "flags": resolved_info["flags"], "idempotency_key": key, }) # Already_sent: batch lookup (Eng#5 — nu N+1) unique_keys = list(set(keys_for_lookup)) already_sent_map = _already_sent_lookup(conn, account_id, unique_keys) # Duplicate_in_file (OV-3): detectie coliziuni intra-batch # Grupam pe cheie de idempotenta: >1 rand cu aceeasi cheie = duplicate key_to_indices: dict[str, list[int]] = {} for row in preview_rows: k = row.get("idempotency_key") if k: if k not in key_to_indices: key_to_indices[k] = [] key_to_indices[k].append(row["row_index"]) # Aplica already_sent si duplicate_in_file for row in preview_rows: k = row.get("idempotency_key") if not k: continue # Already_sent: cheia exista deja in submissions if k in already_sent_map and row["resolved_status"] in ("ok", "needs_review", "needs_data"): sent_info = already_sent_map[k] row["resolved_status"] = "already_sent" row["already_sent_info"] = sent_info continue # Duplicate_in_file (OV-3): >1 rand cu aceeasi cheie in ACELASI fisier indices_with_same_key = key_to_indices.get(k, []) if len(indices_with_same_key) > 1 and row["resolved_status"] in ("ok", "needs_review", "needs_data"): others = [idx for idx in indices_with_same_key if idx != row["row_index"]] row["resolved_status"] = "duplicate_in_file" row["duplicate_with"] = others # Rezumat summary: dict[str, int] = {} for row in preview_rows: s = row["resolved_status"] summary[s] = summary.get(s, 0) + 1 # Actualizeaza contoarele in import_batches conn.execute( "UPDATE import_batches SET ok=?, needs_mapping=?, needs_data=?, needs_review=?, " "already_sent=?, duplicate_in_file=? WHERE id=?", ( summary.get("ok", 0), summary.get("needs_mapping", 0), summary.get("needs_data", 0), summary.get("needs_review", 0), summary.get("already_sent", 0), summary.get("duplicate_in_file", 0), import_id, ), ) # Actualizeaza resolved_status in import_rows conn.execute("BEGIN IMMEDIATE") try: upd_params = [ (row["resolved_status"], import_id, row["row_index"]) for row in preview_rows ] conn.executemany( "UPDATE import_rows SET resolved_status=? WHERE batch_id=? AND row_index=?", upd_params, ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") return { "import_id": import_id, "rows": preview_rows, "summary": summary, "total": len(preview_rows), } finally: conn.close() # --------------------------------------------------------------------------- # # POST /v1/import/{id}/commit — gate HARD + enqueue + log atestare (T5+T12) # # --------------------------------------------------------------------------- # class CommitIn(BaseModel): n_confirmat: int = Field(..., description="Numarul de randuri ok confirmate (gate HARD)") reviewed_rows: list[int] = Field( default_factory=list, description="Indecsi de rand needs_review bifate explicit de utilizator", ) confirmed_by: str | None = Field(None, description="Email/identifier utilizator (log atestare)") @router.post("/{import_id}/commit") def commit_import( import_id: int, req: CommitIn, account_id: int = Depends(resolve_account_id), ) -> dict: """Gate HARD confirmare + enqueue randuri ok + log atestare (T5+T12). TOCTOU (Issue 1): INSERT per-rand cu ON CONFLICT(idempotency_key) DO NOTHING. Randuri colidante = reclasificate already_sent in rezultatul commit-ului. rows_hash + n_confirmed acopera DOAR randurile efectiv puse in coada. """ acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") if batch["status"] == "committed": raise HTTPException(status_code=409, detail="batch deja comis") # Incarca randurile cu stare ok sau needs_review ok_rows_db = conn.execute( "SELECT row_index, raw_json, override_json, resolved_status FROM import_rows " "WHERE batch_id=? AND resolved_status IN ('ok', 'needs_review') ORDER BY row_index", (import_id,), ).fetchall() if not ok_rows_db: raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat in acest batch.") def _override_of(r) -> dict: return (decrypt_creds(r["override_json"]) if r["override_json"] else None) or {} # Decripteaza randurile ok ok_rows: list[dict] = [] ok_indices: list[int] = [] review_indices: set[int] = set() for r in ok_rows_db: try: row_data = decrypt_creds(r["raw_json"]) if row_data is None: continue except Exception: continue if r["resolved_status"] == "ok": ok_rows.append({"row_index": r["row_index"], "data": row_data, "override": _override_of(r), "status": "ok"}) ok_indices.append(r["row_index"]) elif r["resolved_status"] == "needs_review": review_indices.add(r["row_index"]) # needs_review bifate explicit (Voce#1 — atestare pe valori) confirmed_review = [idx for idx in req.reviewed_rows if idx in review_indices] for idx in confirmed_review: # Gaseste randul needs_review si il adauga la ok_rows for r in ok_rows_db: if r["row_index"] == idx and r["resolved_status"] == "needs_review": try: row_data = decrypt_creds(r["raw_json"]) if row_data: ok_rows.append({"row_index": idx, "data": row_data, "override": _override_of(r), "status": "needs_review"}) ok_indices.append(idx) except Exception: pass # Gate HARD: n_confirmat trebuie sa fie EXACT egal cu numarul de randuri ok n_total_ok = len(ok_rows) if req.n_confirmat != n_total_ok: _cg_msg = f"Ai confirmat {req.n_confirmat} dar sunt {n_total_ok} randuri gata de trimis. Verifica preview-ul." raise HTTPException( status_code=422, detail={ "error": "confirmare_gresita", "message": _cg_msg, "n_ok": n_total_ok, **errors.eroare("IMPORT_CONFIRMARE_GRESITA", cauza=_cg_msg), }, ) if n_total_ok == 0: raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat.") # Incarca maparea de coloane pentru a construi payload-ul first_row_db = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchone() col_names = [] if first_row_db: try: fd = decrypt_creds(first_row_db["raw_json"]) if fd: col_names = list(fd.keys()) except Exception: pass sig = _signature(col_names) if col_names else "" mapping_row = conn.execute( "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() json_mapare: dict[str, str] = {} if mapping_row: json_mapare = json.loads(mapping_row["json_mapare"]) # Incarca maparea de operatii mapping_meta = load_mapping_meta(conn, acct) mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} # Construieste payload-urile submissions enqueued: list[dict] = [] toctou_collisions: list[int] = [] rows_for_hash: list[str] = [] # Enqueue in tranzactie explicita (Issue 6) conn.execute("BEGIN IMMEDIATE") try: # purge_after pentru submissions noi (T16) purge_after_sql = "datetime('now', '+90 days')" for ok_row in ok_rows: row_dict = ok_row["data"] row_index = ok_row["row_index"] # Aplica maparea de coloane mapped: dict[str, Any] = {} for col_f, camp_c in json_mapare.items(): if col_f in row_dict and camp_c: mapped[camp_c] = row_dict[col_f] # Rezolva data date_col_fisier = None for col_f, camp_c in json_mapare.items(): if camp_c == "data_prestatie": date_col_fisier = col_f break if date_col_fisier: col_fmt = (mapping_row["format_data"] if mapping_row else None) or "ambiguous" raw_date = mapped.get("data_prestatie") if raw_date is not None: iso_date, _ = parse_date_value(raw_date, col_fmt) if iso_date: mapped["data_prestatie"] = iso_date # Operatia -> prestatii (denumire_op alimenteaza denumirea reala) operatie_val = mapped.pop("operatie", None) denumire_val = mapped.pop("denumire_op", None) if operatie_val and "prestatii" not in mapped: denumire = str(denumire_val).strip() if denumire_val not in (None, "") else str(operatie_val) mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": denumire}] # Rezolva prestatii INAINTE de canonicalizare (altfel cheia difera de cea din preview) prestatii = mapped.get("prestatii") or [] resolved, _ = resolve_prestatii(prestatii, mapping) mapped["prestatii"] = resolved # Canonicalizare (dupa rezolvare prestatii -> cod_prestatie inclus in cheie) canon = canonicalize_row(mapped) mapped.update({ "vin": canon["vin"], "nr_inmatriculare": canon["nr_inmatriculare"], "odometru_final": canon["odometru_final"], }) # Override editat in preview (3.6) — aplicat ULTIMUL, ca in resolver. override = ok_row.get("override") or {} if override: mapped.update(override) # Re-canonicalizeaza pentru a obtine cheia IDENTICA cu cea din preview # (_build_idempotency_key = canonicalize_row + build_key peste mapped). canon = canonicalize_row(mapped) mapped.update({ "vin": canon["vin"], "nr_inmatriculare": canon["nr_inmatriculare"], "odometru_final": canon["odometru_final"], }) # Cheia de idempotenta (identica cu cheia din preview — aceeasi ordine) key = build_key(account_id, canon) # Hash row pentru atestare (valori rezolvate) rows_for_hash.append(json.dumps({ "row_index": row_index, "vin": mapped.get("vin"), "data_prestatie": mapped.get("data_prestatie"), "odometru_final": mapped.get("odometru_final"), "prestatii": [str(p.get("cod_prestatie") or p.get("cod_op_service") or "") for p in resolved], }, sort_keys=True, ensure_ascii=False)) payload_json = json.dumps(mapped, ensure_ascii=False) # INSERT ON CONFLICT DO NOTHING (TOCTOU — Issue 1) cur = conn.execute( "INSERT OR IGNORE INTO submissions " "(idempotency_key, account_id, status, payload_json, batch_id, row_index, purge_after) " "VALUES (?, ?, 'queued', ?, ?, ?, " + purge_after_sql + ")", (key, acct, payload_json, import_id, row_index), ) if cur.rowcount == 0: # Coliziune TOCTOU: cheia a fost inserata de un canal concurent toctou_collisions.append(row_index) else: sub_id = cur.lastrowid enqueued.append({ "submission_id": sub_id, "row_index": row_index, "idempotency_key": key, }) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise n_enqueued = len(enqueued) # Log atestare (Voce#9): rows_hash + n_confirmed acopera DOAR randurile puse in coada rows_hash = hashlib.sha256( json.dumps(sorted(rows_for_hash), ensure_ascii=False).encode("utf-8") ).hexdigest() if rows_for_hash else "" conn.execute( "INSERT INTO import_attestations (batch_id, account_id, confirmed_by, rows_hash, n_confirmed) " "VALUES (?, ?, ?, ?, ?)", (import_id, acct, req.confirmed_by, rows_hash, n_enqueued), ) # Actualizeaza status batch new_status = "committed" if n_enqueued > 0 else "staging" conn.execute( "UPDATE import_batches SET status=?, ok=? WHERE id=?", (new_status, n_enqueued, import_id), ) return { "import_id": import_id, "enqueued": n_enqueued, "toctou_collisions": toctou_collisions, "rows_hash": rows_hash, "submissions": enqueued, } finally: conn.close() # --------------------------------------------------------------------------- # # POST /v1/import/{id}/rand/{row_index}/editeaza — editare celule preview (3.6) # # --------------------------------------------------------------------------- # class RandEditIn(BaseModel): """Campuri de continut editabile in preview. None = ne-trimis (neschimbat); "" = sterge override-ul (revine la valoarea din fisier).""" vin: str | None = None nr_inmatriculare: str | None = None data_prestatie: str | None = None odometru_initial: str | None = None odometru_final: str | None = None @router.post("/{import_id}/rand/{row_index}/editeaza") def editeaza_rand( import_id: int, row_index: int, req: RandEditIn, account_id: int = Depends(resolve_account_id), ) -> dict: """Persista editarea unui rand de preview (mutatie pura — Approach B, 3.6). NU recalculeaza statusul si NU atinge `submissions`; preview-ul rederiva statusul prin `_resolve_row_for_preview` cu override aplicat ultimul. """ conn = get_connection() try: override = apply_row_override( conn, import_id=import_id, account_id=account_id, row_index=row_index, fields=req.model_dump(), ) return {"ok": True, "import_id": import_id, "row_index": row_index, "override": override} finally: conn.close() # --------------------------------------------------------------------------- # # GET /v1/import/{id}/export-failed — CSV randuri esuate (T8) # # --------------------------------------------------------------------------- # _EXPORT_FAILED_COLUMNS = [ "row_index", "resolved_status", "vin", "nr_inmatriculare", "data_prestatie", "odometru_final", "operatie", "error", ] @router.get("/{import_id}/export-failed") def export_failed_rows( import_id: int, account_id: int = Depends(resolve_account_id), ) -> StreamingResponse: """CSV cu randurile esuate (needs_data/needs_mapping/needs_review) pentru corectie + re-upload.""" acct = account_or_default(account_id) conn = get_connection() try: batch = conn.execute( "SELECT id, filename FROM import_batches WHERE id=? AND account_id=?", (import_id, acct), ).fetchone() if not batch: raise HTTPException(status_code=404, detail="batch de import inexistent") failed_rows = conn.execute( "SELECT row_index, raw_json, resolved_status, error FROM import_rows " "WHERE batch_id=? AND resolved_status IN ('needs_data', 'needs_mapping', 'needs_review') " "ORDER BY row_index", (import_id,), ).fetchall() # Incarca maparea de coloane pentru a stii ce campuri sa extraga first_row = conn.execute( "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", (import_id,), ).fetchone() col_names = [] if first_row: try: fd = decrypt_creds(first_row["raw_json"]) if fd: col_names = list(fd.keys()) except Exception: pass sig = _signature(col_names) if col_names else "" mapping_row = conn.execute( "SELECT json_mapare FROM column_mappings WHERE account_id=? AND signature_coloane=?", (acct, sig), ).fetchone() json_mapare: dict[str, str] = {} if mapping_row: json_mapare = json.loads(mapping_row["json_mapare"]) buf = io.StringIO() writer = csv.DictWriter(buf, fieldnames=_EXPORT_FAILED_COLUMNS, extrasaction="ignore") writer.writeheader() for r in failed_rows: try: row_data = decrypt_creds(r["raw_json"]) or {} except Exception: row_data = {} # Extrage campuri canonice din mapare mapped: dict[str, Any] = {} for col_f, camp_c in json_mapare.items(): if col_f in row_data: mapped[camp_c] = row_data[col_f] # Operatia (camp canonic 'operatie' sau din prestatii) operatie_val = mapped.get("operatie") or "" if not operatie_val and mapped.get("prestatii"): prestatii = mapped.get("prestatii") if isinstance(prestatii, list) and prestatii: first_p = prestatii[0] if isinstance(first_p, dict): operatie_val = first_p.get("cod_op_service") or first_p.get("denumire") or "" # Erori (din import_rows.error sau din resolved_status) error_str = r["error"] or r["resolved_status"] writer.writerow({ "row_index": r["row_index"], "resolved_status": r["resolved_status"], "vin": mapped.get("vin") or "", "nr_inmatriculare": mapped.get("nr_inmatriculare") or "", "data_prestatie": mapped.get("data_prestatie") or "", "odometru_final": mapped.get("odometru_final") or "", "operatie": operatie_val, "error": error_str, }) data = buf.getvalue() finally: conn.close() fname = f"import_{import_id}_failed.csv" return StreamingResponse( iter([data]), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{fname}"'}, )