feat(T5/dashboard): import DBF idempotent + nomenclator browser + audit CSV + stare RAR

T5 (tools/import_dbf.py): citire prestatii_rar.DBF / mapare_prestatii.DBF cu
dbfread, raport dry-run (randuri valide/duplicate/goale, mapari orfane = cod
necunoscut in nomenclator), --commit cu upsert idempotent in tranzactie.

Dashboard: browser nomenclator, indicator stare RAR (indisponibil? derivat din
ultimul login < 30h, coada arata ultima stare locala), export audit CSV
(/v1/audit/export?status=sent|all&date_from&date_to, b64Image exclus,
coloana purge_after pentru retentia 90z).

Verify: 11 teste noi (test_import_dbf 6, test_dashboard 5), suita 111 pass,
dry-run real pe DBF-urile din repo + smoke live dashboard/CSV.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-06-15 20:32:26 +00:00
parent 6fb92466cb
commit 6ab22ea0fb
8 changed files with 728 additions and 20 deletions

View File

@@ -11,9 +11,12 @@ middleware (CORE) si exportul CSV vin ulterior — marcate TODO unde lipsesc.
from __future__ import annotations
import csv
import io
import json
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from ...auth import resolve_account_id
@@ -152,6 +155,104 @@ def get_nomenclator() -> dict:
conn.close()
AUDIT_COLUMNS = [
"submission_id",
"status",
"id_prezentare",
"account_id",
"vin",
"nr_inmatriculare",
"data_prestatie",
"odometru_final",
"prestatii",
"rar_status_code",
"created_at",
"updated_at",
"purge_after",
]
def _audit_rows(conn, date_from: str | None, date_to: str | None, status: str):
"""Randuri audit (sent implicit) filtrate pe data(updated_at) in [from, to].
payload_json e text in schelet (criptarea PII e P2); citim campurile-cheie
pentru audit. b64_image NU intra in CSV (mare). Daca P2 cripteaza payload-ul,
aici se decripteaza inainte de a construi randul.
"""
sql = "SELECT id, status, id_prezentare, account_id, payload_json, rar_status_code, created_at, updated_at, purge_after FROM submissions"
where = []
params: list = []
if status != "all":
where.append("status=?")
params.append(status)
if date_from:
where.append("date(updated_at) >= date(?)")
params.append(date_from)
if date_to:
where.append("date(updated_at) <= date(?)")
params.append(date_to)
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY id"
for r in conn.execute(sql, params).fetchall():
try:
p = json.loads(r["payload_json"]) if r["payload_json"] else {}
except (ValueError, TypeError):
p = {}
codes = ",".join(
(it.get("cod_prestatie") or it.get("cod_op_service") or "")
for it in (p.get("prestatii") or [])
if isinstance(it, dict)
)
yield {
"submission_id": r["id"],
"status": r["status"],
"id_prezentare": r["id_prezentare"] or "",
"account_id": r["account_id"] or "",
"vin": p.get("vin") or "",
"nr_inmatriculare": p.get("nr_inmatriculare") or "",
"data_prestatie": p.get("data_prestatie") or "",
"odometru_final": p.get("odometru_final") or "",
"prestatii": codes,
"rar_status_code": r["rar_status_code"] or "",
"created_at": r["created_at"],
"updated_at": r["updated_at"],
"purge_after": r["purge_after"] or "",
}
@router.get("/audit/export")
def audit_export(
date_from: str | None = None,
date_to: str | None = None,
status: str = "sent",
) -> StreamingResponse:
"""CSV cu ce s-a trimis (audit). Filtre optionale `date_from`/`date_to` (YYYY-MM-DD)
pe data(updated_at). `status` implicit `sent` (ce a ajuns efectiv la RAR);
`status=all` exporta toata coada. Leaga re_tinerea 90 zile prin coloana
`purge_after` (plan.md sect. 4 + 8). b64_image nu se exporta.
"""
conn = get_connection()
try:
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=AUDIT_COLUMNS)
writer.writeheader()
for row in _audit_rows(conn, date_from, date_to, status):
writer.writerow(row)
data = buf.getvalue()
finally:
conn.close()
fname = f"audit_{status}_{date_from or 'inceput'}_{date_to or 'azi'}.csv"
return StreamingResponse(
iter([data]),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
)
@router.get("/mapari")
def get_mapari(account_id: int | None = None) -> dict:
conn = get_connection()

View File

