Reguli text per cont (operation_text_rules), resolve_prestatii cu param aditiv text_rules + precedenta stricta, threadat pe toate cele 6 callsite-uri + valid_codes + seam classify_prezentare. UI Mapari: sectiune reguli + preview pre-salvare + overlap + telemetrie text_rule_hit. UX tabel: cod_rar sub operatie, pill eticheta scurta, fara scroll orizontal (scopat .tabel-trimiteri + carduri <768px), detaliu inline expandabil (a11y + pauza poll). code-review: reparat regula auto_send=0 care trimitea automat la RAR in loc sa tina randul pentru review. 814 passed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1338 lines
52 KiB
Python
1338 lines
52 KiB
Python
"""Router import Treapta 2 — upload fisier xlsx/csv, preview, commit, export.
|
|
|
|
Endpointuri:
|
|
POST /v1/import — upload fisier, staging in import_batches/import_rows
|
|
GET /v1/import/{id}/column-mapping — maparea de coloane existenta / sugestii fuzzy
|
|
POST /v1/import/{id}/column-mapping — salveaza maparea de coloane
|
|
GET /v1/import/{id}/preview — preview 6 stari per rand (fara enqueue)
|
|
POST /v1/import/{id}/commit — gate HARD + enqueue randuri ok + log atestare
|
|
GET /v1/import/{id}/export-failed — CSV cu randuri esuate (needs_data/needs_mapping/needs_review)
|
|
|
|
Reguli cheie (plan §3.1-3.4, §12):
|
|
- Issue 6: scrieri bulk in tranzactie explicita BEGIN IMMEDIATE...COMMIT + executemany.
|
|
- Eng#5: already_sent lookup BATCH (IN chunk ~900), nu N+1.
|
|
- OV-3: duplicate_in_file EXCLUSIV la preview/commit. NU atinge reconcile.py/worker.
|
|
- Issue 1 (TOCTOU): commit per-rand cu ON CONFLICT(idempotency_key) DO NOTHING.
|
|
- Issue 5a: import_rows.raw_json CRIPTAT Fernet.
|
|
- Issue 5b: fuzzy coloane refoloseste mapping.normalize_for_match (DRY).
|
|
- T4/D3: drift semnatura coloane -> NU aplica orb, cere re-confirmare.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import hashlib
|
|
import io
|
|
import json
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, UploadFile
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
|
|
from ... import errors
|
|
from ...auth import resolve_account_id
|
|
from ...crypto import decrypt_creds, encrypt_creds
|
|
from ...db import get_connection
|
|
from ...idempotency import build_key, canonicalize_row
|
|
from ...import_parse import (
|
|
FileTooLarge,
|
|
HeaderError,
|
|
MultipleSheets,
|
|
ParsedFile,
|
|
parse_date_value,
|
|
parse_file,
|
|
)
|
|
from ...mapping import (
|
|
_emite_text_rule_hits,
|
|
account_or_default,
|
|
has_no_auto_send,
|
|
load_mapping_meta,
|
|
load_nomenclator_codes,
|
|
load_text_rules,
|
|
normalize_for_match,
|
|
resolve_prestatii,
|
|
)
|
|
from ...validation import validate_prezentare
|
|
|
|
router = APIRouter(prefix="/v1/import", tags=["import"])
|
|
|
|
# Marimea maxima a unui chunk pentru IN(...) SQLite (limite SQLite ~999)
|
|
_IN_CHUNK = 900
|
|
|
|
# Campuri canonice si sinonimele lor pentru sugestie fuzzy coloane (Issue 5b/Eng#4)
|
|
_CANONICAL_SYNONYMS: dict[str, list[str]] = {
|
|
"vin": ["VIN", "Serie sasiu", "Sasiu", "Serie", "Numar sasiu", "Nr sasiu", "Chassis"],
|
|
"nr_inmatriculare": ["Nr inmatriculare", "Numar inmatriculare", "Numar auto", "Nr auto", "Numar", "Nr"],
|
|
"data_prestatie": ["Data prestatie", "Data", "Date", "Data service", "Data lucrare"],
|
|
"odometru_final": ["Odometru final", "Odometru", "KM", "Kilometri", "Km final", "Citire contor"],
|
|
"odometru_initial": ["Odometru initial", "KM initial", "Km start"],
|
|
"operatie": ["Operatie", "Cod prestatie", "Prestatie", "Lucrare", "Tip lucrare", "Cod op"],
|
|
"denumire_op": ["Denumire operatie", "Denumire", "Descriere", "Denumire prestatie", "Nume operatie"],
|
|
"obs": ["Observatii", "Obs", "Mentiuni", "Note"],
|
|
}
|
|
|
|
# Prag minim scor fuzzy pentru pre-selectie (0..100)
|
|
_FUZZY_MIN_SCORE = 55
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Helpere interne #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
def _signature(columns: list[str]) -> str:
|
|
"""Semnatura coloanelor = sha256 al listei sortate (case-insensitive)."""
|
|
norm = sorted(c.strip().lower() for c in columns)
|
|
return hashlib.sha256(json.dumps(norm, ensure_ascii=False).encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _fuzzy_suggest_column(
|
|
col_name: str,
|
|
*,
|
|
limit: int = 3,
|
|
) -> list[dict]:
|
|
"""Sugereaza campuri canonice pentru o coloana din fisier.
|
|
|
|
Refoloseste normalize_for_match + rapidfuzz.fuzz.token_sort_ratio (Issue 5b/Eng#4).
|
|
Intoarce [{camp_canonic, score}] sortat descrescator.
|
|
"""
|
|
from rapidfuzz import fuzz, process
|
|
|
|
query = normalize_for_match(col_name)
|
|
# Construieste un dict {camp_canonic: sinonime_normalize} pentru matching
|
|
choices: dict[str, str] = {}
|
|
for camp, sinonime in _CANONICAL_SYNONYMS.items():
|
|
# Cel mai bun sinonim = primul (cel mai specific)
|
|
best_syn = normalize_for_match(sinonime[0])
|
|
choices[camp] = best_syn
|
|
|
|
ranked = process.extract(
|
|
query,
|
|
choices,
|
|
scorer=fuzz.token_sort_ratio,
|
|
limit=limit,
|
|
)
|
|
return [
|
|
{"camp_canonic": camp, "score": float(score)}
|
|
for _, score, camp in ranked
|
|
if score >= _FUZZY_MIN_SCORE
|
|
]
|
|
|
|
|
|
|
|
def _resolve_row_for_preview(
|
|
raw_row: dict[str, Any],
|
|
json_mapare: dict[str, str],
|
|
date_col_format: dict[str, str],
|
|
coercion_flags: list[str],
|
|
mapping: dict[str, str],
|
|
mapping_meta: dict[str, dict],
|
|
formula_columns: list[str],
|
|
override: dict[str, Any] | None = None,
|
|
valid_codes: set[str] | None = None,
|
|
text_rules: list[dict] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Rezolva un rand din import pentru preview: aplica mapare coloane + validare.
|
|
|
|
Intoarce un dict cu:
|
|
resolved_status: ok/needs_mapping/needs_data/needs_review
|
|
resolved: valorile finale rezolvate (VIN, data, km, prestatii)
|
|
errors: lista erori validare
|
|
flags: motive needs_review
|
|
|
|
`override` (3.6, Approach B): patch CANONIC editat in preview, aplicat ULTIMUL
|
|
peste valorile mapate (dupa `json_mapare` si canonicalizare). Permite corectarea
|
|
unei valori sau completarea unui camp a carui coloana LIPSESTE din fisier, fara
|
|
sa atinga `raw_json`/idempotency.
|
|
"""
|
|
# Aplica maparea de coloane
|
|
mapped: dict[str, Any] = {}
|
|
for col_fisier, camp_canonic in json_mapare.items():
|
|
if col_fisier in raw_row and camp_canonic:
|
|
mapped[camp_canonic] = raw_row[col_fisier]
|
|
|
|
# Detectie coloane cu formule (Issue 3) — nu blocheaza, dar adauga flag
|
|
formula_flag: list[str] = []
|
|
for col_fisier, camp_canonic in json_mapare.items():
|
|
if col_fisier in formula_columns:
|
|
formula_flag.append(f"Coloana '{col_fisier}' pare sa contina formule fara valori calculate — re-salveaza fisierul in Excel.")
|
|
|
|
# Rezolvare data prestatie
|
|
# Gaseste coloana de data din mapare
|
|
data_col_fisier = None
|
|
for col_f, camp_c in json_mapare.items():
|
|
if camp_c == "data_prestatie":
|
|
data_col_fisier = col_f
|
|
break
|
|
|
|
is_ambiguous_date = False
|
|
if data_col_fisier:
|
|
col_fmt = date_col_format.get(data_col_fisier, "ambiguous")
|
|
raw_date = mapped.get("data_prestatie")
|
|
if raw_date is not None:
|
|
iso_date, is_amb = parse_date_value(raw_date, col_fmt)
|
|
if iso_date:
|
|
mapped["data_prestatie"] = iso_date
|
|
if is_amb:
|
|
is_ambiguous_date = True
|
|
|
|
# Operatia: daca camp canonic e "operatie", construieste prestatii.
|
|
# denumire_op (coloana descriptiva, ex. "Reparatie Motor") alimenteaza
|
|
# `denumire` -> sugestia fuzzy din editorul de mapari devine utila; fara ea,
|
|
# denumire = codul opac (ex. "OP-MOTOR") si fuzzy nu are pe ce sa lucreze.
|
|
operatie_val = mapped.pop("operatie", None)
|
|
denumire_val = mapped.pop("denumire_op", None)
|
|
if operatie_val and "prestatii" not in mapped:
|
|
denumire = str(denumire_val).strip() if denumire_val not in (None, "") else str(operatie_val)
|
|
mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": denumire}]
|
|
|
|
# Canonicalizare (T9): normalizeaza VIN/nr/odometru
|
|
canon = canonicalize_row(mapped)
|
|
mapped.update({
|
|
"vin": canon["vin"],
|
|
"nr_inmatriculare": canon["nr_inmatriculare"],
|
|
"odometru_final": canon["odometru_final"],
|
|
})
|
|
|
|
# Override editat in preview (3.6) — aplicat ULTIMUL, peste valorile mapate +
|
|
# canonicalizate. Valorile din override sunt deja canonice (vezi _merge_override).
|
|
if override:
|
|
mapped.update(override)
|
|
|
|
# Flags needs_review acumulate
|
|
all_flags = list(coercion_flags) + formula_flag
|
|
if is_ambiguous_date:
|
|
all_flags.append("Data ambigua (zi<=12): verifica daca e DD.MM sau MM.DD")
|
|
|
|
# Rezolvare prestatii
|
|
prestatii = mapped.get("prestatii") or []
|
|
resolved, unmapped = resolve_prestatii(prestatii, mapping, valid_codes, text_rules)
|
|
mapped["prestatii"] = resolved
|
|
|
|
# Determinare stare
|
|
if unmapped:
|
|
return {
|
|
"resolved_status": "needs_mapping",
|
|
"resolved": mapped,
|
|
"errors": [{"unmapped": unmapped}],
|
|
"flags": all_flags,
|
|
}
|
|
|
|
# Validare continut
|
|
errors = validate_prezentare(mapped)
|
|
|
|
if all_flags:
|
|
# needs_review: chiar daca validarea trece, flagurile blocheaza auto-send
|
|
return {
|
|
"resolved_status": "needs_review",
|
|
"resolved": mapped,
|
|
"errors": errors,
|
|
"flags": all_flags,
|
|
}
|
|
|
|
# auto_send gate (T6/OV-1)
|
|
if has_no_auto_send(resolved, mapping_meta):
|
|
return {
|
|
"resolved_status": "needs_mapping",
|
|
"resolved": mapped,
|
|
"errors": [{"auto_send": "cod mapat cu auto_send=0; review manual inainte de trimitere"}],
|
|
"flags": all_flags,
|
|
}
|
|
|
|
if errors:
|
|
return {
|
|
"resolved_status": "needs_data",
|
|
"resolved": mapped,
|
|
"errors": errors,
|
|
"flags": all_flags,
|
|
}
|
|
|
|
return {
|
|
"resolved_status": "ok",
|
|
"resolved": mapped,
|
|
"errors": [],
|
|
"flags": all_flags,
|
|
}
|
|
|
|
|
|
def _build_idempotency_key(account_id: int | None, resolved: dict[str, Any]) -> str:
|
|
"""Construieste cheia de idempotenta pentru un rand rezolvat."""
|
|
canon = canonicalize_row(resolved)
|
|
return build_key(account_id, canon)
|
|
|
|
|
|
# Campuri de continut editabile in preview (3.6). Operatia/codul RAR NU se editeaza
|
|
# aici (raman in panoul de mapare) — vezi Non-Goals din PRD 3.6.
|
|
EDIT_FIELDS = ("vin", "nr_inmatriculare", "data_prestatie", "odometru_initial", "odometru_final")
|
|
|
|
|
|
def _merge_override(current: dict[str, Any], fields: dict[str, str | None]) -> dict[str, Any]:
|
|
"""Aplica campurile editate peste override-ul curent (mutatie pura).
|
|
|
|
Semantica:
|
|
- valoare None -> camp ne-trimis in cerere -> neschimbat.
|
|
- valoare "" -> STERGE cheia din override (revine la valoarea din fisier).
|
|
- valoare negoala -> set valoare CANONICA (vin/nr upper, odometru_final fara ".0").
|
|
`odometru_initial`/`data_prestatie` se pastreaza stripped (canonicalize_row normeaza
|
|
doar `_final`; validarea le verifica direct).
|
|
"""
|
|
out = dict(current)
|
|
raw: dict[str, str] = {}
|
|
for camp in EDIT_FIELDS:
|
|
val = fields.get(camp)
|
|
if val is None:
|
|
continue
|
|
s = str(val).strip()
|
|
if s == "":
|
|
out.pop(camp, None) # empty = clear
|
|
else:
|
|
raw[camp] = s
|
|
if raw:
|
|
canon = canonicalize_row(raw)
|
|
for camp in raw:
|
|
if camp in ("vin", "nr_inmatriculare", "odometru_final"):
|
|
out[camp] = canon[camp]
|
|
else:
|
|
out[camp] = raw[camp]
|
|
return out
|
|
|
|
|
|
def apply_row_override(
|
|
conn,
|
|
*,
|
|
import_id: int,
|
|
account_id: int | None,
|
|
row_index: int,
|
|
fields: dict[str, str | None],
|
|
) -> dict[str, Any]:
|
|
"""Persista override-ul canonic pentru un rand de preview (mutatie PURA de stocare).
|
|
|
|
NU recalculeaza statusul si NU atinge `submissions` — preview-ul rederiva statusul
|
|
prin `_resolve_row_for_preview` (un singur clasificator, fara drift).
|
|
|
|
Ridica HTTPException: 404 (rand/batch inexistent sau alt cont — scoping JOIN),
|
|
409 (batch deja comis), 422 (override curent corupt -> no-op defensiv, fara scriere goala).
|
|
Intoarce noul dict de override (gol = override sters).
|
|
"""
|
|
acct = account_or_default(account_id)
|
|
# Scoping intr-o singura interogare JOIN -> 404 pe gol (alt cont / batch / row_index).
|
|
row = conn.execute(
|
|
"SELECT r.id AS rid, r.override_json AS oj, b.status AS bstatus "
|
|
"FROM import_rows r JOIN import_batches b ON b.id = r.batch_id "
|
|
"WHERE b.id=? AND b.account_id=? AND r.row_index=?",
|
|
(import_id, acct, row_index),
|
|
).fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="rand de import inexistent")
|
|
if row["bstatus"] == "committed":
|
|
raise HTTPException(status_code=409, detail="batch deja comis; editarea nu mai are efect")
|
|
|
|
current: dict[str, Any] = {}
|
|
if row["oj"]:
|
|
dec = decrypt_creds(row["oj"])
|
|
if dec is None:
|
|
# Decrypt fail (cheie schimbata / token corupt): no-op defensiv, NICIODATA scriere goala.
|
|
_oi_msg = "override curent ilizibil; editare anulata"
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "override_ilizibil",
|
|
"message": _oi_msg,
|
|
**errors.eroare("IMPORT_OVERRIDE_ILIZIBIL", cauza=_oi_msg),
|
|
},
|
|
)
|
|
current = dec
|
|
|
|
new_override = _merge_override(current, fields)
|
|
enc = encrypt_creds(new_override) if new_override else None
|
|
conn.execute("UPDATE import_rows SET override_json=? WHERE id=?", (enc, row["rid"]))
|
|
return new_override
|
|
|
|
|
|
def _already_sent_lookup(conn, account_id: int, keys: list[str]) -> dict[str, dict]:
|
|
"""Cauta cheile de idempotenta in submissions (batch, nu N+1 — Eng#5).
|
|
|
|
Intoarce {idempotency_key: {id, id_prezentare, created_at}} pentru cheile gasite.
|
|
"""
|
|
acct = account_or_default(account_id)
|
|
found: dict[str, dict] = {}
|
|
# Chunk ~900 parametri (limita SQLite)
|
|
for i in range(0, len(keys), _IN_CHUNK):
|
|
chunk = keys[i:i + _IN_CHUNK]
|
|
placeholders = ",".join("?" * len(chunk))
|
|
rows = conn.execute(
|
|
f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions "
|
|
f"WHERE account_id=? AND idempotency_key IN ({placeholders})",
|
|
(acct, *chunk),
|
|
).fetchall()
|
|
for r in rows:
|
|
found[r["idempotency_key"]] = {
|
|
"submission_id": r["id"],
|
|
"id_prezentare": r["id_prezentare"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
# Dual-lookup pentru chei legacy (OV-2: chei vechi cu account_id=None)
|
|
legacy_keys_needed = [k for k in chunk if k not in found]
|
|
if legacy_keys_needed:
|
|
lph = ",".join("?" * len(legacy_keys_needed))
|
|
lrows = conn.execute(
|
|
f"SELECT idempotency_key, id, id_prezentare, created_at FROM submissions "
|
|
f"WHERE idempotency_key IN ({lph})",
|
|
tuple(legacy_keys_needed),
|
|
).fetchall()
|
|
for r in lrows:
|
|
if r["idempotency_key"] not in found:
|
|
found[r["idempotency_key"]] = {
|
|
"submission_id": r["id"],
|
|
"id_prezentare": r["id_prezentare"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
return found
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# POST /v1/import — upload fisier, staging #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
@router.post("")
|
|
async def upload_import(
|
|
file: UploadFile,
|
|
sheet_name: str | None = None,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> dict:
|
|
"""Upload fisier xlsx/csv -> staging in import_batches/import_rows.
|
|
|
|
Nu trimite nimic la RAR. Intoarce {import_id, columns, sample_rows, sheets?}.
|
|
PII (raw_json) criptat Fernet la rest (Issue 5a).
|
|
Scrieri bulk in tranzactie explicita (Issue 6).
|
|
"""
|
|
acct = account_or_default(account_id)
|
|
data = await file.read()
|
|
filename = file.filename or "fisier"
|
|
|
|
# Parsare
|
|
try:
|
|
parsed: ParsedFile = parse_file(data, filename, sheet_name=sheet_name)
|
|
except MultipleSheets as ms:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "multiple_sheets",
|
|
"message": str(ms),
|
|
"sheets": ms.sheet_names,
|
|
**errors.eroare("IMPORT_MULTIPLE_SHEETS", cauza=str(ms)),
|
|
},
|
|
)
|
|
except FileTooLarge as e:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail={
|
|
"error": "file_too_large",
|
|
"message": str(e),
|
|
**errors.eroare("IMPORT_FISIER_PREA_MARE", cauza=str(e)),
|
|
},
|
|
)
|
|
except HeaderError as e:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "header_error",
|
|
"message": str(e),
|
|
"found": e.found,
|
|
**errors.eroare("IMPORT_ANTET_NECLAR", cauza=str(e)),
|
|
},
|
|
)
|
|
except UnicodeDecodeError as e:
|
|
_enc_msg = f"Encoding nesuportat: {e.reason}"
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "encoding_error",
|
|
"message": _enc_msg,
|
|
**errors.eroare("IMPORT_ENCODING", cauza=_enc_msg),
|
|
},
|
|
)
|
|
except Exception as e:
|
|
# Fisier corupt (BadZipFile, InvalidFileException, etc.)
|
|
_inv_msg = f"Fisier nerecunoscut (xlsx/csv): {type(e).__name__}"
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "invalid_file",
|
|
"message": _inv_msg,
|
|
**errors.eroare("IMPORT_FISIER_NERECUNOSCUT", cauza=_inv_msg),
|
|
},
|
|
)
|
|
|
|
conn = get_connection()
|
|
try:
|
|
sig = _signature(parsed.columns)
|
|
|
|
# Issue 6: tranzactie explicita BEGIN IMMEDIATE + executemany
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
try:
|
|
# Insert import_batches
|
|
cur = conn.execute(
|
|
"INSERT INTO import_batches (account_id, filename, status, total, purge_after) "
|
|
"VALUES (?, ?, 'staging', ?, datetime('now', '+90 days'))",
|
|
(acct, filename, len(parsed.rows)),
|
|
)
|
|
batch_id = cur.lastrowid
|
|
|
|
# Insert import_rows bulk (executemany) cu PII criptat
|
|
row_params = []
|
|
for i, row_dict in enumerate(parsed.rows):
|
|
raw_json_enc = encrypt_creds(row_dict) # Criptat Fernet (Issue 5a)
|
|
row_params.append((batch_id, i, raw_json_enc, "pending", None))
|
|
|
|
conn.executemany(
|
|
"INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status, error) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
row_params,
|
|
)
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
raise
|
|
|
|
# Semnatura coloane + mapare existenta
|
|
existing_mapping = conn.execute(
|
|
"SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?",
|
|
(acct, sig),
|
|
).fetchone()
|
|
|
|
mapping_status = "matched" if existing_mapping else "new"
|
|
|
|
# Sample rows (primele 3, fara PII)
|
|
sample = parsed.rows[:3]
|
|
|
|
# Persistam metadata parsedata (coercion_flags, date_col_format, formula_columns)
|
|
# in import_batches pentru refolosire la preview (stocam ca JSON in 'status' nu e OK,
|
|
# ci ca metadate suplimentare — le stocam intr-un rand separat sau returnam direct)
|
|
# Solutie: le returnam in raspuns; preview-ul le va recalcula din raw_json deja stocat
|
|
# SAU le stocam ca un camp extra. Cel mai simplu: stocam coloanele in batch.
|
|
conn.execute(
|
|
"UPDATE import_batches SET ok=?, needs_review=? WHERE id=?",
|
|
(0, len(parsed.coercion_flags), batch_id),
|
|
)
|
|
|
|
result: dict = {
|
|
"import_id": batch_id,
|
|
"columns": parsed.columns,
|
|
"sample_rows": sample,
|
|
"total_rows": len(parsed.rows),
|
|
"formula_columns": parsed.formula_columns,
|
|
"date_col_format": parsed.date_col_format,
|
|
"coercion_flags_count": len(parsed.coercion_flags),
|
|
"mapping_status": mapping_status,
|
|
"signature": sig,
|
|
}
|
|
|
|
if existing_mapping:
|
|
result["column_mapping"] = json.loads(existing_mapping["json_mapare"])
|
|
result["format_data"] = existing_mapping["format_data"]
|
|
else:
|
|
# Sugestii fuzzy per coloana (Issue 5b: refoloseste normalize_for_match)
|
|
suggestions: dict[str, list[dict]] = {}
|
|
for col in parsed.columns:
|
|
sugg = _fuzzy_suggest_column(col, limit=3)
|
|
if sugg:
|
|
suggestions[col] = sugg
|
|
result["fuzzy_suggestions"] = suggestions
|
|
|
|
return result
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# GET /v1/import/{id}/column-mapping — mapare existenta / sugestii #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
@router.get("/{import_id}/column-mapping")
|
|
def get_column_mapping(
|
|
import_id: int,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> dict:
|
|
"""Intoarce maparea de coloane existenta sau sugestii fuzzy."""
|
|
acct = account_or_default(account_id)
|
|
conn = get_connection()
|
|
try:
|
|
batch = conn.execute(
|
|
"SELECT id, filename, status FROM import_batches WHERE id=? AND account_id=?",
|
|
(import_id, acct),
|
|
).fetchone()
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="batch de import inexistent")
|
|
|
|
# Obtine coloanele din primele randuri
|
|
first_rows = conn.execute(
|
|
"SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1",
|
|
(import_id,),
|
|
).fetchall()
|
|
|
|
# Obtine coloanele din batch — trebuie sa le stocam la upload
|
|
# Le recalculam din primul rand (cheile sunt coloanele fisierului)
|
|
columns = []
|
|
if first_rows:
|
|
try:
|
|
row_data = decrypt_creds(first_rows[0]["raw_json"])
|
|
if row_data:
|
|
columns = list(row_data.keys())
|
|
except Exception:
|
|
pass
|
|
|
|
sig = _signature(columns) if columns else ""
|
|
existing = conn.execute(
|
|
"SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?",
|
|
(acct, sig),
|
|
).fetchone()
|
|
|
|
if existing:
|
|
return {
|
|
"status": "matched",
|
|
"column_mapping": json.loads(existing["json_mapare"]),
|
|
"format_data": existing["format_data"],
|
|
"columns": columns,
|
|
"signature": sig,
|
|
}
|
|
|
|
# Sugestii fuzzy
|
|
suggestions: dict[str, list[dict]] = {}
|
|
for col in columns:
|
|
sugg = _fuzzy_suggest_column(col, limit=3)
|
|
if sugg:
|
|
suggestions[col] = sugg
|
|
|
|
return {
|
|
"status": "new",
|
|
"columns": columns,
|
|
"signature": sig,
|
|
"fuzzy_suggestions": suggestions,
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# POST /v1/import/{id}/column-mapping — salveaza maparea #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
class ColumnMappingIn(BaseModel):
|
|
json_mapare: dict[str, str] = Field(..., description="Mapare {col_fisier: camp_canonic}")
|
|
format_data: str | None = Field(None, description="Format data, ex: DD.MM.YYYY")
|
|
|
|
|
|
@router.post("/{import_id}/column-mapping")
|
|
def save_column_mapping(
|
|
import_id: int,
|
|
req: ColumnMappingIn,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> dict:
|
|
"""Salveaza/actualizeaza maparea de coloane pentru contul curent.
|
|
|
|
Semnatura = hash al coloanelor fisierului. Drift: daca coloanele se schimba,
|
|
maparea veche nu se aplica (signature mismatch -> mapping_status='new').
|
|
"""
|
|
acct = account_or_default(account_id)
|
|
conn = get_connection()
|
|
try:
|
|
batch = conn.execute(
|
|
"SELECT id FROM import_batches WHERE id=? AND account_id=?",
|
|
(import_id, acct),
|
|
).fetchone()
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="batch de import inexistent")
|
|
|
|
# Semnatura = antetul COMPLET al fisierului (toate coloanele din batch), nu
|
|
# doar campurile mapate. Altfel, daca clientul ignora o coloana, semnatura
|
|
# difera de cea calculata la preview (col_names = antet complet) si maparea
|
|
# retinuta nu mai e gasita. Citim antetul din primul rand al batch-ului.
|
|
first_row = conn.execute(
|
|
"SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1",
|
|
(import_id,),
|
|
).fetchone()
|
|
columns = list(req.json_mapare.keys())
|
|
if first_row:
|
|
try:
|
|
rd = decrypt_creds(first_row["raw_json"]) or {}
|
|
if rd:
|
|
columns = list(rd.keys())
|
|
except Exception:
|
|
pass
|
|
sig = _signature(columns)
|
|
|
|
conn.execute(
|
|
"INSERT INTO column_mappings (account_id, signature_coloane, json_mapare, format_data) "
|
|
"VALUES (?, ?, ?, ?) "
|
|
"ON CONFLICT(account_id, signature_coloane) DO UPDATE SET "
|
|
"json_mapare=excluded.json_mapare, format_data=excluded.format_data",
|
|
(acct, sig, json.dumps(req.json_mapare, ensure_ascii=False), req.format_data),
|
|
)
|
|
return {"ok": True, "signature": sig, "columns": columns}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# GET /v1/import/{id}/preview — 6 stari per rand (T2 + T11) #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
@router.get("/{import_id}/preview")
|
|
def preview_import(
|
|
import_id: int,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> dict:
|
|
"""Preview 6 stari per rand: ok/needs_mapping/needs_data/needs_review/already_sent/duplicate_in_file.
|
|
|
|
Nu enqueue-aza nimic. Already_sent = lookup batch (Eng#5). Duplicate_in_file = intra-batch
|
|
collision (OV-3: EXCLUSIV aici, NU in reconcile.py/worker).
|
|
"""
|
|
acct = account_or_default(account_id)
|
|
conn = get_connection()
|
|
try:
|
|
batch = conn.execute(
|
|
"SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?",
|
|
(import_id, acct),
|
|
).fetchone()
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="batch de import inexistent")
|
|
|
|
# Incarca toate randurile
|
|
raw_rows_db = conn.execute(
|
|
"SELECT row_index, raw_json, override_json FROM import_rows WHERE batch_id=? ORDER BY row_index",
|
|
(import_id,),
|
|
).fetchall()
|
|
|
|
if not raw_rows_db:
|
|
return {"rows": [], "summary": {}}
|
|
|
|
# Decripteaza si reconstruieste randurile + override-urile editate (3.6)
|
|
rows: list[dict] = []
|
|
overrides: list[dict] = []
|
|
for r in raw_rows_db:
|
|
try:
|
|
row_data = decrypt_creds(r["raw_json"])
|
|
rows.append(row_data or {})
|
|
except Exception:
|
|
rows.append({})
|
|
try:
|
|
ov = decrypt_creds(r["override_json"]) if r["override_json"] else None
|
|
except Exception:
|
|
ov = None
|
|
overrides.append(ov or {})
|
|
|
|
# Obtine coloanele
|
|
col_names = list(rows[0].keys()) if rows else []
|
|
sig = _signature(col_names)
|
|
|
|
# Mapare coloane
|
|
mapping_row = conn.execute(
|
|
"SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?",
|
|
(acct, sig),
|
|
).fetchone()
|
|
|
|
if not mapping_row:
|
|
_ncm_msg = "Maparea coloanelor nu a fost configurata. Configureaza mai intai maparea."
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "no_column_mapping",
|
|
"message": _ncm_msg,
|
|
**errors.eroare("IMPORT_FARA_MAPARE_COLOANE", cauza=_ncm_msg),
|
|
},
|
|
)
|
|
|
|
json_mapare: dict[str, str] = json.loads(mapping_row["json_mapare"])
|
|
format_data = mapping_row["format_data"]
|
|
|
|
# Incarca maparea de operatii o singura data (Eng#5: load_mapping o singura data)
|
|
mapping_meta = load_mapping_meta(conn, acct)
|
|
mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()}
|
|
# T2: validare nomenclator + reguli text incarcate O DATA, inainte de bucla pe randuri.
|
|
valid_codes = load_nomenclator_codes(conn) or None
|
|
text_rules = load_text_rules(conn, acct)
|
|
|
|
# Reconstruieste parsed info (coercion_flags si date_col_format) din datele stocate
|
|
# Nota: import_rows stocheaza raw_json dupa coercion (din parse_file)
|
|
# Recalculam flags din valorile stocate (coercion_flags nu e stocat separat)
|
|
# Vom folosi o detectie simpla: VIN-uri care par numerice si odometru float
|
|
coercion_flags_map: dict[int, list[str]] = {}
|
|
# Detectam din valorile stocate
|
|
for i, row_dict in enumerate(rows):
|
|
flags = []
|
|
# Detectam VIN numeric: daca valoarea a fost stocata si arata ca numar
|
|
for col_f, camp_c in json_mapare.items():
|
|
if camp_c == "vin":
|
|
vin_val = row_dict.get(col_f)
|
|
if vin_val is not None and str(vin_val).replace(".", "").isdigit():
|
|
flags.append(f"VIN numeric ({vin_val}) — verificati seria sasiului")
|
|
if flags:
|
|
coercion_flags_map[i] = flags
|
|
|
|
# Reconstructie date_col_format din mapare si valorile stocate
|
|
# (Simplificat: folosim "ambiguous" daca format_data nu e setat)
|
|
date_col_format: dict[str, str] = {}
|
|
if format_data:
|
|
for col_f, camp_c in json_mapare.items():
|
|
if camp_c == "data_prestatie":
|
|
date_col_format[col_f] = format_data
|
|
|
|
# Detectie formula_columns din valorile stocate (rata None ridicata)
|
|
col_none_counts: dict[str, int] = {}
|
|
for col_f in col_names:
|
|
col_none_counts[col_f] = sum(1 for r in rows if r.get(col_f) is None)
|
|
n_rows = len(rows)
|
|
formula_columns = [
|
|
col_f for col_f, cnt in col_none_counts.items()
|
|
if n_rows > 0 and cnt / n_rows >= 0.6
|
|
]
|
|
|
|
# Rezolva fiecare rand
|
|
preview_rows: list[dict] = []
|
|
keys_for_lookup: list[str] = []
|
|
key_to_index: dict[str, list[int]] = {} # key -> [row_index, ...]
|
|
|
|
for i, row_dict in enumerate(rows):
|
|
flags = coercion_flags_map.get(i, [])
|
|
resolved_info = _resolve_row_for_preview(
|
|
raw_row=row_dict,
|
|
json_mapare=json_mapare,
|
|
date_col_format=date_col_format,
|
|
coercion_flags=flags,
|
|
mapping=mapping,
|
|
mapping_meta=mapping_meta,
|
|
formula_columns=formula_columns,
|
|
override=overrides[i] or None,
|
|
valid_codes=valid_codes,
|
|
text_rules=text_rules,
|
|
)
|
|
|
|
# Calculeaza cheia de idempotenta pentru randurile ok/needs_review
|
|
key = None
|
|
if resolved_info["resolved_status"] in ("ok", "needs_review", "needs_data"):
|
|
try:
|
|
key = _build_idempotency_key(account_id, resolved_info["resolved"])
|
|
keys_for_lookup.append(key)
|
|
if key not in key_to_index:
|
|
key_to_index[key] = []
|
|
key_to_index[key].append(i)
|
|
except Exception:
|
|
pass
|
|
|
|
preview_rows.append({
|
|
"row_index": i,
|
|
"resolved_status": resolved_info["resolved_status"],
|
|
"resolved": resolved_info["resolved"],
|
|
"errors": resolved_info["errors"],
|
|
"flags": resolved_info["flags"],
|
|
"idempotency_key": key,
|
|
})
|
|
|
|
# Already_sent: batch lookup (Eng#5 — nu N+1)
|
|
unique_keys = list(set(keys_for_lookup))
|
|
already_sent_map = _already_sent_lookup(conn, account_id, unique_keys)
|
|
|
|
# Duplicate_in_file (OV-3): detectie coliziuni intra-batch
|
|
# Grupam pe cheie de idempotenta: >1 rand cu aceeasi cheie = duplicate
|
|
key_to_indices: dict[str, list[int]] = {}
|
|
for row in preview_rows:
|
|
k = row.get("idempotency_key")
|
|
if k:
|
|
if k not in key_to_indices:
|
|
key_to_indices[k] = []
|
|
key_to_indices[k].append(row["row_index"])
|
|
|
|
# Aplica already_sent si duplicate_in_file
|
|
for row in preview_rows:
|
|
k = row.get("idempotency_key")
|
|
if not k:
|
|
continue
|
|
|
|
# Already_sent: cheia exista deja in submissions
|
|
if k in already_sent_map and row["resolved_status"] in ("ok", "needs_review", "needs_data"):
|
|
sent_info = already_sent_map[k]
|
|
row["resolved_status"] = "already_sent"
|
|
row["already_sent_info"] = sent_info
|
|
continue
|
|
|
|
# Duplicate_in_file (OV-3): >1 rand cu aceeasi cheie in ACELASI fisier
|
|
indices_with_same_key = key_to_indices.get(k, [])
|
|
if len(indices_with_same_key) > 1 and row["resolved_status"] in ("ok", "needs_review", "needs_data"):
|
|
others = [idx for idx in indices_with_same_key if idx != row["row_index"]]
|
|
row["resolved_status"] = "duplicate_in_file"
|
|
row["duplicate_with"] = others
|
|
|
|
# Rezumat
|
|
summary: dict[str, int] = {}
|
|
for row in preview_rows:
|
|
s = row["resolved_status"]
|
|
summary[s] = summary.get(s, 0) + 1
|
|
|
|
# Actualizeaza contoarele in import_batches
|
|
conn.execute(
|
|
"UPDATE import_batches SET ok=?, needs_mapping=?, needs_data=?, needs_review=?, "
|
|
"already_sent=?, duplicate_in_file=? WHERE id=?",
|
|
(
|
|
summary.get("ok", 0),
|
|
summary.get("needs_mapping", 0),
|
|
summary.get("needs_data", 0),
|
|
summary.get("needs_review", 0),
|
|
summary.get("already_sent", 0),
|
|
summary.get("duplicate_in_file", 0),
|
|
import_id,
|
|
),
|
|
)
|
|
|
|
# Actualizeaza resolved_status in import_rows
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
try:
|
|
upd_params = [
|
|
(row["resolved_status"], import_id, row["row_index"])
|
|
for row in preview_rows
|
|
]
|
|
conn.executemany(
|
|
"UPDATE import_rows SET resolved_status=? WHERE batch_id=? AND row_index=?",
|
|
upd_params,
|
|
)
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
|
|
return {
|
|
"import_id": import_id,
|
|
"rows": preview_rows,
|
|
"summary": summary,
|
|
"total": len(preview_rows),
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# POST /v1/import/{id}/commit — gate HARD + enqueue + log atestare (T5+T12) #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
class CommitIn(BaseModel):
|
|
n_confirmat: int = Field(..., description="Numarul de randuri ok confirmate (gate HARD)")
|
|
reviewed_rows: list[int] = Field(
|
|
default_factory=list,
|
|
description="Indecsi de rand needs_review bifate explicit de utilizator",
|
|
)
|
|
confirmed_by: str | None = Field(None, description="Email/identifier utilizator (log atestare)")
|
|
|
|
|
|
@router.post("/{import_id}/commit")
|
|
def commit_import(
|
|
import_id: int,
|
|
req: CommitIn,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> dict:
|
|
"""Gate HARD confirmare + enqueue randuri ok + log atestare (T5+T12).
|
|
|
|
TOCTOU (Issue 1): INSERT per-rand cu ON CONFLICT(idempotency_key) DO NOTHING.
|
|
Randuri colidante = reclasificate already_sent in rezultatul commit-ului.
|
|
rows_hash + n_confirmed acopera DOAR randurile efectiv puse in coada.
|
|
"""
|
|
acct = account_or_default(account_id)
|
|
conn = get_connection()
|
|
try:
|
|
batch = conn.execute(
|
|
"SELECT id, account_id, filename, status FROM import_batches WHERE id=? AND account_id=?",
|
|
(import_id, acct),
|
|
).fetchone()
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="batch de import inexistent")
|
|
|
|
if batch["status"] == "committed":
|
|
raise HTTPException(status_code=409, detail="batch deja comis")
|
|
|
|
# Incarca randurile cu stare ok sau needs_review
|
|
ok_rows_db = conn.execute(
|
|
"SELECT row_index, raw_json, override_json, resolved_status FROM import_rows "
|
|
"WHERE batch_id=? AND resolved_status IN ('ok', 'needs_review') ORDER BY row_index",
|
|
(import_id,),
|
|
).fetchall()
|
|
|
|
if not ok_rows_db:
|
|
raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat in acest batch.")
|
|
|
|
def _override_of(r) -> dict:
|
|
return (decrypt_creds(r["override_json"]) if r["override_json"] else None) or {}
|
|
|
|
# Decripteaza randurile ok
|
|
ok_rows: list[dict] = []
|
|
ok_indices: list[int] = []
|
|
review_indices: set[int] = set()
|
|
|
|
for r in ok_rows_db:
|
|
try:
|
|
row_data = decrypt_creds(r["raw_json"])
|
|
if row_data is None:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
if r["resolved_status"] == "ok":
|
|
ok_rows.append({"row_index": r["row_index"], "data": row_data,
|
|
"override": _override_of(r), "status": "ok"})
|
|
ok_indices.append(r["row_index"])
|
|
elif r["resolved_status"] == "needs_review":
|
|
review_indices.add(r["row_index"])
|
|
|
|
# needs_review bifate explicit (Voce#1 — atestare pe valori)
|
|
confirmed_review = [idx for idx in req.reviewed_rows if idx in review_indices]
|
|
for idx in confirmed_review:
|
|
# Gaseste randul needs_review si il adauga la ok_rows
|
|
for r in ok_rows_db:
|
|
if r["row_index"] == idx and r["resolved_status"] == "needs_review":
|
|
try:
|
|
row_data = decrypt_creds(r["raw_json"])
|
|
if row_data:
|
|
ok_rows.append({"row_index": idx, "data": row_data,
|
|
"override": _override_of(r), "status": "needs_review"})
|
|
ok_indices.append(idx)
|
|
except Exception:
|
|
pass
|
|
|
|
# Gate HARD: n_confirmat trebuie sa fie EXACT egal cu numarul de randuri ok
|
|
n_total_ok = len(ok_rows)
|
|
if req.n_confirmat != n_total_ok:
|
|
_cg_msg = f"Ai confirmat {req.n_confirmat} dar sunt {n_total_ok} randuri gata de trimis. Verifica preview-ul."
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail={
|
|
"error": "confirmare_gresita",
|
|
"message": _cg_msg,
|
|
"n_ok": n_total_ok,
|
|
**errors.eroare("IMPORT_CONFIRMARE_GRESITA", cauza=_cg_msg),
|
|
},
|
|
)
|
|
|
|
if n_total_ok == 0:
|
|
raise HTTPException(status_code=422, detail="Niciun rand ok de confirmat.")
|
|
|
|
# Incarca maparea de coloane pentru a construi payload-ul
|
|
first_row_db = conn.execute(
|
|
"SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1",
|
|
(import_id,),
|
|
).fetchone()
|
|
col_names = []
|
|
if first_row_db:
|
|
try:
|
|
fd = decrypt_creds(first_row_db["raw_json"])
|
|
if fd:
|
|
col_names = list(fd.keys())
|
|
except Exception:
|
|
pass
|
|
|
|
sig = _signature(col_names) if col_names else ""
|
|
mapping_row = conn.execute(
|
|
"SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?",
|
|
(acct, sig),
|
|
).fetchone()
|
|
|
|
json_mapare: dict[str, str] = {}
|
|
if mapping_row:
|
|
json_mapare = json.loads(mapping_row["json_mapare"])
|
|
|
|
# Incarca maparea de operatii
|
|
mapping_meta = load_mapping_meta(conn, acct)
|
|
mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()}
|
|
# T2: validare nomenclator + reguli text incarcate O DATA, inainte de bucla pe randuri.
|
|
valid_codes = load_nomenclator_codes(conn) or None
|
|
text_rules = load_text_rules(conn, acct)
|
|
|
|
# Construieste payload-urile submissions
|
|
enqueued: list[dict] = []
|
|
toctou_collisions: list[int] = []
|
|
rows_for_hash: list[str] = []
|
|
|
|
# Enqueue in tranzactie explicita (Issue 6)
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
try:
|
|
# purge_after pentru submissions noi (T16)
|
|
purge_after_sql = "datetime('now', '+90 days')"
|
|
|
|
for ok_row in ok_rows:
|
|
row_dict = ok_row["data"]
|
|
row_index = ok_row["row_index"]
|
|
|
|
# Aplica maparea de coloane
|
|
mapped: dict[str, Any] = {}
|
|
for col_f, camp_c in json_mapare.items():
|
|
if col_f in row_dict and camp_c:
|
|
mapped[camp_c] = row_dict[col_f]
|
|
|
|
# Rezolva data
|
|
date_col_fisier = None
|
|
for col_f, camp_c in json_mapare.items():
|
|
if camp_c == "data_prestatie":
|
|
date_col_fisier = col_f
|
|
break
|
|
|
|
if date_col_fisier:
|
|
col_fmt = (mapping_row["format_data"] if mapping_row else None) or "ambiguous"
|
|
raw_date = mapped.get("data_prestatie")
|
|
if raw_date is not None:
|
|
iso_date, _ = parse_date_value(raw_date, col_fmt)
|
|
if iso_date:
|
|
mapped["data_prestatie"] = iso_date
|
|
|
|
# Operatia -> prestatii (denumire_op alimenteaza denumirea reala)
|
|
operatie_val = mapped.pop("operatie", None)
|
|
denumire_val = mapped.pop("denumire_op", None)
|
|
if operatie_val and "prestatii" not in mapped:
|
|
denumire = str(denumire_val).strip() if denumire_val not in (None, "") else str(operatie_val)
|
|
mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": denumire}]
|
|
|
|
# Rezolva prestatii INAINTE de canonicalizare (altfel cheia difera de cea din preview)
|
|
prestatii = mapped.get("prestatii") or []
|
|
resolved, _ = resolve_prestatii(prestatii, mapping, valid_codes, text_rules)
|
|
mapped["prestatii"] = resolved
|
|
|
|
# Canonicalizare (dupa rezolvare prestatii -> cod_prestatie inclus in cheie)
|
|
canon = canonicalize_row(mapped)
|
|
mapped.update({
|
|
"vin": canon["vin"],
|
|
"nr_inmatriculare": canon["nr_inmatriculare"],
|
|
"odometru_final": canon["odometru_final"],
|
|
})
|
|
|
|
# Override editat in preview (3.6) — aplicat ULTIMUL, ca in resolver.
|
|
override = ok_row.get("override") or {}
|
|
if override:
|
|
mapped.update(override)
|
|
# Re-canonicalizeaza pentru a obtine cheia IDENTICA cu cea din preview
|
|
# (_build_idempotency_key = canonicalize_row + build_key peste mapped).
|
|
canon = canonicalize_row(mapped)
|
|
mapped.update({
|
|
"vin": canon["vin"],
|
|
"nr_inmatriculare": canon["nr_inmatriculare"],
|
|
"odometru_final": canon["odometru_final"],
|
|
})
|
|
|
|
# Cheia de idempotenta (identica cu cheia din preview — aceeasi ordine)
|
|
key = build_key(account_id, canon)
|
|
|
|
# Hash row pentru atestare (valori rezolvate)
|
|
rows_for_hash.append(json.dumps({
|
|
"row_index": row_index,
|
|
"vin": mapped.get("vin"),
|
|
"data_prestatie": mapped.get("data_prestatie"),
|
|
"odometru_final": mapped.get("odometru_final"),
|
|
"prestatii": [str(p.get("cod_prestatie") or p.get("cod_op_service") or "") for p in resolved],
|
|
}, sort_keys=True, ensure_ascii=False))
|
|
|
|
payload_json = json.dumps(mapped, ensure_ascii=False)
|
|
|
|
# INSERT ON CONFLICT DO NOTHING (TOCTOU — Issue 1)
|
|
cur = conn.execute(
|
|
"INSERT OR IGNORE INTO submissions "
|
|
"(idempotency_key, account_id, status, payload_json, batch_id, row_index, purge_after) "
|
|
"VALUES (?, ?, 'queued', ?, ?, ?, " + purge_after_sql + ")",
|
|
(key, acct, payload_json, import_id, row_index),
|
|
)
|
|
|
|
if cur.rowcount == 0:
|
|
# Coliziune TOCTOU: cheia a fost inserata de un canal concurent
|
|
toctou_collisions.append(row_index)
|
|
else:
|
|
sub_id = cur.lastrowid
|
|
# US-010: telemetrie pentru itemii rezolvati prin regula text.
|
|
_emite_text_rule_hits(conn, acct, int(sub_id), resolved)
|
|
enqueued.append({
|
|
"submission_id": sub_id,
|
|
"row_index": row_index,
|
|
"idempotency_key": key,
|
|
})
|
|
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
raise
|
|
|
|
n_enqueued = len(enqueued)
|
|
|
|
# Log atestare (Voce#9): rows_hash + n_confirmed acopera DOAR randurile puse in coada
|
|
rows_hash = hashlib.sha256(
|
|
json.dumps(sorted(rows_for_hash), ensure_ascii=False).encode("utf-8")
|
|
).hexdigest() if rows_for_hash else ""
|
|
|
|
conn.execute(
|
|
"INSERT INTO import_attestations (batch_id, account_id, confirmed_by, rows_hash, n_confirmed) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(import_id, acct, req.confirmed_by, rows_hash, n_enqueued),
|
|
)
|
|
|
|
# Actualizeaza status batch
|
|
new_status = "committed" if n_enqueued > 0 else "staging"
|
|
conn.execute(
|
|
"UPDATE import_batches SET status=?, ok=? WHERE id=?",
|
|
(new_status, n_enqueued, import_id),
|
|
)
|
|
|
|
return {
|
|
"import_id": import_id,
|
|
"enqueued": n_enqueued,
|
|
"toctou_collisions": toctou_collisions,
|
|
"rows_hash": rows_hash,
|
|
"submissions": enqueued,
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# POST /v1/import/{id}/rand/{row_index}/editeaza — editare celule preview (3.6) #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
class RandEditIn(BaseModel):
|
|
"""Campuri de continut editabile in preview. None = ne-trimis (neschimbat);
|
|
"" = sterge override-ul (revine la valoarea din fisier)."""
|
|
vin: str | None = None
|
|
nr_inmatriculare: str | None = None
|
|
data_prestatie: str | None = None
|
|
odometru_initial: str | None = None
|
|
odometru_final: str | None = None
|
|
|
|
|
|
@router.post("/{import_id}/rand/{row_index}/editeaza")
|
|
def editeaza_rand(
|
|
import_id: int,
|
|
row_index: int,
|
|
req: RandEditIn,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> dict:
|
|
"""Persista editarea unui rand de preview (mutatie pura — Approach B, 3.6).
|
|
|
|
NU recalculeaza statusul si NU atinge `submissions`; preview-ul rederiva statusul
|
|
prin `_resolve_row_for_preview` cu override aplicat ultimul.
|
|
"""
|
|
conn = get_connection()
|
|
try:
|
|
override = apply_row_override(
|
|
conn,
|
|
import_id=import_id,
|
|
account_id=account_id,
|
|
row_index=row_index,
|
|
fields=req.model_dump(),
|
|
)
|
|
return {"ok": True, "import_id": import_id, "row_index": row_index, "override": override}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# GET /v1/import/{id}/export-failed — CSV randuri esuate (T8) #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
_EXPORT_FAILED_COLUMNS = [
|
|
"row_index",
|
|
"resolved_status",
|
|
"vin",
|
|
"nr_inmatriculare",
|
|
"data_prestatie",
|
|
"odometru_final",
|
|
"operatie",
|
|
"error",
|
|
]
|
|
|
|
|
|
@router.get("/{import_id}/export-failed")
|
|
def export_failed_rows(
|
|
import_id: int,
|
|
account_id: int = Depends(resolve_account_id),
|
|
) -> StreamingResponse:
|
|
"""CSV cu randurile esuate (needs_data/needs_mapping/needs_review) pentru corectie + re-upload."""
|
|
acct = account_or_default(account_id)
|
|
conn = get_connection()
|
|
try:
|
|
batch = conn.execute(
|
|
"SELECT id, filename FROM import_batches WHERE id=? AND account_id=?",
|
|
(import_id, acct),
|
|
).fetchone()
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="batch de import inexistent")
|
|
|
|
failed_rows = conn.execute(
|
|
"SELECT row_index, raw_json, resolved_status, error FROM import_rows "
|
|
"WHERE batch_id=? AND resolved_status IN ('needs_data', 'needs_mapping', 'needs_review') "
|
|
"ORDER BY row_index",
|
|
(import_id,),
|
|
).fetchall()
|
|
|
|
# Incarca maparea de coloane pentru a stii ce campuri sa extraga
|
|
first_row = conn.execute(
|
|
"SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1",
|
|
(import_id,),
|
|
).fetchone()
|
|
col_names = []
|
|
if first_row:
|
|
try:
|
|
fd = decrypt_creds(first_row["raw_json"])
|
|
if fd:
|
|
col_names = list(fd.keys())
|
|
except Exception:
|
|
pass
|
|
|
|
sig = _signature(col_names) if col_names else ""
|
|
mapping_row = conn.execute(
|
|
"SELECT json_mapare FROM column_mappings WHERE account_id=? AND signature_coloane=?",
|
|
(acct, sig),
|
|
).fetchone()
|
|
json_mapare: dict[str, str] = {}
|
|
if mapping_row:
|
|
json_mapare = json.loads(mapping_row["json_mapare"])
|
|
|
|
buf = io.StringIO()
|
|
writer = csv.DictWriter(buf, fieldnames=_EXPORT_FAILED_COLUMNS, extrasaction="ignore")
|
|
writer.writeheader()
|
|
|
|
for r in failed_rows:
|
|
try:
|
|
row_data = decrypt_creds(r["raw_json"]) or {}
|
|
except Exception:
|
|
row_data = {}
|
|
|
|
# Extrage campuri canonice din mapare
|
|
mapped: dict[str, Any] = {}
|
|
for col_f, camp_c in json_mapare.items():
|
|
if col_f in row_data:
|
|
mapped[camp_c] = row_data[col_f]
|
|
|
|
# Operatia (camp canonic 'operatie' sau din prestatii)
|
|
operatie_val = mapped.get("operatie") or ""
|
|
if not operatie_val and mapped.get("prestatii"):
|
|
prestatii = mapped.get("prestatii")
|
|
if isinstance(prestatii, list) and prestatii:
|
|
first_p = prestatii[0]
|
|
if isinstance(first_p, dict):
|
|
operatie_val = first_p.get("cod_op_service") or first_p.get("denumire") or ""
|
|
|
|
# Erori (din import_rows.error sau din resolved_status)
|
|
error_str = r["error"] or r["resolved_status"]
|
|
|
|
writer.writerow({
|
|
"row_index": r["row_index"],
|
|
"resolved_status": r["resolved_status"],
|
|
"vin": mapped.get("vin") or "",
|
|
"nr_inmatriculare": mapped.get("nr_inmatriculare") or "",
|
|
"data_prestatie": mapped.get("data_prestatie") or "",
|
|
"odometru_final": mapped.get("odometru_final") or "",
|
|
"operatie": operatie_val,
|
|
"error": error_str,
|
|
})
|
|
|
|
data = buf.getvalue()
|
|
finally:
|
|
conn.close()
|
|
|
|
fname = f"import_{import_id}_failed.csv"
|
|
return StreamingResponse(
|
|
iter([data]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
|
|
)
|