Files
roa2web-service-auto/data-entry-app/backend/app/services/sync_service.py
Marius Mutu c5fde510a8 feat: Add JWT auth and nomenclature sync to data-entry-app
Integrate shared JWT authentication into data-entry-app:
- Add Oracle pool initialization for auth service
- Add AuthenticationMiddleware to protect API routes
- Update all receipt endpoints to use CurrentUser from JWT
- Add shared auth router (/api/auth/login, /api/auth/refresh)

Add nomenclature synchronization feature:
- Create SQLite models for synced suppliers, local suppliers, and cash registers
- Add nomenclature router with sync triggers and CRUD endpoints
- Add sync service for Oracle → SQLite nomenclature data
- Update nomenclature_service to use synced SQLite data with fallbacks

Create shared frontend components:
- Add shared/frontend/ with LoginView.vue, auth store factory, login.css
- Integrate shared login and auth into data-entry-app frontend
- Add axios-based API service with token refresh interceptor

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-14 18:36:24 +02:00

357 lines
13 KiB
Python

"""Service for syncing nomenclatures from Oracle to SQLite."""
import sys
from pathlib import Path
from typing import Optional, List, Tuple
from datetime import datetime
import logging
from sqlmodel import select
from sqlalchemy.ext.asyncio import AsyncSession
# Add shared modules path
project_root = Path(__file__).parent.parent.parent.parent.parent
sys.path.insert(0, str(project_root / "shared"))
from database.oracle_pool import oracle_pool
from app.db.models.nomenclature import SyncedSupplier, LocalSupplier, SyncedCashRegister
logger = logging.getLogger(__name__)
# Company ID to Oracle Schema mapping
# TODO: This should come from a config table or environment variable
COMPANY_SCHEMAS = {
1: "CONTAFIN", # Example mapping - update with real schema names
2: "CONTAFIN2",
}
class SyncService:
"""Service for syncing nomenclatures from Oracle."""
@staticmethod
def get_schema_for_company(company_id: int) -> Optional[str]:
"""Get Oracle schema for company ID."""
return COMPANY_SCHEMAS.get(company_id)
@staticmethod
async def sync_suppliers(session: AsyncSession, company_id: int) -> Tuple[int, int]:
"""
Sync suppliers from Oracle NOM_PARTENERI to SQLite.
Returns (synced_count, error_count).
"""
schema = SyncService.get_schema_for_company(company_id)
if not schema:
logger.warning(f"No schema mapping for company {company_id}")
return 0, 0
synced = 0
errors = 0
try:
async with oracle_pool.get_connection() as connection:
with connection.cursor() as cursor:
# Fetch active partners from Oracle
cursor.execute(f"""
SELECT ID_PART, DEN_PART, COD_FISCAL, ADRESA
FROM {schema}.NOM_PARTENERI
WHERE ACTIV = 1
""")
rows = cursor.fetchall()
for row in rows:
try:
oracle_id, name, fiscal_code, address = row
# Check if already exists
stmt = select(SyncedSupplier).where(
SyncedSupplier.oracle_id == oracle_id,
SyncedSupplier.company_id == company_id
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
existing.name = name or ""
existing.fiscal_code = fiscal_code
existing.address = address
existing.synced_at = datetime.utcnow()
logger.debug(f"Updated supplier {oracle_id}: {name}")
else:
# Create new record
supplier = SyncedSupplier(
oracle_id=oracle_id,
company_id=company_id,
name=name or "",
fiscal_code=fiscal_code,
address=address,
)
session.add(supplier)
logger.debug(f"Created supplier {oracle_id}: {name}")
synced += 1
except Exception as e:
logger.error(f"Error processing supplier row {row}: {e}")
errors += 1
# Commit all changes
await session.commit()
logger.info(f"Synced {synced} suppliers for company {company_id}, {errors} errors")
except Exception as e:
logger.error(f"Error syncing suppliers for company {company_id}: {e}")
errors += 1
await session.rollback()
return synced, errors
@staticmethod
async def sync_cash_registers(session: AsyncSession, company_id: int) -> Tuple[int, int]:
"""
Sync cash registers from Oracle to SQLite.
Returns (synced_count, error_count).
"""
schema = SyncService.get_schema_for_company(company_id)
if not schema:
logger.warning(f"No schema mapping for company {company_id}")
return 0, 0
synced = 0
errors = 0
try:
async with oracle_pool.get_connection() as connection:
with connection.cursor() as cursor:
# Fetch cash registers (both cash and bank)
# Assuming similar structure to NOM_PARTENERI
# TODO: Verify actual table name and structure in Oracle
cursor.execute(f"""
SELECT ID_CASA, DEN_CASA, CONT
FROM {schema}.NOM_CASE
WHERE ACTIV = 1
""")
rows = cursor.fetchall()
for row in rows:
try:
oracle_id, name, account_code = row
# Determine type based on account code
register_type = "cash" if account_code.startswith("531") else "bank"
# Check if already exists
stmt = select(SyncedCashRegister).where(
SyncedCashRegister.oracle_id == oracle_id,
SyncedCashRegister.company_id == company_id
)
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# Update existing record
existing.name = name or ""
existing.account_code = account_code or ""
existing.register_type = register_type
existing.synced_at = datetime.utcnow()
logger.debug(f"Updated cash register {oracle_id}: {name}")
else:
# Create new record
cash_register = SyncedCashRegister(
oracle_id=oracle_id,
company_id=company_id,
name=name or "",
account_code=account_code or "",
register_type=register_type,
)
session.add(cash_register)
logger.debug(f"Created cash register {oracle_id}: {name}")
synced += 1
except Exception as e:
logger.error(f"Error processing cash register row {row}: {e}")
errors += 1
# Commit all changes
await session.commit()
logger.info(f"Synced {synced} cash registers for company {company_id}, {errors} errors")
except Exception as e:
logger.error(f"Error syncing cash registers for company {company_id}: {e}")
errors += 1
await session.rollback()
return synced, errors
@staticmethod
async def search_supplier(
session: AsyncSession,
company_id: int,
fiscal_code: Optional[str] = None,
name: Optional[str] = None
) -> Tuple[bool, Optional[dict], str]:
"""
Search for supplier in SQLite first, then Oracle if not found.
Returns (found, supplier_data, source).
Source can be: 'synced', 'local', 'not_found'
"""
# 1. Search in synced suppliers
if fiscal_code:
stmt = select(SyncedSupplier).where(
SyncedSupplier.company_id == company_id,
SyncedSupplier.fiscal_code == fiscal_code
)
elif name:
stmt = select(SyncedSupplier).where(
SyncedSupplier.company_id == company_id,
SyncedSupplier.name.ilike(f"%{name}%")
)
else:
return False, None, "no_query"
result = await session.execute(stmt)
supplier = result.scalar_one_or_none()
if supplier:
return True, {
"id": supplier.id,
"oracle_id": supplier.oracle_id,
"name": supplier.name,
"fiscal_code": supplier.fiscal_code,
"address": supplier.address,
}, "synced"
# 2. Search in local suppliers
if fiscal_code:
stmt = select(LocalSupplier).where(
LocalSupplier.company_id == company_id,
LocalSupplier.fiscal_code == fiscal_code
)
elif name:
stmt = select(LocalSupplier).where(
LocalSupplier.company_id == company_id,
LocalSupplier.name.ilike(f"%{name}%")
)
result = await session.execute(stmt)
local = result.scalar_one_or_none()
if local:
return True, {
"id": local.id,
"name": local.name,
"fiscal_code": local.fiscal_code,
"address": local.address,
"is_local": True,
}, "local"
# 3. Try live Oracle search (optional fallback for unsynced data)
# This is a fallback - ideally sync should be up to date
# TODO: Implement live Oracle search if needed
return False, None, "not_found"
@staticmethod
async def create_local_supplier(
session: AsyncSession,
company_id: int,
name: str,
fiscal_code: Optional[str],
address: Optional[str],
created_by: str
) -> LocalSupplier:
"""Create a local supplier entry from OCR data."""
supplier = LocalSupplier(
company_id=company_id,
name=name,
fiscal_code=fiscal_code,
address=address,
created_by=created_by,
)
session.add(supplier)
await session.commit()
await session.refresh(supplier)
logger.info(f"Created local supplier: {name} (CUI: {fiscal_code})")
return supplier
@staticmethod
async def get_all_suppliers(
session: AsyncSession,
company_id: int,
search: Optional[str] = None
) -> List[dict]:
"""
Get all suppliers (synced + local) for a company.
Used for dropdown/autocomplete in UI.
"""
suppliers = []
# Get synced suppliers
stmt = select(SyncedSupplier).where(SyncedSupplier.company_id == company_id)
if search:
stmt = stmt.where(
(SyncedSupplier.name.ilike(f"%{search}%")) |
(SyncedSupplier.fiscal_code.ilike(f"%{search}%"))
)
stmt = stmt.limit(50) # Limit results for performance
result = await session.execute(stmt)
synced = result.scalars().all()
for s in synced:
suppliers.append({
"id": s.id,
"oracle_id": s.oracle_id,
"name": s.name,
"fiscal_code": s.fiscal_code,
"source": "synced"
})
# Get local suppliers
stmt = select(LocalSupplier).where(LocalSupplier.company_id == company_id)
if search:
stmt = stmt.where(
(LocalSupplier.name.ilike(f"%{search}%")) |
(LocalSupplier.fiscal_code.ilike(f"%{search}%"))
)
stmt = stmt.limit(50)
result = await session.execute(stmt)
local = result.scalars().all()
for l in local:
suppliers.append({
"id": l.id,
"name": l.name,
"fiscal_code": l.fiscal_code,
"source": "local"
})
return suppliers
@staticmethod
async def get_all_cash_registers(
session: AsyncSession,
company_id: int
) -> List[dict]:
"""
Get all cash registers for a company.
Used for dropdown in UI.
"""
stmt = select(SyncedCashRegister).where(SyncedCashRegister.company_id == company_id)
result = await session.execute(stmt)
registers = result.scalars().all()
return [
{
"id": r.id,
"oracle_id": r.oracle_id,
"name": r.name,
"account_code": r.account_code,
"register_type": r.register_type
}
for r in registers
]