"""Mapare operatie ROAAUTO -> cod prestatie RAR + fuzzy lookup pentru editor. Contract (varianta hibrida): un item de prestatie poate veni fie cu `cod_prestatie` (cod RAR direct), fie cu `cod_op_service` (cod intern ROAAUTO) + `denumire`. La ingestie incercam sa rezolvam codul intern prin `operations_mapping`; daca nu exista mapare -> submission `needs_mapping` (nu se trimite la RAR), iar operatia apare in editorul web unde userul o mapeaza cu ajutorul unei sugestii fuzzy pe nomenclatorul RAR. La salvarea maparii, submission-urile blocate pe acel cod se re-rezolva automat. Functiile de la inceput (normalize/suggest/resolve) sunt PURE (fara DB/HTTP), unit-testabile direct. Cele cu `conn` sunt helpere de persistenta. """ from __future__ import annotations import hashlib import json import re import unicodedata from typing import Any from rapidfuzz import fuzz, process from . import errors as err_mod from .nomenclator_seed import FALLBACK_NOMENCLATOR from .validation import validate_prezentare # Cont implicit cat timp auth API-key (CORE) nu e implementat: ingestiile vin cu # account_id NULL si le atribuim contului seed-at in schema (id=1). DEFAULT_ACCOUNT_ID = 1 # Sub acest scor (0..100) nu preselectam nicio sugestie — userul alege manual. SUGGEST_MIN_SCORE = 60 # --------------------------------------------------------------------------- # # Pur: normalizare + fuzzy + rezolvare # # --------------------------------------------------------------------------- # def normalize_for_match(value: object) -> str: """Upper + fara diacritice + spatii colapsate, pentru potrivire robusta. 'Reparație motor' -> 'REPARATIE MOTOR'. Diacriticele romanesti (ă/â/î/ș/ț) si artefactele de encoding nu trebuie sa strice scorul fuzzy. """ s = str(value or "") s = unicodedata.normalize("NFKD", s) s = "".join(ch for ch in s if not unicodedata.combining(ch)) return " ".join(s.upper().split()) # --------------------------------------------------------------------------- # # Pre-filtru determinist non-operatii (NUL) — US-001 PRD 5.18 # # --------------------------------------------------------------------------- # # # Masuratoarea k-NN (memorie test-precizie-knn-embeddings) arata recall NUL doar # 64%: gunoiul evident (ITP, plata, discount, nr. inmatriculare, tractare) scapa # semantic ca OE-1. Un pre-filtru text/regex il marcheaza NUL INAINTE de k-NN. # # Garantie: ZERO fals-pozitiv pe operatii reale. Regulile au fost calibrate pe # `docs/operatii-service/*.csv` (toate aparitiile distincte). Triggerele NEambigue # (ITP, ACHITAT/PLATA, DISCOUNT/REDUCERE, TAXA) sunt neconditionate (0 FP masurat). # Triggerele AMBIGUE (TRACTARE, NR INMATRICULARE + pattern placuta) apar si in # operatii reale ("D/R CARLIG TRACTARE", "D/R ELECTROMOTOR CT 44 MKY") -> sunt # ECRANATE de un context de piesa/operatie (`_NUL_CTX_PIESA`). # Trigger-uri neambigue (substring/regex pe text normalizat). _NUL_ITP = re.compile(r"(?:\bITP\b|\d\s*X\s*ITP|X\s*ITP\b|\bITP[.,])") _NUL_PLATA = re.compile(r"\b(ACHITAT|ACHITARE|PLATA|PLATIT|PLATIRE)\b") _NUL_DISCOUNT = re.compile(r"\b(DISCOUNT|REDUCERE)\b") _NUL_TAXA = re.compile(r"\bTAXA\b") # Trigger-uri ambigue — valide ca NUL DOAR in absenta unui context de piesa. _NUL_TRACTARE = re.compile(r"\b(TRACTARE|TRACTARI)\b") _NUL_NR_PLACUTA = re.compile( r"(\bNR\s+INMATRICULARE\b|\bNUMAR\s+INMATRICULARE\b|\b[A-Z]{1,2}\s?\d{2,3}\s?[A-Z]{3}\b)" ) # Daca apare oricare cuvant de aici, TRACTARE/placuta e nume de piesa sau operatie # reala (carlig/capac de tractare, suport placuta, placuta lipita la o reparatie). _NUL_CTX_PIESA = re.compile( r"\b(D/R|D-R|CARLIG|CAPAC|BARA|PROTECTIE|MONTAT|MONTAJ|DEMONTAT|INLOCUIT|" r"INLOCUIRE|REPARAT|REPARATIE|VOPSIT|SCHIMBAT|SUPORT)\b" ) def prefiltru_nul(denumire: object) -> bool: """True daca operatia e gunoi evident (non-operatie de service) -> NUL determinist. Ruleaza INAINTE de k-NN/embeddings in `enrich_suggestions` (US-006). Pur, fara DB. Zero fals-pozitiv pe operatii reale (vezi comentariul de mai sus + tests). """ text = normalize_for_match(denumire) if not text: return False # Neambigue: 0 FP masurat, fara ecranare. if _NUL_ITP.search(text) or _NUL_PLATA.search(text) or _NUL_DISCOUNT.search(text) or _NUL_TAXA.search(text): return True # Ambigue: doar daca NU e context de piesa. if _NUL_CTX_PIESA.search(text): return False if _NUL_TRACTARE.search(text) or _NUL_NR_PLACUTA.search(text): return True return False def suggest_codes( denumire: object, nomenclator: list[dict], *, limit: int = 5, ) -> list[dict]: """Clasament fuzzy al codurilor RAR pentru o denumire de operatie ROAAUTO. `nomenclator` = randuri {cod_prestatie, nume_prestatie}. Intoarce [{cod_prestatie, nume_prestatie, score}] sortat descrescator dupa scor. Daca denumirea e goala, intoarce nomenclatorul in ordinea data, scor 0. """ query = normalize_for_match(denumire) rows = [r for r in nomenclator if (r.get("cod_prestatie") or "")] if not query: return [{**r, "score": 0.0} for r in rows[:limit]] choices = {r["cod_prestatie"]: normalize_for_match(r.get("nume_prestatie")) for r in rows} by_cod = {r["cod_prestatie"]: r for r in rows} # token_sort_ratio (nu token_set_ratio): recompenseaza acoperirea cat mai multor # cuvinte din denumire, in loc sa dea 100 la orice subset (ex. "REPARATIE" si # "REPARATIE ODOMETRU" ar fi egale la set_ratio). ranked = process.extract( query, choices, scorer=fuzz.token_sort_ratio, limit=limit, ) # process.extract -> [(value, score, key)]; key = cod_prestatie. return [ { "cod_prestatie": cod, "nume_prestatie": by_cod[cod].get("nume_prestatie"), "score": float(score), } for _val, score, cod in ranked ] # Prefixul pus pe `cod_sursa` cand un item e rezolvat printr-o regula text. # Forma: "text_rule:". Payload-harmless — # RAR citeste doar `cod_prestatie`; `cod_sursa` ramane in payload_json fara efect. COD_SURSA_TEXT_RULE_PREFIX = "text_rule:" def _rezolva_din_reguli_text( item: dict, text_rules: list[dict] | None, valid_codes: set[str] | None, ) -> tuple[str | None, str | None, bool | None]: """Cauta prima regula text (in ordinea data) al carei pattern e substring al textului operatiei. Intoarce (cod uppercase, pattern original, auto_send) daca e valid, altfel (None, None, None). Textul operatiei = `denumire` daca exista, altfel `cod_op_service`. Ambele parti (text si pattern) se normalizeaza cu `normalize_for_match` (fara diacritice, uppercase, spatii colapsate) -> match insensibil la caz/diacritice. `text_rules` e deja ordonata (priority ASC, id ASC) de `load_text_rules`, deci prima regula care da match castiga. Daca regula castigatoare are un cod absent din `valid_codes` (cand `valid_codes` e setat), nu intoarcem un cod invalid -> (None, None, None) (operatia ramane nemapata), coerent cu garda din `resolve_prestatii`. Pattern-ul intors e cel ORIGINAL al regulii (pentru telemetrie), nu cel normalizat folosit la match. `auto_send` = flagul regulii castigatoare: cand e falsy (DEFAULT 0, de siguranta) randul trebuie TINUT pentru verificare umana, nu trimis automat la RAR (blast radius substring + FINALIZATA ireversibil). """ if not text_rules: return None, None, None text = normalize_for_match(item.get("denumire") or item.get("cod_op_service")) if not text: return None, None, None for rule in text_rules: pat = normalize_for_match(rule.get("pattern")) if not pat or pat not in text: continue # Prima regula care da match castiga. cod = (rule.get("cod_prestatie") or "").strip().upper() if not cod: return None, None, None if valid_codes is not None and cod not in valid_codes: return None, None, None # cod invalid in nomenclator -> nu il punem; ramane nemapat return cod, rule.get("pattern"), bool(rule.get("auto_send")) return None, None, None def text_rule_hits(resolved: list[dict] | None) -> list[dict]: """Extrage din itemii rezolvati cei care au primit cod dintr-o regula text. Intoarce [{pattern, cod_prestatie}] pentru fiecare item al carui `cod_sursa` incepe cu `COD_SURSA_TEXT_RULE_PREFIX`. Pur (fara DB); apelantii cu `conn` il folosesc ca sa emita `log_event("text_rule_hit", ...)`. """ hits: list[dict] = [] for item in resolved or []: sursa = item.get("cod_sursa") if isinstance(sursa, str) and sursa.startswith(COD_SURSA_TEXT_RULE_PREFIX): hits.append({ "pattern": sursa[len(COD_SURSA_TEXT_RULE_PREFIX):], "cod_prestatie": item.get("cod_prestatie"), }) return hits def text_rules_overlap(pattern: str, existing_rules: list[dict] | None) -> list[dict]: """Reguli text existente care se SUPRAPUN cu `pattern` (avertisment neblocant). Overlap = pattern-ul nou normalizat (`normalize_for_match`) e substring al unei reguli existente SAU invers (oricare directie). Pur, determinist, fara DB. Un pattern IDENTIC dupa normalizare NU e overlap: e un upsert (update al codului), nu o suprapunere care merita avertisment. Intoarce dict-urile originale din `existing_rules` care se suprapun (in ordinea data). """ pat = normalize_for_match(pattern) if not pat: return [] hits: list[dict] = [] for rule in existing_rules or []: other = normalize_for_match(rule.get("pattern")) if not other or other == pat: continue # gol sau identic -> nu e overlap if pat in other or other in pat: hits.append(rule) return hits def resolve_prestatii( prestatii: list[dict] | None, mapping: dict[str, str], valid_codes: set[str] | None = None, text_rules: list[dict] | None = None, ) -> tuple[list[dict], list[dict]]: """Rezolva fiecare item: umple `cod_prestatie` din maparea op->cod unde lipseste. Reguli (hibrid): - item cu `cod_prestatie` valid (in nomenclator) -> pastrat ca atare. - item fara cod, cu `cod_op_service` in `mapping` -> umplem cod_prestatie. - item fara cod, nemapat exact, dar al carui text da match pe o regula text (substring) -> umplem cod_prestatie din prima regula care potriveste. - item fara cod, fara mapare si fara regula text -> ramane nemapat. - item cu `cod_prestatie` NECUNOSCUT in nomenclator -> tratat ca operatie de mapat: il promovam la `cod_op_service` (daca nu exista deja) ca sa intre in fluxul needs_mapping. RAR accepta NUMAI coduri din nomenclator (coloana COD_PRESTATIE max 5 car.); un cod necunoscut da HTTP 500 si RECORD PARTIAL la RAR (terminal) -> nu-l trimitem niciodata raw. Precedenta (stricta): `cod_prestatie` direct valid > mapare exacta `cod_op_service` in `mapping` > reguli text > nemapat. Regulile text se incearca DOAR cand nu exista cod valid SI op nu e in `mapping`. `valid_codes` = setul de coduri RAR valide (uppercase) din nomenclator. Cand e None, validarea e dezactivata (compat: comportamentul vechi „cod_prestatie trece neatins"); rutele API il paseaza intotdeauna. `text_rules` = lista de dict-uri ca cea intoarsa de `load_text_rules` ([{pattern, cod_prestatie, auto_send, priority}], ordonata priority ASC, id ASC). Default None = comportament actual neschimbat (fara reguli text). Intoarce (prestatii_rezolvate, nemapate). `prestatii_rezolvate` pastreaza si campurile originale (cod_op_service/denumire) ca re-rezolvarea sa aiba contextul; payload-ul RAR citeste doar cod_prestatie. `nemapate` = [{cod_op_service, denumire}] pentru editor. """ resolved: list[dict] = [] unmapped: list[dict] = [] for item in prestatii or []: it = dict(item) # Curata adnotarile aditive ale rezolvarii (cod_sursa + flagul de hold pe # regula auto_send=0): se recalculeaza de la zero la fiecare rezolvare. # Altfel, un item re-rezolvat acum prin alta cale (ex. mapare exacta) ar pastra # un cod_sursa/flag stale din payload -> telemetrie falsa + hold gresit. it.pop("cod_sursa", None) it.pop("regula_fara_autosend", None) cod = (it.get("cod_prestatie") or "").strip().upper() op = (it.get("cod_op_service") or "").strip() cod_valid = bool(cod) and (valid_codes is None or cod in valid_codes) if cod_valid: it["cod_prestatie"] = cod else: # cod lipsa SAU necunoscut in nomenclator -> ruta de mapare. if cod and not op: # Promovam codul direct necunoscut la cod_op_service ca sa-l poti mapa # in editor (cu denumire = codul, pentru sugestia fuzzy) si sa se retina. op = cod it["cod_op_service"] = op if not it.get("denumire"): it["denumire"] = cod if op and op in mapping: it["cod_prestatie"] = mapping[op] elif op: # Mapare exacta absenta -> incearca regulile text (substring). cod_regula, pattern_regula, auto_send_regula = _rezolva_din_reguli_text( it, text_rules, valid_codes ) if cod_regula is not None: it["cod_prestatie"] = cod_regula # Adnotare aditiva: marcheaza ca rezolvat-prin-regula cu pattern-ul # sursa. Payload-harmless (RAR citeste doar cod_prestatie). it["cod_sursa"] = f"{COD_SURSA_TEXT_RULE_PREFIX}{pattern_regula or ''}" # US-001 (PRD 5.11): regula_fara_autosend nu se mai seteaza; # auto_send nu mai tine randul (has_no_auto_send neutralizat). else: it["cod_prestatie"] = None unmapped.append({"cod_op_service": op, "denumire": it.get("denumire")}) # item fara cod si fara op: il lasam asa; validarea de continut prinde # "prestatii goale"/cod lipsa. resolved.append(it) return resolved, unmapped # --------------------------------------------------------------------------- # # Persistenta (conn) # # --------------------------------------------------------------------------- # def account_or_default(account_id: int | None) -> int: return account_id if account_id is not None else DEFAULT_ACCOUNT_ID def account_scope_clause(account_id: int) -> tuple[str, list]: """Fragment SQL + params pentru filtrarea pe cont in tabele cu account_id nullable. Aplica regula: NULL apartine contului 1 (legacy). Foloseste DOAR pe submissions (account_id NULLABLE). NU folosi pe operations_mapping (account_id NOT NULL) — acolo WHERE account_id=? simplu. """ return ( "(account_id = ? OR (account_id IS NULL AND ? = 1))", [account_id, account_id], ) def seed_nomenclator_if_empty(conn) -> int: """Seed fallback (18 coduri din contract) DOAR daca nomenclator_rar e gol. Worker-ul suprascrie live; aici doar garantam ca editorul fuzzy merge offline. Intoarce nr. de randuri inserate. """ n = conn.execute("SELECT COUNT(*) AS n FROM nomenclator_rar").fetchone()["n"] if n: return 0 conn.executemany( "INSERT OR IGNORE INTO nomenclator_rar (cod_prestatie, nume_prestatie) VALUES (?, ?)", FALLBACK_NOMENCLATOR, ) return len(FALLBACK_NOMENCLATOR) def upsert_nomenclator(conn, items: list[dict]) -> int: """Upsert nomenclator live din RAR. `items` = forma API (codPrestatie/numePrestatie). Tolerant la chei: codPrestatie/cod_prestatie/cod, numePrestatie/nume_prestatie/nume. Intoarce nr. de coduri upsert-ate. """ rows: list[tuple[str, str]] = [] for it in items or []: if not isinstance(it, dict): continue cod = it.get("codPrestatie") or it.get("cod_prestatie") or it.get("cod") nume = it.get("numePrestatie") or it.get("nume_prestatie") or it.get("nume") if cod: rows.append((str(cod).strip().upper(), str(nume or "").strip())) if not rows: return 0 conn.executemany( "INSERT INTO nomenclator_rar (cod_prestatie, nume_prestatie, updated_at) " "VALUES (?, ?, datetime('now')) " "ON CONFLICT(cod_prestatie) DO UPDATE SET nume_prestatie=excluded.nume_prestatie, " "updated_at=datetime('now')", rows, ) return len(rows) def load_nomenclator(conn) -> list[dict]: rows = conn.execute( "SELECT cod_prestatie, nume_prestatie FROM nomenclator_rar ORDER BY cod_prestatie" ).fetchall() return [dict(r) for r in rows] def load_nomenclator_codes(conn) -> set[str]: """Setul de coduri RAR valide (uppercase) pentru validarea cod_prestatie la ingestie. Intoarce set() daca nomenclatorul e gol -> apelantul trebuie sa NU valideze in acel caz (altfel ar bloca totul). In practica nomenclatorul e mereu populat: seed fallback (18 coduri) la boot + upsert live de la worker la fiecare login. """ rows = conn.execute("SELECT cod_prestatie FROM nomenclator_rar").fetchall() return {(r["cod_prestatie"] or "").strip().upper() for r in rows if (r["cod_prestatie"] or "").strip()} def load_mapping(conn, account_id: int | None) -> dict[str, str]: """{cod_op_service -> cod_prestatie} pentru un cont.""" acct = account_or_default(account_id) rows = conn.execute( "SELECT cod_op_service, cod_prestatie FROM operations_mapping WHERE account_id=?", (acct,), ).fetchall() return {r["cod_op_service"]: r["cod_prestatie"] for r in rows} def load_mapping_meta(conn, account_id: int | None) -> dict[str, dict]: """{cod_op_service -> {cod_prestatie, auto_send}} pentru un cont. Varianta extinsa care include si flagul auto_send per operatie. """ acct = account_or_default(account_id) rows = conn.execute( "SELECT cod_op_service, cod_prestatie, auto_send FROM operations_mapping WHERE account_id=?", (acct,), ).fetchall() return { r["cod_op_service"]: {"cod_prestatie": r["cod_prestatie"], "auto_send": bool(r["auto_send"])} for r in rows } def classify_prezentare( content: dict, mapping: dict[str, str], mapping_meta: dict[str, dict], valid_codes: set[str] | None = None, text_rules: list[dict] | None = None, ) -> dict: """Helper pur de clasificare: reproduce EXACT logica create_prezentari fara DB/efecte. Apelat de AMBELE rute (POST /v1/prezentari si POST /v1/prezentari/valideaza) pentru a garanta acelasi verdict — invariantul de corectitudine dry-run. Intoarce {"status", "rar_error", "resolved", "unmapped", "errors", "content"}. "content" = copia actualizata (VIN/nr canonicalizat + prestatii rezolvate). """ from .idempotency import canonicalize_row # import local: evita circular (mapping <- idempotency) c = dict(content) canon = canonicalize_row(c) c.update({ "vin": canon["vin"], "nr_inmatriculare": canon["nr_inmatriculare"], "odometru_final": canon["odometru_final"], }) resolved, unmapped = resolve_prestatii(c.get("prestatii"), mapping, valid_codes, text_rules) c["prestatii"] = resolved if unmapped: status = "needs_mapping" coduri = ", ".join((u.get("cod_op_service") or "") for u in unmapped) rar_error = json.dumps( {"unmapped": unmapped, **err_mod.eroare("COD_NEMAPAT", cauza=f"Coduri fara mapare RAR: {coduri}")}, ensure_ascii=False, ) errors: list[dict] = [] else: errors = validate_prezentare(c) if errors: status = "needs_data" rar_error = json.dumps(errors, ensure_ascii=False) else: # US-001 (PRD 5.11): ramura AUTO_SEND_OPRIT eliminata. # Un cod rezolvat (mapare exacta sau regula text) -> queued direct. status = "queued" rar_error = None return { "status": status, "rar_error": rar_error, "resolved": resolved, "unmapped": unmapped, "errors": errors, "content": c, } def has_no_auto_send(resolved: list[dict], mapping_meta: dict[str, dict]) -> bool: """Neutralizat dupa US-001 (PRD 5.11): auto_send nu mai tine randuri in needs_mapping. Simbolul este PASTRAT (importat in routes.py si import_router.py); stergerea ar produce ImportError la boot. Functia intoarce mereu False — codul rezolvat intra direct in queued, indiferent de valoarea auto_send din mapping_meta. Coloanele DB raman cu default=1 (migrare non-distructiva). """ return False def pending_unmapped(conn, account_id=None) -> list[dict]: """Operatii distincte nemapate, agregate din submission-urile `needs_mapping`. account_id=None (default): global — intentionat pentru web/routes.py (back-compat). Apelantii noi din API TREBUIE sa paseze account_id explicit; None global e footgun (scurge cross-account) si e rezervat exclusiv pentru dashboard-ul intern. account_id=int: filtreaza in SQL pe cont inclusiv randuri legacy (account_id IS NULL apartine contului 1). Filtrarea in SQL, nu post-hoc in Python. """ nomenclator = load_nomenclator(conn) if account_id is not None: scope_sql, scope_params = account_scope_clause(account_id) rows = conn.execute( f"SELECT id, account_id, payload_json FROM submissions " f"WHERE status='needs_mapping' AND {scope_sql}", scope_params, ).fetchall() else: rows = conn.execute( "SELECT id, account_id, payload_json FROM submissions WHERE status='needs_mapping'" ).fetchall() agg: dict[tuple[int, str], dict[str, Any]] = {} for r in rows: acct = r["account_id"] if r["account_id"] is not None else DEFAULT_ACCOUNT_ID try: content = json.loads(r["payload_json"]) except (ValueError, TypeError): continue for item in content.get("prestatii") or []: if not isinstance(item, dict): continue if (item.get("cod_prestatie") or ""): continue op = (item.get("cod_op_service") or "").strip() if not op: continue key = (acct, op) entry = agg.setdefault( key, {"account_id": acct, "cod_op_service": op, "denumire": item.get("denumire"), "blocked": 0, "_ids": set()}, ) if not entry["denumire"] and item.get("denumire"): entry["denumire"] = item.get("denumire") entry["_ids"].add(r["id"]) # Indexeaza corpusul embeddings o data inainte de bucla (no-op cand flagul e off). ensure_embeddings_corpus(conn, nomenclator) out: list[dict] = [] for entry in agg.values(): entry["blocked"] = len(entry.pop("_ids")) entry["suggestions"] = suggest_codes(entry["denumire"], nomenclator, limit=5) # L14-S6: imbogatire sugestii cu GOLD partajat > SILVER > embeddings (Eng-F2). # SUGGESTION-ONLY: nu intra in resolve_prestatii/load_mapping (#13). enriched = enrich_suggestions(conn, entry["denumire"]) entry["sugestie_principala"] = enriched["sugestie_principala"] entry["surse_sugestie"] = enriched["surse"] out.append(entry) out.sort(key=lambda e: (-e["blocked"], e["cod_op_service"])) return out def save_mapping(conn, account_id: int | None, cod_op_service: str, cod_prestatie: str, auto_send: bool) -> None: """Upsert o mapare op->cod (UNIQUE pe account_id+cod_op_service).""" acct = account_or_default(account_id) op = (cod_op_service or "").strip() cod = (cod_prestatie or "").strip().upper() if not op or not cod: raise ValueError("cod_op_service si cod_prestatie sunt obligatorii") conn.execute( "INSERT INTO operations_mapping (account_id, cod_op_service, cod_prestatie, auto_send) " "VALUES (?, ?, ?, ?) " "ON CONFLICT(account_id, cod_op_service) DO UPDATE SET " "cod_prestatie=excluded.cod_prestatie, auto_send=excluded.auto_send", (acct, op, cod, 1 if auto_send else 0), ) def load_text_rules(conn, account_id: int | None) -> list[dict]: """Returneaza regulile text ale unui cont, ordonate priority ASC, id ASC. Fiecare element: {pattern, cod_prestatie, auto_send, priority}. Aplica account_or_default (None == 1). """ acct = account_or_default(account_id) rows = conn.execute( "SELECT pattern, cod_prestatie, auto_send, priority " "FROM operation_text_rules " "WHERE account_id=? " "ORDER BY priority ASC, id ASC", (acct,), ).fetchall() return [dict(r) for r in rows] def save_text_rule( conn, account_id: int | None, pattern: str, cod_prestatie: str, auto_send: bool, ) -> None: """Upsert o regula text pe (account_id, pattern). auto_send boolean -> 0/1. Daca regula exista deja (acelasi cont + pattern), actualizeaza cod_prestatie si auto_send. """ acct = account_or_default(account_id) pat = (pattern or "").strip() cod = (cod_prestatie or "").strip().upper() if not pat or not cod: raise ValueError("pattern si cod_prestatie sunt obligatorii") conn.execute( "INSERT INTO operation_text_rules (account_id, pattern, cod_prestatie, auto_send) " "VALUES (?, ?, ?, ?) " "ON CONFLICT(account_id, pattern) DO UPDATE SET " "cod_prestatie=excluded.cod_prestatie, auto_send=excluded.auto_send", (acct, pat, cod, 1 if auto_send else 0), ) def delete_text_rule(conn, account_id: int | None, pattern: str) -> None: """Sterge regula cu (account_id, pattern) daca exista.""" acct = account_or_default(account_id) pat = (pattern or "").strip() conn.execute( "DELETE FROM operation_text_rules WHERE account_id=? AND pattern=?", (acct, pat), ) # Prag minim de similaritate cosine pentru sugestia din embeddings NN. # Sub acest scor, sugestia NN e prea incerta si nu o afisam (previne recomandari # irelevante cand corpus-ul e mic sau neindexat corect). EMB_MIN_SIMILARITATE = 0.5 def _corpus_signature_silver(rows: list) -> str: """Semnatura stabila a corpusului SILVER (mapping_suggestions) pentru cache. Hash pe (denumire_normalizata, cod, is_nul) sortat -> se schimba la orice add/remove/redenumire/relabel, ramane stabila altfel (evita re-embed inutil). """ triples = sorted( (str(r["denumire_normalizata"] or ""), str(r["cod_prestatie"] or ""), int(r["is_nul"] or 0)) for r in rows ) blob = "".join(f"{d}|{c}|{n}" for d, c, n in triples) return hashlib.sha256(blob.encode("utf-8")).hexdigest() def ensure_embeddings_corpus(conn, nomenclator: list[dict] | None = None) -> None: """Construieste/actualizeaza corpusul embeddings din corpusul ETICHETAT (PRD 5.18 US-005). Sursa corpusului = `mapping_suggestions` (SILVER): exemple reale etichetate {denumire_normalizata -> cod, is_nul}, NU cele 18 categorii generice din `nomenclator_rar`. k-NN peste exemple reale e net mai precis (94.3% acord LLM). Parametrul `nomenclator` e pastrat pentru compatibilitatea apelantilor, dar nu mai e folosit ca sursa. Gated pe `AUTOPASS_EMBEDDINGS_ENABLED` (default ON; OFF in teste): cand e dezactivat, e un no-op total -> /mapari instant + suita de teste rapida. Cand e activat: indexeaza corpusul o singura data (lazy-load modelul ~230MB la prima chemare), re-indexeaza doar cand semnatura corpusului SILVER s-a schimbat. Itemii NUL (is_nul=1, cod NULL) raman in corpus: un vecin NUL e semnal de supresie (US-006). Degradare gratioasa: orice eroare lasa corpusul gol -> enrich cade pe restul. """ from .config import get_settings if not get_settings().embeddings_enabled: return try: from . import embeddings as _emb rows = conn.execute( "SELECT denumire_normalizata, cod_prestatie, is_nul FROM mapping_suggestions" ).fetchall() if not rows: return sig = _corpus_signature_silver(rows) if _emb.corpus_signature() == sig and _emb.has_corpus(): return # deja indexat pe acelasi corpus SILVER -> nimic de facut items = [ { "denumire": str(r["denumire_normalizata"]), "cod": (str(r["cod_prestatie"]) if r["cod_prestatie"] is not None else None), "is_nul": bool(r["is_nul"]), } for r in rows if r["denumire_normalizata"] ] _emb.index_corpus(items, signature=sig) except Exception: pass # degradare gratioasa (#16b): esecul indexarii nu blocheaza editorul def enrich_suggestions( conn, denumire: str | None, *, include_embeddings: bool = True, ) -> dict: """Imbogateste sugestiile cu GOLD partajat, SILVER LLM si embeddings NN. Precedenta Eng-F2 (pentru sugestie-only, nu auto-send): shared GOLD > SILVER > embeddings (Account GOLD = operations_mapping propriu = deja rezolvat inainte de needs_mapping; nu apare in needs_mapping, deci nu e in precedenta de sugestie.) Ordine completa (PRD 5.18 US-006): pre-filtru NUL determinist -> (daca NUL: fara cod, `surse['nul']=True`) altfel GOLD partajat > exact (SILVER) > k-NN embeddings. Returneaza: { 'sugestie_principala': {'cod_prestatie': str, 'sursa': str} | None, 'surse': {'gold_partajat': str|None, 'silver': str|None, 'embedding': str|None, 'nul': bool} } INVARIANTE: - Toate sursele = SUGGESTION-ONLY. NU intra in resolve_prestatii/load_mapping (#13). - Pre-filtru NUL (US-001) ruleaza PRIMUL: gunoiul evident (ITP/plata/discount...) e marcat non-operatie INAINTE de k-NN, fara sugestie de cod. - SILVER cu is_nul=1 (non-operatie/gunoi) NU produce sugestie (#4); vecin k-NN NUL idem. - Degradare gratioasa pe embeddings (#16b): daca motorul nu e disponibil sau arunca, returneaza sugestia disponibila din celelalte surse, fara exceptie. - Import local shared_store/embeddings: evita ciclu la import-time (shared_store importa normalize_for_match din mapping). """ sugestie_principala: dict | None = None surse: dict = {"gold_partajat": None, "silver": None, "embedding": None, "nul": False} if not denumire: return {"sugestie_principala": sugestie_principala, "surse": surse} # 0. Pre-filtru NUL determinist (US-001) INAINTE de orice k-NN/lookup: non-operatie # evidenta -> fara cod, scurtcircuit (nu interogheaza embeddings/SILVER pe gunoi). if prefiltru_nul(denumire): surse["nul"] = True return {"sugestie_principala": None, "surse": surse} # Colecteaza TOATE sursele (fara short-circuit) in `surse`: editorul le poate afisa # toate, independent de care castiga ca sugestie principala. # Precedenta Eng-F2 se aplica DOAR la alegerea sugestiei_principale. # 1. GOLD partajat cross-account (validat de om, cel mai de incredere) try: from .shared_store import lookup_shared_gold row_gold = lookup_shared_gold(conn, denumire) if row_gold and row_gold["cod_prestatie"]: surse["gold_partajat"] = str(row_gold["cod_prestatie"]) except Exception: pass # degradare gratioasa # 2. SILVER LLM (bootstrap, nevalidat de om; is_nul = supresie) try: from .shared_store import lookup_suggestion row_silver = lookup_suggestion(conn, denumire) if row_silver and not row_silver["is_nul"] and row_silver["cod_prestatie"]: surse["silver"] = str(row_silver["cod_prestatie"]) except Exception: pass # degradare gratioasa # 3. Embeddings NN (similaritate semantica, degradare gratioasa #16b) if include_embeddings: try: from . import embeddings as _emb # Poarta IEFTINA: nu atinge is_available()/suggest_nearest cand corpus-ul # e gol — `is_available()` lazy-load-eaza modelul de ~230MB (30-120s in # thread-ul de cerere). Corpusul se construieste de apelant prin # ensure_embeddings_corpus (gated pe AUTOPASS_EMBEDDINGS_ENABLED); cand # flagul e off, has_corpus() ramane False si calea e un no-op real. if _emb.has_corpus(): # F1 (US-005): corpusul k-NN e text NORMALIZAT (denumire_normalizata), # deci query-ul TREBUIE normalizat la fel — altfel cosine degradeaza si # nu mai e configul sub care s-a masurat 94.3%. nn = _emb.suggest_nearest(normalize_for_match(denumire), top_k=1) # Prag minim: similaritate prea mica = sugestie inutila. # Evita recomandari irelevante cand corpus-ul e mic/partial. if nn and nn[0].get("similaritate", 0) >= EMB_MIN_SIMILARITATE: if nn[0].get("is_nul"): # Vecin NUL (non-operatie) = semnal de SUPRESIE, nu cod (US-006). surse["nul"] = True elif nn[0].get("cod"): surse["embedding"] = str(nn[0]["cod"]) except Exception: pass # degradare gratioasa (#16b): motorul absent nu blocheaza # Alege sugestia principala in ordinea de precedenta: GOLD > SILVER > embeddings if surse["gold_partajat"]: sugestie_principala = {"cod_prestatie": surse["gold_partajat"], "sursa": "gold_partajat"} elif surse["silver"]: sugestie_principala = {"cod_prestatie": surse["silver"], "sursa": "silver"} elif surse["embedding"]: sugestie_principala = {"cod_prestatie": surse["embedding"], "sursa": "embedding"} return {"sugestie_principala": sugestie_principala, "surse": surse} def _emite_text_rule_hits(conn, account_id: int, submission_id: int, resolved: list[dict] | None) -> None: """Emite `text_rule_hit` in app_events pentru fiecare item rezolvat prin regula text. Telemetrie „ce regula a rezolvat ce submission". Best-effort (log_event inghite exceptiile). Context = {submission_id, account_id, pattern, cod_prestatie} — fara PII (pattern + cod nu sunt PII). Import local: evita orice risc de ciclu la import. """ hits = text_rule_hits(resolved) if not hits: return from .observ import log_event # import local: best-effort, fara ciclu la import-time for hit in hits: log_event( "text_rule_hit", account_id=account_id, cod=hit.get("cod_prestatie"), conn=conn, context={ "submission_id": submission_id, "account_id": account_id, "pattern": hit.get("pattern"), "cod_prestatie": hit.get("cod_prestatie"), }, ) def reresolve_account(conn, account_id: int | None, batch_id: int | None = None) -> dict[str, int]: """Re-rezolva submission-urile `needs_mapping` ale unui cont dupa o noua mapare. Pentru fiecare: aplica maparea curenta; daca nu mai raman op-uri nemapate -> ruleaza validarea de continut si trece pe `queued` (sau `needs_data` cu motiv), resetand backoff-ul. Daca raman nemapate, ramane `needs_mapping` cu motivul actualizat. Intoarce {requeued, still_blocked, needs_data, review_manual}. auto_send=0 pe un cod nou-mapat -> nu trece pe 'queued' (ramane 'needs_mapping' cu motiv "review manual"); previne FINALIZATA eronat permanent. batch_id != None -> scope la seria comitata (NU cross-batch). batch_id is None -> re-rezolva toti (canal API, batch_id IS NULL inclus). """ acct = account_or_default(account_id) mapping_meta = load_mapping_meta(conn, acct) mapping = {op: meta["cod_prestatie"] for op, meta in mapping_meta.items()} valid_codes = load_nomenclator_codes(conn) or None # Incarca regulile text O DATA, inainte de bucla pe randuri. text_rules = load_text_rules(conn, acct) if batch_id is not None: # Scope la batch-ul specificat (import commit explicit). # NU atinge randuri din alte batches sau din feed API. rows = conn.execute( "SELECT id, payload_json FROM submissions " "WHERE status='needs_mapping' AND account_id=? AND batch_id=?", (acct, batch_id), ).fetchall() else: # POST /v1/mapari (save manual): re-rezolva EXCLUSIV canalul API (batch_id IS NULL). # Salvarea unei mapari NU re-queues randuri din batches de import (cross-batch / # cross-feed). Batches de import sunt re-rezolvate doar la commit explicit. rows = conn.execute( "SELECT id, payload_json FROM submissions " "WHERE status='needs_mapping' AND account_id=? AND batch_id IS NULL", (acct,), ).fetchall() stats = {"requeued": 0, "still_blocked": 0, "needs_data": 0, "review_manual": 0} for r in rows: try: content = json.loads(r["payload_json"]) except (ValueError, TypeError): continue resolved, unmapped = resolve_prestatii(content.get("prestatii"), mapping, valid_codes, text_rules) content["prestatii"] = resolved payload_json = json.dumps(content, ensure_ascii=False) # Telemetrie pentru itemii rezolvati prin regula text. _emite_text_rule_hits(conn, acct, r["id"], resolved) if unmapped: conn.execute( "UPDATE submissions SET payload_json=?, rar_error=?, updated_at=datetime('now') WHERE id=?", (payload_json, json.dumps({"unmapped": unmapped}, ensure_ascii=False), r["id"]), ) stats["still_blocked"] += 1 continue # US-001 (PRD 5.11): ramura auto_send eliminata din reresolve. # Un cod rezolvat -> queued direct (review_manual ramane 0). errors = validate_prezentare(content) if errors: conn.execute( "UPDATE submissions SET status='needs_data', payload_json=?, rar_error=?, " "updated_at=datetime('now') WHERE id=?", (payload_json, json.dumps(errors, ensure_ascii=False), r["id"]), ) stats["needs_data"] += 1 else: conn.execute( "UPDATE submissions SET status='queued', payload_json=?, rar_error=NULL, " "retry_count=0, next_attempt_at=NULL, updated_at=datetime('now') WHERE id=?", (payload_json, r["id"]), ) stats["requeued"] += 1 return stats