"""Modul embedding in-proces pentru sugestie cod RAR -- L14-S4. Design (PRD 5.14, Decision #16/#16b): - Model multilingv via fastembed/ONNX (~230MB pe disc, quantizat, fara torch) - Lazy load la prima folosire, NU la import si NU pe /healthz - Worker NU incarca modelul (API-only) - Degradare gratioasa: daca modelul nu se incarca -> is_available()=False, suggest_nearest() -> [] fara exceptie, ingestia NU e blocata - Embeddings = DOAR sugestie (nu intra in lantul de enqueue/resolve_prestatii) - NU apelat din resolve_prestatii/load_mapping (wiring vine in L14-S6 DUPA 5.15) API public (nivel modul): index_corpus(items) -> None suggest_nearest(text, top_k) -> [{cod, similaritate}] is_available() -> bool Clase (pentru teste / injectare backend): EmbeddingEngine(backend) -- motor testabil cu backend injectabil FastEmbedBackend() -- backend real fastembed/ONNX """ from __future__ import annotations import logging import math from typing import Protocol, runtime_checkable log = logging.getLogger(__name__) # Modelul ales: paraphrase-multilingual-MiniLM-L12-v2 # ~230MB pe disc (ONNX quantizat), 384 dim, multilingv (ro/en/etc.), suportat de # fastembed, fara torch. (Estimarea initiala din PRD de ~50MB a fost gresita.) FASTEMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" # --------------------------------------------------------------------------- # # Protocol backend (mockabil in teste) # # --------------------------------------------------------------------------- # @runtime_checkable class EmbeddingBackend(Protocol): """Interfata minimala pentru un backend de embedding.""" def embed(self, texts: list[str]) -> list[list[float]]: """Intoarce o lista de vectori (cate unul per text).""" ... # --------------------------------------------------------------------------- # # Backend real: fastembed/ONNX # # --------------------------------------------------------------------------- # class FastEmbedBackend: """Backend fastembed/ONNX. Lazy-load la constructie. Arunca ImportError daca fastembed nu e instalat, sau orice exceptie de la TextEmbedding (download esuat, ONNX incompatibil etc.). Apelantul (_load_engine) prinde aceste exceptii. """ def __init__(self, model_name: str = FASTEMBED_MODEL): from fastembed import TextEmbedding # import tardiv -- nu blocheaza la import modul self._model = TextEmbedding(model_name=model_name) def embed(self, texts: list[str]) -> list[list[float]]: # fastembed.embed() intoarce un generator de numpy arrays return [vec.tolist() for vec in self._model.embed(texts)] # --------------------------------------------------------------------------- # # Motor de embedding (testabil, backend injectabil) # # --------------------------------------------------------------------------- # def _cosine_similarity(a: list[float], b: list[float]) -> float: """Similaritate cosine intre doi vectori. Returneaza 0.0 pe vectori nuli.""" dot = sum(x * y for x, y in zip(a, b)) na = math.sqrt(sum(x * x for x in a)) nb = math.sqrt(sum(x * x for x in b)) if na == 0.0 or nb == 0.0: return 0.0 return dot / (na * nb) class EmbeddingEngine: """Motor de embedding cu corpus indexat si cautare NN cosine. Parametri: backend: instanta EmbeddingBackend (real sau mock). None => degradare gratioasa (is_available=False). """ def __init__(self, backend: EmbeddingBackend | None = None): self._backend = backend self._corpus_vecs: list[list[float]] = [] self._corpus_items: list[dict] = [] self._corpus_sig: str | None = None def is_available(self) -> bool: """True daca backend-ul e disponibil si gata de folosire.""" return self._backend is not None def has_corpus(self) -> bool: """True daca un corpus a fost indexat (suggest_nearest poate produce ceva).""" return bool(self._corpus_items) def corpus_signature(self) -> str | None: """Semnatura corpusului indexat (None daca gol). Apelantul re-indexeaza doar cand semnatura nomenclatorului s-a schimbat (evita re-embed inutil).""" return self._corpus_sig def index_corpus(self, items: list[dict], signature: str | None = None) -> None: """Vectorizeaza corpus [{denumire, cod}] si il pastreaza in memorie. Ignora silentios daca backend-ul lipseste, corpus-ul e gol sau apare orice exceptie la vectorizare (degradare gratioasa). """ self._corpus_vecs = [] self._corpus_items = [] self._corpus_sig = None if not items or not self.is_available(): return try: texts = [str(item["denumire"]) for item in items] vecs = self._backend.embed(texts) self._corpus_vecs = vecs self._corpus_items = list(items) self._corpus_sig = signature except Exception as exc: log.warning("embeddings: index_corpus esuat: %s", exc) # corpus ramane gol -- suggest_nearest va returna [] def suggest_nearest( self, denumire: str, top_k: int = 3, ) -> list[dict]: """Returneaza top_k vecini cosine [{cod, similaritate}]. Returneaza [] daca backend-ul lipseste, corpus-ul e gol sau apare orice exceptie (degradare gratioasa -- nu blocheaza ingestia). """ if not self.is_available() or not self._corpus_items: return [] try: query_vecs = self._backend.embed([str(denumire)]) query_vec = query_vecs[0] scored = [ { "cod": item["cod"], "similaritate": _cosine_similarity(query_vec, vec), } for item, vec in zip(self._corpus_items, self._corpus_vecs) ] scored.sort(key=lambda r: r["similaritate"], reverse=True) return scored[:top_k] except Exception as exc: log.warning("embeddings: suggest_nearest esuat: %s", exc) return [] # --------------------------------------------------------------------------- # # Singleton global cu lazy load (API-only, NU worker) # # --------------------------------------------------------------------------- # _engine: EmbeddingEngine | None = None def _load_engine() -> EmbeddingEngine: """Lazy load: construieste engine-ul la prima folosire. Captureaza ORICE exceptie la incarcare (import, download, ONNX init) si returneaza un engine degradat (backend=None) -- ingestia continua pe exact+fuzzy, embedding = sugestie dezactivata. """ try: backend = FastEmbedBackend() log.info("embeddings: backend fastembed incarcat (%s)", FASTEMBED_MODEL) return EmbeddingEngine(backend=backend) except ImportError: log.warning( "embeddings: fastembed nu e instalat -- sugestii NN dezactivate" ) except Exception as exc: log.warning( "embeddings: incarcare backend esuata (%s) -- sugestii NN dezactivate", exc, ) return EmbeddingEngine(backend=None) def _get_engine() -> EmbeddingEngine: """Returneaza engine-ul global (lazy-init).""" global _engine if _engine is None: _engine = _load_engine() return _engine # --------------------------------------------------------------------------- # # API public la nivel de modul (wiring L14-S6) # # --------------------------------------------------------------------------- # def is_available() -> bool: """True daca modelul e incarcat si gata de folosire.""" return _get_engine().is_available() def has_corpus() -> bool: """True daca un corpus a fost indexat in motorul global. NU forteaza incarcarea modelului: daca engine-ul nu a fost initializat inca (`_engine is None`), corpus-ul e gol prin definitie -> False, fara cost. Apelantii (ex. enrich_suggestions) folosesc asta ca poarta ieftina inainte de a atinge calea scumpa (is_available/suggest_nearest, care lazy-load ~230MB). """ if _engine is None: return False return _engine.has_corpus() def corpus_signature() -> str | None: """Semnatura corpusului global indexat (None daca engine ne-initializat/gol). NU forteaza incarcarea modelului: `_engine is None` -> None fara cost. """ if _engine is None: return None return _engine.corpus_signature() def index_corpus(items: list[dict], signature: str | None = None) -> None: """Vectorizeaza corpus [{denumire, cod}] in motorul global. Silentios pe eroare (degradare gratioasa). """ _get_engine().index_corpus(items, signature=signature) def suggest_nearest(denumire: str, top_k: int = 3) -> list[dict]: """Returneaza top_k sugestii [{cod, similaritate}] sau [] la eroare. Sigur de apelat indiferent de starea backend-ului. """ return _get_engine().suggest_nearest(denumire, top_k=top_k)