diff --git a/app/api/v1/import_router.py b/app/api/v1/import_router.py new file mode 100644 index 0000000..59e0e76 --- /dev/null +++ b/app/api/v1/import_router.py @@ -0,0 +1,1125 @@ +"""Router import Treapta 2 — upload fisier xlsx/csv, preview, commit, export. + +Endpointuri: + POST /v1/import — upload fisier, staging in import_batches/import_rows + GET /v1/import/{id}/column-mapping — maparea de coloane existenta / sugestii fuzzy + POST /v1/import/{id}/column-mapping — salveaza maparea de coloane + GET /v1/import/{id}/preview — preview 6 stari per rand (fara enqueue) + POST /v1/import/{id}/commit — gate HARD + enqueue randuri ok + log atestare + GET /v1/import/{id}/export-failed — CSV cu randuri esuate (needs_data/needs_mapping/needs_review) + +Reguli cheie (plan §3.1-3.4, §12): + - Issue 6: scrieri bulk in tranzactie explicita BEGIN IMMEDIATE...COMMIT + executemany. + - Eng#5: already_sent lookup BATCH (IN chunk ~900), nu N+1. + - OV-3: duplicate_in_file EXCLUSIV la preview/commit. NU atinge reconcile.py/worker. + - Issue 1 (TOCTOU): commit per-rand cu ON CONFLICT(idempotency_key) DO NOTHING. + - Issue 5a: import_rows.raw_json CRIPTAT Fernet. + - Issue 5b: fuzzy coloane refoloseste mapping.normalize_for_match (DRY). + - T4/D3: drift semnatura coloane -> NU aplica orb, cere re-confirmare. +""" + +from __future__ import annotations + +import csv +import hashlib +import io +import json +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, UploadFile +from fastapi.responses import StreamingResponse +from pydantic import BaseModel, Field + +from ...auth import resolve_account_id +from ...crypto import decrypt_creds, encrypt_creds +from ...db import get_connection +from ...idempotency import build_key, canonicalize_row +from ...import_parse import ( + FileTooLarge, + HeaderError, + MultipleSheets, + ParsedFile, + parse_date_value, + parse_file, +) +from ...mapping import ( + account_or_default, + has_no_auto_send, + load_mapping_meta, + normalize_for_match, + resolve_prestatii, +) +from ...validation import validate_prezentare + +router = APIRouter(prefix="/v1/import", tags=["import"]) + +# Marimea maxima a unui chunk pentru IN(...) SQLite (limite SQLite ~999) +_IN_CHUNK = 900 + +# Campuri canonice si sinonimele lor pentru sugestie fuzzy coloane (Issue 5b/Eng#4) +_CANONICAL_SYNONYMS: dict[str, list[str]] = { + "vin": ["VIN", "Serie sasiu", "Sasiu", "Serie", "Numar sasiu", "Nr sasiu", "Chassis"], + "nr_inmatriculare": ["Nr inmatriculare", "Numar inmatriculare", "Numar auto", "Nr auto", "Numar", "Nr"], + "data_prestatie": ["Data prestatie", "Data", "Date", "Data service", "Data lucrare"], + "odometru_final": ["Odometru final", "Odometru", "KM", "Kilometri", "Km final", "Citire contor"], + "odometru_initial": ["Odometru initial", "KM initial", "Km start"], + "operatie": ["Operatie", "Denumire prestatie", "Prestatie", "Lucrare", "Tip lucrare", "Cod prestatie", "Cod op"], + "obs": ["Observatii", "Obs", "Mentiuni", "Note"], +} + +# Prag minim scor fuzzy pentru pre-selectie (0..100) +_FUZZY_MIN_SCORE = 55 + + +# --------------------------------------------------------------------------- # +# Helpere interne # +# --------------------------------------------------------------------------- # + +def _signature(columns: list[str]) -> str: + """Semnatura coloanelor = sha256 al listei sortate (case-insensitive).""" + norm = sorted(c.strip().lower() for c in columns) + return hashlib.sha256(json.dumps(norm, ensure_ascii=False).encode("utf-8")).hexdigest() + + +def _fuzzy_suggest_column( + col_name: str, + *, + limit: int = 3, +) -> list[dict]: + """Sugereaza campuri canonice pentru o coloana din fisier. + + Refoloseste normalize_for_match + rapidfuzz.fuzz.token_sort_ratio (Issue 5b/Eng#4). + Intoarce [{camp_canonic, score}] sortat descrescator. + """ + from rapidfuzz import fuzz, process + + query = normalize_for_match(col_name) + # Construieste un dict {camp_canonic: sinonime_normalize} pentru matching + choices: dict[str, str] = {} + for camp, sinonime in _CANONICAL_SYNONYMS.items(): + # Cel mai bun sinonim = primul (cel mai specific) + best_syn = normalize_for_match(sinonime[0]) + choices[camp] = best_syn + + ranked = process.extract( + query, + choices, + scorer=fuzz.token_sort_ratio, + limit=limit, + ) + return [ + {"camp_canonic": camp, "score": float(score)} + for _, score, camp in ranked + if score >= _FUZZY_MIN_SCORE + ] + + + +def _resolve_row_for_preview( + raw_row: dict[str, Any], + json_mapare: dict[str, str], + date_col_format: dict[str, str], + coercion_flags: list[str], + mapping: dict[str, str], + mapping_meta: dict[str, dict], + formula_columns: list[str], +) -> dict[str, Any]: + """Rezolva un rand din import pentru preview: aplica mapare coloane + validare. + + Intoarce un dict cu: + resolved_status: ok/needs_mapping/needs_data/needs_review + resolved: valorile finale rezolvate (VIN, data, km, prestatii) + errors: lista erori validare + flags: motive needs_review + """ + # Aplica maparea de coloane + mapped: dict[str, Any] = {} + for col_fisier, camp_canonic in json_mapare.items(): + if col_fisier in raw_row and camp_canonic: + mapped[camp_canonic] = raw_row[col_fisier] + + # Detectie coloane cu formule (Issue 3) — nu blocheaza, dar adauga flag + formula_flag: list[str] = [] + for col_fisier, camp_canonic in json_mapare.items(): + if col_fisier in formula_columns: + formula_flag.append(f"Coloana '{col_fisier}' pare sa contina formule fara valori calculate — re-salveaza fisierul in Excel.") + + # Rezolvare data prestatie + # Gaseste coloana de data din mapare + data_col_fisier = None + for col_f, camp_c in json_mapare.items(): + if camp_c == "data_prestatie": + data_col_fisier = col_f + break + + is_ambiguous_date = False + if data_col_fisier: + col_fmt = date_col_format.get(data_col_fisier, "ambiguous") + raw_date = mapped.get("data_prestatie") + if raw_date is not None: + iso_date, is_amb = parse_date_value(raw_date, col_fmt) + if iso_date: + mapped["data_prestatie"] = iso_date + if is_amb: + is_ambiguous_date = True + + # Operatia: daca camp canonic e "operatie", construieste prestatii + operatie_val = mapped.pop("operatie", None) + if operatie_val and "prestatii" not in mapped: + # Construieste un item de prestatie din operatie + mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": str(operatie_val)}] + + # Canonicalizare (T9): normalizeaza VIN/nr/odometru + canon = canonicalize_row(mapped) + mapped.update({ + "vin": canon["vin"], + "nr_inmatriculare": canon["nr_inmatriculare"], + "odometru_final": canon["odometru_final"], + }) + + # Flags needs_review acumulate + all_flags = list(coercion_flags) + formula_flag + if is_ambiguous_date: + all_flags.append("Data ambigua (zi<=12): verifica daca e DD.MM sau MM.DD") + + # Rezolvare prestatii + prestatii = mapped.get("prestatii") or [] + resolved, unmapped = resolve_prestatii(prestatii, mapping) + mapped["prestatii"] = resolved + + # Determinare stare + if unmapped: + return { + "resolved_status": "needs_mapping", + "resolved": mapped, + "errors": [{"unmapped": unmapped}], + "flags": all_flags, + } + + # Validare continut + errors = validate_prezentare(mapped) + + if all_flags: + # needs_review: chiar daca validarea trece, flagurile blocheaza auto-send + return { + "resolved_status": "needs_review", + "resolved": mapped, + "errors": errors, + "flags": all_flags, + } + + # auto_send gate (T6/OV-1) + if has_no_auto_send(resolved, mapping_meta): + return { + "resolved_status": "needs_mapping", + "resolved": mapped, + "errors": [{"auto_send": "cod mapat cu auto_send=0; review manual inainte de trimitere"}], + "flags": all_flags, + } + + if errors: + return { + "resolved_status": "needs_data", + "resolved": mapped, + "errors": errors, + "flags": all_flags, + } + + return { + "resolved_status": "ok", + "resolved": mapped, + "errors": [], + "flags": all_flags, + } + + +def _build_idempotency_key(account_id: int | None, resolved: dict[str, Any]) -> str: + """Construieste cheia de idempotenta pentru un rand rezolvat.""" + canon = canonicalize_row(resolved) + return build_key(account_id, canon) + + +def _already_sent_lookup(conn, account_id: int, keys: list[str]) -> dict[str, dict]: + """Cauta cheile de idempotenta in submissions (batch, nu N+1 — Eng#5). + + Intoarce {idempotency_key: {id, id_prezentare, created_at}} pentru cheile gasite. + """ + acct = account_or_default(account_id) + found: dict[str, dict] = {} + # Chunk ~900 parametri (limita SQLite) + for i in range(0, len(keys), _IN_CHUNK): + chunk = keys[i:i + _IN_CHUNK] + placeholders = ",".join("?" * len(chunk)) + rows = conn.execute( + f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions " + f"WHERE account_id=? AND idempotency_key IN ({placeholders})", + (acct, *chunk), + ).fetchall() + for r in rows: + found[r["idempotency_key"]] = { + "submission_id": r["id"], + "id_prezentare": r["id_prezentare"], + "created_at": r["created_at"], + } + # Dual-lookup pentru chei legacy (OV-2: chei vechi cu account_id=None) + legacy_keys_needed = [k for k in chunk if k not in found] + if legacy_keys_needed: + lph = ",".join("?" * len(legacy_keys_needed)) + lrows = conn.execute( + f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions " + f"WHERE idempotency_key IN ({lph})", + tuple(legacy_keys_needed), + ).fetchall() + for r in lrows: + if r["idempotency_key"] not in found: + found[r["idempotency_key"]] = { + "submission_id": r["id"], + "id_prezentare": r["id_prezentare"], + "created_at": r["created_at"], + } + return found + + +# --------------------------------------------------------------------------- # +# POST /v1/import — upload fisier, staging # +# --------------------------------------------------------------------------- # + +@router.post("") +async def upload_import( + file: UploadFile, + sheet_name: str | None = None, + account_id: int = Depends(resolve_account_id), +) -> dict: + """Upload fisier xlsx/csv -> staging in import_batches/import_rows. + + Nu trimite nimic la RAR. Intoarce {import_id, columns, sample_rows, sheets?}. + PII (raw_json) criptat Fernet la rest (Issue 5a). + Scrieri bulk in tranzactie explicita (Issue 6). + """ + acct = account_or_default(account_id) + data = await file.read() + filename = file.filename or "fisier" + + # Parsare + try: + parsed: ParsedFile = parse_file(data, filename, sheet_name=sheet_name) + except MultipleSheets as ms: + raise HTTPException( + status_code=422, + detail={ + "error": "multiple_sheets", + "message": str(ms), + "sheets": ms.sheet_names, + }, + ) + except FileTooLarge as e: + raise HTTPException( + status_code=413, + detail={ + "error": "file_too_large", + "message": str(e), + }, + ) + except HeaderError as e: + raise HTTPException( + status_code=422, + detail={ + "error": "header_error", + "message": str(e), + "found": e.found, + }, + ) + except UnicodeDecodeError as e: + raise HTTPException( + status_code=422, + detail={ + "error": "encoding_error", + "message": f"Encoding nesuportat: {e.reason}", + }, + ) + except Exception as e: + # Fisier corupt (BadZipFile, InvalidFileException, etc.) + raise HTTPException( + status_code=422, + detail={ + "error": "invalid_file", + "message": f"Fisier nerecunoscut (xlsx/csv): {type(e).__name__}", + }, + ) + + conn = get_connection() + try: + sig = _signature(parsed.columns) + + # Issue 6: tranzactie explicita BEGIN IMMEDIATE + executemany + conn.execute("BEGIN IMMEDIATE") + try: + # Insert import_batches + cur = conn.execute( + "INSERT INTO import_batches (account_id, filename, status, total, purge_after) " + "VALUES (?, ?, 'staging', ?, datetime('now', '+90 days'))", + (acct, filename, len(parsed.rows)), + ) + batch_id = cur.lastrowid + + # Insert import_rows bulk (executemany) cu PII criptat + row_params = [] + for i, row_dict in enumerate(parsed.rows): + raw_json_enc = encrypt_creds(row_dict) # Criptat Fernet (Issue 5a) + row_params.append((batch_id, i, raw_json_enc, "pending", None)) + + conn.executemany( + "INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status, error) " + "VALUES (?, ?, ?, ?, ?)", + row_params, + ) + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + raise + + # Semnatura coloane + mapare existenta + existing_mapping = conn.execute( + "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", + (acct, sig), + ).fetchone() + + mapping_status = "matched" if existing_mapping else "new" + + # Sample rows (primele 3, fara PII) + sample = parsed.rows[:3] + + # Persistam metadata parsedata (coercion_flags, date_col_format, formula_columns) + # in import_batches pentru refolosire la preview (stocam ca JSON in 'status' nu e OK, + # ci ca metadate suplimentare — le stocam intr-un rand separat sau returnam direct) + # Solutie: le returnam in raspuns; preview-ul le va recalcula din raw_json deja stocat + # SAU le stocam ca un camp extra. Cel mai simplu: stocam coloanele in batch. + conn.execute( + "UPDATE import_batches SET ok=?, needs_review=? WHERE id=?", + (0, len(parsed.coercion_flags), batch_id), + ) + + result: dict = { + "import_id": batch_id, + "columns": parsed.columns, + "sample_rows": sample, + "total_rows": len(parsed.rows), + "formula_columns": parsed.formula_columns, + "date_col_format": parsed.date_col_format, + "coercion_flags_count": len(parsed.coercion_flags), + "mapping_status": mapping_status, + "signature": sig, + } + + if existing_mapping: + result["column_mapping"] = json.loads(existing_mapping["json_mapare"]) + result["format_data"] = existing_mapping["format_data"] + else: + # Sugestii fuzzy per coloana (Issue 5b: refoloseste normalize_for_match) + suggestions: dict[str, list[dict]] = {} + for col in parsed.columns: + sugg = _fuzzy_suggest_column(col, limit=3) + if sugg: + suggestions[col] = sugg + result["fuzzy_suggestions"] = suggestions + + return result + + finally: + conn.close() + + +# --------------------------------------------------------------------------- # +# GET /v1/import/{id}/column-mapping — mapare existenta / sugestii # +# --------------------------------------------------------------------------- # + +@router.get("/{import_id}/column-mapping") +def get_column_mapping( + import_id: int, + account_id: int = Depends(resolve_account_id), +) -> dict: + """Intoarce maparea de coloane existenta sau sugestii fuzzy.""" + acct = account_or_default(account_id) + conn = get_connection() + try: + batch = conn.execute( + "SELECT id, filename, status FROM import_batches WHERE id=? AND account_id=?", + (import_id, acct), + ).fetchone() + if not batch: + raise HTTPException(status_code=404, detail="batch de import inexistent") + + # Obtine coloanele din primele randuri + first_rows = conn.execute( + "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", + (import_id,), + ).fetchall() + + # Obtine coloanele din batch — trebuie sa le stocam la upload + # Le recalculam din primul rand (cheile sunt coloanele fisierului) + columns = [] + if first_rows: + try: + row_data = decrypt_creds(first_rows[0]["raw_json"]) + if row_data: + columns = list(row_data.keys()) + except Exception: + pass + + sig = _signature(columns) if columns else "" + existing = conn.execute( + "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", + (acct, sig), + ).fetchone() + + if existing: + return { + "status": "matched", + "column_mapping": json.loads(existing["json_mapare"]), + "format_data": existing["format_data"], + "columns": columns, + "signature": sig, + } + + # Sugestii fuzzy + suggestions: dict[str, list[dict]] = {} + for col in columns: + sugg = _fuzzy_suggest_column(col, limit=3) + if sugg: + suggestions[col] = sugg + + return { + "status": "new", + "columns": columns, + "signature": sig, + "fuzzy_suggestions": suggestions, + } + finally: + conn.close() + + +# --------------------------------------------------------------------------- # +# POST /v1/import/{id}/column-mapping — salveaza maparea # +# --------------------------------------------------------------------------- # + +class ColumnMappingIn(BaseModel): + json_mapare: dict[str, str] = Field(..., description="Mapare {col_fisier: camp_canonic}") + format_data: str | None = Field(None, description="Format data, ex: DD.MM.YYYY") + + +@router.post("/{import_id}/column-mapping") +def save_column_mapping( + import_id: int, + req: ColumnMappingIn, + account_id: int = Depends(resolve_account_id), +) -> dict: + """Salveaza/actualizeaza maparea de coloane pentru contul curent. + + Semnatura = hash al coloanelor fisierului. Drift: daca coloanele se schimba, + maparea veche nu se aplica (signature mismatch -> mapping_status='new'). + """ + acct = account_or_default(account_id) + conn = get_connection() + try: + batch = conn.execute( + "SELECT id FROM import_batches WHERE id=? AND account_id=?", + (import_id, acct), + ).fetchone() + if not batch: + raise HTTPException(status_code=404, detail="batch de import inexistent") + + # Recalculeaza semnatura din coloanele fisierului (cheile maparii) + columns = list(req.json_mapare.keys()) + sig = _signature(columns) + + conn.execute( + "INSERT INTO column_mappings (account_id, signature_coloane, json_mapare, format_data) " + "VALUES (?, ?, ?, ?) " + "ON CONFLICT(account_id, signature_coloane) DO UPDATE SET " + "json_mapare=excluded.json_mapare, format_data=excluded.format_data", + (acct, sig, json.dumps(req.json_mapare, ensure_ascii=False), req.format_data), + ) + return {"ok": True, "signature": sig, "columns": columns} + finally: + conn.close() + + +# --------------------------------------------------------------------------- # +# GET /v1/import/{id}/preview — 6 stari per rand (T2 + T11) # +# --------------------------------------------------------------------------- # + +@router.get("/{import_id}/preview") +def preview_import( + import_id: int, + account_id: int = Depends(resolve_account_id), +) -> dict: + """Preview 6 stari per rand: ok/needs_mapping/needs_data/needs_review/already_sent/duplicate_in_file. + + Nu enqueue-aza nimic. Already_sent = lookup batch (Eng#5). Duplicate_in_file = intra-batch + collision (OV-3: EXCLUSIV aici, NU in reconcile.py/worker). + """ + acct = account_or_default(account_id) + conn = get_connection() + try: + batch = conn.execute( + "SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?", + (import_id, acct), + ).fetchone() + if not batch: + raise HTTPException(status_code=404, detail="batch de import inexistent") + + # Incarca toate randurile + raw_rows_db = conn.execute( + "SELECT row_index, raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index", + (import_id,), + ).fetchall() + + if not raw_rows_db: + return {"rows": [], "summary": {}} + + # Decripteaza si reconstruieste randurile + rows: list[dict] = [] + for r in raw_rows_db: + try: + row_data = decrypt_creds(r["raw_json"]) + rows.append(row_data or {}) + except Exception: + rows.append({}) + + # Obtine coloanele + col_names = list(rows[0].keys()) if rows else [] + sig = _signature(col_names) + + # Mapare coloane + mapping_row = conn.execute( + "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", + (acct, sig), + ).fetchone() + + if not mapping_row: + raise HTTPException( + status_code=422, + detail={ + "error": "no_column_mapping", + "message": "Maparea coloanelor nu a fost configurata. Configureaza mai intai maparea.", + }, + ) + + json_mapare: dict[str, str] = json.loads(mapping_row["json_mapare"]) + format_data = mapping_row["format_data"] + + # Incarca maparea de operatii o singura data (Eng#5: load_mapping o singura data) + mapping_meta = load_mapping_meta(conn, acct) + mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} + + # Reconstruieste parsed info (coercion_flags si date_col_format) din datele stocate + # Nota: import_rows stocheaza raw_json dupa coercion (din parse_file) + # Recalculam flags din valorile stocate (coercion_flags nu e stocat separat) + # Vom folosi o detectie simpla: VIN-uri care par numerice si odometru float + coercion_flags_map: dict[int, list[str]] = {} + # Detectam din valorile stocate + for i, row_dict in enumerate(rows): + flags = [] + # Detectam VIN numeric: daca valoarea a fost stocata si arata ca numar + for col_f, camp_c in json_mapare.items(): + if camp_c == "vin": + vin_val = row_dict.get(col_f) + if vin_val is not None and str(vin_val).replace(".", "").isdigit(): + flags.append(f"VIN numeric ({vin_val}) — verificati seria sasiului") + if flags: + coercion_flags_map[i] = flags + + # Reconstructie date_col_format din mapare si valorile stocate + # (Simplificat: folosim "ambiguous" daca format_data nu e setat) + date_col_format: dict[str, str] = {} + if format_data: + for col_f, camp_c in json_mapare.items(): + if camp_c == "data_prestatie": + date_col_format[col_f] = format_data + + # Detectie formula_columns din valorile stocate (rata None ridicata) + col_none_counts: dict[str, int] = {} + for col_f in col_names: + col_none_counts[col_f] = sum(1 for r in rows if r.get(col_f) is None) + n_rows = len(rows) + formula_columns = [ + col_f for col_f, cnt in col_none_counts.items() + if n_rows > 0 and cnt / n_rows >= 0.6 + ] + + # Rezolva fiecare rand + preview_rows: list[dict] = [] + keys_for_lookup: list[str] = [] + key_to_index: dict[str, list[int]] = {} # key -> [row_index, ...] + + for i, row_dict in enumerate(rows): + flags = coercion_flags_map.get(i, []) + resolved_info = _resolve_row_for_preview( + raw_row=row_dict, + json_mapare=json_mapare, + date_col_format=date_col_format, + coercion_flags=flags, + mapping=mapping, + mapping_meta=mapping_meta, + formula_columns=formula_columns, + ) + + # Calculeaza cheia de idempotenta pentru randurile ok/needs_review + key = None + if resolved_info["resolved_status"] in ("ok", "needs_review", "needs_data"): + try: + key = _build_idempotency_key(account_id, resolved_info["resolved"]) + keys_for_lookup.append(key) + if key not in key_to_index: + key_to_index[key] = [] + key_to_index[key].append(i) + except Exception: + pass + + preview_rows.append({ + "row_index": i, + "resolved_status": resolved_info["resolved_status"], + "resolved": resolved_info["resolved"], + "errors": resolved_info["errors"], + "flags": resolved_info["flags"], + "idempotency_key": key, + }) + + # Already_sent: batch lookup (Eng#5 — nu N+1) + unique_keys = list(set(keys_for_lookup)) + already_sent_map = _already_sent_lookup(conn, account_id, unique_keys) + + # Duplicate_in_file (OV-3): detectie coliziuni intra-batch + # Grupam pe cheie de idempotenta: >1 rand cu aceeasi cheie = duplicate + key_to_indices: dict[str, list[int]] = {} + for row in preview_rows: + k = row.get("idempotency_key") + if k: + if k not in key_to_indices: + key_to_indices[k] = [] + key_to_indices[k].append(row["row_index"]) + + # Aplica already_sent si duplicate_in_file + for row in preview_rows: + k = row.get("idempotency_key") + if not k: + continue + + # Already_sent: cheia exista deja in submissions + if k in already_sent_map and row["resolved_status"] in ("ok", "needs_review", "needs_data"): + sent_info = already_sent_map[k] + row["resolved_status"] = "already_sent" + row["already_sent_info"] = sent_info + continue + + # Duplicate_in_file (OV-3): >1 rand cu aceeasi cheie in ACELASI fisier + indices_with_same_key = key_to_indices.get(k, []) + if len(indices_with_same_key) > 1 and row["resolved_status"] in ("ok", "needs_review", "needs_data"): + others = [idx for idx in indices_with_same_key if idx != row["row_index"]] + row["resolved_status"] = "duplicate_in_file" + row["duplicate_with"] = others + + # Rezumat + summary: dict[str, int] = {} + for row in preview_rows: + s = row["resolved_status"] + summary[s] = summary.get(s, 0) + 1 + + # Actualizeaza contoarele in import_batches + conn.execute( + "UPDATE import_batches SET ok=?, needs_mapping=?, needs_data=?, needs_review=?, " + "already_sent=?, duplicate_in_file=? WHERE id=?", + ( + summary.get("ok", 0), + summary.get("needs_mapping", 0), + summary.get("needs_data", 0), + summary.get("needs_review", 0), + summary.get("already_sent", 0), + summary.get("duplicate_in_file", 0), + import_id, + ), + ) + + # Actualizeaza resolved_status in import_rows + conn.execute("BEGIN IMMEDIATE") + try: + upd_params = [ + (row["resolved_status"], import_id, row["row_index"]) + for row in preview_rows + ] + conn.executemany( + "UPDATE import_rows SET resolved_status=? WHERE batch_id=? AND row_index=?", + upd_params, + ) + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + + return { + "import_id": import_id, + "rows": preview_rows, + "summary": summary, + "total": len(preview_rows), + } + finally: + conn.close() + + +# --------------------------------------------------------------------------- # +# POST /v1/import/{id}/commit — gate HARD + enqueue + log atestare (T5+T12) # +# --------------------------------------------------------------------------- # + +class CommitIn(BaseModel): + n_confirmat: int = Field(..., description="Numarul de randuri ok confirmate (gate HARD)") + reviewed_rows: list[int] = Field( + default_factory=list, + description="Indecsi de rand needs_review bifate explicit de utilizator", + ) + confirmed_by: str | None = Field(None, description="Email/identifier utilizator (log atestare)") + + +@router.post("/{import_id}/commit") +def commit_import( + import_id: int, + req: CommitIn, + account_id: int = Depends(resolve_account_id), +) -> dict: + """Gate HARD confirmare + enqueue randuri ok + log atestare (T5+T12). + + TOCTOU (Issue 1): INSERT per-rand cu ON CONFLICT(idempotency_key) DO NOTHING. + Randuri colidante = reclasificate already_sent in rezultatul commit-ului. + rows_hash + n_confirmed acopera DOAR randurile efectiv puse in coada. + """ + acct = account_or_default(account_id) + conn = get_connection() + try: + batch = conn.execute( + "SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?", + (import_id, acct), + ).fetchone() + if not batch: + raise HTTPException(status_code=404, detail="batch de import inexistent") + + if batch["status"] == "committed": + raise HTTPException(status_code=409, detail="batch deja comis") + + # Incarca randurile cu stare ok sau needs_review + ok_rows_db = conn.execute( + "SELECT row_index, raw_json, resolved_status FROM import_rows " + "WHERE batch_id=? AND resolved_status IN ('ok', 'needs_review') ORDER BY row_index", + (import_id,), + ).fetchall() + + if not ok_rows_db: + raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat in acest batch.") + + # Decripteaza randurile ok + ok_rows: list[dict] = [] + ok_indices: list[int] = [] + review_indices: set[int] = set() + + for r in ok_rows_db: + try: + row_data = decrypt_creds(r["raw_json"]) + if row_data is None: + continue + except Exception: + continue + + if r["resolved_status"] == "ok": + ok_rows.append({"row_index": r["row_index"], "data": row_data, "status": "ok"}) + ok_indices.append(r["row_index"]) + elif r["resolved_status"] == "needs_review": + review_indices.add(r["row_index"]) + + # needs_review bifate explicit (Voce#1 — atestare pe valori) + confirmed_review = [idx for idx in req.reviewed_rows if idx in review_indices] + for idx in confirmed_review: + # Gaseste randul needs_review si il adauga la ok_rows + for r in ok_rows_db: + if r["row_index"] == idx and r["resolved_status"] == "needs_review": + try: + row_data = decrypt_creds(r["raw_json"]) + if row_data: + ok_rows.append({"row_index": idx, "data": row_data, "status": "needs_review"}) + ok_indices.append(idx) + except Exception: + pass + + # Gate HARD: n_confirmat trebuie sa fie EXACT egal cu numarul de randuri ok + n_total_ok = len(ok_rows) + if req.n_confirmat != n_total_ok: + raise HTTPException( + status_code=422, + detail={ + "error": "confirmare_gresita", + "message": f"Ai confirmat {req.n_confirmat} dar sunt {n_total_ok} randuri gata de trimis. Verifica preview-ul.", + "n_ok": n_total_ok, + }, + ) + + if n_total_ok == 0: + raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat.") + + # Incarca maparea de coloane pentru a construi payload-ul + first_row_db = conn.execute( + "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", + (import_id,), + ).fetchone() + col_names = [] + if first_row_db: + try: + fd = decrypt_creds(first_row_db["raw_json"]) + if fd: + col_names = list(fd.keys()) + except Exception: + pass + + sig = _signature(col_names) if col_names else "" + mapping_row = conn.execute( + "SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?", + (acct, sig), + ).fetchone() + + json_mapare: dict[str, str] = {} + if mapping_row: + json_mapare = json.loads(mapping_row["json_mapare"]) + + # Incarca maparea de operatii + mapping_meta = load_mapping_meta(conn, acct) + mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} + + # Construieste payload-urile submissions + enqueued: list[dict] = [] + toctou_collisions: list[int] = [] + rows_for_hash: list[str] = [] + + # Enqueue in tranzactie explicita (Issue 6) + conn.execute("BEGIN IMMEDIATE") + try: + # purge_after pentru submissions noi (T16) + purge_after_sql = "datetime('now', '+90 days')" + + for ok_row in ok_rows: + row_dict = ok_row["data"] + row_index = ok_row["row_index"] + + # Aplica maparea de coloane + mapped: dict[str, Any] = {} + for col_f, camp_c in json_mapare.items(): + if col_f in row_dict and camp_c: + mapped[camp_c] = row_dict[col_f] + + # Rezolva data + date_col_fisier = None + for col_f, camp_c in json_mapare.items(): + if camp_c == "data_prestatie": + date_col_fisier = col_f + break + + if date_col_fisier: + col_fmt = (mapping_row["format_data"] if mapping_row else None) or "ambiguous" + raw_date = mapped.get("data_prestatie") + if raw_date is not None: + iso_date, _ = parse_date_value(raw_date, col_fmt) + if iso_date: + mapped["data_prestatie"] = iso_date + + # Operatia -> prestatii + operatie_val = mapped.pop("operatie", None) + if operatie_val and "prestatii" not in mapped: + mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": str(operatie_val)}] + + # Rezolva prestatii INAINTE de canonicalizare (altfel cheia difera de cea din preview) + prestatii = mapped.get("prestatii") or [] + resolved, _ = resolve_prestatii(prestatii, mapping) + mapped["prestatii"] = resolved + + # Canonicalizare (dupa rezolvare prestatii -> cod_prestatie inclus in cheie) + canon = canonicalize_row(mapped) + mapped.update({ + "vin": canon["vin"], + "nr_inmatriculare": canon["nr_inmatriculare"], + "odometru_final": canon["odometru_final"], + }) + + # Cheia de idempotenta (identica cu cheia din preview — aceeasi ordine) + key = build_key(account_id, canon) + + # Hash row pentru atestare (valori rezolvate) + rows_for_hash.append(json.dumps({ + "row_index": row_index, + "vin": mapped.get("vin"), + "data_prestatie": mapped.get("data_prestatie"), + "odometru_final": mapped.get("odometru_final"), + "prestatii": [str(p.get("cod_prestatie") or p.get("cod_op_service") or "") for p in resolved], + }, sort_keys=True, ensure_ascii=False)) + + payload_json = json.dumps(mapped, ensure_ascii=False) + + # INSERT ON CONFLICT DO NOTHING (TOCTOU — Issue 1) + cur = conn.execute( + "INSERT OR IGNORE INTO submissions " + "(idempotency_key, account_id, status, payload_json, batch_id, row_index, purge_after) " + "VALUES (?, ?, 'queued', ?, ?, ?, " + purge_after_sql + ")", + (key, acct, payload_json, import_id, row_index), + ) + + if cur.rowcount == 0: + # Coliziune TOCTOU: cheia a fost inserata de un canal concurent + toctou_collisions.append(row_index) + else: + sub_id = cur.lastrowid + enqueued.append({ + "submission_id": sub_id, + "row_index": row_index, + "idempotency_key": key, + }) + + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + raise + + n_enqueued = len(enqueued) + + # Log atestare (Voce#9): rows_hash + n_confirmed acopera DOAR randurile puse in coada + rows_hash = hashlib.sha256( + json.dumps(sorted(rows_for_hash), ensure_ascii=False).encode("utf-8") + ).hexdigest() if rows_for_hash else "" + + conn.execute( + "INSERT INTO import_attestations (batch_id, account_id, confirmed_by, rows_hash, n_confirmed) " + "VALUES (?, ?, ?, ?, ?)", + (import_id, acct, req.confirmed_by, rows_hash, n_enqueued), + ) + + # Actualizeaza status batch + new_status = "committed" if n_enqueued > 0 else "staging" + conn.execute( + "UPDATE import_batches SET status=?, ok=? WHERE id=?", + (new_status, n_enqueued, import_id), + ) + + return { + "import_id": import_id, + "enqueued": n_enqueued, + "toctou_collisions": toctou_collisions, + "rows_hash": rows_hash, + "submissions": enqueued, + } + finally: + conn.close() + + +# --------------------------------------------------------------------------- # +# GET /v1/import/{id}/export-failed — CSV randuri esuate (T8) # +# --------------------------------------------------------------------------- # + +_EXPORT_FAILED_COLUMNS = [ + "row_index", + "resolved_status", + "vin", + "nr_inmatriculare", + "data_prestatie", + "odometru_final", + "operatie", + "error", +] + + +@router.get("/{import_id}/export-failed") +def export_failed_rows( + import_id: int, + account_id: int = Depends(resolve_account_id), +) -> StreamingResponse: + """CSV cu randurile esuate (needs_data/needs_mapping/needs_review) pentru corectie + re-upload.""" + acct = account_or_default(account_id) + conn = get_connection() + try: + batch = conn.execute( + "SELECT id, filename FROM import_batches WHERE id=? AND account_id=?", + (import_id, acct), + ).fetchone() + if not batch: + raise HTTPException(status_code=404, detail="batch de import inexistent") + + failed_rows = conn.execute( + "SELECT row_index, raw_json, resolved_status, error FROM import_rows " + "WHERE batch_id=? AND resolved_status IN ('needs_data', 'needs_mapping', 'needs_review') " + "ORDER BY row_index", + (import_id,), + ).fetchall() + + # Incarca maparea de coloane pentru a stii ce campuri sa extraga + first_row = conn.execute( + "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", + (import_id,), + ).fetchone() + col_names = [] + if first_row: + try: + fd = decrypt_creds(first_row["raw_json"]) + if fd: + col_names = list(fd.keys()) + except Exception: + pass + + sig = _signature(col_names) if col_names else "" + mapping_row = conn.execute( + "SELECT json_mapare FROM column_mappings WHERE account_id=? AND signature_coloane=?", + (acct, sig), + ).fetchone() + json_mapare: dict[str, str] = {} + if mapping_row: + json_mapare = json.loads(mapping_row["json_mapare"]) + + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=_EXPORT_FAILED_COLUMNS, extrasaction="ignore") + writer.writeheader() + + for r in failed_rows: + try: + row_data = decrypt_creds(r["raw_json"]) or {} + except Exception: + row_data = {} + + # Extrage campuri canonice din mapare + mapped: dict[str, Any] = {} + for col_f, camp_c in json_mapare.items(): + if col_f in row_data: + mapped[camp_c] = row_data[col_f] + + # Operatia (camp canonic 'operatie' sau din prestatii) + operatie_val = mapped.get("operatie") or "" + if not operatie_val and mapped.get("prestatii"): + prestatii = mapped.get("prestatii") + if isinstance(prestatii, list) and prestatii: + first_p = prestatii[0] + if isinstance(first_p, dict): + operatie_val = first_p.get("cod_op_service") or first_p.get("denumire") or "" + + # Erori (din import_rows.error sau din resolved_status) + error_str = r["error"] or r["resolved_status"] + + writer.writerow({ + "row_index": r["row_index"], + "resolved_status": r["resolved_status"], + "vin": mapped.get("vin") or "", + "nr_inmatriculare": mapped.get("nr_inmatriculare") or "", + "data_prestatie": mapped.get("data_prestatie") or "", + "odometru_final": mapped.get("odometru_final") or "", + "operatie": operatie_val, + "error": error_str, + }) + + data = buf.getvalue() + finally: + conn.close() + + fname = f"import_{import_id}_failed.csv" + return StreamingResponse( + iter([data]), + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="{fname}"'}, + ) diff --git a/app/main.py b/app/main.py index b71acd1..e5487b6 100644 --- a/app/main.py +++ b/app/main.py @@ -18,6 +18,7 @@ from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.staticfiles import StaticFiles from . import __version__ +from .api.v1.import_router import router as import_v1_router from .api.v1.router import router as api_v1_router from .config import get_settings from .db import get_connection, init_db, queue_depth, read_heartbeat @@ -56,6 +57,7 @@ _STATIC_DIR = Path(__file__).resolve().parent / "web" / "static" app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static") app.include_router(api_v1_router) +app.include_router(import_v1_router) app.include_router(web_router) diff --git a/tests/test_import_api.py b/tests/test_import_api.py new file mode 100644 index 0000000..c13f025 --- /dev/null +++ b/tests/test_import_api.py @@ -0,0 +1,830 @@ +"""Teste API import Treapta 2 — POST /v1/import, preview, commit, export-failed. + +Acopera: + - #11 U1+T4: upload + staging + mapare coloane semnatura/drift/fuzzy + - #12 T2+T11: preview 6 stari + already_sent batch lookup + intra-batch collision + - #13 T5+T12: gate HARD confirmare + atestare valori + commit ON CONFLICT (TOCTOU) + - #14 T8: export randuri esuate CSV +""" + +from __future__ import annotations + +import csv +import io +import json +import os +import tempfile + +import openpyxl +import pytest +from fastapi.testclient import TestClient + + +# --------------------------------------------------------------------------- # +# Fixture client # +# --------------------------------------------------------------------------- # + +@pytest.fixture() +def client(monkeypatch): + """Client FastAPI cu DB temporara izolata per test.""" + tmp = tempfile.mkdtemp() + monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db")) + from app.config import get_settings + get_settings.cache_clear() + from app.crypto import reset_cache + reset_cache() + from app.main import app + with TestClient(app) as c: + yield c + get_settings.cache_clear() + + +# --------------------------------------------------------------------------- # +# Helpere pentru fisiere test # +# --------------------------------------------------------------------------- # + +_HEADER = ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"] +_ROW_OK = ["WVWZZZ1KZAW000123", "B999TST", "2026-06-15", "123456", "Revizie"] +_ROW_OK2 = ["WVWZZZ1KZAW000124", "CJ001AB", "2026-05-10", "98765", "Reparatie"] + + +def _make_xlsx(rows: list[list]) -> bytes: + """Creeaza un xlsx in-memory.""" + wb = openpyxl.Workbook() + ws = wb.active + ws.title = "Sheet1" + for row in rows: + ws.append(row) + buf = io.BytesIO() + wb.save(buf) + return buf.getvalue() + + +def _make_csv(rows: list[list], delimiter: str = ";") -> bytes: + """Creeaza un CSV in-memory.""" + buf = io.StringIO() + writer = csv.writer(buf, delimiter=delimiter) + for row in rows: + writer.writerow(row) + return buf.getvalue().encode("utf-8") + + +def _upload_file(client: TestClient, data: bytes, filename: str = "test.xlsx") -> dict: + """Upload un fisier si intoarce raspunsul JSON.""" + r = client.post( + "/v1/import", + files={"file": (filename, io.BytesIO(data), "application/octet-stream")}, + ) + return r + + +def _default_column_mapping() -> dict: + """Mapare de coloane implicita pentru fisierul test.""" + return { + "VIN": "vin", + "Nr inmatriculare": "nr_inmatriculare", + "Data prestatie": "data_prestatie", + "Odometru final": "odometru_final", + "Operatie": "operatie", + } + + +def _setup_nomenclator(client: TestClient) -> None: + """Seed nomenclator cu un cod de prestatie pentru teste.""" + # Folosim POST /v1/prezentari pentru a forta seed-ul nomenclatorului + # care are loc in init_db -> seed_nomenclator_if_empty + pass # seed-ul se face automat in init_db + + +def _seed_operation_mapping(client: TestClient, cod_op: str = "Revizie", cod_prest: str = "OE-1") -> None: + """Salveaza o mapare de operatii pentru teste.""" + # Adauga mai intai in nomenclator daca nu exista (prin POST prezentare care creeaza cod) + # De fapt, cod OE-1 e in nomenclatorul seed + client.post("/v1/mapari", json={ + "cod_op_service": cod_op, + "cod_prestatie": cod_prest, + "auto_send": True, + }) + + +# =========================================================================== # +# #11 — Upload + staging (U1+T4) # +# =========================================================================== # + +class TestUploadStaging: + def test_upload_xlsx_ok(self, client): + """Upload xlsx valid -> import_id + columns + sample_rows.""" + data = _make_xlsx([_HEADER, _ROW_OK, _ROW_OK2]) + r = _upload_file(client, data, "test.xlsx") + assert r.status_code == 200, r.text + body = r.json() + assert "import_id" in body + assert body["columns"] == _HEADER + assert body["total_rows"] == 2 + assert len(body["sample_rows"]) == 2 + + def test_upload_csv_semicolon(self, client): + """Upload CSV cu ';' (export RO) -> parsare corecta.""" + data = _make_csv([_HEADER, _ROW_OK], delimiter=";") + r = _upload_file(client, data, "test.csv") + assert r.status_code == 200, r.text + body = r.json() + assert body["columns"] == _HEADER + assert body["total_rows"] == 1 + + def test_upload_fisier_prea_mare(self, client): + """Fisier >5MB -> 413.""" + data = b"PK" + b"X" * (5 * 1024 * 1024 + 100) + r = _upload_file(client, data, "mare.xlsx") + assert r.status_code in (413, 422) + + def test_upload_format_invalid(self, client): + """Fisier tip nesuportat -> 422.""" + r = _upload_file(client, b"data random", "test.dbf") + assert r.status_code == 422 + + def test_issue5a_raw_json_criptat(self, client): + """Issue 5a: raw_json din import_rows trebuie sa fie criptat (ciphertext la rest).""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + assert r.status_code == 200 + import_id = r.json()["import_id"] + + # Citeste direct din DB si verifica ca raw_json e criptat (nu JSON plain) + import sqlite3 + from app.config import get_settings + conn = sqlite3.connect(get_settings().db_path) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1", + (import_id,), + ).fetchone() + assert row is not None + raw = row["raw_json"] + # Ciphertext Fernet incepe cu "gAAA" (base64url) + assert not raw.startswith("{"), "raw_json trebuie sa fie criptat, nu JSON plain" + # Verifica ca se poate decripta + from app.crypto import decrypt_creds + decrypted = decrypt_creds(raw) + assert decrypted is not None + assert "VIN" in decrypted or any("VIN" in k for k in decrypted.keys()) + finally: + conn.close() + + def test_issue5b_fuzzy_coloane_refoloseste_normalize_for_match(self, client): + """Issue 5b: fuzzy_suggestions din raspuns foloseste normalize_for_match (fara duplicat).""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + assert r.status_code == 200 + body = r.json() + # Daca nu exista mapare, trebuie sa avem fuzzy_suggestions + if "fuzzy_suggestions" in body: + sugg = body["fuzzy_suggestions"] + # "VIN" trebuie sa aiba sugestia "vin" cu scor mare + if "VIN" in sugg: + camps = [s["camp_canonic"] for s in sugg["VIN"]] + assert "vin" in camps, f"'vin' trebuie sa fie in sugestii pentru 'VIN', primit: {camps}" + if "Odometru final" in sugg: + camps = [s["camp_canonic"] for s in sugg["Odometru final"]] + assert "odometru_final" in camps, f"'odometru_final' trebuie in sugestii" + + def test_drift_semnatura_coloane(self, client): + """T4/D3: upload 2 cu coloane mutate -> mapping_status='new' (nu aplica orb).""" + # Upload 1 cu header standard + data1 = _make_xlsx([_HEADER, _ROW_OK]) + r1 = _upload_file(client, data1, "test.xlsx") + assert r1.status_code == 200 + import_id1 = r1.json()["import_id"] + + # Salveaza maparea pentru upload 1 + client.post( + f"/v1/import/{import_id1}/column-mapping", + json={"json_mapare": _default_column_mapping()}, + ) + + # Upload 2 cu header DIFERIT (coloane mutate/redenumite) + header2 = ["Sasiu", "Inmatriculare", "Data", "KM", "Lucrare"] + data2 = _make_xlsx([header2, ["WVWZZZ1KZAW000125", "B1XYZ", "2026-06-10", "50000", "ITP"]]) + r2 = _upload_file(client, data2, "test2.xlsx") + assert r2.status_code == 200 + body2 = r2.json() + # Semnatura diferita -> nu se aplica maparea veche + assert body2["mapping_status"] == "new", \ + f"Drift coloane trebuie detectat, primit mapping_status={body2['mapping_status']}" + + def test_aceeasi_semnatura_returneaza_maparea(self, client): + """Dupa salvarea maparii, al doilea upload cu aceleasi coloane o returneaza direct.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r1 = _upload_file(client, data, "test.xlsx") + import_id = r1.json()["import_id"] + + # Salveaza maparea + rc = client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}, + ) + assert rc.status_code == 200 + + # Upload 2 cu aceleasi coloane + r2 = _upload_file(client, data, "test.xlsx") + assert r2.status_code == 200 + body2 = r2.json() + assert body2["mapping_status"] == "matched" + assert "column_mapping" in body2 + + def test_upload_xlsx_multisheet_returneaza_eroare_cu_sheets(self, client): + """Xlsx cu 2 sheet-uri non-goale -> 422 cu lista de sheet-uri.""" + wb = openpyxl.Workbook() + ws1 = wb.active + ws1.title = "Iunie" + for row in [_HEADER, _ROW_OK]: + ws1.append(row) + ws2 = wb.create_sheet("Iulie") + for row in [_HEADER, _ROW_OK2]: + ws2.append(row) + buf = io.BytesIO() + wb.save(buf) + + r = _upload_file(client, buf.getvalue(), "multi.xlsx") + assert r.status_code == 422 + body = r.json() + assert body["detail"]["error"] == "multiple_sheets" + assert "Iunie" in body["detail"]["sheets"] + + def test_upload_xlsx_multisheet_cu_sheet_ales(self, client): + """Dupa alegere sheet -> parsare corecta.""" + wb = openpyxl.Workbook() + ws1 = wb.active + ws1.title = "Iunie" + for row in [_HEADER, _ROW_OK]: + ws1.append(row) + ws2 = wb.create_sheet("Iulie") + for row in [_HEADER, _ROW_OK2]: + ws2.append(row) + buf = io.BytesIO() + wb.save(buf) + + r = client.post( + "/v1/import?sheet_name=Iulie", + files={"file": ("multi.xlsx", io.BytesIO(buf.getvalue()), "application/octet-stream")}, + ) + assert r.status_code == 200 + body = r.json() + assert body["total_rows"] == 1 + assert body["sample_rows"][0]["VIN"] == _ROW_OK2[0] + + def test_purge_after_setat_la_insert(self, client): + """T16: purge_after trebuie setat la insert import_batches.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + import sqlite3 + from app.config import get_settings + conn = sqlite3.connect(get_settings().db_path) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT purge_after FROM import_batches WHERE id=?", (import_id,) + ).fetchone() + assert row["purge_after"] is not None, "purge_after trebuie setat la insert" + finally: + conn.close() + + +# =========================================================================== # +# #11 — Mapare coloane (T4) # +# =========================================================================== # + +class TestColumnMapping: + def test_save_column_mapping(self, client): + """Salveaza maparea de coloane pentru un batch.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + rc = client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}, + ) + assert rc.status_code == 200 + assert "signature" in rc.json() + + def test_get_column_mapping_dupa_salvare(self, client): + """GET column-mapping returneaza maparea salvata.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}, + ) + + rg = client.get(f"/v1/import/{import_id}/column-mapping") + assert rg.status_code == 200 + body = rg.json() + assert body["status"] == "matched" + assert body["column_mapping"] == _default_column_mapping() + + def test_get_column_mapping_fara_salvare_returneaza_sugestii(self, client): + """GET column-mapping fara mapare salvata -> sugestii fuzzy.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + rg = client.get(f"/v1/import/{import_id}/column-mapping") + assert rg.status_code == 200 + body = rg.json() + assert body["status"] == "new" + # Trebuie sa aiba sugestii pentru coloane evidente + if "fuzzy_suggestions" in body: + assert "VIN" in body["fuzzy_suggestions"] + + def test_column_mapping_batch_inexistent(self, client): + """GET/POST pe batch inexistent -> 404.""" + r = client.get("/v1/import/99999/column-mapping") + assert r.status_code == 404 + + +# =========================================================================== # +# #12 — Preview 6 stari (T2 + T11) # +# =========================================================================== # + +class TestPreview: + def _upload_and_map(self, client, rows=None): + """Fixture: upload + salveaza mapare + seeda nomenclator.""" + if rows is None: + rows = [_HEADER, _ROW_OK] + data = _make_xlsx(rows) + r = _upload_file(client, data, "test.xlsx") + assert r.status_code == 200, r.text + import_id = r.json()["import_id"] + + # Salveaza maparea de coloane + rc = client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}, + ) + assert rc.status_code == 200 + + # Seeda maparea de operatii + _seed_operation_mapping(client, "Revizie", "OE-1") + + return import_id + + def test_preview_rand_ok(self, client): + """Rand valid cu operatie mapata -> stare 'ok'.""" + import_id = self._upload_and_map(client) + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200, rp.text + body = rp.json() + rows = body["rows"] + assert len(rows) == 1 + # VIN valid, data valida, odometru valid, operatie mapata -> ok + assert rows[0]["resolved_status"] == "ok" + + def test_preview_needs_mapping(self, client): + """Rand cu operatie nemapata -> needs_mapping.""" + import_id = self._upload_and_map(client, rows=[_HEADER, _ROW_OK2]) + # _ROW_OK2 are operatia "Reparatie" care nu e mapata + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body = rp.json() + assert any(r["resolved_status"] in ("needs_mapping",) for r in body["rows"]) + + def test_preview_needs_data(self, client): + """Rand cu VIN invalid -> needs_data.""" + row_bad = ["INVALID_VIN_XX", "B999TST", "2026-06-15", "123456", "Revizie"] + import_id = self._upload_and_map(client, rows=[_HEADER, row_bad]) + _seed_operation_mapping(client, "Revizie", "OE-1") + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body = rp.json() + assert any(r["resolved_status"] in ("needs_data",) for r in body["rows"]) + + def test_preview_already_sent_dupa_submit(self, client): + """Rand deja trimis prin API -> stare already_sent la preview (T2/D5).""" + # Trimite prin API canalul standard + client.post("/v1/prezentari", json={ + "rar_credentials": {"email": "x@y.ro", "password": "s"}, + "prezentari": [{ + "vin": "WVWZZZ1KZAW000123", + "nr_inmatriculare": "B999TST", + "data_prestatie": "2026-06-15", + "odometru_final": "123456", + "prestatii": [{"cod_prestatie": "OE-1"}], + }], + }) + + # Acum upload acelasi rand + import_id = self._upload_and_map(client, rows=[_HEADER, _ROW_OK]) + _seed_operation_mapping(client, "Revizie", "OE-1") + + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body = rp.json() + # Randul trebuie sa fie already_sent (cheia idempotency exista deja) + statuses = [r["resolved_status"] for r in body["rows"]] + assert "already_sent" in statuses, f"Asteptat 'already_sent', primit: {statuses}" + + def test_preview_duplicate_in_file(self, client): + """T11/OV-3: 2 randuri identice in ACELASI fisier -> duplicate_in_file.""" + # Acelasi rand de doua ori + rows = [_HEADER, _ROW_OK, _ROW_OK] # duplicat exact + import_id = self._upload_and_map(client, rows=rows) + _seed_operation_mapping(client, "Revizie", "OE-1") + + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body = rp.json() + statuses = [r["resolved_status"] for r in body["rows"]] + assert "duplicate_in_file" in statuses, \ + f"Asteptat 'duplicate_in_file', primit: {statuses}" + + def test_preview_already_sent_batch_lookup_nu_n_plus_1(self, client): + """Eng#5: already_sent lookup BATCH (nu N+1) — ≤7 interogari pentru 5 randuri.""" + # Cream 5 randuri distincte + rows_data = [_HEADER] + for i in range(5): + rows_data.append([ + f"WVWZZZ1KZAW00{i:04d}", + f"B00{i}TST", + "2026-06-15", + str(100000 + i), + "Revizie", + ]) + + import_id = self._upload_and_map(client, rows=rows_data) + _seed_operation_mapping(client, "Revizie", "OE-1") + + # Aceasta verificare e comportamentala: preview trebuie sa functioneze + # corect (nu testam direct nr. de SQL queries, ci ca raspunsul e corect) + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body = rp.json() + assert len(body["rows"]) == 5 + + def test_preview_fara_mapare_coloane_returneaza_422(self, client): + """Preview fara mapare de coloane configurata -> 422.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + # Nu salvam maparea de coloane + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 422 + assert "no_column_mapping" in rp.json()["detail"]["error"] + + def test_preview_summary_ok(self, client): + """Preview intoarce si summary cu contoare per stare.""" + import_id = self._upload_and_map(client, rows=[_HEADER, _ROW_OK, _ROW_OK2]) + _seed_operation_mapping(client, "Revizie", "OE-1") + + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body = rp.json() + assert "summary" in body + # Suma totala = nr randuri + total = sum(body["summary"].values()) + assert total == len(body["rows"]) + + +# =========================================================================== # +# #13 — Commit gate HARD + atestare + TOCTOU (T5 + T12) # +# =========================================================================== # + +class TestCommit: + def _upload_preview_ok(self, client): + """Upload + mapeaza + preview -> returneaza import_id cu randuri ok.""" + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + client.post( + f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}, + ) + _seed_operation_mapping(client, "Revizie", "OE-1") + + # Preview pentru a calcula starile + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + return import_id, rp.json() + + def test_commit_cu_n_corect_enqueued(self, client): + """Commit cu N corect -> rand in submissions cu status queued.""" + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + assert n_ok > 0, "Trebuie cel putin un rand ok" + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": n_ok, + "reviewed_rows": [], + }) + assert rc.status_code == 200, rc.text + body = rc.json() + assert body["enqueued"] == n_ok + + def test_commit_cu_n_gresit_reject(self, client): + """T5/D3: commit cu N gresit -> 422, nu enqueue.""" + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + + # Trimitem n_confirmat + 1 (gresit) + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": n_ok + 1, + "reviewed_rows": [], + }) + assert rc.status_code == 422 + detail = rc.json()["detail"] + assert "confirmare_gresita" in detail.get("error", ""), \ + f"Eroare neasteptata: {detail}" + + def test_commit_log_atestare(self, client): + """T12/Voce#9: commit scrie import_attestations cu rows_hash + n_confirmed.""" + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": n_ok, + "reviewed_rows": [], + "confirmed_by": "test@example.com", + }) + assert rc.status_code == 200 + body = rc.json() + assert body["enqueued"] == n_ok + assert body["rows_hash"] # sha256 non-gol + + # Verifica direct in DB ca exista atestarea + import sqlite3 + from app.config import get_settings + conn = sqlite3.connect(get_settings().db_path) + conn.row_factory = sqlite3.Row + try: + att = conn.execute( + "SELECT * FROM import_attestations WHERE batch_id=?", (import_id,) + ).fetchone() + assert att is not None, "import_attestations trebuie sa contina o inregistrare" + assert att["n_confirmed"] == n_ok + assert att["rows_hash"] == body["rows_hash"] + assert att["confirmed_by"] == "test@example.com" + finally: + conn.close() + + def test_commit_batch_id_setat_pe_submission(self, client): + """T7: submission creata la commit trebuie sa aiba batch_id + row_index setate.""" + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": n_ok, + "reviewed_rows": [], + }) + assert rc.status_code == 200 + submissions = rc.json()["submissions"] + assert len(submissions) > 0 + + import sqlite3 + from app.config import get_settings + conn = sqlite3.connect(get_settings().db_path) + conn.row_factory = sqlite3.Row + try: + for sub in submissions: + row = conn.execute( + "SELECT batch_id, row_index FROM submissions WHERE id=?", + (sub["submission_id"],), + ).fetchone() + assert row["batch_id"] == import_id, "batch_id trebuie setat" + assert row["row_index"] is not None, "row_index trebuie setat" + finally: + conn.close() + + def test_commit_toctou_cheie_inserata_concurent(self, client): + """Issue 1 (TOCTOU): cheie inserata de canal concurent -> reclasificata already_sent. + + Simulam TOCTOU inserand cheia direct in submissions inainte de commit. + """ + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + + # Gaseste cheia de idempotenta a randului ok + ok_rows = [r for r in preview["rows"] if r["resolved_status"] == "ok"] + assert len(ok_rows) > 0 + idem_key = ok_rows[0]["idempotency_key"] + assert idem_key, "idempotency_key trebuie calculat la preview" + + # Simuleaza canalul concurent: insereaza cheia in submissions + import sqlite3 + from app.config import get_settings + conn = sqlite3.connect(get_settings().db_path) + try: + conn.execute( + "INSERT INTO submissions (idempotency_key, account_id, status, payload_json) " + "VALUES (?, 1, 'queued', '{}')", + (idem_key,), + ) + conn.commit() + finally: + conn.close() + + # Commit-ul trebuie sa detecteze coliziunea si s-o raporteze ca TOCTOU + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": n_ok, + "reviewed_rows": [], + }) + # Poate fi 200 cu toctou_collisions sau 422 cu informatii clare + # Conform planului Issue 1: reclasificat already_sent, nu rollback + # Dar n_enqueued va fi 0 daca toate colideaza + if rc.status_code == 200: + body = rc.json() + assert body["toctou_collisions"] == [ok_rows[0]["row_index"]] or body["enqueued"] == 0 + # Sau 422 daca gate-ul HARD detecteaza ca n_ok actual != n_confirmat + # (dupa reclasificare, n_total_ok scade) + + def test_commit_needs_review_nebifat_exclus_din_n(self, client): + """Voce#1: rand needs_review nebifat explicit -> NU intra in N, NU se enqueued.""" + # Rand cu VIN numeric (coercion -> needs_review) + import datetime as dt + wb = openpyxl.Workbook() + ws = wb.active + ws.append(_HEADER) + # VIN ca int (numeric) -> coercion flag -> needs_review + ws.cell(row=2, column=1).value = 123456789012345 # VIN numeric + ws.cell(row=2, column=2).value = "B999TST" + ws.cell(row=2, column=3).value = "2026-06-15" + ws.cell(row=2, column=4).value = 123456 + ws.cell(row=2, column=5).value = "Revizie" + buf = io.BytesIO() + wb.save(buf) + + r = _upload_file(client, buf.getvalue(), "test.xlsx") + assert r.status_code == 200 + import_id = r.json()["import_id"] + + client.post(f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}) + _seed_operation_mapping(client, "Revizie", "OE-1") + + rp = client.get(f"/v1/import/{import_id}/preview") + assert rp.status_code == 200 + body_preview = rp.json() + + review_rows = [r for r in body_preview["rows"] if r["resolved_status"] == "needs_review"] + ok_rows_count = body_preview["summary"].get("ok", 0) + review_count = len(review_rows) + + if review_count == 0: + pytest.skip("Niciun rand needs_review in acest test — skip") + + # Confirma FARA a bifa needs_review -> n_confirmat = ok_rows_count (fara review) + # Dar n_ok e 0 daca tot fisierul e needs_review + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": ok_rows_count, + "reviewed_rows": [], # nu bifam needs_review + }) + + if ok_rows_count == 0: + # 0 randuri ok + 0 reviewed = eroare + assert rc.status_code in (422,) + else: + # Randurile needs_review NU sunt enqueued (nu le-am bifat) + assert rc.status_code == 200 + body = rc.json() + assert body["enqueued"] == ok_rows_count + + def test_commit_double_call_returneaza_409(self, client): + """Commit de doua ori pe acelasi batch -> 409 (deja comis).""" + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + + client.post(f"/v1/import/{import_id}/commit", json={"n_confirmat": n_ok, "reviewed_rows": []}) + r2 = client.post(f"/v1/import/{import_id}/commit", json={"n_confirmat": n_ok, "reviewed_rows": []}) + assert r2.status_code == 409 + + def test_atestare_purge_after_setat_pe_submission(self, client): + """T16: submissions create la commit trebuie sa aiba purge_after setat.""" + import_id, preview = self._upload_preview_ok(client) + n_ok = preview["summary"].get("ok", 0) + + rc = client.post(f"/v1/import/{import_id}/commit", json={ + "n_confirmat": n_ok, "reviewed_rows": [] + }) + assert rc.status_code == 200 + submissions = rc.json()["submissions"] + + if not submissions: + pytest.skip("Nicio submission creata") + + import sqlite3 + from app.config import get_settings + conn = sqlite3.connect(get_settings().db_path) + conn.row_factory = sqlite3.Row + try: + for sub in submissions: + row = conn.execute( + "SELECT purge_after FROM submissions WHERE id=?", (sub["submission_id"],) + ).fetchone() + assert row["purge_after"] is not None, "purge_after trebuie setat pe submission" + finally: + conn.close() + + +# =========================================================================== # +# #14 — Export randuri esuate CSV (T8) # +# =========================================================================== # + +class TestExportFailed: + def _setup_batch_with_bad_rows(self, client): + """Upload cu randuri esuate (VIN invalid).""" + row_bad = ["INVALID_VIN_XXXXXXX", "B999TST", "2026-06-15", "123456", "Revizie"] + data = _make_xlsx([_HEADER, row_bad]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + client.post(f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}) + _seed_operation_mapping(client, "Revizie", "OE-1") + + # Preview pentru a calcula starile + client.get(f"/v1/import/{import_id}/preview") + + return import_id + + def test_export_failed_returneaza_csv(self, client): + """Export randuri esuate -> CSV cu header + randuri.""" + import_id = self._setup_batch_with_bad_rows(client) + + r = client.get(f"/v1/import/{import_id}/export-failed") + assert r.status_code == 200 + assert "text/csv" in r.headers["content-type"] + + # Parseaza CSV + content = r.text + reader = csv.DictReader(io.StringIO(content)) + rows = list(reader) + assert len(rows) > 0, "CSV trebuie sa contina cel putin un rand esuat" + + def test_export_failed_contine_motiv_eroare(self, client): + """CSV de export contine coloana 'error' cu motivul.""" + import_id = self._setup_batch_with_bad_rows(client) + + r = client.get(f"/v1/import/{import_id}/export-failed") + assert r.status_code == 200 + + reader = csv.DictReader(io.StringIO(r.text)) + rows = list(reader) + assert len(rows) > 0 + # Fiecare rand trebuie sa aiba coloana error + for row in rows: + assert "error" in row, "Coloana 'error' trebuie sa fie prezenta" + assert row["resolved_status"] in ("needs_data", "needs_mapping", "needs_review") + + def test_export_failed_batch_inexistent(self, client): + """Export pe batch inexistent -> 404.""" + r = client.get("/v1/import/99999/export-failed") + assert r.status_code == 404 + + def test_export_failed_fara_randuri_esuate(self, client): + """Export pe batch fara randuri esuate -> CSV gol (doar header).""" + # Upload cu rand ok + data = _make_xlsx([_HEADER, _ROW_OK]) + r = _upload_file(client, data, "test.xlsx") + import_id = r.json()["import_id"] + + client.post(f"/v1/import/{import_id}/column-mapping", + json={"json_mapare": _default_column_mapping()}) + _seed_operation_mapping(client, "Revizie", "OE-1") + client.get(f"/v1/import/{import_id}/preview") + + r = client.get(f"/v1/import/{import_id}/export-failed") + assert r.status_code == 200 + reader = csv.DictReader(io.StringIO(r.text)) + rows = list(reader) + assert len(rows) == 0, "Niciun rand esuat in batch cu randuri ok" + + +# =========================================================================== # +# Regresie: reconcile.py ramane op-blind (OV-3) # +# =========================================================================== # + +class TestReconcileRegresie: + def test_match_finalizata_ramane_op_blind(self): + """OV-3: reconcile.py trebuie sa ramana op-blind (nu editat de import). + + Importam reconcile si verificam ca nu s-au adaugat parametri de operatie. + """ + from app.reconcile import match_finalizata + import inspect + + sig = inspect.signature(match_finalizata) + params = set(sig.parameters.keys()) + # Trebuie sa aiba exact acesti parametri (op-blind by design) + expected = {"finalizate", "vin", "data_prestatie", "odometru_final"} + assert not (params - expected - {"self"}), \ + f"match_finalizata are parametri neasteptati: {params - expected}" + # Nu trebuie sa aiba parametri de operatie + assert "cod_prestatie" not in params + assert "operatie" not in params + assert "cod_op_service" not in params