"""Teste pentru app/import_parse.py (T14 + T3 + T10 + T13). Fixture-urile xlsx sunt generate in-memory cu openpyxl (nu fisiere binare commituite). Fiecare sectiune acopera un task distinct. """ from __future__ import annotations import csv import io from datetime import date, datetime import openpyxl import pytest from app.import_parse import ( FileTooLarge, HeaderError, MultipleSheets, ParsedFile, parse_csv, parse_file, parse_xlsx, _detect_date_formats, _infer_date_format_from_column, parse_date_value, _trim_footer, ) # --------------------------------------------------------------------------- # # Helpere fixture # # --------------------------------------------------------------------------- # def _make_xlsx(rows: list[list], sheet_name: str = "Sheet1") -> bytes: """Creeaza un fisier xlsx in-memory cu un singur sheet.""" wb = openpyxl.Workbook() ws = wb.active ws.title = sheet_name for row in rows: ws.append(row) buf = io.BytesIO() wb.save(buf) return buf.getvalue() def _make_xlsx_multisheet(sheets: dict[str, list[list]]) -> bytes: """Creeaza un xlsx cu mai multe sheet-uri.""" wb = openpyxl.Workbook() first = True for name, rows in sheets.items(): if first: ws = wb.active ws.title = name first = False else: ws = wb.create_sheet(name) for row in rows: ws.append(row) buf = io.BytesIO() wb.save(buf) return buf.getvalue() def _make_csv(rows: list[list], delimiter: str = ",", encoding: str = "utf-8") -> bytes: """Creeaza un CSV in-memory.""" buf = io.StringIO() writer = csv.writer(buf, delimiter=delimiter) for row in rows: writer.writerow(row) return buf.getvalue().encode(encoding) _HEADER = ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"] _ROW1 = ["WVWZZZ1KZAW000123", "B999TST", "2026-06-15", "123456", "Revizie"] _ROW2 = ["WVWZZZ1KZAW000124", "CJ001AB", "2026-05-10", "98765", "Reparatie"] # =========================================================================== # # T14 — CSV delimiter sniff + encoding + cap # # =========================================================================== # class TestCsvDelimiter: def test_csv_semicolon_ro_export(self): """Export Excel RO foloseste ';' — trebuie detectat corect.""" data = _make_csv([_HEADER, _ROW1, _ROW2], delimiter=";") result = parse_csv(data) assert result.columns == _HEADER assert len(result.rows) == 2 assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000123" def test_csv_comma_standard(self): data = _make_csv([_HEADER, _ROW1, _ROW2], delimiter=",") result = parse_csv(data) assert len(result.rows) == 2 def test_csv_tab_delimiter(self): data = _make_csv([_HEADER, _ROW1, _ROW2], delimiter="\t") result = parse_csv(data) assert len(result.rows) == 2 def test_csv_single_column_raises_header_error(self): """1 coloana dupa sniff -> HeaderError clar, nu mapare oarba.""" # CSV fara delimitator real -> o singura coloana text = "VIN\nWVWZZZ1KZAW000123\n" data = text.encode("utf-8") with pytest.raises(HeaderError) as exc: parse_csv(data) assert "coloana" in str(exc.value).lower() def test_csv_encoding_cp1250(self): """Export RO cu encoding cp1250 (diacritice romanesti).""" rows = [ ["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"], ["WVWZZZ1KZAW000123", "B999TST", "2026-06-15", "123456", "Revizie periodică"], ] data = _make_csv(rows, delimiter=";", encoding="cp1250") result = parse_csv(data) assert len(result.rows) == 1 def test_csv_too_many_rows_raises(self): """>5000 randuri -> FileTooLarge fara parsare partiala.""" header = ["VIN", "Data", "Odometru", "NrInm", "Op"] rows = [header] + [["WVWZZZ1KZAW000123", "2026-01-01", "1000", "B1TST", "R"] for _ in range(5001)] data = _make_csv(rows, delimiter=",") with pytest.raises(FileTooLarge): parse_csv(data) def test_csv_too_large_bytes_raises(self): """>5MB -> FileTooLarge.""" data = b"X" * (5 * 1024 * 1024 + 1) with pytest.raises(FileTooLarge): parse_csv(data) # =========================================================================== # # T14 — XLSX read_only dim-check + cap # # =========================================================================== # class TestXlsxDimcheck: def test_xlsx_parsat_corect(self): data = _make_xlsx([_HEADER, _ROW1, _ROW2]) result = parse_xlsx(data) assert result.columns == _HEADER assert len(result.rows) == 2 def test_xlsx_too_large_bytes_raises(self): """Fisier >5MB -> FileTooLarge inainte de parsare.""" # Cream un xlsx real dar verificam dimensiunea bytes separat data = b"PK" + b"X" * (5 * 1024 * 1024 + 100) with pytest.raises((FileTooLarge, Exception)): # Poate ridica si InvalidFileException daca nu e xlsx valid parse_xlsx(data) def test_xlsx_empty_raises_header_error(self): """Workbook fara date -> HeaderError.""" wb = openpyxl.Workbook() buf = io.BytesIO() wb.save(buf) with pytest.raises(HeaderError): parse_xlsx(buf.getvalue()) def test_parse_file_dispatch_xlsx(self): data = _make_xlsx([_HEADER, _ROW1]) result = parse_file(data, "test.xlsx") assert len(result.rows) == 1 def test_parse_file_dispatch_csv(self): data = _make_csv([_HEADER, _ROW1], delimiter=";") result = parse_file(data, "test.csv") assert len(result.rows) == 1 def test_parse_file_unsupported_ext(self): with pytest.raises(HeaderError): parse_file(b"data", "test.dbf") # =========================================================================== # # T3 — coercion guard + needs_review + mesaj formule-None # # =========================================================================== # class TestCoercionNeedsReview: def test_vin_numeric_xlsx_flagged(self): """VIN '0123...' citit ca numeric din xlsx -> needs_review (coercion_flags).""" # openpyxl citeste VIN numeric ca int/float daca celula e formatata numeric wb = openpyxl.Workbook() ws = wb.active ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) # Scrie VIN ca numar (simuleaza comportamentul openpyxl pt celule numerice) ws.cell(row=2, column=1).value = 123456789012345 # int, nu string ws.cell(row=2, column=2).value = "B999TST" ws.cell(row=2, column=3).value = "2026-06-15" ws.cell(row=2, column=4).value = 123456 ws.cell(row=2, column=5).value = "Revizie" buf = io.BytesIO() wb.save(buf) result = parse_xlsx(buf.getvalue()) assert 0 in result.coercion_flags flags = result.coercion_flags[0] assert any("VIN" in f for f in flags) def test_odometru_float_coerced(self): """Odometru 123456.0 (float Excel) -> convertit la '123456'.""" wb = openpyxl.Workbook() ws = wb.active ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) ws.cell(row=2, column=1).value = "WVWZZZ1KZAW000123" ws.cell(row=2, column=2).value = "B999TST" ws.cell(row=2, column=3).value = "2026-06-15" ws.cell(row=2, column=4).value = 123456.0 # float cu .0 ws.cell(row=2, column=5).value = "Revizie" buf = io.BytesIO() wb.save(buf) result = parse_xlsx(buf.getvalue()) odo_val = result.rows[0]["Odometru final"] assert odo_val == "123456", f"Asteptat '123456', primit '{odo_val}'" # Nu trebuie flag needs_review pentru odometru .0 (e coercion standard) assert 0 not in result.coercion_flags or not any( "Odometru" in f for f in result.coercion_flags.get(0, []) ) def test_formula_column_detected(self): """Coloana cu >60% None (formule necalculate) -> formula_columns.""" wb = openpyxl.Workbook() ws = wb.active ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) for i in range(10): ws.append([ "WVWZZZ1KZAW000123", "B999TST", "2026-06-15", None, # formula necalculata -> None "Revizie", ]) buf = io.BytesIO() wb.save(buf) result = parse_xlsx(buf.getvalue()) assert "Odometru final" in result.formula_columns def test_datetime_native_converted_to_iso(self): """Celula datetime nativa -> convertita la YYYY-MM-DD (neambigua).""" wb = openpyxl.Workbook() ws = wb.active ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) ws.cell(row=2, column=1).value = "WVWZZZ1KZAW000123" ws.cell(row=2, column=2).value = "B999TST" ws.cell(row=2, column=3).value = datetime(2026, 6, 15, 10, 30) ws.cell(row=2, column=4).value = 123456 ws.cell(row=2, column=5).value = "Revizie" buf = io.BytesIO() wb.save(buf) result = parse_xlsx(buf.getvalue()) assert result.rows[0]["Data prestatie"] == "2026-06-15" def test_odometru_via_canonicalize_row(self): """Verifica ca odometru_final='123456.0' e normalizat de canonicalize_row.""" from app.idempotency import canonicalize_row raw = {"vin": "WVWZZZ1KZAW000123", "nr_inmatriculare": "B1TST", "data_prestatie": "2026-06-15", "odometru_final": "123456.0", "prestatii": []} canon = canonicalize_row(raw) assert canon["odometru_final"] == "123456" def test_vin_numeric_not_sent(self): """VIN numeric flagged -> coercion_flags prezent = auto-send blocat.""" wb = openpyxl.Workbook() ws = wb.active ws.append(["VIN", "Nr inmatriculare", "Data prestatie", "Odometru final", "Operatie"]) ws.cell(row=2, column=1).value = 1234567890 # numeric ws.cell(row=2, column=2).value = "B999TST" ws.cell(row=2, column=3).value = "2026-06-15" ws.cell(row=2, column=4).value = 123456 ws.cell(row=2, column=5).value = "Revizie" buf = io.BytesIO() wb.save(buf) result = parse_xlsx(buf.getvalue()) # Randul 0 trebuie sa aiba flags (needs_review) assert 0 in result.coercion_flags # =========================================================================== # # T10 — dezambiguizare data la nivel de COLOANA (OV-8) # # =========================================================================== # class TestDateColumnDisambiguation: def test_string_ambiguous_flagged(self): """'03.04.2026' string cu zi<=12 -> ambiguous.""" fmt = _infer_date_format_from_column(["03.04.2026", "05.06.2026", "01.02.2026"]) assert fmt == "ambiguous" def test_dd_first_detected_from_column(self): """Coloana cu cel putin un rand zi>12 -> DD.MM.YYYY detectat.""" # 15.04.2026: zi=15 > 12 -> DD-first sigur fmt = _infer_date_format_from_column(["03.04.2026", "15.04.2026", "01.02.2026"]) assert fmt == "DD.MM.YYYY" def test_month_gt12_also_dd_first(self): """Luna >12 imposibila -> cu siguranta DD.MM (ex: 04.13.2026 e imposibil -> zi=4, luna=13 ❌ / zi=13, luna=4 ✓).""" # Daca pozitia-1 (luna) > 12 -> DD-first fmt = _infer_date_format_from_column(["04.13.2026"]) assert fmt == "DD.MM.YYYY" def test_iso_format_detected(self): fmt = _infer_date_format_from_column(["2026-06-15", "2026-05-10"]) assert fmt == "YYYY-MM-DD" def test_native_datetime_column_format(self): """Coloana cu toate valorile datetime native -> format 'native'.""" col_values = {"Data prestatie": [datetime(2026, 6, 15), datetime(2026, 5, 10)]} fmt_map = _detect_date_formats(col_values, ["Data prestatie"]) assert fmt_map.get("Data prestatie") == "native" def test_parse_date_value_ambiguous_needs_review(self): """'03.04.2026' cu format ambiguous -> (iso, True) = needs_review.""" iso, ambiguous = parse_date_value("03.04.2026", "ambiguous") assert ambiguous is True assert iso == "2026-04-03" # parseaza ca DD.MM.YYYY def test_parse_date_value_native_already_iso(self): """Valoare deja convertita la ISO (din datetime nativ) -> (iso, False).""" iso, ambiguous = parse_date_value("2026-06-15", "native") assert iso == "2026-06-15" assert ambiguous is False def test_parse_date_value_dd_mm_yyyy(self): iso, ambiguous = parse_date_value("15.06.2026", "DD.MM.YYYY") assert iso == "2026-06-15" assert ambiguous is False def test_parse_date_value_empty(self): iso, ambiguous = parse_date_value(None, "DD.MM.YYYY") assert iso is None assert ambiguous is False def test_column_uniform_mm_dd_with_day_gt12(self): """Coloana uniform MM.DD cu randuri zi>12 -> format DD-first detectat la nivel coloana.""" # Exemplu: 03.04.2026 (ambiguu) + 15.04.2026 (zi=15>12 -> DD-first sigur) # -> intreaga coloana e DD.MM.YYYY fmt = _infer_date_format_from_column(["03.04.2026", "15.04.2026"]) assert fmt == "DD.MM.YYYY" # Rand care altfel ar parea ambiguu (03) e tratat corect de format detectat la nivel coloana iso, ambig = parse_date_value("03.04.2026", fmt) assert iso == "2026-04-03" assert ambig is False # =========================================================================== # # T13 — robustete export RO (multi-sheet + merged header + footer trim) # # =========================================================================== # class TestRobustetExportRO: def test_multisheet_raises_multiple_sheets(self): """Workbook cu 2 sheet-uri non-goale -> MultipleSheets cu lista.""" data = _make_xlsx_multisheet({ "Iunie": [_HEADER, _ROW1], "Iulie": [_HEADER, _ROW2], }) with pytest.raises(MultipleSheets) as exc: parse_xlsx(data) assert "Iunie" in exc.value.sheet_names assert "Iulie" in exc.value.sheet_names def test_multisheet_with_sheet_name_selected(self): """Dupa alegere sheet -> parsat corect.""" data = _make_xlsx_multisheet({ "Iunie": [_HEADER, _ROW1], "Iulie": [_HEADER, _ROW2], }) result = parse_xlsx(data, sheet_name="Iulie") assert len(result.rows) == 1 assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000124" def test_merged_header_resolved(self): """Header cu celule imbinate -> un-merge logic, nu coloane goale.""" wb = openpyxl.Workbook() ws = wb.active # Scrie header cu merge pe primele 2 coloane ws.cell(row=1, column=1).value = "Vehicul" ws.cell(row=1, column=3).value = "Data prestatie" ws.cell(row=1, column=4).value = "Odometru final" ws.cell(row=1, column=5).value = "Operatie" ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=2) ws.cell(row=2, column=1).value = "WVWZZZ1KZAW000123" ws.cell(row=2, column=2).value = "B999TST" ws.cell(row=2, column=3).value = "2026-06-15" ws.cell(row=2, column=4).value = 123456 ws.cell(row=2, column=5).value = "Revizie" buf = io.BytesIO() wb.save(buf) result = parse_xlsx(buf.getvalue()) # Merge propaga "Vehicul" la ambele coloane; dedup adauga sufix _2 assert "Vehicul" in result.columns assert "Vehicul_2" in result.columns # coloana 2 din merge — dezambiguizata cu sufix # Niciuna nu e goala/None assert len([c for c in result.columns if "Vehicul" in c]) == 2 def test_footer_rows_skipped(self): """Randuri trailing fara VIN + data -> skip structural, nu needs_data.""" rows = [ _HEADER, _ROW1, _ROW2, ["TOTAL", "", "", "222221", ""], # footer cu VIN "TOTAL" ["", "", "", "", ""], # rand complet gol ] data = _make_xlsx(rows) result = parse_xlsx(data) # Randul "TOTAL" are VIN non-gol ("TOTAL") si data goala -> nu e trim structural # Randul complet gol (VIN="" + data="") -> trim # Deci: 3 randuri raman (ROW1 + ROW2 + TOTAL) vins = [r.get("VIN") for r in result.rows] assert "" not in [v for v in vins if v is not None] # Randul gol complet sa nu fie prezent empty_rows = [r for r in result.rows if all(v is None or str(v).strip() == "" for v in r.values())] assert len(empty_rows) == 0 def test_footer_vin_and_date_both_empty_skipped(self): """Rand unde VIN si data sunt ambele goale -> skip (footer TOTAL/Intocmit de:).""" rows_list = [ {"VIN": "WVWZZZ1KZAW000123", "Data prestatie": "2026-06-15", "Odometru final": "123456"}, {"VIN": "", "Data prestatie": "", "Odometru final": "9999"}, # footer fals ] col_names = ["VIN", "Data prestatie", "Odometru final"] trimmed = _trim_footer(rows_list, col_names) assert len(trimmed) == 1 assert trimmed[0]["VIN"] == "WVWZZZ1KZAW000123" def test_single_sheet_no_error(self): """Workbook cu un singur sheet -> parsat fara MultipleSheets.""" data = _make_xlsx([_HEADER, _ROW1]) result = parse_xlsx(data) assert len(result.rows) == 1 # =========================================================================== # # Integrare parse_file # # =========================================================================== # class TestParseFileIntegration: def test_xlsx_full_flow(self): data = _make_xlsx([_HEADER, _ROW1, _ROW2]) result = parse_file(data, "prezentari_iunie.xlsx") assert result.columns == _HEADER assert len(result.rows) == 2 assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000123" assert result.rows[1]["Odometru final"] == "98765" def test_csv_semicolon_full_flow(self): data = _make_csv([_HEADER, _ROW1], delimiter=";") result = parse_file(data, "export_ro.csv") assert result.columns == _HEADER assert result.rows[0]["VIN"] == "WVWZZZ1KZAW000123"