@@ -41,6 +41,25 @@ def _worker_alive(hb) -> bool:
return age <= get_settings().worker_heartbeat_stale_s
def _rar_state(hb, worker_alive: bool) -> str:
"""Eticheta de disponibilitate RAR, derivata din ultimul login reusit.
Nu interogam RAR live aici (dashboard-ul degradeaza la ultima stare cunoscuta
a cozii). JWT TTL = 30h: un login mai vechi de atat inseamna ca nu mai stim
sigur ca RAR raspunde -> "indisponibil?". Fara niciun login -> necunoscut.
"""
if not worker_alive:
return "necunoscut (worker oprit)"
last = hb["last_rar_login_ok"] if hb else None
if not last:
return "fara login reusit inca"
try:
age = (datetime.now(timezone.utc) - datetime.fromisoformat(last)).total_seconds()
except (ValueError, TypeError):
return "necunoscut"
return "indisponibil?" if age > 108000 else "ok"
@router.get("/", response_class=HTMLResponse)
def dashboard(request: Request) -> HTMLResponse:
conn = get_connection()
@@ -48,20 +67,37 @@ def dashboard(request: Request) -> HTMLResponse:
counts = _status_counts(conn)
hb = read_heartbeat(conn)
blocked = sum(counts.get(s, 0) for s in _BLOCKED)
worker_alive = _worker_alive(hb)
ctx = {
"request": request,
"rar_env": get_settings().rar_env,
"version": __version__,
"counts": counts,
"blocked": blocked,
"worker_alive": _worker_alive(hb),
"worker_alive": worker_alive,
"last_login": hb["last_rar_login_ok"] if hb else None,
"rar_state": _rar_state(hb, worker_alive),
}
return templates.TemplateResponse("dashboard.html", ctx)
finally:
conn.close()
@router.get("/_fragments/nomenclator", response_class=HTMLResponse)
def fragment_nomenclator(request: Request) -> HTMLResponse:
"""Browser nomenclator RAR (cache local upsert-at de worker la fiecare login)."""
conn = get_connection()
try:
rows = conn.execute(
"SELECT cod_prestatie, nume_prestatie, updated_at FROM nomenclator_rar ORDER BY cod_prestatie"
).fetchall()
return templates.TemplateResponse(
"_nomenclator.html", {"request": request, "rows": rows}
)
finally:
conn.close()
@router.get("/_fragments/banner", response_class=HTMLResponse)
def fragment_banner(request: Request) -> HTMLResponse:
conn = get_connection()

View File

