Shared Components: - Add CompanySelector.vue and PeriodSelector.vue components - Add AppHeader.vue and SlideMenu.vue layout components - Add shared stores factories (companies.js, accountingPeriod.js) - Add shared routes factories (companies.py, calendar.py) - Add shared models (company.py, calendar.py) - Add shared layout styles (header.css, navigation.css) Data Entry App: - Update CLAUDE.md with prod/test server documentation - Improve nomenclature sync service with better error handling - Update receipts router and CRUD operations - Add company/period stores using shared factories - Update App.vue layout with shared components - Fix OCRUploadZone file handling Reports App: - Refactor stores to use shared factories - Update App.vue to use shared layout components Infrastructure: - Replace start-data-entry.sh with separate dev/test scripts - Add .claude/rules for authentication, backend patterns, etc. - Add implementation plan for OCR receipt improvements - Clean up old documentation files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
409 lines
15 KiB
Python
409 lines
15 KiB
Python
"""Service for syncing nomenclatures from Oracle to SQLite."""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional, List, Tuple
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
from sqlmodel import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
# Add shared modules path
|
|
project_root = Path(__file__).parent.parent.parent.parent.parent
|
|
sys.path.insert(0, str(project_root / "shared"))
|
|
|
|
from database.oracle_pool import oracle_pool
|
|
from app.db.models.nomenclature import SyncedSupplier, LocalSupplier, SyncedCashRegister
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cache for schema lookups (populated dynamically from Oracle)
|
|
_schema_cache: dict[int, str] = {}
|
|
|
|
|
|
class SyncService:
|
|
"""Service for syncing nomenclatures from Oracle."""
|
|
|
|
@staticmethod
|
|
async def get_schema_for_company(company_id: int) -> Optional[str]:
|
|
"""
|
|
Get Oracle schema for company ID from V_NOM_FIRME view.
|
|
Results are cached in memory for performance.
|
|
"""
|
|
# Check cache first
|
|
if company_id in _schema_cache:
|
|
return _schema_cache[company_id]
|
|
|
|
try:
|
|
async with oracle_pool.get_connection() as connection:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT SCHEMA
|
|
FROM CONTAFIN_ORACLE.V_NOM_FIRME
|
|
WHERE ID_FIRMA = :company_id
|
|
""", {'company_id': company_id})
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
schema = result[0]
|
|
_schema_cache[company_id] = schema
|
|
logger.info(f"Resolved schema for company {company_id}: {schema}")
|
|
return schema
|
|
else:
|
|
logger.warning(f"No schema found for company {company_id}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching schema for company {company_id}: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
async def sync_suppliers(session: AsyncSession, company_id: int) -> Tuple[int, int]:
|
|
"""
|
|
Sync suppliers (furnizori, id_tip_part=17) from Oracle to SQLite.
|
|
Uses CORESP_TIP_PART joined with VNOM_PARTENERI view.
|
|
Returns (synced_count, error_count).
|
|
"""
|
|
schema = await SyncService.get_schema_for_company(company_id)
|
|
if not schema:
|
|
logger.warning(f"No schema mapping for company {company_id}")
|
|
return 0, 0
|
|
|
|
synced = 0
|
|
errors = 0
|
|
|
|
try:
|
|
async with oracle_pool.get_connection() as connection:
|
|
with connection.cursor() as cursor:
|
|
# Fetch active suppliers from Oracle
|
|
# id_tip_part = 17 means "furnizori" (suppliers)
|
|
# Using CORESP_TIP_PART to filter by partner type
|
|
cursor.execute(f"""
|
|
SELECT B.ID_PART, B.DENUMIRE, B.COD_FISCAL, B.ADRESA
|
|
FROM {schema}.CORESP_TIP_PART A
|
|
INNER JOIN {schema}.VNOM_PARTENERI B ON A.ID_PART = B.ID_PART
|
|
WHERE A.ID_TIP_PART = 17
|
|
AND (B.INACTIV = 0 OR B.INACTIV IS NULL)
|
|
AND B.ID_PART IS NOT NULL
|
|
ORDER BY B.DENUMIRE
|
|
""")
|
|
rows = cursor.fetchall()
|
|
|
|
for row in rows:
|
|
try:
|
|
oracle_id, name, fiscal_code, address = row
|
|
|
|
# Check if already exists
|
|
stmt = select(SyncedSupplier).where(
|
|
SyncedSupplier.oracle_id == oracle_id,
|
|
SyncedSupplier.company_id == company_id
|
|
)
|
|
result = await session.execute(stmt)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing:
|
|
# Update existing record
|
|
existing.name = name or ""
|
|
existing.fiscal_code = fiscal_code
|
|
existing.address = address
|
|
existing.synced_at = datetime.utcnow()
|
|
logger.debug(f"Updated supplier {oracle_id}: {name}")
|
|
else:
|
|
# Create new record
|
|
supplier = SyncedSupplier(
|
|
oracle_id=oracle_id,
|
|
company_id=company_id,
|
|
name=name or "",
|
|
fiscal_code=fiscal_code,
|
|
address=address,
|
|
)
|
|
session.add(supplier)
|
|
logger.debug(f"Created supplier {oracle_id}: {name}")
|
|
|
|
synced += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing supplier row {row}: {e}")
|
|
errors += 1
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
logger.info(f"Synced {synced} suppliers for company {company_id}, {errors} errors")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing suppliers for company {company_id}: {e}")
|
|
errors += 1
|
|
await session.rollback()
|
|
|
|
return synced, errors
|
|
|
|
@staticmethod
|
|
async def sync_cash_registers(session: AsyncSession, company_id: int) -> Tuple[int, int]:
|
|
"""
|
|
Sync cash registers and bank accounts from Oracle to SQLite.
|
|
Returns (synced_count, error_count).
|
|
|
|
Uses CORESP_TIP_PART with:
|
|
- id_tip_part = 22: CASA LEI
|
|
- id_tip_part = 23: CASA VALUTA
|
|
- id_tip_part = 24: BANCA LEI
|
|
- id_tip_part = 25: BANCA VALUTA
|
|
"""
|
|
schema = await SyncService.get_schema_for_company(company_id)
|
|
if not schema:
|
|
logger.warning(f"No schema mapping for company {company_id}")
|
|
return 0, 0
|
|
|
|
synced = 0
|
|
errors = 0
|
|
|
|
# Partner types mapping
|
|
# 22=CASA LEI, 23=CASA VALUTA -> cash
|
|
# 24=BANCA LEI, 25=BANCA VALUTA -> bank
|
|
partner_types = [22, 23, 24, 25]
|
|
|
|
try:
|
|
async with oracle_pool.get_connection() as connection:
|
|
with connection.cursor() as cursor:
|
|
# Fetch cash/bank partners from CORESP_TIP_PART
|
|
cursor.execute(f"""
|
|
SELECT B.ID_PART, B.DENUMIRE, A.ID_TIP_PART
|
|
FROM {schema}.CORESP_TIP_PART A
|
|
INNER JOIN {schema}.VNOM_PARTENERI B ON A.ID_PART = B.ID_PART
|
|
WHERE A.ID_TIP_PART IN (22, 23, 24, 25)
|
|
AND (B.INACTIV = 0 OR B.INACTIV IS NULL)
|
|
AND B.ID_PART IS NOT NULL
|
|
ORDER BY A.ID_TIP_PART, B.DENUMIRE
|
|
""")
|
|
rows = cursor.fetchall()
|
|
|
|
# Type mapping: 22=CASA LEI, 23=CASA VALUTA -> cash; 24=BANCA LEI, 25=BANCA VALUTA -> bank
|
|
type_mapping = {
|
|
22: ("cash", "CASA_LEI"),
|
|
23: ("cash", "CASA_VALUTA"),
|
|
24: ("bank", "BANCA_LEI"),
|
|
25: ("bank", "BANCA_VALUTA"),
|
|
}
|
|
|
|
for row in rows:
|
|
try:
|
|
oracle_id, name, tip_part_id = row
|
|
|
|
# Determine type based on partner type
|
|
register_type, account_code = type_mapping.get(tip_part_id, ("cash", "UNKNOWN"))
|
|
|
|
# Check if already exists
|
|
stmt = select(SyncedCashRegister).where(
|
|
SyncedCashRegister.oracle_id == oracle_id,
|
|
SyncedCashRegister.company_id == company_id
|
|
)
|
|
result = await session.execute(stmt)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing:
|
|
# Update existing record
|
|
existing.name = name or ""
|
|
existing.account_code = account_code
|
|
existing.register_type = register_type
|
|
existing.synced_at = datetime.utcnow()
|
|
logger.debug(f"Updated cash register {oracle_id}: {name}")
|
|
else:
|
|
# Create new record
|
|
cash_register = SyncedCashRegister(
|
|
oracle_id=oracle_id,
|
|
company_id=company_id,
|
|
name=name or "",
|
|
account_code=account_code,
|
|
register_type=register_type,
|
|
)
|
|
session.add(cash_register)
|
|
logger.debug(f"Created cash register {oracle_id}: {name}")
|
|
|
|
synced += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing cash register row {row}: {e}")
|
|
errors += 1
|
|
|
|
# Commit all changes
|
|
await session.commit()
|
|
logger.info(f"Synced {synced} cash registers for company {company_id}, {errors} errors")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing cash registers for company {company_id}: {e}")
|
|
errors += 1
|
|
await session.rollback()
|
|
|
|
return synced, errors
|
|
|
|
@staticmethod
|
|
async def search_supplier(
|
|
session: AsyncSession,
|
|
company_id: int,
|
|
fiscal_code: Optional[str] = None,
|
|
name: Optional[str] = None
|
|
) -> Tuple[bool, Optional[dict], str]:
|
|
"""
|
|
Search for supplier in SQLite first, then Oracle if not found.
|
|
Returns (found, supplier_data, source).
|
|
Source can be: 'synced', 'local', 'not_found'
|
|
"""
|
|
# 1. Search in synced suppliers
|
|
if fiscal_code:
|
|
stmt = select(SyncedSupplier).where(
|
|
SyncedSupplier.company_id == company_id,
|
|
SyncedSupplier.fiscal_code == fiscal_code
|
|
)
|
|
elif name:
|
|
stmt = select(SyncedSupplier).where(
|
|
SyncedSupplier.company_id == company_id,
|
|
SyncedSupplier.name.ilike(f"%{name}%")
|
|
)
|
|
else:
|
|
return False, None, "no_query"
|
|
|
|
result = await session.execute(stmt)
|
|
supplier = result.scalar_one_or_none()
|
|
|
|
if supplier:
|
|
return True, {
|
|
"id": supplier.id,
|
|
"oracle_id": supplier.oracle_id,
|
|
"name": supplier.name,
|
|
"fiscal_code": supplier.fiscal_code,
|
|
"address": supplier.address,
|
|
}, "synced"
|
|
|
|
# 2. Search in local suppliers
|
|
if fiscal_code:
|
|
stmt = select(LocalSupplier).where(
|
|
LocalSupplier.company_id == company_id,
|
|
LocalSupplier.fiscal_code == fiscal_code
|
|
)
|
|
elif name:
|
|
stmt = select(LocalSupplier).where(
|
|
LocalSupplier.company_id == company_id,
|
|
LocalSupplier.name.ilike(f"%{name}%")
|
|
)
|
|
|
|
result = await session.execute(stmt)
|
|
local = result.scalar_one_or_none()
|
|
|
|
if local:
|
|
return True, {
|
|
"id": local.id,
|
|
"name": local.name,
|
|
"fiscal_code": local.fiscal_code,
|
|
"address": local.address,
|
|
"is_local": True,
|
|
}, "local"
|
|
|
|
# 3. Try live Oracle search (optional fallback for unsynced data)
|
|
# This is a fallback - ideally sync should be up to date
|
|
# TODO: Implement live Oracle search if needed
|
|
|
|
return False, None, "not_found"
|
|
|
|
@staticmethod
|
|
async def create_local_supplier(
|
|
session: AsyncSession,
|
|
company_id: int,
|
|
name: str,
|
|
fiscal_code: Optional[str],
|
|
address: Optional[str],
|
|
created_by: str
|
|
) -> LocalSupplier:
|
|
"""Create a local supplier entry from OCR data."""
|
|
supplier = LocalSupplier(
|
|
company_id=company_id,
|
|
name=name,
|
|
fiscal_code=fiscal_code,
|
|
address=address,
|
|
created_by=created_by,
|
|
)
|
|
session.add(supplier)
|
|
await session.commit()
|
|
await session.refresh(supplier)
|
|
logger.info(f"Created local supplier: {name} (CUI: {fiscal_code})")
|
|
return supplier
|
|
|
|
@staticmethod
|
|
async def get_all_suppliers(
|
|
session: AsyncSession,
|
|
company_id: int,
|
|
search: Optional[str] = None
|
|
) -> List[dict]:
|
|
"""
|
|
Get all suppliers (synced + local) for a company.
|
|
Used for dropdown/autocomplete in UI.
|
|
"""
|
|
suppliers = []
|
|
|
|
# Get synced suppliers
|
|
stmt = select(SyncedSupplier).where(SyncedSupplier.company_id == company_id)
|
|
if search:
|
|
stmt = stmt.where(
|
|
(SyncedSupplier.name.ilike(f"%{search}%")) |
|
|
(SyncedSupplier.fiscal_code.ilike(f"%{search}%"))
|
|
)
|
|
stmt = stmt.limit(50) # Limit results for performance
|
|
|
|
result = await session.execute(stmt)
|
|
synced = result.scalars().all()
|
|
|
|
for s in synced:
|
|
suppliers.append({
|
|
"id": s.id,
|
|
"oracle_id": s.oracle_id,
|
|
"name": s.name,
|
|
"fiscal_code": s.fiscal_code,
|
|
"source": "synced"
|
|
})
|
|
|
|
# Get local suppliers
|
|
stmt = select(LocalSupplier).where(LocalSupplier.company_id == company_id)
|
|
if search:
|
|
stmt = stmt.where(
|
|
(LocalSupplier.name.ilike(f"%{search}%")) |
|
|
(LocalSupplier.fiscal_code.ilike(f"%{search}%"))
|
|
)
|
|
stmt = stmt.limit(50)
|
|
|
|
result = await session.execute(stmt)
|
|
local = result.scalars().all()
|
|
|
|
for l in local:
|
|
suppliers.append({
|
|
"id": l.id,
|
|
"name": l.name,
|
|
"fiscal_code": l.fiscal_code,
|
|
"source": "local"
|
|
})
|
|
|
|
return suppliers
|
|
|
|
@staticmethod
|
|
async def get_all_cash_registers(
|
|
session: AsyncSession,
|
|
company_id: int
|
|
) -> List[dict]:
|
|
"""
|
|
Get all cash registers for a company.
|
|
Used for dropdown in UI.
|
|
"""
|
|
stmt = select(SyncedCashRegister).where(SyncedCashRegister.company_id == company_id)
|
|
result = await session.execute(stmt)
|
|
registers = result.scalars().all()
|
|
|
|
return [
|
|
{
|
|
"id": r.id,
|
|
"oracle_id": r.oracle_id,
|
|
"name": r.name,
|
|
"account_code": r.account_code,
|
|
"register_type": r.register_type
|
|
}
|
|
for r in registers
|
|
]
|