Replace single-select gestiune dropdown with multi-select checkboxes. Settings stores comma-separated IDs, Python builds IN clause with bind variables, Oracle PL/SQL splits CSV via REGEXP_SUBSTR for stock lookup. Empty selection = all warehouses (unchanged behavior). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
442 lines
17 KiB
Python
442 lines
17 KiB
Python
import html
|
|
import json
|
|
import logging
|
|
import oracledb
|
|
from datetime import datetime, timedelta
|
|
from .. import database
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Diacritics to ASCII mapping (Romanian)
|
|
_DIACRITICS = str.maketrans({
|
|
'\u0103': 'a', # ă
|
|
'\u00e2': 'a', # â
|
|
'\u00ee': 'i', # î
|
|
'\u0219': 's', # ș
|
|
'\u021b': 't', # ț
|
|
'\u0102': 'A', # Ă
|
|
'\u00c2': 'A', # Â
|
|
'\u00ce': 'I', # Î
|
|
'\u0218': 'S', # Ș
|
|
'\u021a': 'T', # Ț
|
|
# Older Unicode variants
|
|
'\u015f': 's', # ş (cedilla)
|
|
'\u0163': 't', # ţ (cedilla)
|
|
'\u015e': 'S', # Ş
|
|
'\u0162': 'T', # Ţ
|
|
})
|
|
|
|
|
|
def clean_web_text(text: str) -> str:
|
|
"""Port of VFP CleanWebText: unescape HTML entities + diacritics to ASCII."""
|
|
if not text:
|
|
return ""
|
|
result = html.unescape(text)
|
|
result = result.translate(_DIACRITICS)
|
|
# Remove any remaining <br> tags
|
|
for br in ('<br>', '<br/>', '<br />'):
|
|
result = result.replace(br, ' ')
|
|
return result.strip()
|
|
|
|
|
|
def convert_web_date(date_str: str) -> datetime:
|
|
"""Port of VFP ConvertWebDate: parse web date to datetime."""
|
|
if not date_str:
|
|
return datetime.now()
|
|
try:
|
|
return datetime.strptime(date_str.strip(), '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(date_str.strip()[:10], '%Y-%m-%d')
|
|
except ValueError:
|
|
return datetime.now()
|
|
|
|
|
|
def format_address_for_oracle(address: str, city: str, region: str) -> str:
|
|
"""Port of VFP FormatAddressForOracle."""
|
|
region_clean = clean_web_text(region)
|
|
city_clean = clean_web_text(city)
|
|
address_clean = clean_web_text(address)
|
|
return f"JUD:{region_clean};{city_clean};{address_clean}"
|
|
|
|
|
|
def compute_discount_split(order, settings: dict) -> dict | None:
|
|
"""Compute proportional discount split by VAT rate from order items.
|
|
|
|
Returns: {"11": 3.98, "21": 1.43} or None if split not applicable.
|
|
Only splits when split_discount_vat is enabled AND multiple VAT rates exist.
|
|
When single VAT rate: returns {actual_rate: total} (smarter than GoMag's fixed 21%).
|
|
"""
|
|
if not order or order.discount_total <= 0:
|
|
return None
|
|
|
|
split_enabled = settings.get("split_discount_vat") == "1"
|
|
|
|
# Calculate VAT distribution from order items (exclude zero-value)
|
|
vat_totals = {}
|
|
for item in order.items:
|
|
item_value = abs(item.price * item.quantity)
|
|
if item_value > 0:
|
|
vat_key = str(int(item.vat)) if item.vat == int(item.vat) else str(item.vat)
|
|
vat_totals[vat_key] = vat_totals.get(vat_key, 0) + item_value
|
|
|
|
if not vat_totals:
|
|
return None
|
|
|
|
grand_total = sum(vat_totals.values())
|
|
if grand_total <= 0:
|
|
return None
|
|
|
|
if len(vat_totals) == 1:
|
|
# Single VAT rate — use that rate (smarter than GoMag's fixed 21%)
|
|
actual_vat = list(vat_totals.keys())[0]
|
|
return {actual_vat: round(order.discount_total, 2)}
|
|
|
|
if not split_enabled:
|
|
return None
|
|
|
|
# Multiple VAT rates — split proportionally
|
|
result = {}
|
|
discount_remaining = order.discount_total
|
|
sorted_rates = sorted(vat_totals.keys(), key=lambda x: float(x))
|
|
|
|
for i, vat_rate in enumerate(sorted_rates):
|
|
if i == len(sorted_rates) - 1:
|
|
split_amount = round(discount_remaining, 2) # last gets remainder
|
|
else:
|
|
proportion = vat_totals[vat_rate] / grand_total
|
|
split_amount = round(order.discount_total * proportion, 2)
|
|
discount_remaining -= split_amount
|
|
|
|
if split_amount > 0:
|
|
result[vat_rate] = split_amount
|
|
|
|
return result if result else None
|
|
|
|
|
|
def build_articles_json(items, order=None, settings=None) -> str:
|
|
"""Build JSON string for Oracle PACK_IMPORT_COMENZI.importa_comanda.
|
|
Includes transport and discount as extra articles if configured.
|
|
Supports per-article id_pol from codmat_policy_map and discount VAT splitting."""
|
|
articles = []
|
|
codmat_policy_map = settings.get("_codmat_policy_map", {}) if settings else {}
|
|
default_id_pol = settings.get("id_pol", "") if settings else ""
|
|
|
|
for item in items:
|
|
article_dict = {
|
|
"sku": item.sku,
|
|
"quantity": str(item.quantity),
|
|
"price": str(item.price),
|
|
"vat": str(item.vat),
|
|
"name": clean_web_text(item.name)
|
|
}
|
|
# Per-article id_pol from dual-policy validation
|
|
item_pol = codmat_policy_map.get(item.sku)
|
|
if item_pol and str(item_pol) != str(default_id_pol):
|
|
article_dict["id_pol"] = str(item_pol)
|
|
articles.append(article_dict)
|
|
|
|
if order and settings:
|
|
transport_codmat = settings.get("transport_codmat", "")
|
|
transport_vat = settings.get("transport_vat", "21")
|
|
discount_codmat = settings.get("discount_codmat", "")
|
|
|
|
# Transport as article with quantity +1
|
|
if order.delivery_cost > 0 and transport_codmat:
|
|
article_dict = {
|
|
"sku": transport_codmat,
|
|
"quantity": "1",
|
|
"price": str(order.delivery_cost),
|
|
"vat": transport_vat,
|
|
"name": "Transport"
|
|
}
|
|
if settings.get("transport_id_pol"):
|
|
article_dict["id_pol"] = settings["transport_id_pol"]
|
|
articles.append(article_dict)
|
|
|
|
# Discount — smart VAT splitting
|
|
if order.discount_total > 0 and discount_codmat:
|
|
discount_split = compute_discount_split(order, settings)
|
|
|
|
if discount_split and len(discount_split) > 1:
|
|
# Multiple VAT rates — multiple discount lines
|
|
for vat_rate, split_amount in sorted(discount_split.items(), key=lambda x: float(x[0])):
|
|
article_dict = {
|
|
"sku": discount_codmat,
|
|
"quantity": "-1",
|
|
"price": str(split_amount),
|
|
"vat": vat_rate,
|
|
"name": f"Discount (TVA {vat_rate}%)"
|
|
}
|
|
if settings.get("discount_id_pol"):
|
|
article_dict["id_pol"] = settings["discount_id_pol"]
|
|
articles.append(article_dict)
|
|
elif discount_split and len(discount_split) == 1:
|
|
# Single VAT rate — use detected rate
|
|
actual_vat = list(discount_split.keys())[0]
|
|
article_dict = {
|
|
"sku": discount_codmat,
|
|
"quantity": "-1",
|
|
"price": str(order.discount_total),
|
|
"vat": actual_vat,
|
|
"name": "Discount"
|
|
}
|
|
if settings.get("discount_id_pol"):
|
|
article_dict["id_pol"] = settings["discount_id_pol"]
|
|
articles.append(article_dict)
|
|
else:
|
|
# Fallback — original behavior with GoMag VAT or settings default
|
|
discount_vat = getattr(order, 'discount_vat', None) or settings.get("discount_vat", "21")
|
|
article_dict = {
|
|
"sku": discount_codmat,
|
|
"quantity": "-1",
|
|
"price": str(order.discount_total),
|
|
"vat": discount_vat,
|
|
"name": "Discount"
|
|
}
|
|
if settings.get("discount_id_pol"):
|
|
article_dict["id_pol"] = settings["discount_id_pol"]
|
|
articles.append(article_dict)
|
|
|
|
return json.dumps(articles)
|
|
|
|
|
|
def import_single_order(order, id_pol: int = None, id_sectie: int = None, app_settings: dict = None, id_gestiuni: list[int] = None) -> dict:
|
|
"""Import a single order into Oracle ROA.
|
|
|
|
Returns dict with:
|
|
success: bool
|
|
id_comanda: int or None
|
|
id_partener: int or None
|
|
id_adresa_facturare: int or None
|
|
id_adresa_livrare: int or None
|
|
error: str or None
|
|
"""
|
|
result = {
|
|
"success": False,
|
|
"id_comanda": None,
|
|
"id_partener": None,
|
|
"id_adresa_facturare": None,
|
|
"id_adresa_livrare": None,
|
|
"error": None
|
|
}
|
|
|
|
conn = None
|
|
try:
|
|
order_number = clean_web_text(order.number)
|
|
order_date = convert_web_date(order.date)
|
|
logger.info(
|
|
f"Order {order.number}: raw date={order.date!r} → "
|
|
f"parsed={order_date.strftime('%Y-%m-%d %H:%M:%S')}"
|
|
)
|
|
|
|
if database.pool is None:
|
|
raise RuntimeError("Oracle pool not initialized")
|
|
conn = database.pool.acquire()
|
|
with conn.cursor() as cur:
|
|
# Step 1: Process partner — use shipping person data for name
|
|
id_partener = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
|
|
if order.billing.is_company:
|
|
denumire = clean_web_text(order.billing.company_name).upper()
|
|
cod_fiscal = clean_web_text(order.billing.company_code) or None
|
|
registru = clean_web_text(order.billing.company_reg) or None
|
|
is_pj = 1
|
|
else:
|
|
# Use shipping person for partner name (person on shipping label)
|
|
if order.shipping and (order.shipping.lastname or order.shipping.firstname):
|
|
denumire = clean_web_text(
|
|
f"{order.shipping.lastname} {order.shipping.firstname}"
|
|
).upper()
|
|
else:
|
|
denumire = clean_web_text(
|
|
f"{order.billing.lastname} {order.billing.firstname}"
|
|
).upper()
|
|
cod_fiscal = None
|
|
registru = None
|
|
is_pj = 0
|
|
|
|
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_partener", [
|
|
cod_fiscal, denumire, registru, is_pj, id_partener
|
|
])
|
|
|
|
partner_id = id_partener.getvalue()
|
|
if not partner_id or partner_id <= 0:
|
|
result["error"] = f"Partner creation failed for {denumire}"
|
|
return result
|
|
|
|
result["id_partener"] = int(partner_id)
|
|
|
|
# Determine if billing and shipping are different persons
|
|
billing_name = clean_web_text(
|
|
f"{order.billing.lastname} {order.billing.firstname}"
|
|
).strip().upper()
|
|
shipping_name = ""
|
|
if order.shipping:
|
|
shipping_name = clean_web_text(
|
|
f"{order.shipping.lastname} {order.shipping.firstname}"
|
|
).strip().upper()
|
|
different_person = bool(
|
|
shipping_name and billing_name and shipping_name != billing_name
|
|
)
|
|
|
|
# Step 2: Process shipping address (primary — person on shipping label)
|
|
# Use shipping person phone/email for partner contact
|
|
shipping_phone = ""
|
|
shipping_email = ""
|
|
if order.shipping:
|
|
shipping_phone = order.shipping.phone or ""
|
|
shipping_email = order.shipping.email or ""
|
|
if not shipping_phone:
|
|
shipping_phone = order.billing.phone or ""
|
|
if not shipping_email:
|
|
shipping_email = order.billing.email or ""
|
|
|
|
addr_livr_id = None
|
|
if order.shipping:
|
|
id_adresa_livr = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
shipping_addr = format_address_for_oracle(
|
|
order.shipping.address, order.shipping.city,
|
|
order.shipping.region
|
|
)
|
|
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_adresa", [
|
|
partner_id, shipping_addr,
|
|
shipping_phone,
|
|
shipping_email,
|
|
id_adresa_livr
|
|
])
|
|
addr_livr_id = id_adresa_livr.getvalue()
|
|
|
|
# Step 3: Process billing address
|
|
if different_person:
|
|
# Different person: use shipping address for BOTH billing and shipping in ROA
|
|
addr_fact_id = addr_livr_id
|
|
else:
|
|
# Same person: use billing address as-is
|
|
id_adresa_fact = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
billing_addr = format_address_for_oracle(
|
|
order.billing.address, order.billing.city, order.billing.region
|
|
)
|
|
cur.callproc("PACK_IMPORT_PARTENERI.cauta_sau_creeaza_adresa", [
|
|
partner_id, billing_addr,
|
|
order.billing.phone or "",
|
|
order.billing.email or "",
|
|
id_adresa_fact
|
|
])
|
|
addr_fact_id = id_adresa_fact.getvalue()
|
|
|
|
if addr_fact_id is not None:
|
|
result["id_adresa_facturare"] = int(addr_fact_id)
|
|
if addr_livr_id is not None:
|
|
result["id_adresa_livrare"] = int(addr_livr_id)
|
|
|
|
# Step 4: Build articles JSON and import order
|
|
articles_json = build_articles_json(order.items, order, app_settings)
|
|
|
|
# Use CLOB for the JSON
|
|
clob_var = cur.var(oracledb.DB_TYPE_CLOB)
|
|
clob_var.setvalue(0, articles_json)
|
|
|
|
id_comanda = cur.var(oracledb.DB_TYPE_NUMBER)
|
|
|
|
# Convert list[int] to CSV string for Oracle VARCHAR2 param
|
|
id_gestiune_csv = ",".join(str(g) for g in id_gestiuni) if id_gestiuni else None
|
|
|
|
cur.callproc("PACK_IMPORT_COMENZI.importa_comanda", [
|
|
order_number, # p_nr_comanda_ext
|
|
order_date, # p_data_comanda
|
|
partner_id, # p_id_partener
|
|
clob_var, # p_json_articole (CLOB)
|
|
addr_livr_id, # p_id_adresa_livrare
|
|
addr_fact_id, # p_id_adresa_facturare
|
|
id_pol, # p_id_pol
|
|
id_sectie, # p_id_sectie
|
|
id_gestiune_csv, # p_id_gestiune (CSV string)
|
|
id_comanda # v_id_comanda (OUT)
|
|
])
|
|
|
|
comanda_id = id_comanda.getvalue()
|
|
|
|
if comanda_id and comanda_id > 0:
|
|
conn.commit()
|
|
result["success"] = True
|
|
result["id_comanda"] = int(comanda_id)
|
|
logger.info(f"Order {order_number} imported: ID={comanda_id}")
|
|
else:
|
|
conn.rollback()
|
|
result["error"] = "importa_comanda returned invalid ID"
|
|
|
|
except oracledb.DatabaseError as e:
|
|
error_msg = str(e)
|
|
result["error"] = error_msg
|
|
logger.error(f"Oracle error importing order {order.number}: {error_msg}")
|
|
if conn:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
logger.error(f"Error importing order {order.number}: {e}")
|
|
if conn:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
if conn:
|
|
try:
|
|
database.pool.release(conn)
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def soft_delete_order_in_roa(id_comanda: int) -> dict:
|
|
"""Soft-delete an order in Oracle ROA (set sters=1 on comenzi + comenzi_detalii).
|
|
Returns {"success": bool, "error": str|None, "details_deleted": int}
|
|
"""
|
|
result = {"success": False, "error": None, "details_deleted": 0}
|
|
|
|
if database.pool is None:
|
|
result["error"] = "Oracle pool not initialized"
|
|
return result
|
|
|
|
conn = None
|
|
try:
|
|
conn = database.pool.acquire()
|
|
with conn.cursor() as cur:
|
|
# Soft-delete order details
|
|
cur.execute(
|
|
"UPDATE comenzi_detalii SET sters = 1 WHERE id_comanda = :1 AND sters = 0",
|
|
[id_comanda]
|
|
)
|
|
result["details_deleted"] = cur.rowcount
|
|
|
|
# Soft-delete the order itself
|
|
cur.execute(
|
|
"UPDATE comenzi SET sters = 1 WHERE id_comanda = :1 AND sters = 0",
|
|
[id_comanda]
|
|
)
|
|
|
|
conn.commit()
|
|
result["success"] = True
|
|
logger.info(f"Soft-deleted order ID={id_comanda} in Oracle ROA ({result['details_deleted']} details)")
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
logger.error(f"Error soft-deleting order ID={id_comanda}: {e}")
|
|
if conn:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
if conn:
|
|
try:
|
|
database.pool.release(conn)
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|