Files
gomag-vending/api/app/services/mapping_service.py
Claude Agent 452dc9b9f0 feat(mappings): strict validation + silent CSV skip for missing CODMAT
Add Pydantic validators and service-level checks that reject empty SKU/CODMAT
on create/edit (400). CSV import now silently skips rows without CODMAT and
counts them in skipped_no_codmat instead of treating them as errors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 21:46:59 +00:00

365 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import oracledb
import csv
import io
import logging
from fastapi import HTTPException
from .. import database
logger = logging.getLogger(__name__)
def get_mappings(search: str = "", page: int = 1, per_page: int = 50,
sort_by: str = "sku", sort_dir: str = "asc",
show_deleted: bool = False, pct_filter: str = None):
"""Get paginated mappings with optional search, sorting, and pct_filter.
pct_filter values:
'complete' only SKU groups where sum(procent_pret for active rows) == 100
'incomplete' only SKU groups where sum < 100
None / 'all' no filter
"""
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
offset = (page - 1) * per_page
# Validate and resolve sort parameters
allowed_sort = {
"sku": "at.sku",
"codmat": "at.codmat",
"denumire": "na.denumire",
"um": "na.um",
"cantitate_roa": "at.cantitate_roa",
"procent_pret": "at.procent_pret",
"activ": "at.activ",
}
sort_col = allowed_sort.get(sort_by, "at.sku")
if sort_dir.lower() not in ("asc", "desc"):
sort_dir = "asc"
order_clause = f"{sort_col} {sort_dir}"
# Always add secondary sort to keep groups together
if sort_col not in ("at.sku",):
order_clause += ", at.sku"
order_clause += ", at.codmat"
with database.pool.acquire() as conn:
with conn.cursor() as cur:
# Build WHERE clause
where_clauses = []
params = {}
if not show_deleted:
where_clauses.append("at.sters = 0")
if search:
where_clauses.append("""(UPPER(at.sku) LIKE '%' || UPPER(:search) || '%'
OR UPPER(at.codmat) LIKE '%' || UPPER(:search) || '%'
OR UPPER(na.denumire) LIKE '%' || UPPER(:search) || '%')""")
params["search"] = search
where = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
# Fetch ALL matching rows (no pagination yet — we need to group by SKU first)
data_sql = f"""
SELECT at.sku, at.codmat, na.denumire, na.um, at.cantitate_roa,
at.procent_pret, at.activ, at.sters,
TO_CHAR(at.data_creare, 'YYYY-MM-DD HH24:MI') as data_creare
FROM ARTICOLE_TERTI at
LEFT JOIN nom_articole na ON na.codmat = at.codmat
{where}
ORDER BY {order_clause}
"""
cur.execute(data_sql, params)
columns = [col[0].lower() for col in cur.description]
all_rows = [dict(zip(columns, row)) for row in cur.fetchall()]
# Group by SKU and compute pct_total for each group
from collections import OrderedDict
groups = OrderedDict()
for row in all_rows:
sku = row["sku"]
if sku not in groups:
groups[sku] = []
groups[sku].append(row)
# Compute counts across ALL groups (before pct_filter)
total_skus = len(groups)
complete_skus = 0
incomplete_skus = 0
for sku, rows in groups.items():
pct_total = sum(
(r["procent_pret"] or 0)
for r in rows
if r.get("activ") == 1
)
if abs(pct_total - 100) <= 0.01:
complete_skus += 1
else:
incomplete_skus += 1
counts = {
"total": total_skus,
"complete": complete_skus,
"incomplete": incomplete_skus,
}
# Apply pct_filter
if pct_filter in ("complete", "incomplete"):
filtered_groups = {}
for sku, rows in groups.items():
pct_total = sum(
(r["procent_pret"] or 0)
for r in rows
if r.get("activ") == 1
)
is_complete = abs(pct_total - 100) <= 0.01
if pct_filter == "complete" and is_complete:
filtered_groups[sku] = rows
elif pct_filter == "incomplete" and not is_complete:
filtered_groups[sku] = rows
groups = filtered_groups
# Flatten back to rows for pagination (paginate by raw row count)
filtered_rows = [row for rows in groups.values() for row in rows]
total = len(filtered_rows)
page_rows = filtered_rows[offset: offset + per_page]
# Attach pct_total and is_complete to each row for the renderer
# Re-compute per visible group
sku_pct = {}
for sku, rows in groups.items():
pct_total = sum(
(r["procent_pret"] or 0)
for r in rows
if r.get("activ") == 1
)
sku_pct[sku] = {"pct_total": pct_total, "is_complete": abs(pct_total - 100) <= 0.01}
for row in page_rows:
meta = sku_pct.get(row["sku"], {"pct_total": 0, "is_complete": False})
row["pct_total"] = meta["pct_total"]
row["is_complete"] = meta["is_complete"]
return {
"mappings": page_rows,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 0,
"counts": counts,
}
def create_mapping(sku: str, codmat: str, cantitate_roa: float = 1, procent_pret: float = 100):
"""Create a new mapping. Returns dict or raises HTTPException on duplicate."""
if not sku or not sku.strip():
raise HTTPException(status_code=400, detail="SKU este obligatoriu")
if not codmat or not codmat.strip():
raise HTTPException(status_code=400, detail="CODMAT este obligatoriu")
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
with database.pool.acquire() as conn:
with conn.cursor() as cur:
# Check for active duplicate
cur.execute("""
SELECT COUNT(*) FROM ARTICOLE_TERTI
WHERE sku = :sku AND codmat = :codmat AND NVL(sters, 0) = 0
""", {"sku": sku, "codmat": codmat})
if cur.fetchone()[0] > 0:
raise HTTPException(status_code=409, detail="Maparea SKU-CODMAT există deja")
# Check for soft-deleted record that could be restored
cur.execute("""
SELECT COUNT(*) FROM ARTICOLE_TERTI
WHERE sku = :sku AND codmat = :codmat AND sters = 1
""", {"sku": sku, "codmat": codmat})
if cur.fetchone()[0] > 0:
raise HTTPException(
status_code=409,
detail="Maparea a fost ștearsă anterior",
headers={"X-Can-Restore": "true"}
)
cur.execute("""
INSERT INTO ARTICOLE_TERTI (sku, codmat, cantitate_roa, procent_pret, activ, sters, data_creare, id_util_creare)
VALUES (:sku, :codmat, :cantitate_roa, :procent_pret, 1, 0, SYSDATE, -3)
""", {"sku": sku, "codmat": codmat, "cantitate_roa": cantitate_roa, "procent_pret": procent_pret})
conn.commit()
return {"sku": sku, "codmat": codmat}
def update_mapping(sku: str, codmat: str, cantitate_roa: float = None, procent_pret: float = None, activ: int = None):
"""Update an existing mapping."""
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
sets = []
params = {"sku": sku, "codmat": codmat}
if cantitate_roa is not None:
sets.append("cantitate_roa = :cantitate_roa")
params["cantitate_roa"] = cantitate_roa
if procent_pret is not None:
sets.append("procent_pret = :procent_pret")
params["procent_pret"] = procent_pret
if activ is not None:
sets.append("activ = :activ")
params["activ"] = activ
if not sets:
return False
sets.append("data_modif = SYSDATE")
set_clause = ", ".join(sets)
with database.pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute(f"""
UPDATE ARTICOLE_TERTI SET {set_clause}
WHERE sku = :sku AND codmat = :codmat
""", params)
conn.commit()
return cur.rowcount > 0
def delete_mapping(sku: str, codmat: str):
"""Soft delete (set sters=1)."""
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
with database.pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE ARTICOLE_TERTI SET sters = 1, data_modif = SYSDATE
WHERE sku = :sku AND codmat = :codmat
""", {"sku": sku, "codmat": codmat})
conn.commit()
return cur.rowcount > 0
def edit_mapping(old_sku: str, old_codmat: str, new_sku: str, new_codmat: str,
cantitate_roa: float = 1, procent_pret: float = 100):
"""Edit a mapping. If PK changed, soft-delete old and insert new."""
if not new_sku or not new_sku.strip():
raise HTTPException(status_code=400, detail="SKU este obligatoriu")
if not new_codmat or not new_codmat.strip():
raise HTTPException(status_code=400, detail="CODMAT este obligatoriu")
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
if old_sku == new_sku and old_codmat == new_codmat:
# Simple update - only cantitate/procent changed
return update_mapping(new_sku, new_codmat, cantitate_roa, procent_pret)
else:
# PK changed: soft-delete old, upsert new (MERGE handles existing soft-deleted target)
with database.pool.acquire() as conn:
with conn.cursor() as cur:
# Mark old record as deleted
cur.execute("""
UPDATE ARTICOLE_TERTI SET sters = 1, data_modif = SYSDATE
WHERE sku = :sku AND codmat = :codmat
""", {"sku": old_sku, "codmat": old_codmat})
# Upsert new record (MERGE in case target PK exists as soft-deleted)
cur.execute("""
MERGE INTO ARTICOLE_TERTI t
USING (SELECT :sku AS sku, :codmat AS codmat FROM DUAL) s
ON (t.sku = s.sku AND t.codmat = s.codmat)
WHEN MATCHED THEN UPDATE SET
cantitate_roa = :cantitate_roa,
procent_pret = :procent_pret,
activ = 1, sters = 0,
data_modif = SYSDATE
WHEN NOT MATCHED THEN INSERT
(sku, codmat, cantitate_roa, procent_pret, activ, sters, data_creare, id_util_creare)
VALUES (:sku, :codmat, :cantitate_roa, :procent_pret, 1, 0, SYSDATE, -3)
""", {"sku": new_sku, "codmat": new_codmat,
"cantitate_roa": cantitate_roa, "procent_pret": procent_pret})
conn.commit()
return True
def restore_mapping(sku: str, codmat: str):
"""Restore a soft-deleted mapping (set sters=0)."""
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
with database.pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE ARTICOLE_TERTI SET sters = 0, data_modif = SYSDATE
WHERE sku = :sku AND codmat = :codmat
""", {"sku": sku, "codmat": codmat})
conn.commit()
return cur.rowcount > 0
def import_csv(file_content: str):
"""Import mappings from CSV content. Returns summary."""
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
reader = csv.DictReader(io.StringIO(file_content))
created = 0
skipped_no_codmat = 0
errors = []
with database.pool.acquire() as conn:
with conn.cursor() as cur:
for i, row in enumerate(reader, 1):
sku = row.get("sku", "").strip()
codmat = row.get("codmat", "").strip()
if not sku:
errors.append(f"Rând {i}: SKU lipsă")
continue
if not codmat:
skipped_no_codmat += 1
continue
try:
cantitate = float(row.get("cantitate_roa", "1") or "1")
procent = float(row.get("procent_pret", "100") or "100")
cur.execute("""
MERGE INTO ARTICOLE_TERTI t
USING (SELECT :sku AS sku, :codmat AS codmat FROM DUAL) s
ON (t.sku = s.sku AND t.codmat = s.codmat)
WHEN MATCHED THEN UPDATE SET
cantitate_roa = :cantitate_roa,
procent_pret = :procent_pret,
activ = 1,
sters = 0,
data_modif = SYSDATE
WHEN NOT MATCHED THEN INSERT
(sku, codmat, cantitate_roa, procent_pret, activ, sters, data_creare, id_util_creare)
VALUES (:sku, :codmat, :cantitate_roa, :procent_pret, 1, 0, SYSDATE, -3)
""", {"sku": sku, "codmat": codmat, "cantitate_roa": cantitate, "procent_pret": procent})
created += 1
except Exception as e:
errors.append(f"Rând {i}: {str(e)}")
conn.commit()
return {"processed": created, "skipped_no_codmat": skipped_no_codmat, "errors": errors}
def export_csv():
"""Export all mappings as CSV string."""
if database.pool is None:
raise HTTPException(status_code=503, detail="Oracle unavailable")
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["sku", "codmat", "cantitate_roa", "procent_pret", "activ"])
with database.pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT sku, codmat, cantitate_roa, procent_pret, activ
FROM ARTICOLE_TERTI WHERE sters = 0 ORDER BY sku, codmat
""")
for row in cur:
writer.writerow(row)
return output.getvalue()
def get_csv_template():
"""Return empty CSV template."""
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["sku", "codmat", "cantitate_roa", "procent_pret"])
writer.writerow(["EXAMPLE_SKU", "EXAMPLE_CODMAT", "1", "100"])
return output.getvalue()