Files
rar-autopass/app/mapping.py
Claude Agent 3fc53534e2 feat(5.15+5.14): CLOSE — fix-uri code-review + embeddings functional
5.15 (propagare design + dashboard editare) si 5.14 (mapare LLM distilata)
inchise dupa /code-review high. 8 buguri reparate TDD:

- HIGH modal nu se deschidea pe randul slim (base.html: trimitere-slim)
- HIGH /repune trunchia prestatii (declaratie incompleta la RAR) -> iterare
  peste existing, codes pozitional
- HIGH embeddings incarca model ~230MB degeaba pe corpus gol -> poarta has_corpus()
- HIGH picker chips gol pe re-render eroare -> conn/account_id pe toate ramurile
- MED obs re-derivat dupa stergere explicita -> _merge_override pastreaza obs=''
- MED mapare salvata fara denumire poluă GOLD -> _record_gold_validation guard
- MED typo nome_prestatie -> nume_prestatie in select /repune
- MED bucketare timp +3h gresita iarna -> SQLite localtime + TZ=Europe/Bucharest

Embeddings WIRE-uit functional (PRD #15, decizie user): ensure_embeddings_corpus
construieste corpus din nomenclator, gated pe AUTOPASS_EMBEDDINGS_ENABLED (default
off). Marime model corectata ~50MB->~230MB (estimare PRD gresita).

Cleanup: hoist load_* din bucla bulk-fix; import re la top.
Regresie: 1256 passed, 1 deselected (live), 0 failed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 20:48:34 +00:00

820 lines
34 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Mapare operatie ROAAUTO -> cod prestatie RAR + fuzzy lookup pentru editor.
Contract (varianta hibrida): un item de prestatie poate veni
fie cu `cod_prestatie` (cod RAR direct), fie cu `cod_op_service`
(cod intern ROAAUTO) + `denumire`. La ingestie incercam sa rezolvam codul intern
prin `operations_mapping`; daca nu exista mapare -> submission `needs_mapping`
(nu se trimite la RAR), iar operatia apare in editorul web unde userul o mapeaza
cu ajutorul unei sugestii fuzzy pe nomenclatorul RAR. La salvarea maparii,
submission-urile blocate pe acel cod se re-rezolva automat.
Functiile de la inceput (normalize/suggest/resolve) sunt PURE (fara DB/HTTP),
unit-testabile direct. Cele cu `conn` sunt helpere de persistenta.
"""
from __future__ import annotations
import hashlib
import json
import unicodedata
from typing import Any
from rapidfuzz import fuzz, process
from . import errors as err_mod
from .nomenclator_seed import FALLBACK_NOMENCLATOR
from .validation import validate_prezentare
# Cont implicit cat timp auth API-key (CORE) nu e implementat: ingestiile vin cu
# account_id NULL si le atribuim contului seed-at in schema (id=1).
DEFAULT_ACCOUNT_ID = 1
# Sub acest scor (0..100) nu preselectam nicio sugestie — userul alege manual.
SUGGEST_MIN_SCORE = 60
# --------------------------------------------------------------------------- #
# Pur: normalizare + fuzzy + rezolvare #
# --------------------------------------------------------------------------- #
def normalize_for_match(value: object) -> str:
"""Upper + fara diacritice + spatii colapsate, pentru potrivire robusta.
'Reparație motor' -> 'REPARATIE MOTOR'. Diacriticele romanesti (ă/â/î/ș/ț)
si artefactele de encoding nu trebuie sa strice scorul fuzzy.
"""
s = str(value or "")
s = unicodedata.normalize("NFKD", s)
s = "".join(ch for ch in s if not unicodedata.combining(ch))
return " ".join(s.upper().split())
def suggest_codes(
denumire: object,
nomenclator: list[dict],
*,
limit: int = 5,
) -> list[dict]:
"""Clasament fuzzy al codurilor RAR pentru o denumire de operatie ROAAUTO.
`nomenclator` = randuri {cod_prestatie, nume_prestatie}. Intoarce
[{cod_prestatie, nume_prestatie, score}] sortat descrescator dupa scor.
Daca denumirea e goala, intoarce nomenclatorul in ordinea data, scor 0.
"""
query = normalize_for_match(denumire)
rows = [r for r in nomenclator if (r.get("cod_prestatie") or "")]
if not query:
return [{**r, "score": 0.0} for r in rows[:limit]]
choices = {r["cod_prestatie"]: normalize_for_match(r.get("nume_prestatie")) for r in rows}
by_cod = {r["cod_prestatie"]: r for r in rows}
# token_sort_ratio (nu token_set_ratio): recompenseaza acoperirea cat mai multor
# cuvinte din denumire, in loc sa dea 100 la orice subset (ex. "REPARATIE" si
# "REPARATIE ODOMETRU" ar fi egale la set_ratio).
ranked = process.extract(
query,
choices,
scorer=fuzz.token_sort_ratio,
limit=limit,
)
# process.extract -> [(value, score, key)]; key = cod_prestatie.
return [
{
"cod_prestatie": cod,
"nume_prestatie": by_cod[cod].get("nume_prestatie"),
"score": float(score),
}
for _val, score, cod in ranked
]
# Prefixul pus pe `cod_sursa` cand un item e rezolvat printr-o regula text.
# Forma: "text_rule:<pattern original al regulii castigatoare>". Payload-harmless —
# RAR citeste doar `cod_prestatie`; `cod_sursa` ramane in payload_json fara efect.
COD_SURSA_TEXT_RULE_PREFIX = "text_rule:"
def _rezolva_din_reguli_text(
item: dict,
text_rules: list[dict] | None,
valid_codes: set[str] | None,
) -> tuple[str | None, str | None, bool | None]:
"""Cauta prima regula text (in ordinea data) al carei pattern e substring al
textului operatiei. Intoarce (cod uppercase, pattern original, auto_send) daca e
valid, altfel (None, None, None).
Textul operatiei = `denumire` daca exista, altfel `cod_op_service`. Ambele parti
(text si pattern) se normalizeaza cu `normalize_for_match` (fara diacritice,
uppercase, spatii colapsate) -> match insensibil la caz/diacritice.
`text_rules` e deja ordonata (priority ASC, id ASC) de `load_text_rules`, deci
prima regula care da match castiga. Daca regula castigatoare are un cod absent din
`valid_codes` (cand `valid_codes` e setat), nu intoarcem un cod invalid ->
(None, None, None) (operatia ramane nemapata), coerent cu garda din `resolve_prestatii`.
Pattern-ul intors e cel ORIGINAL al regulii (pentru telemetrie), nu cel
normalizat folosit la match. `auto_send` = flagul regulii castigatoare: cand e
falsy (DEFAULT 0, de siguranta) randul trebuie TINUT pentru verificare umana, nu
trimis automat la RAR (blast radius substring + FINALIZATA ireversibil).
"""
if not text_rules:
return None, None, None
text = normalize_for_match(item.get("denumire") or item.get("cod_op_service"))
if not text:
return None, None, None
for rule in text_rules:
pat = normalize_for_match(rule.get("pattern"))
if not pat or pat not in text:
continue
# Prima regula care da match castiga.
cod = (rule.get("cod_prestatie") or "").strip().upper()
if not cod:
return None, None, None
if valid_codes is not None and cod not in valid_codes:
return None, None, None # cod invalid in nomenclator -> nu il punem; ramane nemapat
return cod, rule.get("pattern"), bool(rule.get("auto_send"))
return None, None, None
def text_rule_hits(resolved: list[dict] | None) -> list[dict]:
"""Extrage din itemii rezolvati cei care au primit cod dintr-o regula text.
Intoarce [{pattern, cod_prestatie}] pentru fiecare item al carui `cod_sursa`
incepe cu `COD_SURSA_TEXT_RULE_PREFIX`. Pur (fara DB); apelantii cu `conn` il
folosesc ca sa emita `log_event("text_rule_hit", ...)`.
"""
hits: list[dict] = []
for item in resolved or []:
sursa = item.get("cod_sursa")
if isinstance(sursa, str) and sursa.startswith(COD_SURSA_TEXT_RULE_PREFIX):
hits.append({
"pattern": sursa[len(COD_SURSA_TEXT_RULE_PREFIX):],
"cod_prestatie": item.get("cod_prestatie"),
})
return hits
def text_rules_overlap(pattern: str, existing_rules: list[dict] | None) -> list[dict]:
"""Reguli text existente care se SUPRAPUN cu `pattern` (avertisment neblocant).
Overlap = pattern-ul nou normalizat (`normalize_for_match`) e substring al unei
reguli existente SAU invers (oricare directie). Pur, determinist, fara DB.
Un pattern IDENTIC dupa normalizare NU e overlap: e un upsert (update al codului),
nu o suprapunere care merita avertisment. Intoarce dict-urile originale din
`existing_rules` care se suprapun (in ordinea data).
"""
pat = normalize_for_match(pattern)
if not pat:
return []
hits: list[dict] = []
for rule in existing_rules or []:
other = normalize_for_match(rule.get("pattern"))
if not other or other == pat:
continue # gol sau identic -> nu e overlap
if pat in other or other in pat:
hits.append(rule)
return hits
def resolve_prestatii(
prestatii: list[dict] | None,
mapping: dict[str, str],
valid_codes: set[str] | None = None,
text_rules: list[dict] | None = None,
) -> tuple[list[dict], list[dict]]:
"""Rezolva fiecare item: umple `cod_prestatie` din maparea op->cod unde lipseste.
Reguli (hibrid):
- item cu `cod_prestatie` valid (in nomenclator) -> pastrat ca atare.
- item fara cod, cu `cod_op_service` in `mapping` -> umplem cod_prestatie.
- item fara cod, nemapat exact, dar al carui text da match pe o regula text
(substring) -> umplem cod_prestatie din prima regula care potriveste.
- item fara cod, fara mapare si fara regula text -> ramane nemapat.
- item cu `cod_prestatie` NECUNOSCUT in nomenclator -> tratat ca operatie de
mapat: il promovam la `cod_op_service` (daca nu exista deja) ca sa intre in
fluxul needs_mapping. RAR accepta NUMAI coduri din nomenclator (coloana
COD_PRESTATIE max 5 car.); un cod necunoscut da HTTP 500 si RECORD PARTIAL
la RAR (terminal) -> nu-l trimitem niciodata raw.
Precedenta (stricta): `cod_prestatie` direct valid > mapare exacta `cod_op_service`
in `mapping` > reguli text > nemapat. Regulile text se incearca DOAR cand nu exista
cod valid SI op nu e in `mapping`.
`valid_codes` = setul de coduri RAR valide (uppercase) din nomenclator. Cand e
None, validarea e dezactivata (compat: comportamentul vechi „cod_prestatie trece
neatins"); rutele API il paseaza intotdeauna.
`text_rules` = lista de dict-uri ca cea intoarsa de `load_text_rules`
([{pattern, cod_prestatie, auto_send, priority}], ordonata priority ASC, id ASC).
Default None = comportament actual neschimbat (fara reguli text).
Intoarce (prestatii_rezolvate, nemapate). `prestatii_rezolvate` pastreaza
si campurile originale (cod_op_service/denumire) ca re-rezolvarea sa aiba
contextul; payload-ul RAR citeste doar cod_prestatie. `nemapate` =
[{cod_op_service, denumire}] pentru editor.
"""
resolved: list[dict] = []
unmapped: list[dict] = []
for item in prestatii or []:
it = dict(item)
# Curata adnotarile aditive ale rezolvarii (cod_sursa + flagul de hold pe
# regula auto_send=0): se recalculeaza de la zero la fiecare rezolvare.
# Altfel, un item re-rezolvat acum prin alta cale (ex. mapare exacta) ar pastra
# un cod_sursa/flag stale din payload -> telemetrie falsa + hold gresit.
it.pop("cod_sursa", None)
it.pop("regula_fara_autosend", None)
cod = (it.get("cod_prestatie") or "").strip().upper()
op = (it.get("cod_op_service") or "").strip()
cod_valid = bool(cod) and (valid_codes is None or cod in valid_codes)
if cod_valid:
it["cod_prestatie"] = cod
else:
# cod lipsa SAU necunoscut in nomenclator -> ruta de mapare.
if cod and not op:
# Promovam codul direct necunoscut la cod_op_service ca sa-l poti mapa
# in editor (cu denumire = codul, pentru sugestia fuzzy) si sa se retina.
op = cod
it["cod_op_service"] = op
if not it.get("denumire"):
it["denumire"] = cod
if op and op in mapping:
it["cod_prestatie"] = mapping[op]
elif op:
# Mapare exacta absenta -> incearca regulile text (substring).
cod_regula, pattern_regula, auto_send_regula = _rezolva_din_reguli_text(
it, text_rules, valid_codes
)
if cod_regula is not None:
it["cod_prestatie"] = cod_regula
# Adnotare aditiva: marcheaza ca rezolvat-prin-regula cu pattern-ul
# sursa. Payload-harmless (RAR citeste doar cod_prestatie).
it["cod_sursa"] = f"{COD_SURSA_TEXT_RULE_PREFIX}{pattern_regula or ''}"
# US-001 (PRD 5.11): regula_fara_autosend nu se mai seteaza;
# auto_send nu mai tine randul (has_no_auto_send neutralizat).
else:
it["cod_prestatie"] = None
unmapped.append({"cod_op_service": op, "denumire": it.get("denumire")})
# item fara cod si fara op: il lasam asa; validarea de continut prinde
# "prestatii goale"/cod lipsa.
resolved.append(it)
return resolved, unmapped
# --------------------------------------------------------------------------- #
# Persistenta (conn) #
# --------------------------------------------------------------------------- #
def account_or_default(account_id: int | None) -> int:
return account_id if account_id is not None else DEFAULT_ACCOUNT_ID
def account_scope_clause(account_id: int) -> tuple[str, list]:
"""Fragment SQL + params pentru filtrarea pe cont in tabele cu account_id nullable.
Aplica regula: NULL apartine contului 1 (legacy).
Foloseste DOAR pe submissions (account_id NULLABLE).
NU folosi pe operations_mapping (account_id NOT NULL) — acolo WHERE account_id=? simplu.
"""
return (
"(account_id = ? OR (account_id IS NULL AND ? = 1))",
[account_id, account_id],
)
def seed_nomenclator_if_empty(conn) -> int:
"""Seed fallback (18 coduri din contract) DOAR daca nomenclator_rar e gol.
Worker-ul suprascrie live; aici doar garantam ca editorul fuzzy merge offline.
Intoarce nr. de randuri inserate.
"""
n = conn.execute("SELECT COUNT(*) AS n FROM nomenclator_rar").fetchone()["n"]
if n:
return 0
conn.executemany(
"INSERT OR IGNORE INTO nomenclator_rar (cod_prestatie, nume_prestatie) VALUES (?, ?)",
FALLBACK_NOMENCLATOR,
)
return len(FALLBACK_NOMENCLATOR)
def upsert_nomenclator(conn, items: list[dict]) -> int:
"""Upsert nomenclator live din RAR. `items` = forma API (codPrestatie/numePrestatie).
Tolerant la chei: codPrestatie/cod_prestatie/cod, numePrestatie/nume_prestatie/nume.
Intoarce nr. de coduri upsert-ate.
"""
rows: list[tuple[str, str]] = []
for it in items or []:
if not isinstance(it, dict):
continue
cod = it.get("codPrestatie") or it.get("cod_prestatie") or it.get("cod")
nume = it.get("numePrestatie") or it.get("nume_prestatie") or it.get("nume")
if cod:
rows.append((str(cod).strip().upper(), str(nume or "").strip()))
if not rows:
return 0
conn.executemany(
"INSERT INTO nomenclator_rar (cod_prestatie, nume_prestatie, updated_at) "
"VALUES (?, ?, datetime('now')) "
"ON CONFLICT(cod_prestatie) DO UPDATE SET nume_prestatie=excluded.nume_prestatie, "
"updated_at=datetime('now')",
rows,
)
return len(rows)
def load_nomenclator(conn) -> list[dict]:
rows = conn.execute(
"SELECT cod_prestatie, nume_prestatie FROM nomenclator_rar ORDER BY cod_prestatie"
).fetchall()
return [dict(r) for r in rows]
def load_nomenclator_codes(conn) -> set[str]:
"""Setul de coduri RAR valide (uppercase) pentru validarea cod_prestatie la ingestie.
Intoarce set() daca nomenclatorul e gol -> apelantul trebuie sa NU valideze in
acel caz (altfel ar bloca totul). In practica nomenclatorul e mereu populat:
seed fallback (18 coduri) la boot + upsert live de la worker la fiecare login.
"""
rows = conn.execute("SELECT cod_prestatie FROM nomenclator_rar").fetchall()
return {(r["cod_prestatie"] or "").strip().upper() for r in rows if (r["cod_prestatie"] or "").strip()}
def load_mapping(conn, account_id: int | None) -> dict[str, str]:
"""{cod_op_service -> cod_prestatie} pentru un cont."""
acct = account_or_default(account_id)
rows = conn.execute(
"SELECT cod_op_service, cod_prestatie FROM operations_mapping WHERE account_id=?",
(acct,),
).fetchall()
return {r["cod_op_service"]: r["cod_prestatie"] for r in rows}
def load_mapping_meta(conn, account_id: int | None) -> dict[str, dict]:
"""{cod_op_service -> {cod_prestatie, auto_send}} pentru un cont.
Varianta extinsa care include si flagul auto_send per operatie.
"""
acct = account_or_default(account_id)
rows = conn.execute(
"SELECT cod_op_service, cod_prestatie, auto_send FROM operations_mapping WHERE account_id=?",
(acct,),
).fetchall()
return {
r["cod_op_service"]: {"cod_prestatie": r["cod_prestatie"], "auto_send": bool(r["auto_send"])}
for r in rows
}
def classify_prezentare(
content: dict,
mapping: dict[str, str],
mapping_meta: dict[str, dict],
valid_codes: set[str] | None = None,
text_rules: list[dict] | None = None,
) -> dict:
"""Helper pur de clasificare: reproduce EXACT logica create_prezentari fara DB/efecte.
Apelat de AMBELE rute (POST /v1/prezentari si POST /v1/prezentari/valideaza) pentru
a garanta acelasi verdict — invariantul de corectitudine dry-run.
Intoarce {"status", "rar_error", "resolved", "unmapped", "errors", "content"}.
"content" = copia actualizata (VIN/nr canonicalizat + prestatii rezolvate).
"""
from .idempotency import canonicalize_row # import local: evita circular (mapping <- idempotency)
c = dict(content)
canon = canonicalize_row(c)
c.update({
"vin": canon["vin"],
"nr_inmatriculare": canon["nr_inmatriculare"],
"odometru_final": canon["odometru_final"],
})
resolved, unmapped = resolve_prestatii(c.get("prestatii"), mapping, valid_codes, text_rules)
c["prestatii"] = resolved
if unmapped:
status = "needs_mapping"
coduri = ", ".join((u.get("cod_op_service") or "") for u in unmapped)
rar_error = json.dumps(
{"unmapped": unmapped, **err_mod.eroare("COD_NEMAPAT", cauza=f"Coduri fara mapare RAR: {coduri}")},
ensure_ascii=False,
)
errors: list[dict] = []
else:
errors = validate_prezentare(c)
if errors:
status = "needs_data"
rar_error = json.dumps(errors, ensure_ascii=False)
else:
# US-001 (PRD 5.11): ramura AUTO_SEND_OPRIT eliminata.
# Un cod rezolvat (mapare exacta sau regula text) -> queued direct.
status = "queued"
rar_error = None
return {
"status": status,
"rar_error": rar_error,
"resolved": resolved,
"unmapped": unmapped,
"errors": errors,
"content": c,
}
def has_no_auto_send(resolved: list[dict], mapping_meta: dict[str, dict]) -> bool:
"""Neutralizat dupa US-001 (PRD 5.11): auto_send nu mai tine randuri in needs_mapping.
Simbolul este PASTRAT (importat in routes.py si import_router.py); stergerea
ar produce ImportError la boot. Functia intoarce mereu False — codul rezolvat
intra direct in queued, indiferent de valoarea auto_send din mapping_meta.
Coloanele DB raman cu default=1 (migrare non-distructiva).
"""
return False
def pending_unmapped(conn, account_id=None) -> list[dict]:
"""Operatii distincte nemapate, agregate din submission-urile `needs_mapping`.
account_id=None (default): global — intentionat pentru web/routes.py (back-compat).
Apelantii noi din API TREBUIE sa paseze account_id explicit; None global e
footgun (scurge cross-account) si e rezervat exclusiv pentru dashboard-ul intern.
account_id=int: filtreaza in SQL pe cont inclusiv randuri legacy (account_id IS NULL
apartine contului 1). Filtrarea in SQL, nu post-hoc in Python.
"""
nomenclator = load_nomenclator(conn)
if account_id is not None:
scope_sql, scope_params = account_scope_clause(account_id)
rows = conn.execute(
f"SELECT id, account_id, payload_json FROM submissions "
f"WHERE status='needs_mapping' AND {scope_sql}",
scope_params,
).fetchall()
else:
rows = conn.execute(
"SELECT id, account_id, payload_json FROM submissions WHERE status='needs_mapping'"
).fetchall()
agg: dict[tuple[int, str], dict[str, Any]] = {}
for r in rows:
acct = r["account_id"] if r["account_id"] is not None else DEFAULT_ACCOUNT_ID
try:
content = json.loads(r["payload_json"])
except (ValueError, TypeError):
continue
for item in content.get("prestatii") or []:
if not isinstance(item, dict):
continue
if (item.get("cod_prestatie") or ""):
continue
op = (item.get("cod_op_service") or "").strip()
if not op:
continue
key = (acct, op)
entry = agg.setdefault(
key,
{"account_id": acct, "cod_op_service": op, "denumire": item.get("denumire"), "blocked": 0, "_ids": set()},
)
if not entry["denumire"] and item.get("denumire"):
entry["denumire"] = item.get("denumire")
entry["_ids"].add(r["id"])
# Indexeaza corpusul embeddings o data inainte de bucla (no-op cand flagul e off).
ensure_embeddings_corpus(conn, nomenclator)
out: list[dict] = []
for entry in agg.values():
entry["blocked"] = len(entry.pop("_ids"))
entry["suggestions"] = suggest_codes(entry["denumire"], nomenclator, limit=5)
# L14-S6: imbogatire sugestii cu GOLD partajat > SILVER > embeddings (Eng-F2).
# SUGGESTION-ONLY: nu intra in resolve_prestatii/load_mapping (#13).
enriched = enrich_suggestions(conn, entry["denumire"])
entry["sugestie_principala"] = enriched["sugestie_principala"]
entry["surse_sugestie"] = enriched["surse"]
out.append(entry)
out.sort(key=lambda e: (-e["blocked"], e["cod_op_service"]))
return out
def save_mapping(conn, account_id: int | None, cod_op_service: str, cod_prestatie: str, auto_send: bool) -> None:
"""Upsert o mapare op->cod (UNIQUE pe account_id+cod_op_service)."""
acct = account_or_default(account_id)
op = (cod_op_service or "").strip()
cod = (cod_prestatie or "").strip().upper()
if not op or not cod:
raise ValueError("cod_op_service si cod_prestatie sunt obligatorii")
conn.execute(
"INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) "
"VALUES (?, ?, ?, ?) "
"ON CONFLICT(account_id, cod_op_service) DO UPDATE SET "
"cod_prestatie=excluded.cod_prestatie, auto_send=excluded.auto_send",
(acct, op, cod, 1 if auto_send else 0),
)
def load_text_rules(conn, account_id: int | None) -> list[dict]:
"""Returneaza regulile text ale unui cont, ordonate priority ASC, id ASC.
Fiecare element: {pattern, cod_prestatie, auto_send, priority}.
Aplica account_or_default (None == 1).
"""
acct = account_or_default(account_id)
rows = conn.execute(
"SELECT pattern, cod_prestatie, auto_send, priority "
"FROM operation_text_rules "
"WHERE account_id=? "
"ORDER BY priority ASC, id ASC",
(acct,),
).fetchall()
return [dict(r) for r in rows]
def save_text_rule(
conn,
account_id: int | None,
pattern: str,
cod_prestatie: str,
auto_send: bool,
) -> None:
"""Upsert o regula text pe (account_id, pattern).
auto_send boolean -> 0/1. Daca regula exista deja (acelasi cont + pattern),
actualizeaza cod_prestatie si auto_send.
"""
acct = account_or_default(account_id)
pat = (pattern or "").strip()
cod = (cod_prestatie or "").strip().upper()
if not pat or not cod:
raise ValueError("pattern si cod_prestatie sunt obligatorii")
conn.execute(
"INSERT INTO operation_text_rules (account_id, pattern, cod_prestatie, auto_send) "
"VALUES (?, ?, ?, ?) "
"ON CONFLICT(account_id, pattern) DO UPDATE SET "
"cod_prestatie=excluded.cod_prestatie, auto_send=excluded.auto_send",
(acct, pat, cod, 1 if auto_send else 0),
)
def delete_text_rule(conn, account_id: int | None, pattern: str) -> None:
"""Sterge regula cu (account_id, pattern) daca exista."""
acct = account_or_default(account_id)
pat = (pattern or "").strip()
conn.execute(
"DELETE FROM operation_text_rules WHERE account_id=? AND pattern=?",
(acct, pat),
)
# Prag minim de similaritate cosine pentru sugestia din embeddings NN.
# Sub acest scor, sugestia NN e prea incerta si nu o afisam (previne recomandari
# irelevante cand corpus-ul e mic sau neindexat corect).
EMB_MIN_SIMILARITATE = 0.5
def _corpus_signature(nomenclator: list[dict]) -> str:
"""Semnatura stabila a nomenclatorului pentru cache-ul corpusului embeddings.
Hash pe perechile (cod, denumire) sortate dupa cod -> se schimba la orice
add/remove/redenumire de cod, ramane stabila altfel (evita re-embed inutil).
"""
pairs = sorted(
(str(n.get("cod_prestatie") or ""), str(n.get("nume_prestatie") or ""))
for n in nomenclator
)
blob = "".join(f"{c}{d}" for c, d in pairs)
return hashlib.sha256(blob.encode("utf-8")).hexdigest()
def ensure_embeddings_corpus(conn, nomenclator: list[dict] | None = None) -> None:
"""Construieste/actualizeaza corpusul embeddings din nomenclator (Stratul 2 PRD 5.14).
Gated pe `AUTOPASS_EMBEDDINGS_ENABLED` (default OFF): cand e dezactivat, e un
no-op total (nu atinge modelul, nu interogheaza nomenclatorul) -> /mapari instant
+ suita de teste rapida; sugestiile cad pe GOLD/SILVER + fuzzy.
Cand e activat: indexeaza corpusul {denumire=nume_prestatie, cod=cod_prestatie}
o singura data (lazy-load modelul ~230MB la prima chemare), re-indexeaza doar
cand semnatura nomenclatorului s-a schimbat. Degradare gratioasa: orice eroare
(model absent, embed esuat) lasa corpusul gol -> enrich_suggestions cade pe restul.
Apelat de apelantii care imbogatesc sugestii (pending_unmapped,
_nemapate_pentru_submission) INAINTE de bucla de enrich_suggestions, NU din
enrich_suggestions (care ramane o interogare ieftina cu garda has_corpus()).
"""
from .config import get_settings
if not get_settings().embeddings_enabled:
return
try:
from . import embeddings as _emb
nomen = nomenclator if nomenclator is not None else load_nomenclator(conn)
if not nomen:
return
sig = _corpus_signature(nomen)
if _emb.corpus_signature() == sig and _emb.has_corpus():
return # deja indexat pe acelasi nomenclator -> nimic de facut
items = [
{"denumire": str(n["nume_prestatie"]), "cod": str(n["cod_prestatie"])}
for n in nomen
if n.get("nume_prestatie") and n.get("cod_prestatie")
]
_emb.index_corpus(items, signature=sig)
except Exception:
pass # degradare gratioasa (#16b): esecul indexarii nu blocheaza editorul
def enrich_suggestions(
conn,
denumire: str | None,
*,
include_embeddings: bool = True,
) -> dict:
"""Imbogateste sugestiile cu GOLD partajat, SILVER LLM si embeddings NN.
Precedenta Eng-F2 (pentru sugestie-only, nu auto-send):
shared GOLD > SILVER > embeddings
(Account GOLD = operations_mapping propriu = deja rezolvat inainte de needs_mapping;
nu apare in needs_mapping, deci nu e in precedenta de sugestie.)
Returneaza:
{
'sugestie_principala': {'cod_prestatie': str, 'sursa': str} | None,
'surse': {'gold_partajat': str|None, 'silver': str|None, 'embedding': str|None}
}
INVARIANTE:
- Toate sursele = SUGGESTION-ONLY. NU intra in resolve_prestatii/load_mapping (#13).
- SILVER cu is_nul=1 (non-operatie/gunoi) NU produce sugestie (#4).
- Degradare gratioasa pe embeddings (#16b): daca motorul nu e disponibil sau arunca,
returneaza sugestia disponibila din celelalte surse, fara exceptie.
- Import local shared_store/embeddings: evita ciclu la import-time (shared_store
importa normalize_for_match din mapping).
"""
sugestie_principala: dict | None = None
surse: dict = {"gold_partajat": None, "silver": None, "embedding": None}
if not denumire:
return {"sugestie_principala": sugestie_principala, "surse": surse}
# Colecteaza TOATE sursele (fara short-circuit) in `surse`: editorul le poate afisa
# toate, independent de care castiga ca sugestie principala.
# Precedenta Eng-F2 se aplica DOAR la alegerea sugestiei_principale.
# 1. GOLD partajat cross-account (validat de om, cel mai de incredere)
try:
from .shared_store import lookup_shared_gold
row_gold = lookup_shared_gold(conn, denumire)
if row_gold and row_gold["cod_prestatie"]:
surse["gold_partajat"] = str(row_gold["cod_prestatie"])
except Exception:
pass # degradare gratioasa
# 2. SILVER LLM (bootstrap, nevalidat de om; is_nul = supresie)
try:
from .shared_store import lookup_suggestion
row_silver = lookup_suggestion(conn, denumire)
if row_silver and not row_silver["is_nul"] and row_silver["cod_prestatie"]:
surse["silver"] = str(row_silver["cod_prestatie"])
except Exception:
pass # degradare gratioasa
# 3. Embeddings NN (similaritate semantica, degradare gratioasa #16b)
if include_embeddings:
try:
from . import embeddings as _emb
# Poarta IEFTINA: nu atinge is_available()/suggest_nearest cand corpus-ul
# e gol — `is_available()` lazy-load-eaza modelul de ~230MB (30-120s in
# thread-ul de cerere). Corpusul se construieste de apelant prin
# ensure_embeddings_corpus (gated pe AUTOPASS_EMBEDDINGS_ENABLED); cand
# flagul e off, has_corpus() ramane False si calea e un no-op real.
if _emb.has_corpus():
nn = _emb.suggest_nearest(str(denumire), top_k=1)
# Prag minim: similaritate prea mica = sugestie inutila.
# Evita recomandari irelevante cand corpus-ul e mic/partial.
if nn and nn[0].get("similaritate", 0) >= EMB_MIN_SIMILARITATE:
surse["embedding"] = str(nn[0]["cod"])
except Exception:
pass # degradare gratioasa (#16b): motorul absent nu blocheaza
# Alege sugestia principala in ordinea de precedenta: GOLD > SILVER > embeddings
if surse["gold_partajat"]:
sugestie_principala = {"cod_prestatie": surse["gold_partajat"], "sursa": "gold_partajat"}
elif surse["silver"]:
sugestie_principala = {"cod_prestatie": surse["silver"], "sursa": "silver"}
elif surse["embedding"]:
sugestie_principala = {"cod_prestatie": surse["embedding"], "sursa": "embedding"}
return {"sugestie_principala": sugestie_principala, "surse": surse}
def _emite_text_rule_hits(conn, account_id: int, submission_id: int, resolved: list[dict] | None) -> None:
"""Emite `text_rule_hit` in app_events pentru fiecare item rezolvat prin regula text.
Telemetrie „ce regula a rezolvat ce submission". Best-effort (log_event inghite
exceptiile). Context = {submission_id, account_id, pattern, cod_prestatie} — fara
PII (pattern + cod nu sunt PII). Import local: evita orice risc de ciclu la import.
"""
hits = text_rule_hits(resolved)
if not hits:
return
from .observ import log_event # import local: best-effort, fara ciclu la import-time
for hit in hits:
log_event(
"text_rule_hit",
account_id=account_id,
cod=hit.get("cod_prestatie"),
conn=conn,
context={
"submission_id": submission_id,
"account_id": account_id,
"pattern": hit.get("pattern"),
"cod_prestatie": hit.get("cod_prestatie"),
},
)
def reresolve_account(conn, account_id: int | None, batch_id: int | None = None) -> dict[str, int]:
"""Re-rezolva submission-urile `needs_mapping` ale unui cont dupa o noua mapare.
Pentru fiecare: aplica maparea curenta; daca nu mai raman op-uri nemapate ->
ruleaza validarea de continut si trece pe `queued` (sau `needs_data` cu
motiv), resetand backoff-ul. Daca raman nemapate, ramane `needs_mapping` cu
motivul actualizat. Intoarce {requeued, still_blocked, needs_data, review_manual}.
auto_send=0 pe un cod nou-mapat -> nu trece pe 'queued' (ramane 'needs_mapping'
cu motiv "review manual"); previne FINALIZATA eronat permanent.
batch_id != None -> scope la seria comitata (NU cross-batch).
batch_id is None -> re-rezolva toti (canal API, batch_id IS NULL inclus).
"""
acct = account_or_default(account_id)
mapping_meta = load_mapping_meta(conn, acct)
mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()}
valid_codes = load_nomenclator_codes(conn) or None
# Incarca regulile text O DATA, inainte de bucla pe randuri.
text_rules = load_text_rules(conn, acct)
if batch_id is not None:
# Scope la batch-ul specificat (import commit explicit).
# NU atinge randuri din alte batches sau din feed API.
rows = conn.execute(
"SELECT id, payload_json FROM submissions "
"WHERE status='needs_mapping' AND account_id=? AND batch_id=?",
(acct, batch_id),
).fetchall()
else:
# POST /v1/mapari (save manual): re-rezolva EXCLUSIV canalul API (batch_id IS NULL).
# Salvarea unei mapari NU re-queues randuri din batches de import (cross-batch /
# cross-feed). Batches de import sunt re-rezolvate doar la commit explicit.
rows = conn.execute(
"SELECT id, payload_json FROM submissions "
"WHERE status='needs_mapping' AND account_id=? AND batch_id IS NULL",
(acct,),
).fetchall()
stats = {"requeued": 0, "still_blocked": 0, "needs_data": 0, "review_manual": 0}
for r in rows:
try:
content = json.loads(r["payload_json"])
except (ValueError, TypeError):
continue
resolved, unmapped = resolve_prestatii(content.get("prestatii"), mapping, valid_codes, text_rules)
content["prestatii"] = resolved
payload_json = json.dumps(content, ensure_ascii=False)
# Telemetrie pentru itemii rezolvati prin regula text.
_emite_text_rule_hits(conn, acct, r["id"], resolved)
if unmapped:
conn.execute(
"UPDATE submissions SET payload_json=?, rar_error=?, updated_at=datetime('now') WHERE id=?",
(payload_json, json.dumps({"unmapped": unmapped}, ensure_ascii=False), r["id"]),
)
stats["still_blocked"] += 1
continue
# US-001 (PRD 5.11): ramura auto_send eliminata din reresolve.
# Un cod rezolvat -> queued direct (review_manual ramane 0).
errors = validate_prezentare(content)
if errors:
conn.execute(
"UPDATE submissions SET status='needs_data', payload_json=?, rar_error=?, "
"updated_at=datetime('now') WHERE id=?",
(payload_json, json.dumps(errors, ensure_ascii=False), r["id"]),
)
stats["needs_data"] += 1
else:
conn.execute(
"UPDATE submissions SET status='queued', payload_json=?, rar_error=NULL, "
"retry_count=0, next_attempt_at=NULL, updated_at=datetime('now') WHERE id=?",
(payload_json, r["id"]),
)
stats["requeued"] += 1
return stats