"""Parser fisiere xlsx/csv pentru import prezentari (Treapta 2, U1). Arhitectura 2-treceri (Issue 2, consens cross-model): Trecerea 1 — read_only=True: dim-check (FileTooLarge) + detectie multi-sheet. Trecerea 2 — normal-mode: header + merged cells + body. Aceasta separare e necesara deoarece openpyxl read_only=True nu vede celule imbinate. Modulul este PUR in sensul ca nu face I/O DB, nu trimite nimic la RAR si nu are efecte laterale — intoarce structuri Python testabile direct. Stari per-rand (resolved_status): ok — date complete, gata de trimis dupa mapare + validare needs_review — coercion suspectat (VIN numeric, odometru float) sau data ambigua needs_data — camp obligatoriu lipsa (dupa coercion) (needs_mapping, already_sent, duplicate_in_file — calculate in preview, nu aici) """ from __future__ import annotations import csv import io from datetime import date, datetime from typing import Any, NamedTuple # --------------------------------------------------------------------------- # # Constante # # --------------------------------------------------------------------------- # MAX_ROWS = 5_000 MAX_BYTES = 5 * 1024 * 1024 # 5 MB # Prag rata None pe o coloana obligatorie -> mesaj formule necalculate (Issue 3) FORMULA_NONE_RATE = 0.6 # Coloane cheie pentru detectia footer-ului (trim structural) KEY_COLS = {"vin", "data_prestatie"} # Delimitatori incercati la sniff CSV (ordinea conteaza: ; primul, export RO) CSV_DELIMITERS = [";", ",", "\t"] # Encodinguri incercate in ordine (BOM-aware + RO) CSV_ENCODINGS = ["utf-8-sig", "utf-8", "cp1250", "latin2"] # --------------------------------------------------------------------------- # # Exceptii custom # # --------------------------------------------------------------------------- # class FileTooLarge(Exception): """Fisier depaseste limita de randuri sau dimensiune.""" def __init__(self, *, rows: int | None = None, bytes_: int | None = None): self.rows = rows self.bytes_ = bytes_ parts = [] if rows is not None: parts.append(f"{rows} randuri (max {MAX_ROWS})") if bytes_ is not None: parts.append(f"{bytes_ // 1024} KB (max {MAX_BYTES // 1024} KB)") super().__init__(f"Fisier prea mare: {', '.join(parts)}") class HeaderError(Exception): """Header lipsa, duplicat sau un singur camp detectat.""" def __init__(self, message: str, found: list[str] | None = None): self.found = found or [] super().__init__(message) class MultipleSheets(Exception): """Workbook cu mai mult de un sheet non-gol — utilizatorul trebuie sa aleaga.""" def __init__(self, sheet_names: list[str]): self.sheet_names = sheet_names super().__init__(f"Mai multe sheet-uri non-goale: {sheet_names}. Alege sheet-ul de importat.") # --------------------------------------------------------------------------- # # Structura interna de rezultat # # --------------------------------------------------------------------------- # class ParsedFile(NamedTuple): """Rezultatul parsarii unui fisier.""" columns: list[str] # Numele coloanelor detectate (din header) rows: list[dict[str, Any]] # Fiecare rand: {coloana: valoare_bruta} coercion_flags: dict[int, list[str]] # {row_index: [motive needs_review]} formula_columns: list[str] # Coloane cu rata None ridicata (Issue 3) date_col_format: dict[str, str] # {coloana: "DD.MM.YYYY" | "YYYY-MM-DD" | "native" | "ambiguous"} # --------------------------------------------------------------------------- # # XLSX — trecerea 1: dim-check (read_only) # # --------------------------------------------------------------------------- # def _xlsx_dimcheck(data: bytes) -> list[str]: """Trecerea 1 read_only: verifica dimensiunile si intoarce lista de sheet-uri non-goale. Ridica FileTooLarge daca depaseste limita. Ridica MultipleSheets daca sunt >1 sheet-uri non-goale. Intoarce lista (cu un singur element daca totul e ok). """ import openpyxl if len(data) > MAX_BYTES: raise FileTooLarge(bytes_=len(data)) wb = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True) try: non_empty: list[str] = [] for name in wb.sheetnames: ws = wb[name] # In read_only, max_row poate fi None daca sheet-ul e gol max_row = ws.max_row or 0 if max_row > 0: non_empty.append(name) if max_row > MAX_ROWS: raise FileTooLarge(rows=max_row) finally: wb.close() if len(non_empty) > 1: raise MultipleSheets(non_empty) return non_empty # 0 sau 1 element # --------------------------------------------------------------------------- # # XLSX — trecerea 2: header + merged + body (normal-mode) # # --------------------------------------------------------------------------- # def _unmerge_header(ws) -> dict[int, str]: """Rezolva celulele imbinate din primul rand non-gol. Intoarce {col_index_1based: valoare_str}. Merge range-urile din header propaga valoarea topleft la toate coloanele din grup. """ # Gaseste primul rand non-gol header_row = None for row in ws.iter_rows(max_row=20): vals = [c.value for c in row if c.value is not None] if vals: header_row = row[0].row break if header_row is None: return {} # Mapa col_index -> valoare din celule normale col_vals: dict[int, str] = {} for cell in ws[header_row]: if cell.value is not None: col_vals[cell.column] = str(cell.value).strip() # Propaga valoarea topleft pentru merge range-uri din randul header for merged_range in ws.merged_cells.ranges: if merged_range.min_row <= header_row <= merged_range.max_row: # Valoarea e in celula topleft topleft = ws.cell(row=merged_range.min_row, column=merged_range.min_col) val = str(topleft.value or "").strip() for col in range(merged_range.min_col, merged_range.max_col + 1): col_vals[col] = val return col_vals def _deduplicate_columns(names: list[str]) -> list[str]: """Adauga sufix _2/_3 la coloane cu acelasi nume (din merged cells).""" seen: dict[str, int] = {} result = [] for n in names: if n not in seen: seen[n] = 1 result.append(n) else: seen[n] += 1 result.append(f"{n}_{seen[n]}") return result def _xlsx_parse_sheet(ws, sheet_name: str) -> ParsedFile: """Parseaza un sheet in normal-mode (trecerea 2). Presupune ca dim-check a trecut deja (FileTooLarge nu se verifica din nou). """ # Header cu merged cells col_map = _unmerge_header(ws) if not col_map: raise HeaderError(f"Sheet '{sheet_name}': niciun header detectat.", found=[]) # Ordoneaza coloanele dupa index sorted_cols = sorted(col_map.items()) # [(col_idx, name), ...] col_indices = [idx for idx, _ in sorted_cols] col_names = [name for _, name in sorted_cols] # Dezambiguizeaza duplicate (provin din merge care se propaga la mai multe coloane) col_names = _deduplicate_columns(col_names) if len(col_names) < 2: raise HeaderError(f"Doar {len(col_names)} coloana detectata — verifica fisierul.", found=col_names) # Gaseste randul header ca sa sarim peste el header_row_num = ws.cell(row=1, column=col_indices[0]).row # Re-detect: prima celula din col_map # Obtinem randul headerului din prima celula valida for row in ws.iter_rows(max_row=20): for c in row: if c.column in col_map and c.value is not None: header_row_num = c.row break else: continue break # Citeste randurile de date raw_rows: list[dict[str, Any]] = [] # Colecteaza valorile pe coloane pentru detectia datei si a formulelor col_values: dict[str, list[Any]] = {name: [] for name in col_names} for row in ws.iter_rows(min_row=header_row_num + 1): row_dict: dict[str, Any] = {} for col_idx, col_name in zip(col_indices, col_names): # Cauta celula cu col_idx in rand (unele randuri pot fi mai scurte) found_cell = None for c in row: if c.column == col_idx: found_cell = c break val = found_cell.value if found_cell is not None else None row_dict[col_name] = val col_values[col_name].append(val) raw_rows.append(row_dict) # Trim footer: elimina randuri trailing unde coloanele cheie sunt goale raw_rows = _trim_footer(raw_rows, col_names) # Detectie coloane cu formule (rata None, Issue 3) formula_columns = _detect_formula_columns(col_values, len(raw_rows)) # Detectie format data la nivel de coloana (T10/OV-8) date_col_format = _detect_date_formats(col_values, col_names) # Coercion + flags needs_review (T3) coercion_flags: dict[int, list[str]] = {} processed_rows: list[dict[str, Any]] = [] for i, row_dict in enumerate(raw_rows): processed, flags = _coerce_row(row_dict, col_names) processed_rows.append(processed) if flags: coercion_flags[i] = flags return ParsedFile( columns=col_names, rows=processed_rows, coercion_flags=coercion_flags, formula_columns=formula_columns, date_col_format=date_col_format, ) # --------------------------------------------------------------------------- # # Trim footer structural # # --------------------------------------------------------------------------- # def _is_key_empty(row_dict: dict[str, Any], col_names: list[str]) -> bool: """Randul e structural gol daca coloanele cheie (VIN + data) sunt ambele None/gol.""" # Detecta coloanele cheie prin nume normalized from .mapping import normalize_for_match norm_names = {normalize_for_match(n): n for n in col_names} vin_col = None date_col_key = None for norm, orig in norm_names.items(): if "VIN" in norm or "SERIE" in norm or "SASIU" in norm: vin_col = orig if "DATA" in norm or "DATE" in norm or "PRESTATIE" in norm: date_col_key = orig def _empty(v: Any) -> bool: return v is None or str(v).strip() == "" vin_empty = _empty(row_dict.get(vin_col)) if vin_col else True date_empty = _empty(row_dict.get(date_col_key)) if date_col_key else True return vin_empty and date_empty def _trim_footer(rows: list[dict[str, Any]], col_names: list[str]) -> list[dict[str, Any]]: """Elimina randuri trailing unde VIN + data sunt goale (footer TOTAL/Intocmit de:).""" i = len(rows) - 1 while i >= 0 and _is_key_empty(rows[i], col_names): i -= 1 return rows[: i + 1] # --------------------------------------------------------------------------- # # Detectie coloane formule (Issue 3) # # --------------------------------------------------------------------------- # def _detect_formula_columns(col_values: dict[str, list[Any]], n_rows: int) -> list[str]: """Coloane unde rata de None depaseste pragul -> probabil formule necalculate.""" if n_rows == 0: return [] result = [] for col_name, vals in col_values.items(): none_count = sum(1 for v in vals if v is None) rate = none_count / n_rows if rate >= FORMULA_NONE_RATE: result.append(col_name) return result # --------------------------------------------------------------------------- # # Dezambiguizare data la nivel de coloana (T10 / OV-8) # # --------------------------------------------------------------------------- # def _detect_date_formats(col_values: dict[str, list[Any]], col_names: list[str]) -> dict[str, str]: """Detecteaza formatul datei pentru fiecare coloana de tip data. Rezultate posibile per coloana: "native" — toate valorile non-None sunt datetime nativ openpyxl (neambigue) "DD.MM.YYYY" — coloana e DD-first (cel putin un rand are token[1] > 12) "YYYY-MM-DD" — format ISO "ambiguous" — string, toti zi <= 12 (si DD si MM ar fi valide) "mixed" — amestec datetime nativ + string (Nu e inclusa daca coloana nu pare a fi de tip data) """ from .mapping import normalize_for_match result: dict[str, str] = {} for col_name in col_names: norm = normalize_for_match(col_name) # Filtra coloanele de data dupa nume if not any(kw in norm for kw in ("DATA", "DATE", "PRESTATIE", "ZI", "AN")): continue vals = [v for v in col_values.get(col_name, []) if v is not None] if not vals: continue native_count = sum(1 for v in vals if isinstance(v, (datetime, date))) str_vals = [str(v).strip() for v in vals if not isinstance(v, (datetime, date))] if native_count == len(vals): result[col_name] = "native" continue if native_count > 0 and str_vals: result[col_name] = "mixed" continue # Toate string — detectie format la nivel de coloana (OV-8) fmt = _infer_date_format_from_column(str_vals) result[col_name] = fmt return result def _infer_date_format_from_column(str_vals: list[str]) -> str: """Detecteaza formatul datei dintr-o lista de valori string. Logica OV-8: daca ORICARE rand are token pozitia-1 > 12 -> coloana e DD-first. Daca toti zi <= 12 -> ambiguu. """ dd_first_evidence = False iso_evidence = False parseable = 0 for s in str_vals: if not s: continue # Incearca ISO (YYYY-MM-DD sau YYYY/MM/DD) if _looks_iso(s): iso_evidence = True parseable += 1 continue # Incearca DD.MM.YYYY sau DD/MM/YYYY sau DD-MM-YYYY parts = _split_date(s) if parts and len(parts) == 3: try: day_candidate = int(parts[0]) month_candidate = int(parts[1]) if day_candidate > 12: dd_first_evidence = True # Daca month_candidate > 12 -> cu siguranta DD.MM (luna e la pozitia 1) if month_candidate > 12: dd_first_evidence = True parseable += 1 except ValueError: pass if not parseable: return "ambiguous" if iso_evidence and not dd_first_evidence: return "YYYY-MM-DD" if dd_first_evidence: return "DD.MM.YYYY" # Toti zi <= 12: nu putem distinge DD.MM de MM.DD return "ambiguous" def _looks_iso(s: str) -> bool: """Verifica rapid daca string-ul arata ca YYYY-MM-DD.""" parts = s.replace("/", "-").split("-") if len(parts) == 3: try: y = int(parts[0]) return y > 1900 except ValueError: pass return False def _split_date(s: str) -> list[str] | None: """Imparte un string data dupa separatorul comun (., /, -).""" for sep in (".", "/", "-"): parts = s.split(sep) if len(parts) == 3: return parts return None # --------------------------------------------------------------------------- # # Coercion per rand (T3) # # --------------------------------------------------------------------------- # def _coerce_row(row_dict: dict[str, Any], col_names: list[str]) -> tuple[dict[str, Any], list[str]]: """Coerce valorile unui rand si colecteaza flags needs_review. Reguli: - VIN citit ca int/float (openpyxl: "0123..." -> 123.0) -> str + flag needs_review - Odometru float cu .0 -> tunde ".0" (via canonicalize_row logic) - Datetime nativ -> convertit la YYYY-MM-DD string - Valori goale/None raman None """ from .mapping import normalize_for_match flags: list[str] = [] out: dict[str, Any] = {} norm_names = {normalize_for_match(n): n for n in col_names} # Identifica coloanele semantice vin_col = _find_col(norm_names, ("VIN", "SERIE SASIU", "SASIU", "SERIE")) odo_col = _find_col(norm_names, ("ODOMETRU", "KM", "KILOMETRI", "ODO")) for col_name, val in row_dict.items(): if val is None: out[col_name] = None continue # Datetime nativ -> string YYYY-MM-DD if isinstance(val, datetime): out[col_name] = val.date().isoformat() continue if isinstance(val, date): out[col_name] = val.isoformat() continue # VIN: daca e numeric (float sau int) -> coercion suspectat if col_name == vin_col: if isinstance(val, (int, float)): flags.append(f"VIN numeric ({val}) — verificati seria sasiului") out[col_name] = str(int(val)) if val == int(val) else str(val) else: out[col_name] = str(val).strip().upper() continue # Odometru: float cu .0 -> int string if col_name == odo_col: if isinstance(val, float): s = str(val) if s.endswith(".0"): out[col_name] = s[:-2] # "123456.0" -> "123456" else: # Float non-integer -> pastreaza si lasa validarea sa decida flags.append(f"Odometru float nestandard ({val})") out[col_name] = str(val) elif isinstance(val, int): out[col_name] = str(val) else: out[col_name] = str(val).strip() continue # Default: string out[col_name] = str(val).strip() if isinstance(val, str) else val return out, flags def _find_col(norm_names: dict[str, str], keywords: tuple[str, ...]) -> str | None: """Gaseste o coloana dupa cuvinte cheie in numele normalizat.""" for kw in keywords: for norm, orig in norm_names.items(): if kw in norm: return orig return None # --------------------------------------------------------------------------- # # Parsare data per rand (folosita de preview resolve) # # --------------------------------------------------------------------------- # def parse_date_value( val: Any, col_format: str, ) -> tuple[str | None, bool]: """Parseaza o valoare de data si intoarce (iso_string, is_ambiguous). - val e deja string (coercion a convertit datetime nativ). - col_format: "native", "DD.MM.YYYY", "YYYY-MM-DD", "ambiguous", "mixed". - Intoarce (None, False) daca valoarea e goala. - Intoarce (iso, True) daca data e ambigua (needs_review). """ if val is None or str(val).strip() == "": return None, False # Daca coercion a convertit deja la ISO (din datetime nativ) s = str(val).strip() try: d = date.fromisoformat(s) return d.isoformat(), False except ValueError: pass if col_format in ("native", "YYYY-MM-DD"): # Incearca ISO parts = s.replace("/", "-").split("-") if len(parts) == 3: try: d = date(int(parts[0]), int(parts[1]), int(parts[2])) return d.isoformat(), False except ValueError: pass return None, False if col_format == "DD.MM.YYYY": parts = _split_date(s) if parts and len(parts) == 3: try: d = date(int(parts[2]), int(parts[1]), int(parts[0])) return d.isoformat(), False except ValueError: pass return None, False if col_format == "ambiguous": # Incearca DD.MM.YYYY parts = _split_date(s) if parts and len(parts) == 3: try: d = date(int(parts[2]), int(parts[1]), int(parts[0])) return d.isoformat(), True # ambiguu -> needs_review except ValueError: pass return None, True # mixed sau necunoscut: incearca ambele parts = _split_date(s) if parts and len(parts) == 3: try: # Incearca DD.MM.YYYY d = date(int(parts[2]), int(parts[1]), int(parts[0])) return d.isoformat(), True # ambiguu except ValueError: pass return None, False # --------------------------------------------------------------------------- # # CSV # # --------------------------------------------------------------------------- # def _decode_csv(data: bytes) -> str: """Decodifica bytes CSV cu fallback encoding RO.""" for enc in CSV_ENCODINGS: try: return data.decode(enc) except (UnicodeDecodeError, LookupError): continue raise UnicodeDecodeError("csv", data, 0, len(data), "Encoding nesuportat (incercat utf-8, cp1250, latin2)") def _sniff_delimiter(sample: str) -> str: """Detecteaza delimiter-ul CSV. Export Excel RO foloseste ';'.""" # Incearca Sniffer standard try: dialect = csv.Sniffer().sniff(sample, delimiters=";,\t") return dialect.delimiter except csv.Error: pass # Proba explicita: alege delimiter-ul care da cel mai mare numar consistent de coloane best_delim = "," best_cols = 0 for delim in CSV_DELIMITERS: lines = sample.splitlines()[:10] counts = [] for line in lines: if line.strip(): counts.append(len(line.split(delim))) if counts: # Cel mai frecvent count from collections import Counter common = Counter(counts).most_common(1)[0][0] if common > best_cols: best_cols = common best_delim = delim return best_delim def parse_csv(data: bytes) -> ParsedFile: """Parseaza un fisier CSV. Detecteaza delimiter + encoding RO.""" if len(data) > MAX_BYTES: raise FileTooLarge(bytes_=len(data)) text = _decode_csv(data) sample = text[:8192] delimiter = _sniff_delimiter(sample) reader = csv.DictReader(io.StringIO(text), delimiter=delimiter) # Citeste toate randurile (limitat la MAX_ROWS) raw_rows: list[dict[str, Any]] = [] for i, row in enumerate(reader): if i >= MAX_ROWS: raise FileTooLarge(rows=i + 1) raw_rows.append(dict(row)) if not raw_rows: raise HeaderError("CSV gol sau fara randuri de date.", found=[]) col_names = list(raw_rows[0].keys()) if not col_names or len(col_names) < 2: raise HeaderError( f"Doar {len(col_names)} coloana detectata cu delimiter '{delimiter}' — verifica separatorul.", found=col_names, ) # Curata cheile None (DictReader poate produce None pt coloane extra) col_names = [c for c in col_names if c is not None and str(c).strip()] # Strip whitespace din valori cleaned: list[dict[str, Any]] = [] for row in raw_rows: cleaned.append({k: (v.strip() if isinstance(v, str) else v) for k, v in row.items() if k in col_names}) # Trim footer cleaned = _trim_footer(cleaned, col_names) # Colecteaza valori per coloana pentru detectii col_values: dict[str, list[Any]] = {c: [] for c in col_names} for row in cleaned: for c in col_names: col_values[c].append(row.get(c)) formula_columns: list[str] = [] # CSV nu are formule date_col_format = _detect_date_formats(col_values, col_names) coercion_flags: dict[int, list[str]] = {} processed: list[dict[str, Any]] = [] for i, row in enumerate(cleaned): p, flags = _coerce_row(row, col_names) processed.append(p) if flags: coercion_flags[i] = flags return ParsedFile( columns=col_names, rows=processed, coercion_flags=coercion_flags, formula_columns=formula_columns, date_col_format=date_col_format, ) # --------------------------------------------------------------------------- # # XLSX — entry point # # --------------------------------------------------------------------------- # def parse_xlsx(data: bytes, *, sheet_name: str | None = None) -> ParsedFile: """Parseaza un fisier XLSX. Arhitectura 2-treceri (Issue 2): 1. read_only=True: dim-check + detectie multi-sheet 2. normal-mode: header + merged cells + body Parametru sheet_name: daca workbook-ul are mai multe sheet-uri, utilizatorul trebuie sa aleaga; trimite-l inapoi la acest apel. Daca None si >1 sheet -> ridica MultipleSheets. """ import openpyxl # Trecerea 1: dim-check try: non_empty = _xlsx_dimcheck(data) except MultipleSheets as ms: if sheet_name is not None: # Utilizatorul a ales deja un sheet — continuam cu cel ales non_empty = ms.sheet_names else: raise if not non_empty: raise HeaderError("Workbook fara sheet-uri cu date.", found=[]) # Alegere sheet if sheet_name is not None: target = sheet_name elif len(non_empty) == 1: target = non_empty[0] else: raise MultipleSheets(non_empty) # Trecerea 2: normal-mode wb = openpyxl.load_workbook(io.BytesIO(data), read_only=False, data_only=True) try: if target not in wb.sheetnames: raise HeaderError(f"Sheet '{target}' nu exista in workbook.", found=wb.sheetnames) ws = wb[target] return _xlsx_parse_sheet(ws, target) finally: wb.close() # --------------------------------------------------------------------------- # # Entry point universal # # --------------------------------------------------------------------------- # def parse_file( data: bytes, filename: str, *, sheet_name: str | None = None, ) -> ParsedFile: """Entry point unic: detecteaza tipul dupa extensie si parseaza. Ridica: FileTooLarge, HeaderError, MultipleSheets, UnicodeDecodeError, openpyxl.utils.exceptions.InvalidFileException (fisier corupt). """ name_lower = filename.lower() if name_lower.endswith(".csv"): return parse_csv(data) elif name_lower.endswith((".xlsx", ".xls")): return parse_xlsx(data, sheet_name=sheet_name) else: raise HeaderError(f"Tip fisier nesuportat: '{filename}'. Acceptat: xlsx, xls, csv.")