""" Validare empirica Premisa 1 — "90%+ din traficul viitor sunt repetari ale acelorasi denumiri". LIMITARE CRITICA (documentata explicit): CSV-urile din docs/operatii-service/ contin frecvente AGREGATE (DENOP + NR), fara coloana de data/timestamp. Validarea temporala stricta (corpus = lunile 1-N, test = lunile N+) NU este posibila cu datele curente. PROXY FOLOSIT (onest, nu pretinde ca = validare temporala): 1. COVERAGE PROXY (Zipf): hit_rate_at_K = sum(NR pt top-K denumiri dupa frecventa) / total_NR Masoara: daca etichetam top-K denumiri si traficul viitor urmeaza aceeasi distributie Zipf (ipoteza stationaritate), ce % din trafic va fi acoperit. NU masoara drift vocabular in timp. 2. LEAVE-FIRST-OUT PROXY: leave_one_out_hit_rate = (total_volume - total_distinct) / total_volume Masoara: daca corpus = "toate denumirile vazute cel putin o data", ce % din aparitii sunt "repetari" (aparitia 2,3,...n a fiecarei denumiri)? Singletonii (NR=1) contribuie 0 hit-uri (prima aparitie = miss inevitable). Aceasta e limita superioara a hit-rate-ului sub stationaritate. VERDICT Premisa 1 (bazat pe coverage proxy): SUSTINUTA — <= 10% din denumirile distincte acopera >= 90% din volum SLABA — intre 10% si 30% din distincte necesare pentru >= 90% volum NEVALIDABILA — > 30% din distincte necesare (distributie Zipf slaba/plata) Refoloseste normalize_for_match din app/mapping.py pentru cheia de potrivire. """ from __future__ import annotations import csv import os import sys # Calea la root-ul proiectului (doua nivele deasupra tools/mapare-llm/) _HERE = os.path.dirname(os.path.abspath(__file__)) _ROOT = os.path.abspath(os.path.join(_HERE, '..', '..')) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) from app.mapping import normalize_for_match # Re-expunem normalize_for_match sub un alias mai scurt pentru uz intern + teste. def normalize_key(text: object) -> str: """Alias pentru normalize_for_match din app/mapping.py. Upper + fara diacritice + spatii colapsate. Exemplu: 'Reparație motor' -> 'REPARATIE MOTOR'. """ return normalize_for_match(text) # --------------------------------------------------------------------------- # I/O # --------------------------------------------------------------------------- def load_csv(path: str) -> list[tuple[str, int]]: """Incarca CSV cu coloanele DENOP (denumire) + NR (frecventa). Returneaza lista de (denumire_originala, nr_total) dupa agregare pe cheia normalize_key (unifica variante ortografice: diacritice, majuscule). Randurile cu DENOP gol sau NR non-pozitiv sunt ignorate. """ agg: dict[str, list] = {} # normalized_key -> [first_seen_denumire, total_nr] with open(path, encoding='utf-8-sig') as f: reader = csv.DictReader(f, delimiter=';') for row in reader: denop = (row.get('DENOP') or '').strip().strip('"') nr_raw = (row.get('NR') or '').strip().strip('"') if not denop or not nr_raw: continue try: nr = int(nr_raw) except ValueError: continue if nr <= 0: continue key = normalize_key(denop) if key not in agg: agg[key] = [denop, 0] agg[key][1] += nr return [(v[0], v[1]) for v in agg.values()] # --------------------------------------------------------------------------- # Functii pure (testabile fara I/O) # --------------------------------------------------------------------------- def compute_volume_coverage(rows: list[tuple[str, int]]) -> list[dict]: """Sorteaza dupa NR descrescator si calculeaza acoperirea cumulativa de volum. Returneaza: [{denumire, nr, cumulative_volume_frac, cumulative_count}, ...] unde cumulative_volume_frac e fractia din total_NR acoperita de primele `cumulative_count` denumiri (dupa sortare descrescatoare). """ sorted_rows = sorted(rows, key=lambda x: -x[1]) total_volume = sum(nr for _, nr in sorted_rows) if total_volume == 0: return [] cumul = 0 result = [] for i, (denumire, nr) in enumerate(sorted_rows, 1): cumul += nr result.append({ 'denumire': denumire, 'nr': nr, 'cumulative_volume_frac': cumul / total_volume, 'cumulative_count': i, }) return result def corpus_size_for_threshold(rows: list[tuple[str, int]], threshold: float = 0.90) -> int: """Numarul minim de etichete (top-frecventa) pentru >= threshold acoperire de volum. Sorteaza descrescator si numara cate denumiri sunt necesare pana la prag. Returneaza len(rows) daca pragul nu e atins (distributie prea plata). """ coverage = compute_volume_coverage(rows) for entry in coverage: if entry['cumulative_volume_frac'] >= threshold: return entry['cumulative_count'] return len(rows) def compute_hit_rate_at_k(rows: list[tuple[str, int]], k: int) -> float: """Fractia de volum total acoperita de top-K denumiri (coverage proxy). Interpretare: daca etichetam cele mai frecvente K denumiri, si traficul viitor urmeaza aceeasi distributie, hit_rate_at_K = probabilitatea ca o tranzactie viitoare sa fie acoperita de corpus. """ if not rows: return 0.0 sorted_rows = sorted(rows, key=lambda x: -x[1]) total_volume = sum(nr for _, nr in sorted_rows) if total_volume == 0: return 0.0 top_k_volume = sum(nr for _, nr in sorted_rows[:k]) return top_k_volume / total_volume def leave_one_out_hit_rate(rows: list[tuple[str, int]]) -> float: """Proxy leave-first-out: (total_volume - total_distinct) / total_volume. Interpretare: daca corpus = toate denumirile vazute cel putin o data, fractia de aparitii care sunt "repetari" (nu prima aparitie) = hit-uri. Singletonii (NR=1) contribuie 0 hit-uri (prima si unica aparitie = miss). Aceasta e LIMITA SUPERIOARA a hit-rate-ului real sub ipoteza de stationaritate. NU e validare temporala (nu masoara cand apar denumirile noi in timp). """ if not rows: return 0.0 total_volume = sum(nr for _, nr in rows) total_distinct = len(rows) if total_volume == 0: return 0.0 return (total_volume - total_distinct) / total_volume def singleton_stats(rows: list[tuple[str, int]]) -> dict: """Statistici pentru denumirile cu NR=1 (vazute o singura data). Singletonii sunt importanti: ei sunt INTOTDEAUNA miss-uri la prima aparitie si, daca nu mai apar, raman miss-uri permanent. """ singletons = [(d, n) for d, n in rows if n == 1] total_distinct = len(rows) total_volume = sum(nr for _, nr in rows) singleton_volume = len(singletons) # fiecare singleton contribuie NR=1 return { 'singleton_count': len(singletons), 'total_distinct': total_distinct, 'singleton_volume_frac': singleton_volume / total_volume if total_volume else 0.0, 'singleton_distinct_frac': len(singletons) / total_distinct if total_distinct else 0.0, } def run_holdout(rows: list[tuple[str, int]], client_name: str = 'unknown') -> dict: """Analiza holdout proxy completa pentru un set de (denumire, nr). Combina coverage proxy (Zipf) si leave-first-out proxy. Returneaza un dict cu statistici si verdict privind Premisa 1. """ total_distinct = len(rows) total_volume = sum(nr for _, nr in rows) coverage_at_100 = compute_hit_rate_at_k(rows, k=100) coverage_at_500 = compute_hit_rate_at_k(rows, k=500) coverage_at_1000 = compute_hit_rate_at_k(rows, k=1000) labels_for_90pct = corpus_size_for_threshold(rows, threshold=0.90) frac_for_90pct = labels_for_90pct / total_distinct if total_distinct else 1.0 loh = leave_one_out_hit_rate(rows) s = singleton_stats(rows) # Verdict bazat pe coverage proxy (Zipf): ce procent din distincte necesare pt 90% vol if frac_for_90pct <= 0.10: verdict = 'SUSTINUTA' elif frac_for_90pct <= 0.30: verdict = 'SLABA' else: verdict = 'NEVALIDABILA' return { 'client': client_name, 'total_distinct': total_distinct, 'total_volume': total_volume, 'coverage_at_100': round(coverage_at_100 * 100, 2), 'coverage_at_500': round(coverage_at_500 * 100, 2), 'coverage_at_1000': round(coverage_at_1000 * 100, 2), 'labels_for_90pct': labels_for_90pct, 'frac_for_90pct': round(frac_for_90pct * 100, 2), 'leave_one_out_hit_rate': round(loh * 100, 2), 'singleton_count': s['singleton_count'], 'singleton_distinct_frac': round(s['singleton_distinct_frac'] * 100, 2), 'singleton_volume_frac': round(s['singleton_volume_frac'] * 100, 2), 'verdict': verdict, 'nota': ( 'PROXY FRECVENTA (fara timestamp temporal): validare temporala stricta ' 'imposibila cu datele curente. hit_rate_at_K = % volum acoperit de top-K ' 'etichete; valida NUMAI sub ipoteza distributie stabila in timp.' ), } # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _format_row(label: str, value: str, width: int = 45) -> str: return f" {label:<{width}}{value}" def main() -> None: """Ruleaza holdout pe toate CSV-urile din docs/operatii-service/.""" root = os.path.join(_ROOT, 'docs', 'operatii-service') clients = ['clever', 'sigma', 'automotive', 'south'] sep = "=" * 72 print(sep) print("HOLDOUT PREMISA 1 — PROXY FRECVENTA (fara date temporale)") print(sep) print("LIMITARE: CSV-urile contin frecvente AGREGATE (DENOP + NR), fara") print("coloana de data/timestamp. Validarea temporala stricta NU e posibila.") print() print("PROXY 1 (Coverage Zipf): hit_rate_at_K = % volum acoperit de top-K") print(" -> valida sub ipoteza distributie stabila (nemasurabila cu date curente)") print("PROXY 2 (Leave-first-out): (total_vol - total_distinct) / total_vol") print(" -> limita superioara a hit-rate-ului daca am eticheta tot ce vedem odata") print(sep) print() all_rows_combined: list[tuple[str, int]] = [] results = [] for client in clients: path = os.path.join(root, f'operatii-service-{client}.csv') rows = load_csv(path) all_rows_combined.extend(rows) r = run_holdout(rows, client_name=client) results.append(r) print(f"CLIENT: {client.upper()}") print(_format_row("Denumiri distincte:", f"{r['total_distinct']:,}")) print(_format_row("Volum total operatii:", f"{r['total_volume']:,}")) print(_format_row("Coverage top-100:", f"{r['coverage_at_100']:.1f}%")) print(_format_row("Coverage top-500:", f"{r['coverage_at_500']:.1f}%")) print(_format_row("Coverage top-1000:", f"{r['coverage_at_1000']:.1f}%")) print(_format_row( "Etichete pt 90% vol:", f"{r['labels_for_90pct']} ({r['frac_for_90pct']:.1f}% din distinct)" )) print(_format_row( "Leave-first-out hit-rate:", f"{r['leave_one_out_hit_rate']:.1f}%" )) print(_format_row( "Singletons (NR=1):", f"{r['singleton_count']} ({r['singleton_distinct_frac']:.1f}% din distinct," f" {r['singleton_volume_frac']:.1f}% din vol)" )) print(f" VERDICT PREMISA 1: {r['verdict']}") print() # Agregat: re-agreg pe cheia normalized (pentru ca clientii pot avea aceleasi denumiri) agg_dict: dict[str, list] = {} for client in clients: path = os.path.join(root, f'operatii-service-{client}.csv') rows_c = load_csv(path) for (d, n) in rows_c: k = normalize_key(d) if k not in agg_dict: agg_dict[k] = [d, 0] agg_dict[k][1] += n all_rows_agg = [(v[0], v[1]) for v in agg_dict.values()] agg = run_holdout(all_rows_agg, client_name='AGREGAT_4_CLIENTI') print(f"CLIENT: AGREGAT (4 clienti, distinct cross-client)") print(_format_row("Denumiri distincte:", f"{agg['total_distinct']:,}")) print(_format_row("Volum total operatii:", f"{agg['total_volume']:,}")) print(_format_row("Coverage top-100:", f"{agg['coverage_at_100']:.1f}%")) print(_format_row("Coverage top-500:", f"{agg['coverage_at_500']:.1f}%")) print(_format_row("Coverage top-1000:", f"{agg['coverage_at_1000']:.1f}%")) print(_format_row( "Etichete pt 90% vol:", f"{agg['labels_for_90pct']} ({agg['frac_for_90pct']:.1f}% din distinct)" )) print(_format_row("Leave-first-out hit-rate:", f"{agg['leave_one_out_hit_rate']:.1f}%")) print(_format_row( "Singletons (NR=1):", f"{agg['singleton_count']} ({agg['singleton_distinct_frac']:.1f}% din distinct," f" {agg['singleton_volume_frac']:.1f}% din vol)" )) print(f" VERDICT PREMISA 1: {agg['verdict']}") print() print(sep) print("CONCLUZIE PREMISA 1:") verdicts = [r['verdict'] for r in results] if all(v == 'SUSTINUTA' for v in verdicts): print(" SUSTINUTA la toti clientii individual.") elif any(v == 'SUSTINUTA' for v in verdicts): print(" PARTIALA: sustinuta la unii clienti, slaba/nevalidabila la altii.") else: print(" SLABA sau NEVALIDABILA la toti clientii.") print(f" Agregat: {agg['verdict']}") print() print("NOTA METODOLOGICA:") print(" Concluzia e valida NUMAI sub ipoteza ca distributia de frecvente e stabila") print(" in timp (vocabularul service-ului nu se schimba semnificativ de la luna la luna).") print(" Pentru validare temporala stricta, sunt necesare date cu coloana de data.") print(sep) if __name__ == '__main__': main()