@@ -0,0 +1,16 @@
{% if rows %}
<table>
<thead><tr><th>Cod</th><th>Denumire</th><th>Actualizat</th></tr></thead>
<tbody>
{% for r in rows %}
<tr>
<td><span class="pill">{{ r.cod_prestatie }}</span></td>
<td>{{ r.nume_prestatie }}</td>
<td class="muted">{{ r.updated_at }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="empty">Nomenclator gol. Worker-ul il umple la primul login RAR reusit.</div>
{% endif %}

View File

@@ -11,11 +11,17 @@
<div style="display:flex; gap:24px; flex-wrap:wrap;">
<div><div class="muted">Worker</div><div class="{{ 's-sent' if worker_alive else 's-error' }}">
{{ 'viu' if worker_alive else 'mort' }}</div></div>
<div><div class="muted">RAR</div><div class="{{ 's-sent' if rar_state == 'ok' else 's-error' if 'indisponibil' in rar_state else 'muted' }}">{{ rar_state }}</div></div>
<div><div class="muted">Ultimul login RAR</div><div>{{ last_login or '—' }}</div></div>
<div><div class="muted">In coada</div><div>{{ counts.get('queued', 0) }}</div></div>
<div><div class="muted">Trimise</div><div class="s-sent">{{ counts.get('sent', 0) }}</div></div>
<div><div class="muted">Blocate</div><div class="{{ 's-error' if blocked else '' }}">{{ blocked }}</div></div>
</div>
{% if rar_state != 'ok' %}
<p class="muted" style="margin:12px 0 0; font-size:12px;">
RAR posibil indisponibil — coada de mai jos arata ultima stare cunoscuta (local), nu live din RAR.
</p>
{% endif %}
</div>
<!-- incarcat o data; NU poll (sa nu stergem o selectie in curs). Se re-randeaza la salvare. -->
@@ -24,10 +30,23 @@
</div>
<div class="card">
<h2 style="font-size:14px; margin:0 0 12px;">Coada submissions</h2>
<div style="display:flex; align-items:center; gap:12px; margin:0 0 12px;">
<h2 style="font-size:14px; margin:0;">Coada submissions</h2>
<a href="/v1/audit/export?status=sent" style="margin-left:auto; font-size:13px;" download>export audit CSV (trimise)</a>
<a href="/v1/audit/export?status=all" style="font-size:13px;" download>tot</a>
</div>
<div hx-get="/_fragments/submissions" hx-trigger="load, every 10s" hx-swap="innerHTML">
<div class="empty">se incarca…</div>
</div>
</div>
<div class="card">
<details>
<summary style="cursor:pointer; font-size:14px; font-weight:600;">Nomenclator RAR (coduri prestatii)</summary>
<div style="margin-top:12px;" hx-get="/_fragments/nomenclator" hx-trigger="load" hx-swap="innerHTML">
<div class="empty">se incarca…</div>
</div>
</details>
</div>
{% endblock %}

View File

@@ -190,8 +190,11 @@ Nimic din cod nu e scris încă (`app/`, `tools/` nu există). Ordine recomandat
Capturate: format eroare `data:[{field,message}]` + 3 mesaje exacte (VIN O/I/Q, dată veche, dată viitoare),
forma răspuns success, `idPrezentare==id`, `idAgent` server-side, `sistemReparat:"null"` acceptat, `b64Image`/`odometruInitial` omise OK.
**Descoperire: WAF cere `User-Agent` (altfel 403).** Toate detaliile în `docs/api-rar-contract.md`.
- [ ] **T5 (P1) — `tools/import_dbf.py`** dry-run + raport pe `mapare_prestatii.DBF` / `prestatii_rar.DBF`, apoi import în SQLite.
Verify: raport rânduri valide/orfane/coduri necunoscute; import idempotent. (Stub creat — neimplementat.)
- [x] **T5 (P1) — `tools/import_dbf.py`** ✅ 2026-06-15. `dbfread` → raport (rânduri valide, duplicate, goale, mapări
orfane = cod necunoscut în nomenclator) pe `prestatii_rar.DBF` (20 coduri) + `mapare_prestatii.DBF` (gol în arhivă).
Default dry-run; `--commit` scrie idempotent (upsert pe `nomenclator_rar` PK + `operations_mapping` UNIQUE), tranzacție
`BEGIN IMMEDIATE`/ROLLBACK. Verify: 6 teste (`tests/test_import_dbf.py`, cu writer dBASE III minimal pentru fixturi) +
dry-run real pe DBF-urile din repo.
- [x] **Schelet repo** — ✅ 2026-06-15. `app/api/v1`, `app/rar_client.py` (cu User-Agent), `app/worker`, `app/web`, SQLite (WAL),
`Dockerfile` + `docker compose`, `/healthz` verde. Verificat: login prin client OK, nomenclator 18 coduri,
worker heartbeat → `worker_alive=True`, enqueue + dedup idempotency funcționale.
@@ -230,7 +233,12 @@ Nimic din cod nu e scris încă (`app/`, `tools/` nu există). Ordine recomandat
documentează env-urile. **Fix critic descoperit la split-ul în 2 containere:** `AUTOPASS_CREDS_KEY` trebuie PARTAJATĂ
api↔worker (altfel worker nu decriptează creds) — acum impusă în compose (`${...:?}` → fail explicit dacă lipsește).
Verify: 2 teste (`tests/test_deploy.py`).
- [ ] **Dashboard** (Jinja2+HTMX) cu stările empty/error/RAR-indisponibil + banner alertă. Apoi `/design-review` pe UI-ul live.
- [x] **Dashboard** (Jinja2+HTMX) ✅ 2026-06-15. Stări explicite: empty (coadă/mapări goale cu CTA), error (needs_mapping
în editor + motiv pe submission), **RAR indisponibil** (indicator stare RAR derivat din ultimul login < 30h coada arată
ultima stare cunoscută local, nu live), banner alertă blocate (poll 15s). Componente: status worker/RAR, editor mapări
fuzzy, **browser nomenclator**, coadă (poll 10s), **export audit CSV** (`GET /v1/audit/export?status=sent|all&date_from&date_to`,
b64Image exclus, coloană `purge_after`). Verify: 5 teste (`tests/test_dashboard.py`) + smoke live. **Rămas: `/design-review`
pe UI-ul live** (cosmetic, neblocant).
### De decis ulterior (urmărit, nu blocant)
- **[P2]** Defer criptare-at-rest + purjare 90z până după primul postPrezentare real reușit? (gold-plating vs. privacy-argument-de-adopție).

109
tests/test_dashboard.py Normal file
View File

@@ -0,0 +1,109 @@
"""Teste dashboard + audit CSV: nomenclator browser, stare RAR, export CSV."""
from __future__ import annotations
import csv
import io
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
@pytest.fixture()
def client(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
from app.main import app
with TestClient(app) as c:
yield c
get_settings.cache_clear()
def _body(**over):
prez = {
"vin": "WVWZZZ1KZAW000123",
"nr_inmatriculare": "B999TST",
"data_prestatie": "2026-06-15",
"odometru_final": "123456",
"prestatii": [{"cod_prestatie": "OE-1"}],
}
prez.update(over)
return {"rar_credentials": {"email": "x@y.ro", "password": "s"}, "prezentari": [prez]}
# --------------------------------------------------------------------------- #
# Dashboard render + fragmente #
# --------------------------------------------------------------------------- #
def test_dashboard_renders_with_rar_state(client):
r = client.get("/")
assert r.status_code == 200
# worker neavand heartbeat -> stare RAR necunoscuta (worker oprit)
assert "worker oprit" in r.text
assert "Nomenclator RAR" in r.text
def test_nomenclator_fragment_lists_seed(client):
r = client.get("/_fragments/nomenclator")
assert r.status_code == 200
# seed fallback are 18 coduri; OE-1 + R-ODO trebuie sa apara
assert "OE-1" in r.text
assert "R-ODO" in r.text
def test_submissions_fragment_empty_state(client):
r = client.get("/_fragments/submissions")
assert r.status_code == 200
assert "Coada e goala" in r.text
# --------------------------------------------------------------------------- #
# Audit CSV export #
# --------------------------------------------------------------------------- #
def test_audit_export_sent_only(client):
# un submission queued (validare ok) + unul needs_data
client.post("/v1/prezentari", json=_body())
client.post("/v1/prezentari", json=_body(vin="BAD"))
# status implicit = sent -> niciun rand (nimic trimis inca), doar header
r = client.get("/v1/audit/export")
assert r.status_code == 200
assert r.headers["content-type"].startswith("text/csv")
assert "attachment" in r.headers["content-disposition"]
rows = list(csv.DictReader(io.StringIO(r.text)))
assert rows == []
# status=all -> ambele, cu coloane-cheie populate, fara b64_image
r = client.get("/v1/audit/export?status=all")
rows = list(csv.DictReader(io.StringIO(r.text)))
assert len(rows) == 2
assert "vin" in rows[0]
assert "b64_image" not in rows[0]
vins = {row["vin"] for row in rows}
assert "WVWZZZ1KZAW000123" in vins
# prestatii = coduri concatenate
assert any(row["prestatii"] == "OE-1" for row in rows)
def test_audit_export_marks_sent_after_update(client):
client.post("/v1/prezentari", json=_body())
# marcam manual sent (worker ar face asta dupa postPrezentare reusit)
from app.db import get_connection
conn = get_connection()
try:
conn.execute("UPDATE submissions SET status='sent', id_prezentare=68514 WHERE id=1")
finally:
conn.close()
rows = list(csv.DictReader(io.StringIO(client.get("/v1/audit/export").text)))
assert len(rows) == 1
assert rows[0]["status"] == "sent"
assert rows[0]["id_prezentare"] == "68514"

205
tests/test_import_dbf.py Normal file
View File

@@ -0,0 +1,205 @@
"""Teste T5: import DBF -> SQLite (dry-run/raport + commit idempotent).
`mapare_prestatii.DBF` real e gol, deci pentru maparile reale scriem fixturi DBF
dBASE III minimale. Nomenclatorul real (`prestatii_rar.DBF`, 20 randuri) e citit
direct pentru un test de read pe date reale.
"""
from __future__ import annotations
import os
import struct
import tempfile
from pathlib import Path
import pytest
from app.config import ROOT
# --------------------------------------------------------------------------- #
# Helper: scriitor dBASE III minimal pentru fixturi #
# --------------------------------------------------------------------------- #
def write_dbf(path: Path, fields: list[tuple[str, str, int]], records: list[dict]) -> None:
"""Scrie un DBF dBASE III. fields = [(nume, tip C/L, lungime)]."""
header = bytearray(32)
header[0] = 0x03 # dBASE III, fara memo
header[1:4] = bytes((25, 1, 1)) # data ultimei actualizari (fictiva)
n_fields = len(fields)
header_len = 32 + 32 * n_fields + 1
record_len = 1 + sum(length for _, _, length in fields)
struct.pack_into("<I", header, 4, len(records))
struct.pack_into("<H", header, 8, header_len)
struct.pack_into("<H", header, 10, record_len)
field_descs = bytearray()
for name, ftype, length in fields:
fd = bytearray(32)
fd[0:11] = name.encode("ascii").ljust(11, b"\x00")[:11]
fd[11] = ord(ftype)
fd[16] = length
field_descs += fd
body = bytearray()
for rec in records:
body += b"\x20" # flag stergere = spatiu (activ)
for name, ftype, length in fields:
val = rec.get(name)
if ftype == "L":
body += (b"T" if val else b"F")
else:
s = "" if val is None else str(val)
body += s.encode("cp1252", "replace").ljust(length, b" ")[:length]
path.write_bytes(bytes(header) + bytes(field_descs) + b"\x0d" + bytes(body) + b"\x1a")
MAPARE_FIELDS = [("COD_OP", "C", 10), ("DESCR_OP", "C", 40), ("COD_RAR", "C", 10), ("AUTO_SEND", "L", 1)]
PREST_FIELDS = [("COD_PREST", "C", 10), ("NUME_PREST", "C", 60)]
@pytest.fixture()
def env(monkeypatch):
tmp = tempfile.mkdtemp()
monkeypatch.setenv("AUTOPASS_DB_PATH", os.path.join(tmp, "t.db"))
from app.config import get_settings
get_settings.cache_clear()
yield Path(tmp)
get_settings.cache_clear()
# --------------------------------------------------------------------------- #
# Read pe DBF #
# --------------------------------------------------------------------------- #
def test_read_nomenclator_real_dbf():
from tools.import_dbf import read_nomenclator
rep = read_nomenclator(ROOT / "prestatii_rar.DBF")
assert len(rep["rows"]) == 20
codes = {r["cod_prestatie"] for r in rep["rows"]}
assert "OE-1" in codes and "R-ODO" in codes
# codurile sunt normalizate upper
assert all(r["cod_prestatie"] == r["cod_prestatie"].upper() for r in rep["rows"])
def test_read_mapari_valid_blank_duplicate(tmp_path):
from tools.import_dbf import read_mapari
p = tmp_path / "m.DBF"
write_dbf(
p,
MAPARE_FIELDS,
[
{"COD_OP": "OP1", "DESCR_OP": "Reparatie motor", "COD_RAR": "oe-1", "AUTO_SEND": True},
{"COD_OP": "OP2", "DESCR_OP": "Schimb ulei", "COD_RAR": "OE-2", "AUTO_SEND": False},
{"COD_OP": "", "DESCR_OP": "fara cod op", "COD_RAR": "OE-3", "AUTO_SEND": True}, # blank
{"COD_OP": "OP4", "DESCR_OP": "fara cod rar", "COD_RAR": "", "AUTO_SEND": True}, # blank
{"COD_OP": "OP1", "DESCR_OP": "duplicat", "COD_RAR": "OE-9", "AUTO_SEND": True}, # duplicate
],
)
rep = read_mapari(p)
assert len(rep["rows"]) == 2
assert rep["blanks"] == 2
assert rep["duplicates"] == ["OP1"]
# cod_rar normalizat upper, auto_send pastrat
assert rep["rows"][0]["cod_prestatie"] == "OE-1"
assert rep["rows"][0]["auto_send"] is True
assert rep["rows"][1]["auto_send"] is False
# --------------------------------------------------------------------------- #
# Orfane #
# --------------------------------------------------------------------------- #
def test_find_orphans():
from tools.import_dbf import find_orphans
mapari = [
{"cod_op_service": "OP1", "cod_prestatie": "OE-1", "denumire": "x"},
{"cod_op_service": "OP2", "cod_prestatie": "ZZZ", "denumire": "y"},
]
orphans = find_orphans(mapari, {"OE-1", "OE-2"})
assert len(orphans) == 1
assert orphans[0]["cod_op_service"] == "OP2"
# --------------------------------------------------------------------------- #
# End-to-end dry-run + commit + idempotenta #
# --------------------------------------------------------------------------- #
def _fixtures(dirpath: Path) -> tuple[Path, Path]:
mp = dirpath / "mapare.DBF"
np_ = dirpath / "prest.DBF"
write_dbf(
np_,
PREST_FIELDS,
[{"COD_PREST": "OE-1", "NUME_PREST": "REPARATIE"}, {"COD_PREST": "R-ODO", "NUME_PREST": "REPARATIE ODOMETRU"}],
)
write_dbf(
mp,
MAPARE_FIELDS,
[
{"COD_OP": "OP1", "DESCR_OP": "Reparatie", "COD_RAR": "OE-1", "AUTO_SEND": True},
{"COD_OP": "OP2", "DESCR_OP": "Operatie necunoscuta", "COD_RAR": "XYZ", "AUTO_SEND": False}, # orfan
],
)
return mp, np_
def test_dry_run_does_not_write(env):
from tools.import_dbf import run
from app.db import get_connection
mp, np_ = _fixtures(env)
res = run(commit=False, mapare_path=mp, prest_path=np_)
assert res["written"] == {"nomenclator": 0, "mapari": 0}
assert len(res["orphans"]) == 1
assert res["orphans"][0]["cod_op_service"] == "OP2"
conn = get_connection()
try:
n = conn.execute("SELECT COUNT(*) AS n FROM operations_mapping").fetchone()["n"]
assert n == 0
finally:
conn.close()
def test_commit_writes_and_is_idempotent(env):
from tools.import_dbf import run
from app.db import get_connection
mp, np_ = _fixtures(env)
res1 = run(commit=True, mapare_path=mp, prest_path=np_)
assert res1["written"] == {"nomenclator": 2, "mapari": 2}
conn = get_connection()
try:
maps = conn.execute(
"SELECT cod_op_service, cod_prestatie, auto_send FROM operations_mapping ORDER BY cod_op_service"
).fetchall()
assert [(m["cod_op_service"], m["cod_prestatie"], m["auto_send"]) for m in maps] == [
("OP1", "OE-1", 1),
("OP2", "XYZ", 0),
]
nom = conn.execute("SELECT COUNT(*) AS n FROM nomenclator_rar").fetchone()["n"]
finally:
conn.close()
# A doua rulare nu duplica (upsert pe cheile UNIQUE).
run(commit=True, mapare_path=mp, prest_path=np_)
conn = get_connection()
try:
assert conn.execute("SELECT COUNT(*) AS n FROM operations_mapping").fetchone()["n"] == 2
assert conn.execute("SELECT COUNT(*) AS n FROM nomenclator_rar").fetchone()["n"] == nom
finally:
conn.close()
def test_missing_dbf_raises(env):
from tools.import_dbf import run
with pytest.raises(FileNotFoundError):
run(commit=False, mapare_path=env / "nope.DBF", prest_path=env / "nope2.DBF")

View File

@@ -1,15 +1,18 @@
#!/usr/bin/env python3
"""Import DBF -> SQLite (T5 — SCHELET, neimplementat inca).
"""Import DBF ROAAUTO -> SQLite gateway (T5).
Plan.md sect. 7: dry-run + raport intai (randuri valide, mapari orfane, coduri
necunoscute in nomenclator), apoi scrie in SQLite. Surse:
- mapare_prestatii.DBF -> operations_mapping
- prestatii_rar.DBF -> nomenclator_rar
(rar_log.DBF NU se migreaza.)
- prestatii_rar.DBF (COD_PREST, NUME_PREST) -> nomenclator_rar
- mapare_prestatii.DBF (COD_OP, DESCR_OP, COD_RAR, AUTO_SEND) -> operations_mapping
(rar_log.DBF NU se migreaza — jurnalul nou e `submissions` + live din RAR.)
Utilizare (cand e implementat):
python -m tools.import_dbf --dry-run
python -m tools.import_dbf --commit
Importul e IDEMPOTENT (upsert pe cheile UNIQUE), deci rularea repetata nu duplica.
Default = dry-run (raport, fara scriere). `--commit` scrie efectiv.
Utilizare:
python -m tools.import_dbf # dry-run + raport
python -m tools.import_dbf --commit # scrie in SQLite
Necesita: pip install dbfread
"""
@@ -18,19 +21,230 @@ from __future__ import annotations
import argparse
import sys
from pathlib import Path
from typing import Any
from dbfread import DBF
from app.config import ROOT
from app.db import get_connection, init_db
from app.mapping import DEFAULT_ACCOUNT_ID
# DBF-urile vin din arhiva ROAAUTO din radacina repo-ului.
MAPARE_DBF = ROOT / "mapare_prestatii.DBF"
PREST_DBF = ROOT / "prestatii_rar.DBF"
# Language driver al DBF-urilor = 0x03 (Windows ANSI / cp1252). Diacriticele
# scrise ca literal '?' sunt in sursa, nu un artefact de encoding.
DBF_ENCODING = "cp1252"
# --------------------------------------------------------------------------- #
# Citire DBF -> randuri normalizate (pur, fara DB) #
# --------------------------------------------------------------------------- #
def _field(rec: dict, *names: str) -> Any:
"""Primul camp negol dintr-o lista de nume alternative (tolerant la schema)."""
for n in names:
if n in rec and rec[n] is not None:
return rec[n]
return None
def read_nomenclator(path: Path, *, encoding: str = DBF_ENCODING) -> dict[str, Any]:
"""Citeste prestatii_rar.DBF. Intoarce raport + randuri valide.
{rows: [{cod_prestatie, nume_prestatie}], duplicates: [cod...], blanks: int}
cod_prestatie normalizat strip().upper(); duplicate = acelasi cod de 2+ ori
(pastram prima aparitie).
"""
rows: list[dict[str, str]] = []
seen: set[str] = set()
duplicates: list[str] = []
blanks = 0
for rec in DBF(str(path), encoding=encoding, char_decode_errors="replace"):
cod = str(_field(rec, "COD_PREST", "COD_PRESTATIE", "COD") or "").strip().upper()
nume = str(_field(rec, "NUME_PREST", "NUME_PRESTATIE", "NUME") or "").strip()
if not cod:
blanks += 1
continue
if cod in seen:
duplicates.append(cod)
continue
seen.add(cod)
rows.append({"cod_prestatie": cod, "nume_prestatie": nume})
return {"rows": rows, "duplicates": duplicates, "blanks": blanks}
def read_mapari(path: Path, *, encoding: str = DBF_ENCODING) -> dict[str, Any]:
"""Citeste mapare_prestatii.DBF. Intoarce raport + randuri valide.
{rows: [{cod_op_service, denumire, cod_prestatie, auto_send}],
duplicates: [cod_op...], blanks: int}
Rand valid = are si COD_OP si COD_RAR. blanks = randuri carora le lipseste
unul din ele. duplicate = acelasi COD_OP de 2+ ori (pastram prima aparitie).
"""
rows: list[dict[str, Any]] = []
seen: set[str] = set()
duplicates: list[str] = []
blanks = 0
for rec in DBF(str(path), encoding=encoding, char_decode_errors="replace"):
op = str(_field(rec, "COD_OP", "COD_OP_SERVICE") or "").strip()
cod = str(_field(rec, "COD_RAR", "COD_PRESTATIE", "COD_PREST") or "").strip().upper()
denumire = str(_field(rec, "DESCR_OP", "DENUMIRE", "DESCRIERE") or "").strip()
auto = _field(rec, "AUTO_SEND")
auto_send = bool(auto) if auto is not None else True
if not op or not cod:
blanks += 1
continue
if op in seen:
duplicates.append(op)
continue
seen.add(op)
rows.append(
{"cod_op_service": op, "denumire": denumire, "cod_prestatie": cod, "auto_send": auto_send}
)
return {"rows": rows, "duplicates": duplicates, "blanks": blanks}
def find_orphans(mapari: list[dict], known_codes: set[str]) -> list[dict]:
"""Mapari al caror cod_prestatie nu exista in nomenclator (nu pot fi trimise)."""
return [m for m in mapari if m["cod_prestatie"] not in known_codes]
# --------------------------------------------------------------------------- #
# Scriere SQLite (idempotenta) #
# --------------------------------------------------------------------------- #
def write_nomenclator(conn, rows: list[dict]) -> int:
conn.executemany(
"INSERT INTO nomenclator_rar (cod_prestatie, nume_prestatie, updated_at) "
"VALUES (?, ?, datetime('now')) "
"ON CONFLICT(cod_prestatie) DO UPDATE SET nume_prestatie=excluded.nume_prestatie, "
"updated_at=datetime('now')",
[(r["cod_prestatie"], r["nume_prestatie"]) for r in rows],
)
return len(rows)
def write_mapari(conn, rows: list[dict], account_id: int) -> int:
conn.executemany(
"INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
"VALUES (?, ?, ?, ?) "
"ON CONFLICT(account_id, cod_op_service) DO UPDATE SET "
"cod_prestatie=excluded.cod_prestatie, auto_send=excluded.auto_send",
[(account_id, r["cod_op_service"], r["cod_prestatie"], 1 if r["auto_send"] else 0) for r in rows],
)
return len(rows)
# --------------------------------------------------------------------------- #
# Raport + CLI #
# --------------------------------------------------------------------------- #
def build_report(
nomenclator: dict, mapari: dict, orphans: list[dict], *, account_id: int
) -> str:
lines: list[str] = []
lines.append("=== Import DBF ROAAUTO -> SQLite (raport) ===")
lines.append(f"Cont tinta: account_id={account_id}")
lines.append("")
lines.append("nomenclator_rar (<- prestatii_rar.DBF):")
lines.append(f" randuri valide : {len(nomenclator['rows'])}")
lines.append(f" duplicate cod : {len(nomenclator['duplicates'])} {sorted(set(nomenclator['duplicates'])) or ''}".rstrip())
lines.append(f" randuri goale : {nomenclator['blanks']}")
lines.append("")
lines.append("operations_mapping (<- mapare_prestatii.DBF):")
lines.append(f" randuri valide : {len(mapari['rows'])}")
lines.append(f" duplicate COD_OP: {len(mapari['duplicates'])} {sorted(set(mapari['duplicates'])) or ''}".rstrip())
lines.append(f" randuri goale : {mapari['blanks']} (lipsa COD_OP sau COD_RAR)")
lines.append(f" mapari ORFANE : {len(orphans)} (cod_prestatie necunoscut in nomenclator)")
for m in orphans:
lines.append(f" - {m['cod_op_service']} -> {m['cod_prestatie']} ({m['denumire'] or 'fara denumire'})")
return "\n".join(lines)
def run(
*,
commit: bool,
account_id: int = DEFAULT_ACCOUNT_ID,
mapare_path: Path = MAPARE_DBF,
prest_path: Path = PREST_DBF,
encoding: str = DBF_ENCODING,
) -> dict[str, Any]:
"""Citeste DBF-urile, construieste raportul si (optional) scrie in SQLite.
Intoarce {report, nomenclator, mapari, orphans, written:{nomenclator,mapari}}.
"""
missing = [str(p) for p in (prest_path, mapare_path) if not p.exists()]
if missing:
raise FileNotFoundError("DBF lipsa: " + ", ".join(missing))
nomenclator = read_nomenclator(prest_path, encoding=encoding)
mapari = read_mapari(mapare_path, encoding=encoding)
init_db()
conn = get_connection()
try:
# Coduri cunoscute = nomenclatorul ce urmeaza importat + ce e deja in DB
# (seed fallback / live din worker). Asa orfanele sunt detectate corect
# chiar daca prestatii_rar.DBF nu acopera toate codurile.
db_codes = {r["cod_prestatie"] for r in conn.execute("SELECT cod_prestatie FROM nomenclator_rar")}
known = db_codes | {r["cod_prestatie"] for r in nomenclator["rows"]}
orphans = find_orphans(mapari["rows"], known)
written = {"nomenclator": 0, "mapari": 0}
if commit:
conn.execute("BEGIN IMMEDIATE")
try:
written["nomenclator"] = write_nomenclator(conn, nomenclator["rows"])
written["mapari"] = write_mapari(conn, mapari["rows"], account_id)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
report = build_report(nomenclator, mapari, orphans, account_id=account_id)
return {
"report": report,
"nomenclator": nomenclator,
"mapari": mapari,
"orphans": orphans,
"written": written,
}
finally:
conn.close()
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Import DBF ROAAUTO -> SQLite gateway (T5)")
parser.add_argument("--dry-run", action="store_true", help="raport fara scriere (default)")
parser.add_argument("--commit", action="store_true", help="scrie in SQLite dupa confirmare")
parser.parse_args(argv)
parser.add_argument("--commit", action="store_true", help="scrie in SQLite (implicit: doar raport)")
parser.add_argument("--account-id", type=int, default=DEFAULT_ACCOUNT_ID, help="cont tinta pentru mapari")
parser.add_argument("--mapare", type=Path, default=MAPARE_DBF, help="cale mapare_prestatii.DBF")
parser.add_argument("--nomenclator", type=Path, default=PREST_DBF, help="cale prestatii_rar.DBF")
parser.add_argument("--encoding", default=DBF_ENCODING, help=f"encoding DBF (implicit {DBF_ENCODING})")
args = parser.parse_args(argv)
print("tools/import_dbf.py este SCHELET (T5). De implementat:")
print(" 1. citeste mapare_prestatii.DBF + prestatii_rar.DBF cu dbfread")
print(" 2. raport: randuri valide, mapari orfane, coduri necunoscute in nomenclator")
print(" 3. la --commit: INSERT idempotent in operations_mapping / nomenclator_rar")
return 1
try:
result = run(
commit=args.commit,
account_id=args.account_id,
mapare_path=args.mapare,
prest_path=args.nomenclator,
encoding=args.encoding,
)
except FileNotFoundError as exc:
print(f"EROARE: {exc}", file=sys.stderr)
return 2
print(result["report"])
print("")
if args.commit:
w = result["written"]
print(f"COMMIT: scris {w['nomenclator']} coduri nomenclator, {w['mapari']} mapari (idempotent).")
else:
print("DRY-RUN: nimic scris. Reia cu --commit dupa ce verifici raportul.")
return 0
if __name__ == "__main__":