Files
rar-autopass/app/web/routes.py
Claude Agent 4a1d28749a feat(web): dashboard ergonomic cu tab-uri, stepper import si microcopy uman (3.4)
Reorganizeaza interfata web pe trei principii, fara a atinge backend-ul de
trimitere (worker, mapping, idempotency, masina de stari neatinse):

- US-001 app/web/labels.py: modul pur stari tehnice -> text uman + clasa CSS
- US-002 bara status /_fragments/status: microcopy uman, defalcare blocate, scoped cont
- US-003 shell 6 tab-uri (Acasa/Import/Coada/Mapari/Cont/Nomenclator): deep-link
  ?tab=, panou activ randat server-side, fragmente inactive lazy, ARIA real
- US-004 stepper import 4 pasi (pur vizual; hx-target + csrf pastrate)
- US-005 Acasa onboarding checklist auto-bifat + colaps + empty states prietenoase

Reparat in cursul VERIFY/CLOSE: izolare teste (reset ratelimit._hits in fixturi),
regresie avertisment "cont in asteptare de activare" (re-introdus in bara status),
culori hardcodate -> variabile paleta. 434 teste pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 22:26:10 +00:00

1310 lines
49 KiB
Python

"""Dashboard Jinja2 + HTMX (server-rendered, zero build).
Schelet cu stari explicite: empty (coada goala), banner alerta blocate,
worker viu/mort, ultimul login RAR. Editor mapari + browser nomenclator +
export CSV + stare "RAR indisponibil" = de adaugat (plan.md sect. 4 + design-review).
U5 — Rute web pentru import fisier (upload → mapare coloane → preview → confirma → trimite).
Consuma endpointurile backend din import_router (helper-e interne) fara a le modifica.
Toate rutele /_import/* returneaza fragmente HTML targetate pe #import-section prin HTMX.
"""
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import APIRouter, File, Form, Request, UploadFile
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from .. import __version__
from ..auth import rotate_api_key
from ..web.csrf import get_csrf_token, verify_csrf
from .labels import (
ETICHETA_ULTIMA_AUTENTIFICARE_RAR,
eticheta_rar,
eticheta_stare,
eticheta_worker,
)
from ..web.session import require_login
from ..api.v1.import_router import (
_already_sent_lookup,
_build_idempotency_key,
_CANONICAL_SYNONYMS,
_fuzzy_suggest_column,
_resolve_row_for_preview,
_signature,
)
from ..config import get_settings
from ..crypto import decrypt_creds, encrypt_creds
from ..db import get_connection, read_heartbeat
from ..idempotency import build_key, canonicalize_row
from ..import_parse import FileTooLarge, HeaderError, MultipleSheets, parse_date_value, parse_file
from ..users import is_account_admin
from ..mapping import (
DEFAULT_ACCOUNT_ID,
account_or_default,
load_mapping_meta,
load_nomenclator,
pending_unmapped,
reresolve_account,
resolve_prestatii,
save_mapping,
suggest_codes,
)
# Campuri canonice cu eticheta umana pentru dropdown mapare coloane (U5)
_CANONICAL_FIELDS = [(k, v[0]) for k, v in _CANONICAL_SYNONYMS.items()]
router = APIRouter(tags=["web"])
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parent / "templates"))
_BLOCKED = ("error", "needs_data", "needs_mapping")
def _ctx(request: Request, **extra) -> dict:
"""Context de baza pentru template-uri cu formulare: include mereu csrf_token.
Previne lock-out in prod (web_auth_required=True): orice re-randare de eroare
trebuie sa includa csrf_token negol altfel urmatorul submit da 403 (task #8).
"""
return {"request": request, "csrf_token": get_csrf_token(request), **extra}
def _status_counts(conn, account_id: int) -> dict[str, int]:
rows = conn.execute(
"SELECT status, COUNT(*) AS n FROM submissions "
"WHERE (account_id = ? OR (? = 1 AND account_id IS NULL)) "
"GROUP BY status",
(account_id, account_id),
).fetchall()
return {r["status"]: int(r["n"]) for r in rows}
def _account_active(conn, account_id: int) -> bool:
"""True daca contul e activ (sau legacy cu NULL/absent active)."""
row = conn.execute("SELECT active FROM accounts WHERE id=?", (account_id,)).fetchone()
return bool(row["active"]) if row else True
def _worker_alive(hb) -> bool:
if hb is None or not hb["last_beat"]:
return False
try:
last = datetime.fromisoformat(hb["last_beat"])
except ValueError:
return False
age = (datetime.now(timezone.utc) - last).total_seconds()
return age <= get_settings().worker_heartbeat_stale_s
def _rar_state(hb, worker_alive: bool) -> str:
"""Eticheta de disponibilitate RAR, derivata din ultimul login reusit.
Nu interogam RAR live aici (dashboard-ul degradeaza la ultima stare cunoscuta
a cozii). JWT TTL = 30h: un login mai vechi de atat inseamna ca nu mai stim
sigur ca RAR raspunde -> "indisponibil?". Fara niciun login -> necunoscut.
"""
if not worker_alive:
return "necunoscut (worker oprit)"
last = hb["last_rar_login_ok"] if hb else None
if not last:
return "fara login reusit inca"
try:
age = (datetime.now(timezone.utc) - datetime.fromisoformat(last)).total_seconds()
except (ValueError, TypeError):
return "necunoscut"
return "indisponibil?" if age > 108000 else "ok"
_TABS_VALIDE = {"acasa", "import", "coada", "mapari", "cont", "nomenclator"}
def _get_acasa_context(request: Request, conn, account_id: int) -> dict:
"""Calculeaza contextul pentru panoul Acasa (US-005).
Scoped pe contul sesiunii — identic cu pattern-ul din restul rutelor (NULL->1).
"""
from ..mapping import account_or_default
acct = account_or_default(account_id)
# Pas 1: are credentiale RAR configurate?
row = conn.execute(
"SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,)
).fetchone()
are_creds = bool(row and row["rar_creds_enc"])
# Pas 3: are cel putin un submission (trimis sau in coada)?
row_sub = conn.execute(
"SELECT 1 FROM submissions "
"WHERE (account_id = ? OR (? = 1 AND account_id IS NULL)) LIMIT 1",
(acct, acct),
).fetchone()
are_trimiteri = row_sub is not None
# Pas 2 (optional): are cheie API activa?
row_key = conn.execute(
"SELECT 1 FROM api_keys WHERE account_id=? AND active=1 LIMIT 1",
(acct,),
).fetchone()
are_cheie_folosita = row_key is not None
return {
"request": request,
"are_creds": are_creds,
"are_trimiteri": are_trimiteri,
"are_cheie_folosita": are_cheie_folosita,
}
def _render_panel_acasa(request: Request, conn=None, account_id: int = 1) -> str:
"""Randeaza panoul Acasa ca string HTML."""
if conn is None:
return templates.get_template("_acasa.html").render({"request": request})
ctx = _get_acasa_context(request, conn, account_id)
return templates.get_template("_acasa.html").render(ctx)
def _render_panel_import(request: Request) -> str:
"""Randeaza panoul Import ca string HTML (include _upload.html)."""
return templates.get_template("_upload.html").render({
"request": request,
"csrf_token": get_csrf_token(request),
})
def _render_panel_coada(request: Request) -> str:
"""Randeaza panoul Coada ca string HTML."""
return templates.get_template("_coada.html").render({"request": request})
def _render_panel_mapari(request: Request, conn, account_id: int) -> str:
"""Randeaza panoul Mapari ca string HTML."""
return templates.get_template("_mapari.html").render({
"request": request,
"pending": pending_unmapped(conn, account_id),
"nomenclator": load_nomenclator(conn),
"message": None,
"csrf_token": get_csrf_token(request),
})
def _render_panel_cont(request: Request, conn, account_id: int) -> str:
"""Randeaza panoul Cont ca string HTML."""
from ..mapping import account_or_default
acct = account_or_default(account_id)
row = conn.execute("SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,)).fetchone()
are_creds = bool(row and row["rar_creds_enc"])
return templates.get_template("_cont.html").render({
"request": request,
"csrf_token": get_csrf_token(request),
"api_key": None,
"are_creds": are_creds,
"creds_mesaj": None,
"creds_eroare": None,
"rot_eroare": None,
})
def _render_panel_nomenclator(request: Request, conn) -> str:
"""Randeaza panoul Nomenclator ca string HTML."""
rows = conn.execute(
"SELECT cod_prestatie, nume_prestatie, updated_at FROM nomenclator_rar ORDER BY cod_prestatie"
).fetchall()
return templates.get_template("_nomenclator.html").render({
"request": request,
"rows": rows,
})
def _render_panel_for_tab(request: Request, conn, account_id: int, tab: str) -> str:
"""Randeaza panoul corespunzator unui tab ca string HTML."""
if tab == "acasa":
return _render_panel_acasa(request, conn, account_id)
if tab == "import":
return _render_panel_import(request)
if tab == "coada":
return _render_panel_coada(request)
if tab == "mapari":
return _render_panel_mapari(request, conn, account_id)
if tab == "cont":
return _render_panel_cont(request, conn, account_id)
if tab == "nomenclator":
return _render_panel_nomenclator(request, conn)
return _render_panel_acasa(request)
@router.get("/", response_class=HTMLResponse)
def dashboard(request: Request, tab: str = "acasa") -> HTMLResponse:
"""Dashboard principal cu tab-uri (US-003).
Parametrul ?tab= permite deep-link pe orice sectiune; panoul activ e randat
server-side la full load (fara palpaiere la refresh, degradare gratiosa fara JS).
Tab invalid -> fallback la 'acasa'.
"""
account_id = require_login(request)
active_tab = tab if tab in _TABS_VALIDE else "acasa"
conn = get_connection()
try:
panel_html = _render_panel_for_tab(request, conn, account_id, active_tab)
ctx = {
"request": request,
"rar_env": get_settings().rar_env,
"version": __version__,
"active_tab": active_tab,
"panel_html": panel_html,
"is_admin": is_account_admin(conn, account_id),
"csrf_token": get_csrf_token(request),
}
return templates.TemplateResponse("dashboard.html", ctx)
finally:
conn.close()
@router.get("/_fragments/acasa", response_class=HTMLResponse)
def fragment_acasa(request: Request) -> HTMLResponse:
"""Fragment HTMX pentru tab-ul Acasa (US-003, US-005)."""
account_id = require_login(request)
conn = get_connection()
try:
ctx = _get_acasa_context(request, conn, account_id)
return templates.TemplateResponse("_acasa.html", ctx)
finally:
conn.close()
@router.get("/_fragments/import", response_class=HTMLResponse)
def fragment_import(request: Request) -> HTMLResponse:
"""Fragment HTMX pentru tab-ul Import — include zona de upload (US-003)."""
require_login(request)
return templates.TemplateResponse("_upload.html", _ctx(request))
@router.get("/_fragments/coada", response_class=HTMLResponse)
def fragment_coada(request: Request) -> HTMLResponse:
"""Fragment HTMX pentru tab-ul Coada — include coada submissions (US-003)."""
require_login(request)
return templates.TemplateResponse("_coada.html", {"request": request})
@router.get("/_fragments/nomenclator", response_class=HTMLResponse)
def fragment_nomenclator(request: Request) -> HTMLResponse:
"""Browser nomenclator RAR (cache local upsert-at de worker la fiecare login)."""
conn = get_connection()
try:
rows = conn.execute(
"SELECT cod_prestatie, nume_prestatie, updated_at FROM nomenclator_rar ORDER BY cod_prestatie"
).fetchall()
return templates.TemplateResponse(
"_nomenclator.html", {"request": request, "rows": rows}
)
finally:
conn.close()
@router.get("/_fragments/banner", response_class=HTMLResponse)
def fragment_banner(request: Request) -> HTMLResponse:
account_id = require_login(request)
conn = get_connection()
try:
counts = _status_counts(conn, account_id)
blocked = sum(counts.get(s, 0) for s in _BLOCKED)
return templates.TemplateResponse("_banner.html", {
"request": request,
"blocked": blocked,
"account_active": _account_active(conn, account_id),
})
finally:
conn.close()
def _blocate_defalcat(counts: dict[str, int]) -> list[tuple]:
"""Construieste lista [(eticheta, n), ...] pentru starile blocate cu n > 0.
Ordinea: needs_mapping, needs_data, error — aceeasi ca in PRD.
Returneaza lista goala daca nu exista nicio stare blocata.
"""
rezultat = []
for status in ("needs_mapping", "needs_data", "error"):
n = counts.get(status, 0)
if n > 0:
rezultat.append((eticheta_stare(status), n))
return rezultat
@router.get("/_fragments/status", response_class=HTMLResponse)
def fragment_status(request: Request) -> HTMLResponse:
"""Bara de status persistenta cu etichete umane (US-002, PRD 3.4).
Scoped pe contul sesiunii. Expune starea worker, legatura RAR, ultima
autentificare, contorii de coada si defalcarea blocatelor pe motiv.
Logica in routes.py (nu in template) pentru testabilitate.
"""
account_id = require_login(request)
conn = get_connection()
try:
counts = _status_counts(conn, account_id)
hb = read_heartbeat(conn)
worker_alive = _worker_alive(hb)
rar_state = _rar_state(hb, worker_alive)
# Etichete umane pre-calculate (nu logica in template)
worker_lbl = eticheta_worker(worker_alive)
# eticheta_rar accepta "ok" sau orice alt string -> indisponibil/necunoscut
rar_lbl = eticheta_rar("ok" if rar_state == "ok" else rar_state)
return templates.TemplateResponse("_status.html", {
"request": request,
"worker_lbl": worker_lbl,
"rar_lbl": rar_lbl,
"eticheta_ultima_auth": ETICHETA_ULTIMA_AUTENTIFICARE_RAR,
"last_login": hb["last_rar_login_ok"] if hb else None,
"counts_queued": counts.get("queued", 0),
"counts_sent": counts.get("sent", 0),
"blocate_defalcat": _blocate_defalcat(counts),
"account_active": _account_active(conn, account_id),
})
finally:
conn.close()
@router.get("/_fragments/submissions", response_class=HTMLResponse)
def fragment_submissions(request: Request) -> HTMLResponse:
account_id = require_login(request)
conn = get_connection()
try:
rows = conn.execute(
"SELECT id, status, id_prezentare, rar_status_code, rar_error, retry_count, updated_at "
"FROM submissions "
"WHERE (account_id = ? OR (? = 1 AND account_id IS NULL)) "
"ORDER BY id DESC LIMIT 100",
(account_id, account_id),
).fetchall()
return templates.TemplateResponse("_submissions.html", {"request": request, "rows": rows})
finally:
conn.close()
def _render_mapari(
request: Request, conn, account_id: int, *, message: str | None = None
) -> HTMLResponse:
return templates.TemplateResponse(
"_mapari.html",
{
"request": request,
"pending": pending_unmapped(conn, account_id),
"nomenclator": load_nomenclator(conn),
"message": message,
"csrf_token": get_csrf_token(request),
},
)
@router.get("/_fragments/mapari", response_class=HTMLResponse)
def fragment_mapari(request: Request) -> HTMLResponse:
"""Editor mapari: operatii ROAAUTO nemapate + sugestii fuzzy pe nomenclator RAR.
Scoped pe contul sesiunii (C6/task#7): pending_unmapped primeste account_id explicit.
"""
account_id = require_login(request)
conn = get_connection()
try:
return _render_mapari(request, conn, account_id)
finally:
conn.close()
@router.post("/mapari", response_class=HTMLResponse)
def post_mapare(
request: Request,
cod_op_service: str = Form(...),
cod_prestatie: str = Form(...),
csrf_token: str | None = Form(None),
auto_send: bool = Form(False),
) -> HTMLResponse:
"""Salveaza maparea aleasa de user, re-rezolva submission-urile blocate, re-randeaza editorul."""
account_id = require_login(request)
verify_csrf(request, csrf_token)
conn = get_connection()
try:
cod = cod_prestatie.strip().upper()
exists = conn.execute("SELECT 1 FROM nomenclator_rar WHERE cod_prestatie=?", (cod,)).fetchone()
if not exists:
return _render_mapari(request, conn, account_id, message=f"Cod necunoscut: {cod}")
save_mapping(conn, account_id, cod_op_service, cod, auto_send)
stats = reresolve_account(conn, account_id)
msg = (
f"Mapat {cod_op_service.strip()} -> {cod}. "
f"Deblocate: {stats['requeued']} in coada, {stats['needs_data']} cu date lipsa, "
f"{stats['still_blocked']} inca nemapate."
)
return _render_mapari(request, conn, account_id, message=msg)
finally:
conn.close()
# =========================================================================== #
# Import UI (U5) — upload → mapare coloane → preview → confirmare #
# Consuma helper-e din import_router fara a edita fisierul backend. #
# Toate rutele /_import/* returneaza fragmente HTML (target #import-section). #
# =========================================================================== #
def _collect_unmapped_ops(preview_rows: list[dict], nomenclator: list[dict]) -> list[dict]:
"""Operatii distincte nemapate dintr-un preview de import (staging), cu sugestii fuzzy.
Echivalentul lui pending_unmapped() dar pe randuri de PREVIEW (import in staging,
inca neexistente ca submissions). Aduna doar prestatiile fara cod_prestatie
(cele cu auto_send=0 au deja cod -> nu apar aici). Sortare: cele mai blocate intai.
"""
agg: dict[str, dict[str, Any]] = {}
for row in preview_rows:
if row.get("resolved_status") != "needs_mapping":
continue
for item in (row.get("resolved", {}).get("prestatii") or []):
if not isinstance(item, dict) or item.get("cod_prestatie"):
continue
op = (item.get("cod_op_service") or "").strip()
if not op:
continue
entry = agg.setdefault(op, {"cod_op_service": op, "denumire": item.get("denumire"), "blocked": 0})
if not entry["denumire"] and item.get("denumire"):
entry["denumire"] = item.get("denumire")
entry["blocked"] += 1
out: list[dict] = []
for entry in agg.values():
entry["suggestions"] = suggest_codes(entry["denumire"], nomenclator, limit=5)
out.append(entry)
out.sort(key=lambda e: (-e["blocked"], e["cod_op_service"]))
return out
def _web_compute_preview(
conn,
import_id: int,
account_id: int,
) -> dict[str, Any] | str:
"""Calculeaza preview pentru un batch; intoarce date sau str cu mesaj de eroare.
Reutilizeaza _resolve_row_for_preview, _already_sent_lookup, _signature
din import_router. Nu repeta logica de rezolvare — only orchestrare.
"""
acct = account_or_default(account_id)
batch = conn.execute(
"SELECT id, account_id, filename FROM import_batches WHERE id=? AND account_id=?",
(import_id, acct),
).fetchone()
if not batch:
return "Batch de import inexistent sau inaccesibil."
raw_rows_db = conn.execute(
"SELECT row_index, raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index",
(import_id,),
).fetchall()
if not raw_rows_db:
return "Niciun rand in batch."
# Decripteaza randurile
rows: list[dict[str, Any]] = []
for r in raw_rows_db:
try:
row_data = decrypt_creds(r["raw_json"]) or {}
except Exception:
row_data = {}
rows.append(row_data)
col_names = list(rows[0].keys()) if rows else []
sig = _signature(col_names)
mapping_row = conn.execute(
"SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?",
(acct, sig),
).fetchone()
if not mapping_row:
return "Maparea coloanelor nu a fost configurata. Configureaza mai intai maparea."
json_mapare: dict[str, str] = json.loads(mapping_row["json_mapare"])
format_data: str | None = mapping_row["format_data"]
# Mapare operatii (o singura incarcare — Eng#5)
mapping_meta = load_mapping_meta(conn, acct)
mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()}
# Detectie coercion flags din valorile stocate (VIN numeric)
coercion_flags_map: dict[int, list[str]] = {}
for i, row_dict in enumerate(rows):
flags: list[str] = []
for col_f, camp_c in json_mapare.items():
if camp_c == "vin":
vin_val = row_dict.get(col_f)
if vin_val is not None and str(vin_val).replace(".", "").isdigit():
flags.append(f"VIN numeric ({vin_val}) — verificati seria sasiului")
if flags:
coercion_flags_map[i] = flags
# Reconstructie date_col_format din format_data stocat in mapare
date_col_format: dict[str, str] = {}
if format_data:
for col_f, camp_c in json_mapare.items():
if camp_c == "data_prestatie":
date_col_format[col_f] = format_data
# Detectie coloane cu formule (rata None ridicata)
n_rows = len(rows)
formula_columns: list[str] = []
if n_rows > 0:
none_counts = {col_f: sum(1 for r in rows if r.get(col_f) is None) for col_f in col_names}
formula_columns = [col_f for col_f, cnt in none_counts.items() if cnt / n_rows >= 0.6]
# Rezolvare per rand
preview_rows: list[dict[str, Any]] = []
keys_for_lookup: list[str] = []
key_to_indices: dict[str, list[int]] = {}
for i, row_dict in enumerate(rows):
flags_i = coercion_flags_map.get(i, [])
info = _resolve_row_for_preview(
raw_row=row_dict,
json_mapare=json_mapare,
date_col_format=date_col_format,
coercion_flags=flags_i,
mapping=mapping,
mapping_meta=mapping_meta,
formula_columns=formula_columns,
)
key: str | None = None
if info["resolved_status"] in ("ok", "needs_review", "needs_data"):
try:
key = _build_idempotency_key(account_id, info["resolved"])
keys_for_lookup.append(key)
key_to_indices.setdefault(key, []).append(i)
except Exception:
pass
preview_rows.append({
"row_index": i,
"resolved_status": info["resolved_status"],
"resolved": info["resolved"],
"errors": info["errors"],
"flags": info["flags"],
"idempotency_key": key,
})
# Already_sent: batch lookup (Eng#5 — fara N+1)
unique_keys = list(set(keys_for_lookup))
already_sent_map = _already_sent_lookup(conn, account_id, unique_keys)
# Aplica already_sent si duplicate_in_file
for row in preview_rows:
k = row.get("idempotency_key")
if not k:
continue
if k in already_sent_map and row["resolved_status"] in ("ok", "needs_review", "needs_data"):
row["resolved_status"] = "already_sent"
row["already_sent_info"] = already_sent_map[k]
continue
indices_same_key = key_to_indices.get(k, [])
if len(indices_same_key) > 1 and row["resolved_status"] in ("ok", "needs_review", "needs_data"):
row["resolved_status"] = "duplicate_in_file"
row["duplicate_with"] = [idx for idx in indices_same_key if idx != row["row_index"]]
# Rezumat stari
summary: dict[str, int] = {}
for row in preview_rows:
s = row["resolved_status"]
summary[s] = summary.get(s, 0) + 1
# Actualizeaza contoare in import_batches
conn.execute(
"UPDATE import_batches SET ok=?, needs_mapping=?, needs_data=?, needs_review=?, "
"already_sent=?, duplicate_in_file=? WHERE id=?",
(
summary.get("ok", 0),
summary.get("needs_mapping", 0),
summary.get("needs_data", 0),
summary.get("needs_review", 0),
summary.get("already_sent", 0),
summary.get("duplicate_in_file", 0),
import_id,
),
)
# Actualizeaza resolved_status in import_rows
conn.execute("BEGIN IMMEDIATE")
try:
conn.executemany(
"UPDATE import_rows SET resolved_status=? WHERE batch_id=? AND row_index=?",
[(row["resolved_status"], import_id, row["row_index"]) for row in preview_rows],
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
nomenclator = load_nomenclator(conn)
return {
"rows": preview_rows,
"summary": summary,
"total": len(preview_rows),
"filename": batch["filename"],
"unmapped_ops": _collect_unmapped_ops(preview_rows, nomenclator),
"nomenclator": nomenclator,
}
@router.post("/_import/upload", response_class=HTMLResponse)
async def web_upload_import(
request: Request,
file: UploadFile = File(...),
sheet_name: str | None = Form(None),
csrf_token: str | None = Form(None),
) -> HTMLResponse:
"""Upload fisier xlsx/csv → staging; intoarce fragment HTML.
Daca maparea de coloane exista deja (signature match): computa preview imediat.
Daca nu: intoarce formularul de mapare coloane.
Nu editeaza import_router.py — apeleaza parse_file si DB direct.
"""
account_id = require_login(request)
verify_csrf(request, csrf_token)
acct = account_or_default(account_id)
data = await file.read()
filename = file.filename or "fisier"
# Parsare fisier
try:
parsed = parse_file(data, filename, sheet_name=sheet_name)
except MultipleSheets as ms:
return templates.TemplateResponse("_upload.html", _ctx(request, sheets=ms.sheet_names))
except FileTooLarge as e:
return templates.TemplateResponse("_upload.html", _ctx(request, error=str(e)))
except HeaderError as e:
return templates.TemplateResponse("_upload.html", _ctx(request, error=f"Antet neclar: {e}"))
except UnicodeDecodeError as e:
return templates.TemplateResponse("_upload.html", _ctx(request, error=f"Encoding nesuportat: {e.reason}"))
except Exception as e:
return templates.TemplateResponse("_upload.html", _ctx(request, error=f"Fisier nerecunoscut (xlsx/csv): {type(e).__name__}"))
conn = get_connection()
try:
sig = _signature(parsed.columns)
# Stagingul in DB (tranzactie explicita — Issue 6)
conn.execute("BEGIN IMMEDIATE")
try:
cur = conn.execute(
"INSERT INTO import_batches (account_id, filename, status, total, purge_after) "
"VALUES (?, ?, 'staging', ?, datetime('now', '+90 days'))",
(acct, filename, len(parsed.rows)),
)
batch_id = cur.lastrowid
conn.executemany(
"INSERT INTO import_rows (batch_id, row_index, raw_json, resolved_status, error) "
"VALUES (?, ?, ?, 'pending', NULL)",
[
(batch_id, i, encrypt_creds(row_dict))
for i, row_dict in enumerate(parsed.rows)
],
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
# Verifica mapare existenta
existing = conn.execute(
"SELECT json_mapare, format_data FROM column_mappings "
"WHERE account_id=? AND signature_coloane=?",
(acct, sig),
).fetchone()
batch_id_int: int = cur.lastrowid or 0 # lastrowid este int dupa INSERT reusit
if existing:
# Mapare retinuta → computa preview imediat
result = _web_compute_preview(conn, batch_id_int, account_id)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", {
"request": request,
"error": result,
"csrf_token": get_csrf_token(request),
})
return templates.TemplateResponse("_preview_import.html", {
"request": request,
"import_id": batch_id_int,
"message": "Mapare retinuta aplicata automat.",
"csrf_token": get_csrf_token(request),
**result,
})
# Mapare noua — sugestii fuzzy si formular de mapare
fuzzy_suggestions: dict[str, list[dict]] = {}
for col in parsed.columns:
sugg = _fuzzy_suggest_column(col, limit=3)
if sugg:
fuzzy_suggestions[col] = sugg
return templates.TemplateResponse("_mapcoloane.html", {
"request": request,
"import_id": batch_id_int,
"filename": filename,
"columns": parsed.columns,
"sample_rows": parsed.rows[:3],
"fuzzy_suggestions": fuzzy_suggestions,
"canonical_fields": _CANONICAL_FIELDS,
"format_data": None,
"csrf_token": get_csrf_token(request),
})
finally:
conn.close()
@router.post("/_import/{import_id}/mapare-coloane", response_class=HTMLResponse)
async def web_save_mapare_coloane(
request: Request,
import_id: int,
) -> HTMLResponse:
"""Salveaza maparea de coloane si computa preview. Intoarce fragment HTML."""
account_id = require_login(request)
acct = account_or_default(account_id)
form = await request.form()
# Colectare perechi coloana fisier → camp canonic din form
# form.getlist intoarce List[str | UploadFile]; filtram la str (campuri text)
verify_csrf(request, str(form.get("csrf_token") or ""))
colnames = [str(v) for v in form.getlist("colname") if isinstance(v, str)]
canons = [str(v) for v in form.getlist("canon") if isinstance(v, str)]
format_data_val = str(form.get("format_data") or "").strip() or None
# Construieste json_mapare (ignora campurile marcate ca "ignorate")
json_mapare: dict[str, str] = {}
for colname, canon in zip(colnames, canons):
if canon:
json_mapare[colname] = canon
if not json_mapare:
# Nici un camp mapat → re-arata formularul cu eroare
conn = get_connection()
try:
first_row = conn.execute(
"SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1",
(import_id,),
).fetchone()
columns = []
if first_row:
try:
rd = decrypt_creds(first_row["raw_json"]) or {}
columns = list(rd.keys())
except Exception:
pass
fuzzy: dict[str, list[dict]] = {}
for col in columns:
sugg = _fuzzy_suggest_column(col, limit=3)
if sugg:
fuzzy[col] = sugg
return templates.TemplateResponse("_mapcoloane.html", _ctx(
request,
import_id=import_id,
columns=columns,
sample_rows=[],
fuzzy_suggestions=fuzzy,
canonical_fields=_CANONICAL_FIELDS,
format_data=format_data_val,
message="Mapeaza cel putin un camp canonic inainte de a continua.",
error=True,
))
finally:
conn.close()
conn = get_connection()
try:
# Verifica ca batch-ul apartine contului
batch = conn.execute(
"SELECT id FROM import_batches WHERE id=? AND account_id=?",
(import_id, acct),
).fetchone()
if not batch:
return templates.TemplateResponse("_upload.html", _ctx(
request, error="Batch de import inexistent sau expirat."
))
# Semnatura = antetul COMPLET al fisierului (toate coloanele, inclusiv cele
# ignorate), nu doar campurile mapate. Altfel ignorarea unei coloane schimba
# semnatura si maparea nu mai e gasita la preview/re-upload (col_names = antet
# complet peste tot). `colnames` vine din form = toate coloanele randate.
sig = _signature(colnames or list(json_mapare.keys()))
# Salveaza maparea (upsert)
conn.execute(
"INSERT INTO column_mappings (account_id, signature_coloane, json_mapare, format_data) "
"VALUES (?, ?, ?, ?) "
"ON CONFLICT(account_id, signature_coloane) DO UPDATE SET "
"json_mapare=excluded.json_mapare, format_data=excluded.format_data",
(acct, sig, json.dumps(json_mapare, ensure_ascii=False), format_data_val),
)
# Computa preview
result = _web_compute_preview(conn, import_id, account_id)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", _ctx(request, error=result))
return templates.TemplateResponse("_preview_import.html", _ctx(
request, import_id=import_id, **result
))
finally:
conn.close()
@router.get("/_import/{import_id}/preview", response_class=HTMLResponse)
def web_preview_import(
request: Request,
import_id: int,
) -> HTMLResponse:
"""Preview 6 stari per rand. Tinta HTMX dupa mapare retinuta sau navigare directa."""
account_id = require_login(request)
conn = get_connection()
try:
result = _web_compute_preview(conn, import_id, account_id)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", {
"request": request,
"error": result,
"csrf_token": get_csrf_token(request),
})
return templates.TemplateResponse("_preview_import.html", {
"request": request,
"import_id": import_id,
"csrf_token": get_csrf_token(request),
**result,
})
finally:
conn.close()
@router.post("/_import/{import_id}/mapare-operatie", response_class=HTMLResponse)
async def web_mapare_operatie(
request: Request,
import_id: int,
) -> HTMLResponse:
"""Mapeaza o operatie nemapata din preview-ul de import la un cod RAR, in flux.
Salveaza maparea (persistenta, operations_mapping) si re-randeaza preview-ul:
_web_compute_preview recalculeaza cu noua mapare si re-scrie resolved_status in
import_rows, deci randurile afectate trec din needs_mapping in ok fara re-upload.
"""
account_id = require_login(request)
conn = get_connection()
try:
form = await request.form()
verify_csrf(request, str(form.get("csrf_token") or ""))
cod_op_service = str(form.get("cod_op_service") or "").strip()
cod_prestatie = str(form.get("cod_prestatie") or "").strip().upper()
auto_send = bool(form.get("auto_send"))
def _render(message: str | None = None, error: bool = False) -> HTMLResponse:
result = _web_compute_preview(conn, import_id, account_id)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", _ctx(request, error=result))
return templates.TemplateResponse("_preview_import.html", _ctx(
request, import_id=import_id, message=message, error=error, **result
))
if not cod_op_service or not cod_prestatie:
return _render("Alege un cod RAR pentru operatie.", error=True)
exists = conn.execute(
"SELECT 1 FROM nomenclator_rar WHERE cod_prestatie=?", (cod_prestatie,)
).fetchone()
if not exists:
return _render(f"Cod RAR necunoscut: {cod_prestatie}", error=True)
save_mapping(conn, account_id, cod_op_service, cod_prestatie, auto_send)
return _render(f"Mapat {cod_op_service} -> {cod_prestatie}.")
finally:
conn.close()
@router.get("/_import/reset", response_class=HTMLResponse)
def web_import_reset(request: Request) -> HTMLResponse:
"""Reseteaza sectiunea de import la starea initiala (drop zone gol)."""
return templates.TemplateResponse("_upload.html", {
"request": request,
"csrf_token": get_csrf_token(request),
})
@router.post("/_import/{import_id}/confirma", response_class=HTMLResponse)
async def web_confirma_import(
request: Request,
import_id: int,
) -> HTMLResponse:
"""Gate HARD confirmare + enqueue randuri ok + log atestare. Intoarce fragment HTML.
Replica logica din import_router.commit_import dar cu input din form HTML
si raspuns HTML (nu JSON). INSERT per-rand ON CONFLICT DO NOTHING (TOCTOU).
C8/OV-2: account_id din sesiune propagat consecvent la build_key si toate lookup-urile.
C12: require_login — pe scrieri NICIODATA fallback cont 1 in prod.
"""
account_id = require_login(request)
acct = account_or_default(account_id)
form = await request.form()
verify_csrf(request, str(form.get("csrf_token") or ""))
# Parseaza n_confirmat (form.get intoarce str | UploadFile | None → cast la str)
try:
n_confirmat = int(str(form.get("n_confirmat") or "0"))
except (ValueError, TypeError):
n_confirmat = 0
# Randuri needs_review bifate explicit
reviewed_rows: set[int] = set()
for v in form.getlist("reviewed_rows"):
if isinstance(v, str):
try:
reviewed_rows.add(int(v))
except (ValueError, TypeError):
pass
confirmed_by = str(form.get("confirmed_by") or "").strip() or None
conn = get_connection()
try:
batch = conn.execute(
"SELECT id, filename, status FROM import_batches WHERE id=? AND account_id=?",
(import_id, acct),
).fetchone()
if not batch:
return templates.TemplateResponse("_upload.html", _ctx(
request, error="Batch de import inexistent sau expirat."
))
if batch["status"] == "committed":
return templates.TemplateResponse("_upload.html", _ctx(
request, message="Acest batch a fost deja comis."
))
# Incarca randurile cu stare ok si needs_review
ok_rows_db = conn.execute(
"SELECT row_index, raw_json, resolved_status FROM import_rows "
"WHERE batch_id=? AND resolved_status IN ('ok', 'needs_review') ORDER BY row_index",
(import_id,),
).fetchall()
if not ok_rows_db:
# Re-arata preview cu eroare
result = _web_compute_preview(conn, import_id, account_id)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", _ctx(request, error=result))
return templates.TemplateResponse("_preview_import.html", _ctx(
request,
import_id=import_id,
message="Niciun rand ok de confirmat in acest batch.",
error=True,
**result,
))
# Decripteaza si construieste lista de randuri de trimis
to_enqueue: list[dict[str, Any]] = []
review_indices: set[int] = set()
for r in ok_rows_db:
try:
row_data = decrypt_creds(r["raw_json"]) or {}
except Exception:
continue
if r["resolved_status"] == "ok":
to_enqueue.append({"row_index": r["row_index"], "data": row_data, "status": "ok"})
elif r["resolved_status"] == "needs_review":
review_indices.add(r["row_index"])
# Adauga randurile needs_review bifate explicit
for r in ok_rows_db:
if r["resolved_status"] == "needs_review" and r["row_index"] in reviewed_rows:
try:
row_data = decrypt_creds(r["raw_json"]) or {}
to_enqueue.append({"row_index": r["row_index"], "data": row_data, "status": "needs_review"})
except Exception:
pass
n_total_ok = len(to_enqueue)
# Gate HARD: n_confirmat trebuie sa fie exact egal cu numarul de randuri de trimis
if n_confirmat != n_total_ok:
result = _web_compute_preview(conn, import_id, account_id)
msg = (
f"Numarul confirmat ({n_confirmat}) difera de randurile gata de trimis ({n_total_ok}). "
f"Verifica preview-ul si retasteaza numarul corect."
)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", _ctx(request, error=msg))
return templates.TemplateResponse("_preview_import.html", _ctx(
request, import_id=import_id, message=msg, error=True, **result
))
if n_total_ok == 0:
result = _web_compute_preview(conn, import_id, account_id)
if isinstance(result, str):
return templates.TemplateResponse("_upload.html", _ctx(request, error=result))
return templates.TemplateResponse("_preview_import.html", _ctx(
request,
import_id=import_id,
message="Niciun rand ok de confirmat.",
error=True,
**result,
))
# Incarca maparea de coloane pentru payload
first_row_db = conn.execute(
"SELECT raw_json FROM import_rows WHERE batch_id=? ORDER BY row_index LIMIT 1",
(import_id,),
).fetchone()
col_names: list[str] = []
if first_row_db:
try:
fd = decrypt_creds(first_row_db["raw_json"]) or {}
col_names = list(fd.keys())
except Exception:
pass
sig = _signature(col_names) if col_names else ""
mapping_row = conn.execute(
"SELECT json_mapare, format_data FROM column_mappings WHERE account_id=? AND signature_coloane=?",
(acct, sig),
).fetchone()
json_mapare: dict[str, str] = json.loads(mapping_row["json_mapare"]) if mapping_row else {}
fmt = mapping_row["format_data"] if mapping_row else None
# Mapare operatii
mapping_meta = load_mapping_meta(conn, acct)
mapping_ops = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()}
# Enqueue in tranzactie explicita (Issue 6) — INSERT ON CONFLICT DO NOTHING (TOCTOU)
enqueued: list[dict] = []
toctou: list[int] = []
rows_for_hash: list[str] = []
conn.execute("BEGIN IMMEDIATE")
try:
for item in to_enqueue:
row_dict = item["data"]
row_index = item["row_index"]
# Aplica maparea de coloane
mapped: dict[str, Any] = {}
for col_f, camp_c in json_mapare.items():
if col_f in row_dict and camp_c:
mapped[camp_c] = row_dict[col_f]
# Rezolva data
for col_f, camp_c in json_mapare.items():
if camp_c == "data_prestatie":
col_fmt = fmt or "ambiguous"
raw_date = mapped.get("data_prestatie")
if raw_date is not None:
iso_date, _ = parse_date_value(raw_date, col_fmt)
if iso_date:
mapped["data_prestatie"] = iso_date
break
# Operatia → prestatii (denumire_op alimenteaza denumirea reala)
operatie_val = mapped.pop("operatie", None)
denumire_val = mapped.pop("denumire_op", None)
if operatie_val and "prestatii" not in mapped:
denumire = str(denumire_val).strip() if denumire_val not in (None, "") else str(operatie_val)
mapped["prestatii"] = [{"cod_op_service": str(operatie_val), "denumire": denumire}]
# Rezolva prestatii
prestatii = mapped.get("prestatii") or []
resolved_p, _ = resolve_prestatii(prestatii, mapping_ops)
mapped["prestatii"] = resolved_p
# Canonicalizare
canon = canonicalize_row(mapped)
mapped.update({
"vin": canon["vin"],
"nr_inmatriculare": canon["nr_inmatriculare"],
"odometru_final": canon["odometru_final"],
})
key = build_key(account_id, canon)
rows_for_hash.append(json.dumps({
"row_index": row_index,
"vin": mapped.get("vin"),
"data_prestatie": mapped.get("data_prestatie"),
"odometru_final": mapped.get("odometru_final"),
"prestatii": [
str(p.get("cod_prestatie") or p.get("cod_op_service") or "")
for p in resolved_p
],
}, sort_keys=True, ensure_ascii=False))
cur = conn.execute(
"INSERT OR IGNORE INTO submissions "
"(idempotency_key, account_id, status, payload_json, batch_id, row_index, purge_after) "
"VALUES (?, ?, 'queued', ?, ?, ?, datetime('now', '+90 days'))",
(key, acct, json.dumps(mapped, ensure_ascii=False), import_id, row_index),
)
if cur.rowcount == 0:
toctou.append(row_index)
else:
enqueued.append({"submission_id": cur.lastrowid, "row_index": row_index})
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
n_enqueued = len(enqueued)
# Log atestare (Voce#9)
rows_hash = hashlib.sha256(
json.dumps(sorted(rows_for_hash), ensure_ascii=False).encode("utf-8")
).hexdigest() if rows_for_hash else ""
conn.execute(
"INSERT INTO import_attestations (batch_id, account_id, confirmed_by, rows_hash, n_confirmed) "
"VALUES (?, ?, ?, ?, ?)",
(import_id, acct, confirmed_by, rows_hash, n_enqueued),
)
conn.execute(
"UPDATE import_batches SET status='committed', ok=? WHERE id=?",
(n_enqueued, import_id),
)
# Succes → drop zone cu mesaj de confirmare
toctou_msg = f" ({len(toctou)} coliziuni TOCTOU excluse)" if toctou else ""
return templates.TemplateResponse("_upload.html", _ctx(
request,
message=(
f"S-au pus in coada {n_enqueued} prezentari{toctou_msg}. "
f"Procesarea incepe in cateva secunde — urmareste coada de mai jos."
),
))
finally:
conn.close()
# =========================================================================== #
# US-007 — Sectiune "Contul meu": rotire cheie API + creds RAR din UI #
# Rute web proprii scoped pe sesiune (C13: nu reutilizeaza /v1/conturi/rar-creds
# care cere cheie API; sesiunea web e suficienta ca identitate). #
# =========================================================================== #
def _render_cont(
request: Request,
*,
api_key: str | None = None,
are_creds: bool = False,
creds_mesaj: str | None = None,
creds_eroare: str | None = None,
rot_eroare: str | None = None,
) -> HTMLResponse:
"""Randeaza cardul 'Contul meu'. Parola niciodata in value=."""
return templates.TemplateResponse(
"_cont.html",
_ctx(
request,
api_key=api_key,
are_creds=are_creds,
creds_mesaj=creds_mesaj,
creds_eroare=creds_eroare,
rot_eroare=rot_eroare,
),
)
@router.get("/_fragments/cont", response_class=HTMLResponse)
def fragment_cont(request: Request) -> HTMLResponse:
"""Fragment HTMX card 'Contul meu': stare cheie + creds RAR (fara a le expune)."""
account_id = require_login(request)
acct = account_or_default(account_id)
conn = get_connection()
try:
row = conn.execute(
"SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,)
).fetchone()
are_creds = bool(row and row["rar_creds_enc"])
return _render_cont(request, are_creds=are_creds)
finally:
conn.close()
@router.post("/cont/roteste-cheie", response_class=HTMLResponse)
def cont_roteste_cheie(
request: Request,
csrf_token: str | None = Form(None),
) -> HTMLResponse:
"""Revoca toate cheile active si emite una noua. Afisata O SINGURA DATA."""
account_id = require_login(request)
verify_csrf(request, csrf_token)
acct = account_or_default(account_id)
conn = get_connection()
try:
new_key = rotate_api_key(conn, acct)
row = conn.execute(
"SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,)
).fetchone()
are_creds = bool(row and row["rar_creds_enc"])
return _render_cont(request, api_key=new_key, are_creds=are_creds)
finally:
conn.close()
@router.post("/cont/rar-creds", response_class=HTMLResponse)
def cont_rar_creds(
request: Request,
rar_email: str = Form(""),
rar_parola: str = Form(""),
csrf_token: str | None = Form(None),
) -> HTMLResponse:
"""Seteaza creds RAR per cont din sesiune (ruta web proprie, C13).
Camp parola NICIODATA re-pus in value= la re-randare.
Validare minima: email si parola negoale.
"""
account_id = require_login(request)
verify_csrf(request, csrf_token)
acct = account_or_default(account_id)
email = rar_email.strip()
parola = rar_parola.strip()
if not email or not parola:
conn = get_connection()
try:
row = conn.execute(
"SELECT rar_creds_enc FROM accounts WHERE id=?", (acct,)
).fetchone()
are_creds = bool(row and row["rar_creds_enc"])
finally:
conn.close()
return _render_cont(
request,
are_creds=are_creds,
creds_eroare="Email si parola sunt obligatorii.",
)
enc = encrypt_creds({"email": email, "password": parola})
conn = get_connection()
try:
conn.execute(
"UPDATE accounts SET rar_creds_enc=? WHERE id=?",
(enc, acct),
)
return _render_cont(
request,
are_creds=True,
creds_mesaj="Credentialele RAR au fost salvate cu succes.",
)
finally:
conn.close()