diff --git a/app/import_parse.py b/app/import_parse.py new file mode 100644 index 0000000..58389c9 --- /dev/null +++ b/app/import_parse.py @@ -0,0 +1,748 @@ +"""Parser fisiere xlsx/csv pentru import prezentari (Treapta 2, U1). + +Arhitectura 2-treceri (Issue 2, consens cross-model): + Trecerea 1 — read_only=True: dim-check (FileTooLarge) + detectie multi-sheet. + Trecerea 2 — normal-mode: header + merged cells + body. +Aceasta separare e necesara deoarece openpyxl read_only=True nu vede celule imbinate. + +Modulul este PUR in sensul ca nu face I/O DB, nu trimite nimic la RAR si nu are +efecte laterale — intoarce structuri Python testabile direct. + +Stari per-rand (resolved_status): + ok — date complete, gata de trimis dupa mapare + validare + needs_review — coercion suspectat (VIN numeric, odometru float) sau data ambigua + needs_data — camp obligatoriu lipsa (dupa coercion) + (needs_mapping, already_sent, duplicate_in_file — calculate in preview, nu aici) +""" + +from __future__ import annotations + +import csv +import io +from datetime import date, datetime +from typing import Any, NamedTuple + +# --------------------------------------------------------------------------- # +# Constante # +# --------------------------------------------------------------------------- # + +MAX_ROWS = 5_000 +MAX_BYTES = 5 * 1024 * 1024 # 5 MB + +# Prag rata None pe o coloana obligatorie -> mesaj formule necalculate (Issue 3) +FORMULA_NONE_RATE = 0.6 + +# Coloane cheie pentru detectia footer-ului (trim structural) +KEY_COLS = {"vin", "data_prestatie"} + +# Delimitatori incercati la sniff CSV (ordinea conteaza: ; primul, export RO) +CSV_DELIMITERS = [";", ",", "\t"] + +# Encodinguri incercate in ordine (BOM-aware + RO) +CSV_ENCODINGS = ["utf-8-sig", "utf-8", "cp1250", "latin2"] + + +# --------------------------------------------------------------------------- # +# Exceptii custom # +# --------------------------------------------------------------------------- # + +class FileTooLarge(Exception): + """Fisier depaseste limita de randuri sau dimensiune.""" + def __init__(self, *, rows: int | None = None, bytes_: int | None = None): + self.rows = rows + self.bytes_ = bytes_ + parts = [] + if rows is not None: + parts.append(f"{rows} randuri (max {MAX_ROWS})") + if bytes_ is not None: + parts.append(f"{bytes_ // 1024} KB (max {MAX_BYTES // 1024} KB)") + super().__init__(f"Fisier prea mare: {', '.join(parts)}") + + +class HeaderError(Exception): + """Header lipsa, duplicat sau un singur camp detectat.""" + def __init__(self, message: str, found: list[str] | None = None): + self.found = found or [] + super().__init__(message) + + +class MultipleSheets(Exception): + """Workbook cu mai mult de un sheet non-gol — utilizatorul trebuie sa aleaga.""" + def __init__(self, sheet_names: list[str]): + self.sheet_names = sheet_names + super().__init__(f"Mai multe sheet-uri non-goale: {sheet_names}. Alege sheet-ul de importat.") + + +# --------------------------------------------------------------------------- # +# Structura interna de rezultat # +# --------------------------------------------------------------------------- # + +class ParsedFile(NamedTuple): + """Rezultatul parsarii unui fisier.""" + columns: list[str] # Numele coloanelor detectate (din header) + rows: list[dict[str, Any]] # Fiecare rand: {coloana: valoare_bruta} + coercion_flags: dict[int, list[str]] # {row_index: [motive needs_review]} + formula_columns: list[str] # Coloane cu rata None ridicata (Issue 3) + date_col_format: dict[str, str] # {coloana: "DD.MM.YYYY" | "YYYY-MM-DD" | "native" | "ambiguous"} + + +# --------------------------------------------------------------------------- # +# XLSX — trecerea 1: dim-check (read_only) # +# --------------------------------------------------------------------------- # + +def _xlsx_dimcheck(data: bytes) -> list[str]: + """Trecerea 1 read_only: verifica dimensiunile si intoarce lista de sheet-uri non-goale. + + Ridica FileTooLarge daca depaseste limita. + Ridica MultipleSheets daca sunt >1 sheet-uri non-goale. + Intoarce lista (cu un singur element daca totul e ok). + """ + import openpyxl + + if len(data) > MAX_BYTES: + raise FileTooLarge(bytes_=len(data)) + + wb = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True) + try: + non_empty: list[str] = [] + for name in wb.sheetnames: + ws = wb[name] + # In read_only, max_row poate fi None daca sheet-ul e gol + max_row = ws.max_row or 0 + if max_row > 0: + non_empty.append(name) + if max_row > MAX_ROWS: + raise FileTooLarge(rows=max_row) + finally: + wb.close() + + if len(non_empty) > 1: + raise MultipleSheets(non_empty) + + return non_empty # 0 sau 1 element + + +# --------------------------------------------------------------------------- # +# XLSX — trecerea 2: header + merged + body (normal-mode) # +# --------------------------------------------------------------------------- # + +def _unmerge_header(ws) -> dict[int, str]: + """Rezolva celulele imbinate din primul rand non-gol. + + Intoarce {col_index_1based: valoare_str}. + Merge range-urile din header propaga valoarea topleft la toate coloanele din grup. + """ + # Gaseste primul rand non-gol + header_row = None + for row in ws.iter_rows(max_row=20): + vals = [c.value for c in row if c.value is not None] + if vals: + header_row = row[0].row + break + if header_row is None: + return {} + + # Mapa col_index -> valoare din celule normale + col_vals: dict[int, str] = {} + for cell in ws[header_row]: + if cell.value is not None: + col_vals[cell.column] = str(cell.value).strip() + + # Propaga valoarea topleft pentru merge range-uri din randul header + for merged_range in ws.merged_cells.ranges: + if merged_range.min_row <= header_row <= merged_range.max_row: + # Valoarea e in celula topleft + topleft = ws.cell(row=merged_range.min_row, column=merged_range.min_col) + val = str(topleft.value or "").strip() + for col in range(merged_range.min_col, merged_range.max_col + 1): + col_vals[col] = val + + return col_vals + + +def _deduplicate_columns(names: list[str]) -> list[str]: + """Adauga sufix _2/_3 la coloane cu acelasi nume (din merged cells).""" + seen: dict[str, int] = {} + result = [] + for n in names: + if n not in seen: + seen[n] = 1 + result.append(n) + else: + seen[n] += 1 + result.append(f"{n}_{seen[n]}") + return result + + +def _xlsx_parse_sheet(ws, sheet_name: str) -> ParsedFile: + """Parseaza un sheet in normal-mode (trecerea 2). + + Presupune ca dim-check a trecut deja (FileTooLarge nu se verifica din nou). + """ + # Header cu merged cells + col_map = _unmerge_header(ws) + if not col_map: + raise HeaderError(f"Sheet '{sheet_name}': niciun header detectat.", found=[]) + + # Ordoneaza coloanele dupa index + sorted_cols = sorted(col_map.items()) # [(col_idx, name), ...] + col_indices = [idx for idx, _ in sorted_cols] + col_names = [name for _, name in sorted_cols] + + # Dezambiguizeaza duplicate (provin din merge care se propaga la mai multe coloane) + col_names = _deduplicate_columns(col_names) + + if len(col_names) < 2: + raise HeaderError(f"Doar {len(col_names)} coloana detectata — verifica fisierul.", found=col_names) + + # Gaseste randul header ca sa sarim peste el + header_row_num = ws.cell(row=1, column=col_indices[0]).row + # Re-detect: prima celula din col_map + # Obtinem randul headerului din prima celula valida + for row in ws.iter_rows(max_row=20): + for c in row: + if c.column in col_map and c.value is not None: + header_row_num = c.row + break + else: + continue + break + + # Citeste randurile de date + raw_rows: list[dict[str, Any]] = [] + # Colecteaza valorile pe coloane pentru detectia datei si a formulelor + col_values: dict[str, list[Any]] = {name: [] for name in col_names} + + for row in ws.iter_rows(min_row=header_row_num + 1): + row_dict: dict[str, Any] = {} + for col_idx, col_name in zip(col_indices, col_names): + # Cauta celula cu col_idx in rand (unele randuri pot fi mai scurte) + found_cell = None + for c in row: + if c.column == col_idx: + found_cell = c + break + val = found_cell.value if found_cell is not None else None + row_dict[col_name] = val + col_values[col_name].append(val) + raw_rows.append(row_dict) + + # Trim footer: elimina randuri trailing unde coloanele cheie sunt goale + raw_rows = _trim_footer(raw_rows, col_names) + + # Detectie coloane cu formule (rata None, Issue 3) + formula_columns = _detect_formula_columns(col_values, len(raw_rows)) + + # Detectie format data la nivel de coloana (T10/OV-8) + date_col_format = _detect_date_formats(col_values, col_names) + + # Coercion + flags needs_review (T3) + coercion_flags: dict[int, list[str]] = {} + processed_rows: list[dict[str, Any]] = [] + for i, row_dict in enumerate(raw_rows): + processed, flags = _coerce_row(row_dict, col_names) + processed_rows.append(processed) + if flags: + coercion_flags[i] = flags + + return ParsedFile( + columns=col_names, + rows=processed_rows, + coercion_flags=coercion_flags, + formula_columns=formula_columns, + date_col_format=date_col_format, + ) + + +# --------------------------------------------------------------------------- # +# Trim footer structural # +# --------------------------------------------------------------------------- # + +def _is_key_empty(row_dict: dict[str, Any], col_names: list[str]) -> bool: + """Randul e structural gol daca coloanele cheie (VIN + data) sunt ambele None/gol.""" + # Detecta coloanele cheie prin nume normalized + from .mapping import normalize_for_match + norm_names = {normalize_for_match(n): n for n in col_names} + + vin_col = None + date_col_key = None + for norm, orig in norm_names.items(): + if "VIN" in norm or "SERIE" in norm or "SASIU" in norm: + vin_col = orig + if "DATA" in norm or "DATE" in norm or "PRESTATIE" in norm: + date_col_key = orig + + def _empty(v: Any) -> bool: + return v is None or str(v).strip() == "" + + vin_empty = _empty(row_dict.get(vin_col)) if vin_col else True + date_empty = _empty(row_dict.get(date_col_key)) if date_col_key else True + return vin_empty and date_empty + + +def _trim_footer(rows: list[dict[str, Any]], col_names: list[str]) -> list[dict[str, Any]]: + """Elimina randuri trailing unde VIN + data sunt goale (footer TOTAL/Intocmit de:).""" + i = len(rows) - 1 + while i >= 0 and _is_key_empty(rows[i], col_names): + i -= 1 + return rows[: i + 1] + + +# --------------------------------------------------------------------------- # +# Detectie coloane formule (Issue 3) # +# --------------------------------------------------------------------------- # + +def _detect_formula_columns(col_values: dict[str, list[Any]], n_rows: int) -> list[str]: + """Coloane unde rata de None depaseste pragul -> probabil formule necalculate.""" + if n_rows == 0: + return [] + result = [] + for col_name, vals in col_values.items(): + none_count = sum(1 for v in vals if v is None) + rate = none_count / n_rows + if rate >= FORMULA_NONE_RATE: + result.append(col_name) + return result + + +# --------------------------------------------------------------------------- # +# Dezambiguizare data la nivel de coloana (T10 / OV-8) # +# --------------------------------------------------------------------------- # + +def _detect_date_formats(col_values: dict[str, list[Any]], col_names: list[str]) -> dict[str, str]: + """Detecteaza formatul datei pentru fiecare coloana de tip data. + + Rezultate posibile per coloana: + "native" — toate valorile non-None sunt datetime nativ openpyxl (neambigue) + "DD.MM.YYYY" — coloana e DD-first (cel putin un rand are token[1] > 12) + "YYYY-MM-DD" — format ISO + "ambiguous" — string, toti zi <= 12 (si DD si MM ar fi valide) + "mixed" — amestec datetime nativ + string + (Nu e inclusa daca coloana nu pare a fi de tip data) + """ + from .mapping import normalize_for_match + + result: dict[str, str] = {} + for col_name in col_names: + norm = normalize_for_match(col_name) + # Filtra coloanele de data dupa nume + if not any(kw in norm for kw in ("DATA", "DATE", "PRESTATIE", "ZI", "AN")): + continue + + vals = [v for v in col_values.get(col_name, []) if v is not None] + if not vals: + continue + + native_count = sum(1 for v in vals if isinstance(v, (datetime, date))) + str_vals = [str(v).strip() for v in vals if not isinstance(v, (datetime, date))] + + if native_count == len(vals): + result[col_name] = "native" + continue + + if native_count > 0 and str_vals: + result[col_name] = "mixed" + continue + + # Toate string — detectie format la nivel de coloana (OV-8) + fmt = _infer_date_format_from_column(str_vals) + result[col_name] = fmt + + return result + + +def _infer_date_format_from_column(str_vals: list[str]) -> str: + """Detecteaza formatul datei dintr-o lista de valori string. + + Logica OV-8: daca ORICARE rand are token pozitia-1 > 12 -> coloana e DD-first. + Daca toti zi <= 12 -> ambiguu. + """ + dd_first_evidence = False + iso_evidence = False + parseable = 0 + + for s in str_vals: + if not s: + continue + + # Incearca ISO (YYYY-MM-DD sau YYYY/MM/DD) + if _looks_iso(s): + iso_evidence = True + parseable += 1 + continue + + # Incearca DD.MM.YYYY sau DD/MM/YYYY sau DD-MM-YYYY + parts = _split_date(s) + if parts and len(parts) == 3: + try: + day_candidate = int(parts[0]) + month_candidate = int(parts[1]) + if day_candidate > 12: + dd_first_evidence = True + # Daca month_candidate > 12 -> cu siguranta DD.MM (luna e la pozitia 1) + if month_candidate > 12: + dd_first_evidence = True + parseable += 1 + except ValueError: + pass + + if not parseable: + return "ambiguous" + + if iso_evidence and not dd_first_evidence: + return "YYYY-MM-DD" + + if dd_first_evidence: + return "DD.MM.YYYY" + + # Toti zi <= 12: nu putem distinge DD.MM de MM.DD + return "ambiguous" + + +def _looks_iso(s: str) -> bool: + """Verifica rapid daca string-ul arata ca YYYY-MM-DD.""" + parts = s.replace("/", "-").split("-") + if len(parts) == 3: + try: + y = int(parts[0]) + return y > 1900 + except ValueError: + pass + return False + + +def _split_date(s: str) -> list[str] | None: + """Imparte un string data dupa separatorul comun (., /, -).""" + for sep in (".", "/", "-"): + parts = s.split(sep) + if len(parts) == 3: + return parts + return None + + +# --------------------------------------------------------------------------- # +# Coercion per rand (T3) # +# --------------------------------------------------------------------------- # + +def _coerce_row(row_dict: dict[str, Any], col_names: list[str]) -> tuple[dict[str, Any], list[str]]: + """Coerce valorile unui rand si colecteaza flags needs_review. + + Reguli: + - VIN citit ca int/float (openpyxl: "0123..." -> 123.0) -> str + flag needs_review + - Odometru float cu .0 -> tunde ".0" (via canonicalize_row logic) + - Datetime nativ -> convertit la YYYY-MM-DD string + - Valori goale/None raman None + """ + from .mapping import normalize_for_match + + flags: list[str] = [] + out: dict[str, Any] = {} + + norm_names = {normalize_for_match(n): n for n in col_names} + + # Identifica coloanele semantice + vin_col = _find_col(norm_names, ("VIN", "SERIE SASIU", "SASIU", "SERIE")) + odo_col = _find_col(norm_names, ("ODOMETRU", "KM", "KILOMETRI", "ODO")) + + for col_name, val in row_dict.items(): + if val is None: + out[col_name] = None + continue + + # Datetime nativ -> string YYYY-MM-DD + if isinstance(val, datetime): + out[col_name] = val.date().isoformat() + continue + if isinstance(val, date): + out[col_name] = val.isoformat() + continue + + # VIN: daca e numeric (float sau int) -> coercion suspectat + if col_name == vin_col: + if isinstance(val, (int, float)): + flags.append(f"VIN numeric ({val}) — verificati seria sasiului") + out[col_name] = str(int(val)) if val == int(val) else str(val) + else: + out[col_name] = str(val).strip().upper() + continue + + # Odometru: float cu .0 -> int string + if col_name == odo_col: + if isinstance(val, float): + s = str(val) + if s.endswith(".0"): + out[col_name] = s[:-2] # "123456.0" -> "123456" + else: + # Float non-integer -> pastreaza si lasa validarea sa decida + flags.append(f"Odometru float nestandard ({val})") + out[col_name] = str(val) + elif isinstance(val, int): + out[col_name] = str(val) + else: + out[col_name] = str(val).strip() + continue + + # Default: string + out[col_name] = str(val).strip() if isinstance(val, str) else val + + return out, flags + + +def _find_col(norm_names: dict[str, str], keywords: tuple[str, ...]) -> str | None: + """Gaseste o coloana dupa cuvinte cheie in numele normalizat.""" + for kw in keywords: + for norm, orig in norm_names.items(): + if kw in norm: + return orig + return None + + +# --------------------------------------------------------------------------- # +# Parsare data per rand (folosita de preview resolve) # +# --------------------------------------------------------------------------- # + +def parse_date_value( + val: Any, + col_format: str, +) -> tuple[str | None, bool]: + """Parseaza o valoare de data si intoarce (iso_string, is_ambiguous). + + - val e deja string (coercion a convertit datetime nativ). + - col_format: "native", "DD.MM.YYYY", "YYYY-MM-DD", "ambiguous", "mixed". + - Intoarce (None, False) daca valoarea e goala. + - Intoarce (iso, True) daca data e ambigua (needs_review). + """ + if val is None or str(val).strip() == "": + return None, False + + # Daca coercion a convertit deja la ISO (din datetime nativ) + s = str(val).strip() + try: + d = date.fromisoformat(s) + return d.isoformat(), False + except ValueError: + pass + + if col_format in ("native", "YYYY-MM-DD"): + # Incearca ISO + parts = s.replace("/", "-").split("-") + if len(parts) == 3: + try: + d = date(int(parts[0]), int(parts[1]), int(parts[2])) + return d.isoformat(), False + except ValueError: + pass + return None, False + + if col_format == "DD.MM.YYYY": + parts = _split_date(s) + if parts and len(parts) == 3: + try: + d = date(int(parts[2]), int(parts[1]), int(parts[0])) + return d.isoformat(), False + except ValueError: + pass + return None, False + + if col_format == "ambiguous": + # Incearca DD.MM.YYYY + parts = _split_date(s) + if parts and len(parts) == 3: + try: + d = date(int(parts[2]), int(parts[1]), int(parts[0])) + return d.isoformat(), True # ambiguu -> needs_review + except ValueError: + pass + return None, True + + # mixed sau necunoscut: incearca ambele + parts = _split_date(s) + if parts and len(parts) == 3: + try: + # Incearca DD.MM.YYYY + d = date(int(parts[2]), int(parts[1]), int(parts[0])) + return d.isoformat(), True # ambiguu + except ValueError: + pass + return None, False + + +# --------------------------------------------------------------------------- # +# CSV # +# --------------------------------------------------------------------------- # + +def _decode_csv(data: bytes) -> str: + """Decodifica bytes CSV cu fallback encoding RO.""" + for enc in CSV_ENCODINGS: + try: + return data.decode(enc) + except (UnicodeDecodeError, LookupError): + continue + raise UnicodeDecodeError("csv", data, 0, len(data), "Encoding nesuportat (incercat utf-8, cp1250, latin2)") + + +def _sniff_delimiter(sample: str) -> str: + """Detecteaza delimiter-ul CSV. Export Excel RO foloseste ';'.""" + # Incearca Sniffer standard + try: + dialect = csv.Sniffer().sniff(sample, delimiters=";,\t") + return dialect.delimiter + except csv.Error: + pass + + # Proba explicita: alege delimiter-ul care da cel mai mare numar consistent de coloane + best_delim = "," + best_cols = 0 + for delim in CSV_DELIMITERS: + lines = sample.splitlines()[:10] + counts = [] + for line in lines: + if line.strip(): + counts.append(len(line.split(delim))) + if counts: + # Cel mai frecvent count + from collections import Counter + common = Counter(counts).most_common(1)[0][0] + if common > best_cols: + best_cols = common + best_delim = delim + + return best_delim + + +def parse_csv(data: bytes) -> ParsedFile: + """Parseaza un fisier CSV. Detecteaza delimiter + encoding RO.""" + if len(data) > MAX_BYTES: + raise FileTooLarge(bytes_=len(data)) + + text = _decode_csv(data) + sample = text[:8192] + delimiter = _sniff_delimiter(sample) + + reader = csv.DictReader(io.StringIO(text), delimiter=delimiter) + + # Citeste toate randurile (limitat la MAX_ROWS) + raw_rows: list[dict[str, Any]] = [] + for i, row in enumerate(reader): + if i >= MAX_ROWS: + raise FileTooLarge(rows=i + 1) + raw_rows.append(dict(row)) + + if not raw_rows: + raise HeaderError("CSV gol sau fara randuri de date.", found=[]) + + col_names = list(raw_rows[0].keys()) + if not col_names or len(col_names) < 2: + raise HeaderError( + f"Doar {len(col_names)} coloana detectata cu delimiter '{delimiter}' — verifica separatorul.", + found=col_names, + ) + + # Curata cheile None (DictReader poate produce None pt coloane extra) + col_names = [c for c in col_names if c is not None and str(c).strip()] + + # Strip whitespace din valori + cleaned: list[dict[str, Any]] = [] + for row in raw_rows: + cleaned.append({k: (v.strip() if isinstance(v, str) else v) for k, v in row.items() if k in col_names}) + + # Trim footer + cleaned = _trim_footer(cleaned, col_names) + + # Colecteaza valori per coloana pentru detectii + col_values: dict[str, list[Any]] = {c: [] for c in col_names} + for row in cleaned: + for c in col_names: + col_values[c].append(row.get(c)) + + formula_columns: list[str] = [] # CSV nu are formule + date_col_format = _detect_date_formats(col_values, col_names) + + coercion_flags: dict[int, list[str]] = {} + processed: list[dict[str, Any]] = [] + for i, row in enumerate(cleaned): + p, flags = _coerce_row(row, col_names) + processed.append(p) + if flags: + coercion_flags[i] = flags + + return ParsedFile( + columns=col_names, + rows=processed, + coercion_flags=coercion_flags, + formula_columns=formula_columns, + date_col_format=date_col_format, + ) + + +# --------------------------------------------------------------------------- # +# XLSX — entry point # +# --------------------------------------------------------------------------- # + +def parse_xlsx(data: bytes, *, sheet_name: str | None = None) -> ParsedFile: + """Parseaza un fisier XLSX. + + Arhitectura 2-treceri (Issue 2): + 1. read_only=True: dim-check + detectie multi-sheet + 2. normal-mode: header + merged cells + body + + Parametru sheet_name: daca workbook-ul are mai multe sheet-uri, utilizatorul + trebuie sa aleaga; trimite-l inapoi la acest apel. Daca None si >1 sheet -> + ridica MultipleSheets. + """ + import openpyxl + + # Trecerea 1: dim-check + try: + non_empty = _xlsx_dimcheck(data) + except MultipleSheets as ms: + if sheet_name is not None: + # Utilizatorul a ales deja un sheet — continuam cu cel ales + non_empty = ms.sheet_names + else: + raise + + if not non_empty: + raise HeaderError("Workbook fara sheet-uri cu date.", found=[]) + + # Alegere sheet + if sheet_name is not None: + target = sheet_name + elif len(non_empty) == 1: + target = non_empty[0] + else: + raise MultipleSheets(non_empty) + + # Trecerea 2: normal-mode + wb = openpyxl.load_workbook(io.BytesIO(data), read_only=False, data_only=True) + try: + if target not in wb.sheetnames: + raise HeaderError(f"Sheet '{target}' nu exista in workbook.", found=wb.sheetnames) + ws = wb[target] + return _xlsx_parse_sheet(ws, target) + finally: + wb.close() + + +# --------------------------------------------------------------------------- # +# Entry point universal # +# --------------------------------------------------------------------------- # + +def parse_file( + data: bytes, + filename: str, + *, + sheet_name: str | None = None, +) -> ParsedFile: + """Entry point unic: detecteaza tipul dupa extensie si parseaza. + + Ridica: FileTooLarge, HeaderError, MultipleSheets, UnicodeDecodeError, + openpyxl.utils.exceptions.InvalidFileException (fisier corupt). + """ + name_lower = filename.lower() + if name_lower.endswith(".csv"): + return parse_csv(data) + elif name_lower.endswith((".xlsx", ".xls")): + return parse_xlsx(data, sheet_name=sheet_name) + else: + raise HeaderError(f"Tip fisier nesuportat: '{filename}'. Acceptat: xlsx, xls, csv.") diff --git a/tests/test_import_parse.py b/tests/test_import_parse.py new file mode 100644 index 0000000..790cdb1 --- /dev/null +++ b/tests/test_import_parse.py @@ -0,0 +1,451 @@ +"""Teste pentru app/import_parse.py (T14 + T3 + T10 + T13). + +Fixture-urile xlsx sunt generate in-memory cu openpyxl (nu fisiere binare commituite). +Fiecare sectiune acopera un task distinct. +""" + +from __future__ import annotations + +import csv +import io +from datetime import date, datetime + +import openpyxl +import pytest + +from app.import_parse import ( + FileTooLarge, + HeaderError, + MultipleSheets, + ParsedFile, + parse_csv, + parse_file, + parse_xlsx, + _detect_date_formats, + _infer_date_format_from_column, + parse_date_value, + _trim_footer, +) + + +# --------------------------------------------------------------------------- # +# Helpere fixture # +# --------------------------------------------------------------------------- # + +def _make_xlsx(rows: list[list], sheet_name: str = "Sheet1") -> bytes: + """Creeaza un fisier xlsx in-memory cu un singur sheet.""" + wb = openpyxl.Workbook() + ws = wb.active + ws.title = sheet_name + for row in rows: + ws.append(row) + buf = io.BytesIO() + wb.save(buf) + return buf.getvalue() + + +def _make_xlsx_multisheet(sheets: dict[str, list[list]]) -> bytes: + """Creeaza un xlsx cu mai multe sheet-uri.""" + wb = openpyxl.Workbook() + first = True + for name, rows in sheets.items(): + if first: + ws = wb.active + ws.title = name + first = False + else: + ws = wb.create_sheet(name) + for row in rows: + ws.append(row) + buf = io.BytesIO() + wb.save(buf) + return buf.getvalue() + + +def _make_csv(rows: list[list], delimiter: str = ",", encoding: str = "utf-8") -> bytes: + """Creeaza un CSV in-memory.""" + buf = io.StringIO() + writer = csv.writer(buf, delimiter=delimiter) + for row in rows: + writer.writerow(row) + return buf.getvalue().encode(encoding) + + +_HEADER = ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"] +_ROW1 = ["WVWZZZ1KZAW000123", "B999TST", "2026-06-15", "123456", "Revizie"] +_ROW2 = ["WVWZZZ1KZAW000124", "CJ001AB", "2026-05-10", "98765", "Reparatie"] + + +# =========================================================================== # +# T14 — CSV delimiter sniff + encoding + cap # +# =========================================================================== # + +class TestCsvDelimiter: + def test_csv_semicolon_ro_export(self): + """Export Excel RO foloseste ';' — trebuie detectat corect.""" + data = _make_csv([_HEADER, _ROW1, _ROW2], delimiter=";") + result = parse_csv(data) + assert result.columns == _HEADER + assert len(result.rows) == 2 + assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000123" + + def test_csv_comma_standard(self): + data = _make_csv([_HEADER, _ROW1, _ROW2], delimiter=",") + result = parse_csv(data) + assert len(result.rows) == 2 + + def test_csv_tab_delimiter(self): + data = _make_csv([_HEADER, _ROW1, _ROW2], delimiter="\t") + result = parse_csv(data) + assert len(result.rows) == 2 + + def test_csv_single_column_raises_header_error(self): + """1 coloana dupa sniff -> HeaderError clar, nu mapare oarba.""" + # CSV fara delimitator real -> o singura coloana + text = "VIN\nWVWZZZ1KZAW000123\n" + data = text.encode("utf-8") + with pytest.raises(HeaderError) as exc: + parse_csv(data) + assert "coloana" in str(exc.value).lower() + + def test_csv_encoding_cp1250(self): + """Export RO cu encoding cp1250 (diacritice romanesti).""" + rows = [ + ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"], + ["WVWZZZ1KZAW000123", "B999TST", "2026-06-15", "123456", "Revizie periodică"], + ] + data = _make_csv(rows, delimiter=";", encoding="cp1250") + result = parse_csv(data) + assert len(result.rows) == 1 + + def test_csv_too_many_rows_raises(self): + """>5000 randuri -> FileTooLarge fara parsare partiala.""" + header = ["VIN", "Data", "Odometru", "NrInm", "Op"] + rows = [header] + [["WVWZZZ1KZAW000123", "2026-01-01", "1000", "B1TST", "R"] for _ in range(5001)] + data = _make_csv(rows, delimiter=",") + with pytest.raises(FileTooLarge): + parse_csv(data) + + def test_csv_too_large_bytes_raises(self): + """>5MB -> FileTooLarge.""" + data = b"X" * (5 * 1024 * 1024 + 1) + with pytest.raises(FileTooLarge): + parse_csv(data) + + +# =========================================================================== # +# T14 — XLSX read_only dim-check + cap # +# =========================================================================== # + +class TestXlsxDimcheck: + def test_xlsx_parsat_corect(self): + data = _make_xlsx([_HEADER, _ROW1, _ROW2]) + result = parse_xlsx(data) + assert result.columns == _HEADER + assert len(result.rows) == 2 + + def test_xlsx_too_large_bytes_raises(self): + """Fisier >5MB -> FileTooLarge inainte de parsare.""" + # Cream un xlsx real dar verificam dimensiunea bytes separat + data = b"PK" + b"X" * (5 * 1024 * 1024 + 100) + with pytest.raises((FileTooLarge, Exception)): + # Poate ridica si InvalidFileException daca nu e xlsx valid + parse_xlsx(data) + + def test_xlsx_empty_raises_header_error(self): + """Workbook fara date -> HeaderError.""" + wb = openpyxl.Workbook() + buf = io.BytesIO() + wb.save(buf) + with pytest.raises(HeaderError): + parse_xlsx(buf.getvalue()) + + def test_parse_file_dispatch_xlsx(self): + data = _make_xlsx([_HEADER, _ROW1]) + result = parse_file(data, "test.xlsx") + assert len(result.rows) == 1 + + def test_parse_file_dispatch_csv(self): + data = _make_csv([_HEADER, _ROW1], delimiter=";") + result = parse_file(data, "test.csv") + assert len(result.rows) == 1 + + def test_parse_file_unsupported_ext(self): + with pytest.raises(HeaderError): + parse_file(b"data", "test.dbf") + + +# =========================================================================== # +# T3 — coercion guard + needs_review + mesaj formule-None # +# =========================================================================== # + +class TestCoercionNeedsReview: + def test_vin_numeric_xlsx_flagged(self): + """VIN '0123...' citit ca numeric din xlsx -> needs_review (coercion_flags).""" + # openpyxl citeste VIN numeric ca int/float daca celula e formatata numeric + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) + # Scrie VIN ca numar (simuleaza comportamentul openpyxl pt celule numerice) + ws.cell(row=2, column=1).value = 123456789012345 # int, nu string + ws.cell(row=2, column=2).value = "B999TST" + ws.cell(row=2, column=3).value = "2026-06-15" + ws.cell(row=2, column=4).value = 123456 + ws.cell(row=2, column=5).value = "Revizie" + buf = io.BytesIO() + wb.save(buf) + result = parse_xlsx(buf.getvalue()) + assert 0 in result.coercion_flags + flags = result.coercion_flags[0] + assert any("VIN" in f for f in flags) + + def test_odometru_float_coerced(self): + """Odometru 123456.0 (float Excel) -> convertit la '123456'.""" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) + ws.cell(row=2, column=1).value = "WVWZZZ1KZAW000123" + ws.cell(row=2, column=2).value = "B999TST" + ws.cell(row=2, column=3).value = "2026-06-15" + ws.cell(row=2, column=4).value = 123456.0 # float cu .0 + ws.cell(row=2, column=5).value = "Revizie" + buf = io.BytesIO() + wb.save(buf) + result = parse_xlsx(buf.getvalue()) + odo_val = result.rows[0]["Odometru final"] + assert odo_val == "123456", f"Asteptat '123456', primit '{odo_val}'" + # Nu trebuie flag needs_review pentru odometru .0 (e coercion standard) + assert 0 not in result.coercion_flags or not any( + "Odometru" in f for f in result.coercion_flags.get(0, []) + ) + + def test_formula_column_detected(self): + """Coloana cu >60% None (formule necalculate) -> formula_columns.""" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) + for i in range(10): + ws.append([ + "WVWZZZ1KZAW000123", + "B999TST", + "2026-06-15", + None, # formula necalculata -> None + "Revizie", + ]) + buf = io.BytesIO() + wb.save(buf) + result = parse_xlsx(buf.getvalue()) + assert "Odometru final" in result.formula_columns + + def test_datetime_native_converted_to_iso(self): + """Celula datetime nativa -> convertita la YYYY-MM-DD (neambigua).""" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) + ws.cell(row=2, column=1).value = "WVWZZZ1KZAW000123" + ws.cell(row=2, column=2).value = "B999TST" + ws.cell(row=2, column=3).value = datetime(2026, 6, 15, 10, 30) + ws.cell(row=2, column=4).value = 123456 + ws.cell(row=2, column=5).value = "Revizie" + buf = io.BytesIO() + wb.save(buf) + result = parse_xlsx(buf.getvalue()) + assert result.rows[0]["Data prestatie"] == "2026-06-15" + + def test_odometru_via_canonicalize_row(self): + """Verifica ca odometru_final='123456.0' e normalizat de canonicalize_row.""" + from app.idempotency import canonicalize_row + raw = {"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1TST", + "data_prestatie": "2026-06-15", "odometru_final": "123456.0", "prestatii": []} + canon = canonicalize_row(raw) + assert canon["odometru_final"] == "123456" + + def test_vin_numeric_not_sent(self): + """VIN numeric flagged -> coercion_flags prezent = auto-send blocat.""" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) + ws.cell(row=2, column=1).value = 1234567890 # numeric + ws.cell(row=2, column=2).value = "B999TST" + ws.cell(row=2, column=3).value = "2026-06-15" + ws.cell(row=2, column=4).value = 123456 + ws.cell(row=2, column=5).value = "Revizie" + buf = io.BytesIO() + wb.save(buf) + result = parse_xlsx(buf.getvalue()) + # Randul 0 trebuie sa aiba flags (needs_review) + assert 0 in result.coercion_flags + + +# =========================================================================== # +# T10 — dezambiguizare data la nivel de COLOANA (OV-8) # +# =========================================================================== # + +class TestDateColumnDisambiguation: + def test_string_ambiguous_flagged(self): + """'03.04.2026' string cu zi<=12 -> ambiguous.""" + fmt = _infer_date_format_from_column(["03.04.2026", "05.06.2026", "01.02.2026"]) + assert fmt == "ambiguous" + + def test_dd_first_detected_from_column(self): + """Coloana cu cel putin un rand zi>12 -> DD.MM.YYYY detectat.""" + # 15.04.2026: zi=15 > 12 -> DD-first sigur + fmt = _infer_date_format_from_column(["03.04.2026", "15.04.2026", "01.02.2026"]) + assert fmt == "DD.MM.YYYY" + + def test_month_gt12_also_dd_first(self): + """Luna >12 imposibila -> cu siguranta DD.MM (ex: 04.13.2026 e imposibil -> zi=4, luna=13 ❌ / zi=13, luna=4 ✓).""" + # Daca pozitia-1 (luna) > 12 -> DD-first + fmt = _infer_date_format_from_column(["04.13.2026"]) + assert fmt == "DD.MM.YYYY" + + def test_iso_format_detected(self): + fmt = _infer_date_format_from_column(["2026-06-15", "2026-05-10"]) + assert fmt == "YYYY-MM-DD" + + def test_native_datetime_column_format(self): + """Coloana cu toate valorile datetime native -> format 'native'.""" + col_values = {"Data prestatie": [datetime(2026, 6, 15), datetime(2026, 5, 10)]} + fmt_map = _detect_date_formats(col_values, ["Data prestatie"]) + assert fmt_map.get("Data prestatie") == "native" + + def test_parse_date_value_ambiguous_needs_review(self): + """'03.04.2026' cu format ambiguous -> (iso, True) = needs_review.""" + iso, ambiguous = parse_date_value("03.04.2026", "ambiguous") + assert ambiguous is True + assert iso == "2026-04-03" # parseaza ca DD.MM.YYYY + + def test_parse_date_value_native_already_iso(self): + """Valoare deja convertita la ISO (din datetime nativ) -> (iso, False).""" + iso, ambiguous = parse_date_value("2026-06-15", "native") + assert iso == "2026-06-15" + assert ambiguous is False + + def test_parse_date_value_dd_mm_yyyy(self): + iso, ambiguous = parse_date_value("15.06.2026", "DD.MM.YYYY") + assert iso == "2026-06-15" + assert ambiguous is False + + def test_parse_date_value_empty(self): + iso, ambiguous = parse_date_value(None, "DD.MM.YYYY") + assert iso is None + assert ambiguous is False + + def test_column_uniform_mm_dd_with_day_gt12(self): + """Coloana uniform MM.DD cu randuri zi>12 -> format DD-first detectat la nivel coloana.""" + # Exemplu: 03.04.2026 (ambiguu) + 15.04.2026 (zi=15>12 -> DD-first sigur) + # -> intreaga coloana e DD.MM.YYYY + fmt = _infer_date_format_from_column(["03.04.2026", "15.04.2026"]) + assert fmt == "DD.MM.YYYY" + # Rand care altfel ar parea ambiguu (03) e tratat corect de format detectat la nivel coloana + iso, ambig = parse_date_value("03.04.2026", fmt) + assert iso == "2026-04-03" + assert ambig is False + + +# =========================================================================== # +# T13 — robustete export RO (multi-sheet + merged header + footer trim) # +# =========================================================================== # + +class TestRobustetExportRO: + def test_multisheet_raises_multiple_sheets(self): + """Workbook cu 2 sheet-uri non-goale -> MultipleSheets cu lista.""" + data = _make_xlsx_multisheet({ + "Iunie": [_HEADER, _ROW1], + "Iulie": [_HEADER, _ROW2], + }) + with pytest.raises(MultipleSheets) as exc: + parse_xlsx(data) + assert "Iunie" in exc.value.sheet_names + assert "Iulie" in exc.value.sheet_names + + def test_multisheet_with_sheet_name_selected(self): + """Dupa alegere sheet -> parsat corect.""" + data = _make_xlsx_multisheet({ + "Iunie": [_HEADER, _ROW1], + "Iulie": [_HEADER, _ROW2], + }) + result = parse_xlsx(data, sheet_name="Iulie") + assert len(result.rows) == 1 + assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000124" + + def test_merged_header_resolved(self): + """Header cu celule imbinate -> un-merge logic, nu coloane goale.""" + wb = openpyxl.Workbook() + ws = wb.active + # Scrie header cu merge pe primele 2 coloane + ws.cell(row=1, column=1).value = "Vehicul" + ws.cell(row=1, column=3).value = "Data prestatie" + ws.cell(row=1, column=4).value = "Odometru final" + ws.cell(row=1, column=5).value = "Operatie" + ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=2) + ws.cell(row=2, column=1).value = "WVWZZZ1KZAW000123" + ws.cell(row=2, column=2).value = "B999TST" + ws.cell(row=2, column=3).value = "2026-06-15" + ws.cell(row=2, column=4).value = 123456 + ws.cell(row=2, column=5).value = "Revizie" + buf = io.BytesIO() + wb.save(buf) + result = parse_xlsx(buf.getvalue()) + # Merge propaga "Vehicul" la ambele coloane; dedup adauga sufix _2 + assert "Vehicul" in result.columns + assert "Vehicul_2" in result.columns # coloana 2 din merge — dezambiguizata cu sufix + # Niciuna nu e goala/None + assert len([c for c in result.columns if "Vehicul" in c]) == 2 + + def test_footer_rows_skipped(self): + """Randuri trailing fara VIN + data -> skip structural, nu needs_data.""" + rows = [ + _HEADER, + _ROW1, + _ROW2, + ["TOTAL", "", "", "222221", ""], # footer cu VIN "TOTAL" + ["", "", "", "", ""], # rand complet gol + ] + data = _make_xlsx(rows) + result = parse_xlsx(data) + # Randul "TOTAL" are VIN non-gol ("TOTAL") si data goala -> nu e trim structural + # Randul complet gol (VIN="" + data="") -> trim + # Deci: 3 randuri raman (ROW1 + ROW2 + TOTAL) + vins = [r.get("VIN") for r in result.rows] + assert "" not in [v for v in vins if v is not None] + # Randul gol complet sa nu fie prezent + empty_rows = [r for r in result.rows if all(v is None or str(v).strip() == "" for v in r.values())] + assert len(empty_rows) == 0 + + def test_footer_vin_and_date_both_empty_skipped(self): + """Rand unde VIN si data sunt ambele goale -> skip (footer TOTAL/Intocmit de:).""" + rows_list = [ + {"VIN": "WVWZZZ1KZAW000123", "Data prestatie": "2026-06-15", "Odometru final": "123456"}, + {"VIN": "", "Data prestatie": "", "Odometru final": "9999"}, # footer fals + ] + col_names = ["VIN", "Data prestatie", "Odometru final"] + trimmed = _trim_footer(rows_list, col_names) + assert len(trimmed) == 1 + assert trimmed[0]["VIN"] == "WVWZZZ1KZAW000123" + + def test_single_sheet_no_error(self): + """Workbook cu un singur sheet -> parsat fara MultipleSheets.""" + data = _make_xlsx([_HEADER, _ROW1]) + result = parse_xlsx(data) + assert len(result.rows) == 1 + + +# =========================================================================== # +# Integrare parse_file # +# =========================================================================== # + +class TestParseFileIntegration: + def test_xlsx_full_flow(self): + data = _make_xlsx([_HEADER, _ROW1, _ROW2]) + result = parse_file(data, "prezentari_iunie.xlsx") + assert result.columns == _HEADER + assert len(result.rows) == 2 + assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000123" + assert result.rows[1]["Odometru final"] == "98765" + + def test_csv_semicolon_full_flow(self): + data = _make_csv([_HEADER, _ROW1], delimiter=";") + result = parse_file(data, "export_ro.csv") + assert result.columns == _HEADER + assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000123"