feat(data-entry): Bulk Receipt Upload cu Mobile UX Android Nativ

## Funcționalități Principale

### Bulk Upload & Processing
- Drag & drop pentru upload bonuri multiple oriunde pe pagină
- Batch processing cu job queue și worker pool
- Real-time updates via SSE (Server-Sent Events) cu fallback polling
- Duplicate detection via SHA-256 file hash
- Auto-retry pentru job-uri failed
- Cancel individual jobs sau batch complet

### Mobile UX - Android Native Style
- Top bar fixă cu hamburger, titlu centrat, acțiuni (search/filter)
- Bottom navigation cu 4 tab-uri (Bonuri, Upload, Rapoarte, Setări)
- FAB (Floating Action Button) cu hide/show on scroll
- Filter chips orizontal scrollabile
- Selecție multiplă prin long-press (500ms)
- Select All + Bulk Delete cu confirmare
- Layout Android pentru Create/Edit/View bon (Gmail compose style)

### Bug Fixes
- Refresh individual via SSE în loc de refresh total pagină
- Bonurile cu eroare OCR rămân vizibile pentru editare manuală
- Afișare nume fișier original pentru toate bonurile
- Upload stabil pe mobil (fix race condition File API)
- Păstrare ordine bonuri la refresh (nu se reordonează)

### Backend
- SSE endpoint pentru status updates real-time
- Bulk delete endpoint cu partial success
- Auto-cleanup bonuri failed după 7 zile
- Batch model cu tracking complet

### Testing
- E2E tests cu Playwright
- Unit tests pentru bulk upload, auto-create, cleanup

## Commits Squashed: 43 user stories (US-001 → US-043)
## Branch: ralph/bulk-receipt-upload
## Timp dezvoltare: ~3 zile (Ralph autonomous)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Claude Agent
2026-01-12 08:33:17 +00:00
parent b4a226409c
commit 7b3541403f
53 changed files with 15810 additions and 196 deletions

View File

@@ -58,6 +58,7 @@ logger = logging.getLogger(__name__)
# Global variables for background tasks
telegram_bot_task = None
ocr_job_worker_running = False
cleanup_task_running = False
# ============================================================================
@@ -160,6 +161,33 @@ async def init_ocr_job_worker():
ocr_job_worker_running = False
async def init_cleanup_task():
"""Initialize the cleanup background task for expired failed receipts (US-008).
Runs cleanup at startup and then every 24 hours:
- Finds receipts with processing_status='failed' older than 7 days
- Deletes the receipts and their attachment files from storage
"""
global cleanup_task_running
logger.info("[CLEANUP] Initializing cleanup background task...")
try:
from backend.modules.data_entry.services.cleanup_service import start_cleanup_task
from backend.modules.data_entry.db.database import get_session
success = await start_cleanup_task(get_session)
cleanup_task_running = success
if success:
logger.info("[CLEANUP] ✅ Cleanup task started (runs daily)")
else:
logger.warning("[CLEANUP] ⚠️ Cleanup task failed to start")
except Exception as e:
logger.warning(f"[CLEANUP] ⚠️ Cleanup task init failed: {e}")
cleanup_task_running = False
async def run_telegram_bot():
"""Run Telegram bot as background task."""
logger.info("[TELEGRAM] Starting bot...")
@@ -270,7 +298,10 @@ async def startup_event():
# Step 3: Initialize OCR job worker (with persistent PaddleOCR)
await init_ocr_job_worker()
# Step 4: Start Telegram bot as background task
# Step 4: Initialize cleanup task for expired failed receipts (US-008)
await init_cleanup_task()
# Step 5: Start Telegram bot as background task
if settings.telegram_bot_token:
telegram_bot_task = asyncio.create_task(run_telegram_bot())
logger.info("[STARTUP] ✅ Telegram bot task created")
@@ -290,13 +321,24 @@ async def startup_event():
@app.on_event("shutdown")
async def shutdown_event():
"""Application shutdown - Cleanup resources."""
global telegram_bot_task, ocr_job_worker_running
global telegram_bot_task, ocr_job_worker_running, cleanup_task_running
logger.info("=" * 80)
logger.info("[SHUTDOWN] Stopping ROA2WEB Unified Backend...")
logger.info("=" * 80)
try:
# Stop cleanup task (US-008)
if cleanup_task_running:
logger.info("[SHUTDOWN] Stopping cleanup task...")
try:
from backend.modules.data_entry.services.cleanup_service import stop_cleanup_task
await stop_cleanup_task()
cleanup_task_running = False
logger.info("[SHUTDOWN] Cleanup task stopped")
except Exception as e:
logger.error(f"[SHUTDOWN] Cleanup task error: {e}")
# Stop OCR job worker
if ocr_job_worker_running:
logger.info("[SHUTDOWN] Stopping OCR job worker...")

View File

@@ -147,13 +147,33 @@ class ReceiptCRUD:
)
)
# Bulk upload filters (US-012)
# US-005: Support comma-separated values for processing_status filter (e.g., "pending,processing")
if filters.processing_status:
statuses = [s.strip() for s in filters.processing_status.split(",")]
if len(statuses) == 1:
query = query.where(Receipt.processing_status == statuses[0])
else:
query = query.where(Receipt.processing_status.in_(statuses))
if filters.batch_id:
query = query.where(Receipt.batch_id == filters.batch_id)
# Count total
count_query = select(func.count()).select_from(query.subquery())
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# Apply pagination and ordering
query = query.order_by(Receipt.created_at.desc())
# Apply ordering based on sort_by parameter (US-012)
if filters.sort_by == "processing_started_at":
query = query.order_by(Receipt.processing_started_at.desc())
elif filters.sort_by == "processing_started_at_asc":
query = query.order_by(Receipt.processing_started_at.asc())
else:
# Default ordering
query = query.order_by(Receipt.created_at.desc())
# Apply pagination
offset = (filters.page - 1) * filters.page_size
query = query.offset(offset).limit(filters.page_size)
@@ -163,6 +183,61 @@ class ReceiptCRUD:
return list(receipts), total
@staticmethod
async def get_processing_stats(
session: AsyncSession,
company_id: Optional[int] = None,
batch_id: Optional[str] = None,
) -> dict:
"""Get processing status counts for bulk uploaded receipts (US-012)."""
# Build base query for counting by processing_status
base_conditions = []
if company_id:
base_conditions.append(Receipt.company_id == company_id)
if batch_id:
base_conditions.append(Receipt.batch_id == batch_id)
# Only count receipts that have a processing_status (bulk uploads)
base_conditions.append(Receipt.processing_status.isnot(None))
query = select(
Receipt.processing_status,
func.count(Receipt.id).label("count")
)
for condition in base_conditions:
query = query.where(condition)
query = query.group_by(Receipt.processing_status)
result = await session.execute(query)
rows = result.all()
# Initialize stats
stats = {
"pending_count": 0,
"processing_count": 0,
"completed_count": 0,
"failed_count": 0,
}
# Map results
for row in rows:
status = row.processing_status
count = row.count
if status == "pending":
stats["pending_count"] = count
elif status == "processing":
stats["processing_count"] = count
elif status == "completed":
stats["completed_count"] = count
elif status == "failed":
stats["failed_count"] = count
return stats
@staticmethod
async def get_pending_review(
session: AsyncSession,

View File

@@ -1,8 +1,9 @@
# Database models
from .receipt import Receipt, ReceiptAttachment, ReceiptStatus, ReceiptType, ReceiptDirection
from .receipt import Receipt, ReceiptAttachment, ReceiptStatus, ReceiptType, ReceiptDirection, ProcessingStatus
from .accounting_entry import AccountingEntry, EntryType
from .nomenclature import SyncedSupplier, LocalSupplier, SyncedCashRegister
from .ocr_settings import UserOCRPreference, OCRJobMetrics, OCRMetricsSummary, OCREngine
from .batch import BatchUpload, BatchJob, BatchStatus
__all__ = [
"Receipt",
@@ -10,6 +11,7 @@ __all__ = [
"ReceiptStatus",
"ReceiptType",
"ReceiptDirection",
"ProcessingStatus",
"AccountingEntry",
"EntryType",
"SyncedSupplier",
@@ -20,4 +22,8 @@ __all__ = [
"OCRJobMetrics",
"OCRMetricsSummary",
"OCREngine",
# Batch Upload
"BatchUpload",
"BatchJob",
"BatchStatus",
]

View File

@@ -0,0 +1,64 @@
"""BatchUpload and BatchJob SQLModel models for bulk receipt processing."""
from datetime import datetime
from enum import Enum
from typing import Optional
from sqlmodel import SQLModel, Field
class BatchStatus(str, Enum):
"""Status of a batch upload."""
PENDING = "pending" # Batch created, jobs queued
PROCESSING = "processing" # At least one job is processing
COMPLETED = "completed" # All jobs completed (success or failed)
FAILED = "failed" # Batch-level failure (e.g., all jobs failed)
class BatchUpload(SQLModel, table=True):
"""
Batch upload record for grouping multiple OCR jobs.
Tracks overall progress and status of a bulk upload operation.
"""
__tablename__ = "batch_uploads"
id: Optional[int] = Field(default=None, primary_key=True)
# User info
user_id: str = Field(max_length=100, index=True) # Username who created the batch
company_id: int = Field(index=True) # Company ID for receipt creation
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
# Status tracking
status: BatchStatus = Field(default=BatchStatus.PENDING)
total_files: int = Field(default=0)
class BatchJob(SQLModel, table=True):
"""
Junction table linking batch_uploads to ocr_jobs.
Each record represents one file in a batch, linking to its OCR job.
Also stores the receipt_id once the job completes and auto-creates a receipt.
"""
__tablename__ = "batch_jobs"
id: Optional[int] = Field(default=None, primary_key=True)
# Foreign keys
batch_id: int = Field(foreign_key="batch_uploads.id", index=True)
job_id: str = Field(max_length=36, index=True) # UUID from ocr_jobs table
# Original filename for display
filename: str = Field(max_length=255)
# Receipt reference (set after auto-create)
receipt_id: Optional[int] = Field(default=None, foreign_key="receipts.id")
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -36,6 +36,14 @@ class PaymentMode(str, Enum):
AVANS_DECONTARE = "avans_decontare" # Decont angajat (542)
class ProcessingStatus(str, Enum):
"""Processing status for bulk uploaded receipts."""
PENDING = "pending" # Waiting in queue
PROCESSING = "processing" # Currently being processed by OCR
COMPLETED = "completed" # Successfully processed
FAILED = "failed" # Processing failed with error
if TYPE_CHECKING:
from .accounting_entry import AccountingEntry
@@ -96,6 +104,14 @@ class Receipt(SQLModel, table=True):
oracle_act_id: Optional[int] = Field(default=None)
oracle_error: Optional[str] = Field(default=None, max_length=500)
# Bulk upload batch tracking
batch_id: Optional[str] = Field(default=None, max_length=50, index=True)
processing_status: Optional[str] = Field(default=None, max_length=20, index=True) # ProcessingStatus enum value
processing_error: Optional[str] = Field(default=None) # Full error message text
file_hash: Optional[str] = Field(default=None, max_length=64, index=True) # SHA-256 hash for duplicate detection
processing_started_at: Optional[datetime] = Field(default=None)
processing_completed_at: Optional[datetime] = Field(default=None)
# Relationships
attachments: List["ReceiptAttachment"] = Relationship(
back_populates="receipt",

View File

@@ -0,0 +1,54 @@
"""Add company_id to batch_uploads table.
Revision ID: 20260109_batch_company
Revises: 20251231_add_original_filename_to_metrics
Create Date: 2026-01-09
This migration adds the company_id column to batch_uploads to support
automatic receipt creation during bulk upload processing.
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '20260109_batch_company'
down_revision = None # Will be auto-detected
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add company_id column to batch_uploads table."""
# Check if column already exists (SQLModel may have created it)
conn = op.get_bind()
inspector = sa.inspect(conn)
# Check if batch_uploads table exists
if 'batch_uploads' in inspector.get_table_names():
columns = [col['name'] for col in inspector.get_columns('batch_uploads')]
if 'company_id' not in columns:
op.add_column(
'batch_uploads',
sa.Column('company_id', sa.Integer(), nullable=True)
)
# Create index for company_id
op.create_index(
'ix_batch_uploads_company_id',
'batch_uploads',
['company_id'],
unique=False
)
def downgrade() -> None:
"""Remove company_id column from batch_uploads table."""
conn = op.get_bind()
inspector = sa.inspect(conn)
if 'batch_uploads' in inspector.get_table_names():
columns = [col['name'] for col in inspector.get_columns('batch_uploads')]
if 'company_id' in columns:
op.drop_index('ix_batch_uploads_company_id', table_name='batch_uploads')
op.drop_column('batch_uploads', 'company_id')

View File

@@ -0,0 +1,125 @@
"""Add batch processing fields to receipts table.
Revision ID: add_batch_processing_fields
Revises: add_original_filename_to_metrics
Create Date: 2026-01-11
Adds fields for bulk upload batch tracking:
- batch_id: UUID string for grouping receipts from same upload
- processing_status: enum (pending/processing/completed/failed)
- processing_error: full error message text
- file_hash: SHA-256 hash for duplicate detection
- processing_started_at: when OCR processing started
- processing_completed_at: when OCR processing completed
Also creates indexes for efficient querying.
"""
from alembic import op
import sqlalchemy as sa
# Revision identifiers
revision = 'add_batch_processing_fields'
down_revision = 'add_original_filename_to_metrics'
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add batch processing columns to receipts table."""
conn = op.get_bind()
inspector = sa.inspect(conn)
# Get existing columns
columns = [col['name'] for col in inspector.get_columns('receipts')]
# Add batch_id column with index
if 'batch_id' not in columns:
op.add_column(
'receipts',
sa.Column('batch_id', sa.String(length=50), nullable=True)
)
op.create_index(
'ix_receipts_batch_id',
'receipts',
['batch_id'],
unique=False
)
# Add processing_status column with index
if 'processing_status' not in columns:
op.add_column(
'receipts',
sa.Column('processing_status', sa.String(length=20), nullable=True)
)
op.create_index(
'ix_receipts_processing_status',
'receipts',
['processing_status'],
unique=False
)
# Add processing_error column (TEXT for full error messages)
if 'processing_error' not in columns:
op.add_column(
'receipts',
sa.Column('processing_error', sa.Text(), nullable=True)
)
# Add file_hash column with index for duplicate detection
if 'file_hash' not in columns:
op.add_column(
'receipts',
sa.Column('file_hash', sa.String(length=64), nullable=True)
)
op.create_index(
'ix_receipts_file_hash',
'receipts',
['file_hash'],
unique=False
)
# Add processing_started_at column
if 'processing_started_at' not in columns:
op.add_column(
'receipts',
sa.Column('processing_started_at', sa.DateTime(), nullable=True)
)
# Add processing_completed_at column
if 'processing_completed_at' not in columns:
op.add_column(
'receipts',
sa.Column('processing_completed_at', sa.DateTime(), nullable=True)
)
def downgrade() -> None:
"""Remove batch processing columns from receipts table."""
conn = op.get_bind()
inspector = sa.inspect(conn)
columns = [col['name'] for col in inspector.get_columns('receipts')]
indexes = [idx['name'] for idx in inspector.get_indexes('receipts')]
# Remove indexes first (SQLite batch mode)
if 'ix_receipts_batch_id' in indexes:
op.drop_index('ix_receipts_batch_id', table_name='receipts')
if 'ix_receipts_processing_status' in indexes:
op.drop_index('ix_receipts_processing_status', table_name='receipts')
if 'ix_receipts_file_hash' in indexes:
op.drop_index('ix_receipts_file_hash', table_name='receipts')
# Remove columns (in reverse order of addition)
if 'processing_completed_at' in columns:
op.drop_column('receipts', 'processing_completed_at')
if 'processing_started_at' in columns:
op.drop_column('receipts', 'processing_started_at')
if 'file_hash' in columns:
op.drop_column('receipts', 'file_hash')
if 'processing_error' in columns:
op.drop_column('receipts', 'processing_error')
if 'processing_status' in columns:
op.drop_column('receipts', 'processing_status')
if 'batch_id' in columns:
op.drop_column('receipts', 'batch_id')

View File

@@ -13,6 +13,7 @@ def create_data_entry_router() -> APIRouter:
- /nomenclature - Nomenclature syncing from Oracle
- /settings - User settings (OCR preferences)
- /metrics - OCR analytics and metrics
- /bulk - Bulk upload for batch processing
Returns:
APIRouter: Configured router for data entry module
@@ -24,6 +25,7 @@ def create_data_entry_router() -> APIRouter:
from .ocr import router as ocr_router
from .nomenclature import router as nomenclature_router
from .ocr_settings import router as ocr_settings_router
from .bulk import router as bulk_router
# Include all sub-routers (no prefix - already prefixed in main.py with /api/data-entry)
router.include_router(receipts_router, prefix="/receipts", tags=["data-entry-receipts"])
@@ -31,5 +33,7 @@ def create_data_entry_router() -> APIRouter:
router.include_router(nomenclature_router, prefix="/nomenclature", tags=["data-entry-nomenclature"])
# OCR settings and metrics (endpoints at /settings/* and /metrics/*)
router.include_router(ocr_settings_router, tags=["data-entry-settings"])
# Bulk upload for batch processing
router.include_router(bulk_router, prefix="/bulk", tags=["data-entry-bulk"])
return router

View File

@@ -0,0 +1,997 @@
"""
Bulk upload API endpoints for batch receipt processing.
Endpoints:
- POST /upload - Submit multiple files for OCR processing in a single batch
- GET /batches/{batch_id}/status - Get batch status with optional long-polling
Validation:
- Max 100 files per batch
- Max 10MB per file
- Allowed types: PDF, PNG, JPG
Duplicate Detection (US-007):
- SHA-256 hash calculated for each file
- Duplicate files (same hash + company_id) are rejected with 409 Conflict info
- Duplicates reported in error list, non-duplicates processed normally
"""
import asyncio
import hashlib
import logging
from datetime import datetime
from decimal import Decimal
from pathlib import Path
from typing import Annotated, List, Optional, Union
from fastapi import APIRouter, HTTPException, UploadFile, File, Depends, Query, Header
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from backend.modules.data_entry.db.database import get_session
from backend.modules.data_entry.db.models import BatchUpload, BatchJob, BatchStatus, Receipt, ReceiptAttachment
from backend.modules.data_entry.schemas.bulk import (
BulkUploadResponse,
BulkUploadResponseWithDuplicates,
BatchStatusResponse,
BatchJobInfo,
DuplicateFileInfo,
RetryResponse,
BatchRetryResponse,
CancelJobResponse,
CancelBatchResponse
)
from backend.modules.data_entry.services.ocr.job_queue import job_queue, OCRJobStatus
from backend.config import settings
# Auth integration
from shared.auth.dependencies import get_current_user
from shared.auth.models import CurrentUser
logger = logging.getLogger(__name__)
router = APIRouter()
# ============ Helper for selected company from header ============
async def get_selected_company(
current_user: CurrentUser = Depends(get_current_user),
x_selected_company: Annotated[Optional[str], Header()] = None
) -> int:
"""
Get selected company from X-Selected-Company header.
Validates that the user has access to the specified company.
Falls back to user's first company if no header is provided.
"""
if x_selected_company:
try:
company_id = int(x_selected_company)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid company ID format: {x_selected_company}"
)
if str(company_id) in current_user.companies:
return company_id
raise HTTPException(
status_code=403,
detail=f"Nu aveți acces la firma {company_id}"
)
# No header - use first company from user's list
if current_user.companies:
try:
return int(current_user.companies[0])
except (ValueError, IndexError):
pass
raise HTTPException(
status_code=400,
detail="Nu aveți nicio firmă asignată"
)
# Validation constants
MAX_FILES_PER_BATCH = 100
MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 # 10MB
ALLOWED_MIME_TYPES = {"image/jpeg", "image/png", "application/pdf"}
def compute_file_hash(content: bytes) -> str:
"""
Compute SHA-256 hash of file content.
Used for duplicate detection - same file content = same hash.
Args:
content: Raw file bytes
Returns:
Hexadecimal string of SHA-256 hash (64 characters)
"""
return hashlib.sha256(content).hexdigest()
async def check_duplicate_hashes(
session: AsyncSession,
file_hashes: List[str],
company_id: int
) -> dict[str, int]:
"""
Check which file hashes already exist in the database for this company.
Args:
session: Database session
file_hashes: List of SHA-256 hashes to check
company_id: Company ID to scope the duplicate check
Returns:
Dict mapping hash -> existing receipt_id for duplicates found
"""
if not file_hashes:
return {}
# Query for existing receipts with these hashes for this company
result = await session.execute(
select(Receipt.file_hash, Receipt.id).where(
and_(
Receipt.file_hash.in_(file_hashes),
Receipt.company_id == company_id
)
)
)
# Build hash -> receipt_id mapping
# Note: result.all() is synchronous in SQLAlchemy async, returns list of tuples
duplicates = {}
rows = result.all()
for row in rows:
duplicates[row[0]] = row[1]
return duplicates
@router.post("/upload", response_model=Union[BulkUploadResponse, BulkUploadResponseWithDuplicates])
async def bulk_upload(
files: List[UploadFile] = File(..., description="Multiple files to upload"),
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user),
selected_company: int = Depends(get_selected_company)
):
"""
Upload multiple files for batch OCR processing.
Creates a batch record and queues all files as OCR jobs.
Invalid files cause entire batch rejection (validation errors).
Duplicate files are reported separately and skipped - non-duplicates are processed.
Duplicate Detection (US-007):
- SHA-256 hash calculated for each file before processing
- Files with existing hash for same company are rejected with 409 info
- Response includes duplicate details with existing_receipt_id
Args:
files: List of image/PDF files (max 100 files, max 10MB each)
Returns:
BulkUploadResponse with batch_id and list of job_ids
BulkUploadResponseWithDuplicates if some files were duplicates
Raises:
400: If validation fails (too many files, file too large, invalid type)
409: If ALL files are duplicates
500: If job creation fails
"""
# Validate file count
if len(files) == 0:
raise HTTPException(
status_code=400,
detail="No files provided"
)
if len(files) > MAX_FILES_PER_BATCH:
raise HTTPException(
status_code=400,
detail=f"Too many files. Maximum {MAX_FILES_PER_BATCH} files per batch."
)
# Pre-validate all files before creating any jobs (atomic check)
invalid_files = []
file_contents = []
for file in files:
# Check MIME type
if file.content_type not in ALLOWED_MIME_TYPES:
invalid_files.append(f"{file.filename}: Invalid type ({file.content_type})")
continue
# Read content and check size
content = await file.read()
if len(content) > MAX_FILE_SIZE_BYTES:
invalid_files.append(f"{file.filename}: File too large ({len(content) // (1024*1024)}MB > 10MB)")
continue
# Compute SHA-256 hash for duplicate detection (US-007)
file_hash = compute_file_hash(content)
# Store for later processing
file_contents.append({
"filename": file.filename,
"content": content,
"mime_type": file.content_type,
"file_hash": file_hash
})
# If any files are invalid, reject the entire batch
if invalid_files:
raise HTTPException(
status_code=400,
detail={
"message": f"Validation failed for {len(invalid_files)} file(s)",
"invalid_files": invalid_files
}
)
# Check for duplicates BEFORE creating batch (US-007)
all_hashes = [f["file_hash"] for f in file_contents]
existing_duplicates = await check_duplicate_hashes(session, all_hashes, selected_company)
# Separate duplicate files from processable files
duplicate_files: List[DuplicateFileInfo] = []
processable_files = []
for file_data in file_contents:
if file_data["file_hash"] in existing_duplicates:
existing_receipt_id = existing_duplicates[file_data["file_hash"]]
duplicate_files.append(DuplicateFileInfo(
filename=file_data["filename"],
error="duplicate",
existing_receipt_id=existing_receipt_id,
message=f"Fișier duplicat - există deja ca bon #{existing_receipt_id}"
))
logger.info(
f"[BulkUpload] Duplicate detected: {file_data['filename']} "
f"(hash={file_data['file_hash'][:16]}...) matches receipt #{existing_receipt_id}"
)
else:
processable_files.append(file_data)
# If ALL files are duplicates, return 409 Conflict
if len(duplicate_files) == len(file_contents):
raise HTTPException(
status_code=409,
detail={
"error": "all_duplicates",
"message": f"Toate cele {len(duplicate_files)} fișiere sunt duplicate",
"duplicates": [d.model_dump() for d in duplicate_files]
}
)
# If no processable files remain after filtering (shouldn't happen but be safe)
if not processable_files:
raise HTTPException(
status_code=409,
detail={
"error": "no_files_to_process",
"message": "Nu există fișiere de procesat",
"duplicates": [d.model_dump() for d in duplicate_files]
}
)
# Create batch record with company_id for auto-save
batch = BatchUpload(
user_id=current_user.username,
company_id=selected_company,
status=BatchStatus.PENDING,
total_files=len(processable_files) # Only count processable files
)
session.add(batch)
await session.flush() # Get batch.id before creating jobs
# Create OCR jobs for processable files only
job_ids = []
batch_jobs = []
try:
for file_data in processable_files:
# Create OCR job using existing job_queue
# Pass batch_id and file_hash for tracking
job = await job_queue.create_job(
file_bytes=file_data["content"],
mime_type=file_data["mime_type"],
engine="doctr_plus", # Default engine for bulk
username=current_user.username,
original_filename=file_data["filename"],
batch_id=batch.id, # Link job to batch for auto-save integration
file_hash=file_data["file_hash"] # Pass hash for storage in receipt
)
job_ids.append(job.id)
# Create batch_job link
batch_job = BatchJob(
batch_id=batch.id,
job_id=job.id,
filename=file_data["filename"]
)
batch_jobs.append(batch_job)
# Add all batch_job records
for bj in batch_jobs:
session.add(bj)
# Commit everything atomically
await session.commit()
logger.info(
f"[BulkUpload] Created batch {batch.id} with {len(job_ids)} jobs "
f"for user {current_user.username}"
f"{f', {len(duplicate_files)} duplicates skipped' if duplicate_files else ''}"
)
# Return response with duplicate info if any duplicates were found
if duplicate_files:
return BulkUploadResponseWithDuplicates(
batch_id=batch.id,
job_ids=job_ids,
total_files=len(file_contents),
processed_files=len(job_ids),
duplicate_files=len(duplicate_files),
duplicates=duplicate_files,
message=f"{len(job_ids)} fișier(e) în procesare, {len(duplicate_files)} duplicate ignorate"
)
return BulkUploadResponse(
batch_id=batch.id,
job_ids=job_ids,
total_files=len(job_ids),
message=f"{len(job_ids)} files queued for processing"
)
except Exception as e:
# Rollback on any error
await session.rollback()
logger.error(f"[BulkUpload] Failed to create batch: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to create batch: {str(e)}"
)
# Long-polling constants
MAX_WAIT_SECONDS = 30
POLL_INTERVAL_SECONDS = 0.5
async def _get_batch_status_snapshot(
batch_id: int,
session: AsyncSession
) -> Optional[dict]:
"""
Get current batch status snapshot.
Returns dict with status counts and jobs list, or None if batch not found.
"""
# Get batch record
batch_result = await session.execute(
select(BatchUpload).where(BatchUpload.id == batch_id)
)
batch = batch_result.scalar_one_or_none()
if not batch:
return None
# Get all batch_jobs for this batch
batch_jobs_result = await session.execute(
select(BatchJob).where(BatchJob.batch_id == batch_id)
)
batch_jobs = batch_jobs_result.scalars().all()
if not batch_jobs:
return {
"batch": batch,
"pending_count": 0,
"processing_count": 0,
"completed_count": 0,
"failed_count": 0,
"jobs": [],
"total_amount": None
}
# Get job statuses and error_messages from OCR job queue (SQLite)
job_statuses = {}
job_errors = {}
for bj in batch_jobs:
job = await job_queue.get_job(bj.job_id)
if job:
job_statuses[bj.job_id] = job.status.value
job_errors[bj.job_id] = job.error_message
else:
# Job not found in queue - treat as failed
job_statuses[bj.job_id] = "failed"
job_errors[bj.job_id] = "Job not found in queue"
# Count by status
pending_count = sum(1 for s in job_statuses.values() if s == "pending")
processing_count = sum(1 for s in job_statuses.values() if s == "processing")
completed_count = sum(1 for s in job_statuses.values() if s == "completed")
failed_count = sum(1 for s in job_statuses.values() if s == "failed")
# Build jobs list with status info
jobs_info = []
for bj in batch_jobs:
jobs_info.append({
"job_id": bj.job_id,
"filename": bj.filename,
"status": job_statuses.get(bj.job_id, "failed"),
"receipt_id": bj.receipt_id,
"error_message": job_errors.get(bj.job_id)
})
# Calculate total_amount from completed receipts
total_amount = None
receipt_ids = [bj.receipt_id for bj in batch_jobs if bj.receipt_id is not None]
if receipt_ids:
amount_result = await session.execute(
select(func.sum(Receipt.amount)).where(Receipt.id.in_(receipt_ids))
)
total_sum = amount_result.scalar()
if total_sum is not None:
total_amount = float(total_sum)
return {
"batch": batch,
"pending_count": pending_count,
"processing_count": processing_count,
"completed_count": completed_count,
"failed_count": failed_count,
"jobs": jobs_info,
"total_amount": total_amount
}
def _compute_batch_overall_status(pending: int, processing: int, completed: int, failed: int, total: int) -> str:
"""Compute overall batch status from job counts."""
if pending + processing == 0:
# All jobs finished
if failed == total:
return BatchStatus.FAILED.value
return BatchStatus.COMPLETED.value
elif processing > 0 or completed > 0 or failed > 0:
return BatchStatus.PROCESSING.value
else:
return BatchStatus.PENDING.value
@router.get("/batches/{batch_id}/status", response_model=BatchStatusResponse)
async def get_batch_status(
batch_id: int,
wait: Optional[int] = Query(
default=None,
ge=0,
le=MAX_WAIT_SECONDS,
description="Long-polling wait time in seconds (max 30)"
),
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user)
):
"""
Get batch processing status with optional long-polling.
Returns aggregated status counts and individual job statuses.
When `wait` parameter is provided, the endpoint will poll until:
- Status changes from initial snapshot
- All jobs complete (pending + processing = 0)
- Timeout is reached
Args:
batch_id: Batch ID to query
wait: Optional wait time in seconds for long-polling (0-30)
Returns:
BatchStatusResponse with status counts and job details
Raises:
404: If batch not found
"""
# Get initial snapshot
snapshot = await _get_batch_status_snapshot(batch_id, session)
if snapshot is None:
raise HTTPException(
status_code=404,
detail=f"Batch {batch_id} not found"
)
# If long-polling requested and jobs still in progress
if wait and wait > 0:
initial_pending = snapshot["pending_count"]
initial_processing = snapshot["processing_count"]
initial_completed = snapshot["completed_count"]
initial_failed = snapshot["failed_count"]
# Only wait if there are still jobs in progress
if initial_pending + initial_processing > 0:
elapsed = 0.0
while elapsed < wait:
await asyncio.sleep(POLL_INTERVAL_SECONDS)
elapsed += POLL_INTERVAL_SECONDS
# Refresh snapshot
snapshot = await _get_batch_status_snapshot(batch_id, session)
if snapshot is None:
# Batch deleted during polling (edge case)
raise HTTPException(status_code=404, detail=f"Batch {batch_id} not found")
# Check if status changed
current_pending = snapshot["pending_count"]
current_processing = snapshot["processing_count"]
current_completed = snapshot["completed_count"]
current_failed = snapshot["failed_count"]
if (current_pending != initial_pending or
current_processing != initial_processing or
current_completed != initial_completed or
current_failed != initial_failed):
# Status changed, return immediately
break
# Check if all jobs finished
if current_pending + current_processing == 0:
break
# Build response
batch = snapshot["batch"]
total_files = batch.total_files
overall_status = _compute_batch_overall_status(
snapshot["pending_count"],
snapshot["processing_count"],
snapshot["completed_count"],
snapshot["failed_count"],
total_files
)
jobs = [
BatchJobInfo(
job_id=j["job_id"],
filename=j["filename"],
status=j["status"],
receipt_id=j["receipt_id"],
error_message=j.get("error_message")
)
for j in snapshot["jobs"]
]
return BatchStatusResponse(
batch_id=batch.id,
status=overall_status,
total_files=total_files,
pending_count=snapshot["pending_count"],
processing_count=snapshot["processing_count"],
completed_count=snapshot["completed_count"],
failed_count=snapshot["failed_count"],
jobs=jobs,
total_amount=snapshot["total_amount"],
created_at=batch.created_at
)
# ============ Retry Endpoints (US-006) ============
async def _retry_single_receipt(
session: AsyncSession,
receipt: Receipt,
username: str
) -> tuple[bool, Optional[str], Optional[str]]:
"""
Retry processing for a single receipt.
Finds the original file from attachments, resets processing status,
and creates a new OCR job.
Args:
session: Database session
receipt: Receipt to retry
username: Username for the new OCR job
Returns:
Tuple of (success, job_id, error_message)
"""
# Get the first attachment to find the source file
attachments_result = await session.execute(
select(ReceiptAttachment)
.where(ReceiptAttachment.receipt_id == receipt.id)
.limit(1)
)
attachment = attachments_result.scalar_one_or_none()
if not attachment:
return False, None, "Bonul nu are fișier atașat"
# Construct full path to attachment file
file_path = settings.data_entry_upload_path_resolved / attachment.file_path
if not file_path.exists():
return False, None, "Fișierul original nu mai este disponibil"
# Read file content
try:
with open(file_path, 'rb') as f:
file_bytes = f.read()
except Exception as e:
logger.error(f"[Retry] Failed to read file {file_path}: {e}")
return False, None, f"Eroare la citirea fișierului: {str(e)}"
# Create new OCR job
try:
job = await job_queue.create_job(
file_bytes=file_bytes,
mime_type=attachment.mime_type,
engine="doctr_plus",
username=username,
original_filename=attachment.filename,
batch_id=None, # No batch for retry - direct processing
file_hash=receipt.file_hash
)
# Reset receipt processing status
receipt.processing_status = "pending"
receipt.processing_error = None
receipt.processing_started_at = datetime.utcnow()
receipt.processing_completed_at = None
await session.flush()
logger.info(f"[Retry] Receipt {receipt.id} requeued as job {job.id}")
return True, job.id, None
except Exception as e:
logger.error(f"[Retry] Failed to create job for receipt {receipt.id}: {e}")
return False, None, f"Eroare la crearea job-ului OCR: {str(e)}"
@router.post("/retry/{receipt_id}", response_model=RetryResponse)
async def retry_receipt(
receipt_id: int,
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user),
selected_company: int = Depends(get_selected_company)
):
"""
Retry OCR processing for a single failed receipt.
Resets the receipt's processing_status to 'pending' and creates
a new OCR job using the original attachment file.
Args:
receipt_id: ID of the receipt to retry
Returns:
RetryResponse with success status and new job ID
Raises:
404: If receipt not found
400: If receipt is not in 'failed' status
400: If original file is not available
"""
# Get the receipt
result = await session.execute(
select(Receipt).where(
and_(
Receipt.id == receipt_id,
Receipt.company_id == selected_company
)
)
)
receipt = result.scalar_one_or_none()
if not receipt:
raise HTTPException(
status_code=404,
detail=f"Bonul #{receipt_id} nu a fost găsit"
)
# Verify receipt is in failed status
if receipt.processing_status != "failed":
raise HTTPException(
status_code=400,
detail=f"Bonul nu este în stare de eroare (status actual: {receipt.processing_status})"
)
# Attempt retry
success, job_id, error = await _retry_single_receipt(
session, receipt, current_user.username
)
if not success:
raise HTTPException(
status_code=400,
detail=error or "Eroare necunoscută la reîncărcare"
)
await session.commit()
return RetryResponse(
success=True,
receipt_id=receipt_id,
job_id=job_id,
message="Bon reîncarcat în procesare"
)
@router.post("/retry-batch/{batch_id}", response_model=BatchRetryResponse)
async def retry_batch_failed(
batch_id: str,
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user),
selected_company: int = Depends(get_selected_company)
):
"""
Retry all failed receipts in a batch.
Finds all receipts with batch_id matching and processing_status='failed',
then attempts to retry each one.
Args:
batch_id: Batch ID (UUID string from receipt.batch_id)
Returns:
BatchRetryResponse with counts of successful and failed retries
Raises:
404: If no failed receipts found for batch
"""
# Find all failed receipts in this batch
result = await session.execute(
select(Receipt).where(
and_(
Receipt.batch_id == batch_id,
Receipt.company_id == selected_company,
Receipt.processing_status == "failed"
)
)
)
failed_receipts = result.scalars().all()
if not failed_receipts:
raise HTTPException(
status_code=404,
detail=f"Nu există bonuri cu erori în batch-ul {batch_id}"
)
# Retry each receipt
retried_count = 0
failed_count = 0
errors = []
for receipt in failed_receipts:
success, job_id, error = await _retry_single_receipt(
session, receipt, current_user.username
)
if success:
retried_count += 1
else:
failed_count += 1
errors.append(f"Bon #{receipt.id}: {error}")
await session.commit()
return BatchRetryResponse(
success=retried_count > 0,
batch_id=batch_id,
retried_count=retried_count,
failed_count=failed_count,
errors=errors,
message=f"{retried_count} bonuri reîncarcate în procesare"
+ (f", {failed_count} erori" if failed_count > 0 else "")
)
# ============ Cancel Endpoints (US-014) ============
@router.post("/cancel/{job_id}", response_model=CancelJobResponse)
async def cancel_job(
job_id: str,
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user)
):
"""
Cancel a single OCR processing job.
Only jobs with status 'pending' or 'processing' can be cancelled.
Jobs with status 'completed' or 'failed' cannot be cancelled.
Important: If a receipt has already been created from this job,
it will NOT be deleted - receipts are preserved for audit purposes.
Args:
job_id: The UUID of the OCR job to cancel
Returns:
CancelJobResponse with cancellation details
Raises:
404: If job not found in batch_jobs table
400: If job has already completed or failed
"""
# Find the job in batch_jobs table
batch_job_result = await session.execute(
select(BatchJob).where(BatchJob.job_id == job_id)
)
batch_job = batch_job_result.scalar_one_or_none()
if not batch_job:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} nu a fost găsit"
)
# Get the OCR job from job_queue to check current status
ocr_job = await job_queue.get_job(job_id)
if not ocr_job:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} nu există în coada de procesare"
)
# Check if job can be cancelled
current_status = ocr_job.status.value
if current_status == OCRJobStatus.completed.value:
raise HTTPException(
status_code=400,
detail=f"Job-ul a fost deja procesat cu succes. Nu poate fi anulat."
)
if current_status == OCRJobStatus.failed.value:
raise HTTPException(
status_code=400,
detail=f"Job-ul a eșuat deja. Folosiți opțiunea de reîncercare în loc de anulare."
)
if current_status == OCRJobStatus.cancelled.value:
raise HTTPException(
status_code=400,
detail=f"Job-ul a fost deja anulat."
)
# Update job status to cancelled in job_queue (SQLite)
cancelled_at = datetime.utcnow()
success = await job_queue.update_status(
job_id=job_id,
status=OCRJobStatus.cancelled,
error="Cancelled by user"
)
if not success:
raise HTTPException(
status_code=500,
detail=f"Eroare la anularea job-ului"
)
logger.info(
f"[CancelJob] Job {job_id} cancelled by {current_user.username} "
f"(previous status: {current_status})"
)
return CancelJobResponse(
success=True,
job_id=job_id,
cancelled_at=cancelled_at,
message=f"Job anulat cu succes"
)
@router.post("/cancel-batch/{batch_id}", response_model=CancelBatchResponse)
async def cancel_batch(
batch_id: int,
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user)
):
"""
Cancel all pending/processing jobs in a batch.
Finds all jobs with status 'pending' or 'processing' in the specified batch
and marks them as 'cancelled'. Jobs with status 'completed' or 'failed'
are not affected.
Important: Receipts that have already been created from completed jobs
will NOT be deleted - they are preserved for audit purposes.
Args:
batch_id: The batch ID to cancel
Returns:
CancelBatchResponse with counts of cancelled and skipped jobs
Raises:
404: If batch not found or no jobs exist for batch
"""
# Verify batch exists
batch_result = await session.execute(
select(BatchUpload).where(BatchUpload.id == batch_id)
)
batch = batch_result.scalar_one_or_none()
if not batch:
raise HTTPException(
status_code=404,
detail=f"Batch {batch_id} nu a fost găsit"
)
# Get all batch_jobs for this batch
batch_jobs_result = await session.execute(
select(BatchJob).where(BatchJob.batch_id == batch_id)
)
batch_jobs = batch_jobs_result.scalars().all()
if not batch_jobs:
raise HTTPException(
status_code=404,
detail=f"Nu există job-uri în batch-ul {batch_id}"
)
# Process each job - cancel pending/processing, skip completed/failed
cancelled_count = 0
skipped_count = 0
for batch_job in batch_jobs:
# Get current job status from OCR job queue
ocr_job = await job_queue.get_job(batch_job.job_id)
if not ocr_job:
# Job not found in queue - treat as skipped
skipped_count += 1
continue
current_status = ocr_job.status.value
# Only cancel pending or processing jobs
if current_status in (OCRJobStatus.pending.value, OCRJobStatus.processing.value):
success = await job_queue.update_status(
job_id=batch_job.job_id,
status=OCRJobStatus.cancelled,
error="Cancelled by user (batch cancel)"
)
if success:
cancelled_count += 1
logger.debug(f"[CancelBatch] Cancelled job {batch_job.job_id}")
else:
# Failed to cancel - count as skipped
skipped_count += 1
logger.warning(
f"[CancelBatch] Failed to cancel job {batch_job.job_id}"
)
else:
# Job is completed, failed, or already cancelled - skip it
skipped_count += 1
logger.info(
f"[CancelBatch] Batch {batch_id} cancelled by {current_user.username}: "
f"{cancelled_count} cancelled, {skipped_count} skipped"
)
# Build message
if cancelled_count == 0:
message = f"Nu există job-uri de anulat în batch-ul {batch_id}"
elif skipped_count == 0:
message = f"{cancelled_count} job-uri anulate"
else:
message = f"{cancelled_count} job-uri anulate, {skipped_count} ignorate (deja procesate)"
return CancelBatchResponse(
success=cancelled_count > 0,
batch_id=batch_id,
cancelled_count=cancelled_count,
skipped_count=skipped_count,
message=message
)

View File

@@ -4,7 +4,7 @@ from typing import List, Optional, Annotated
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query, Header, Response
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from backend.modules.data_entry.db.database import get_session
@@ -19,6 +19,7 @@ from backend.modules.data_entry.schemas.receipt import (
ReceiptResponse,
ReceiptListResponse,
ReceiptFilter,
ProcessingStats,
AttachmentResponse,
AccountingEntryResponse,
WorkflowAction,
@@ -28,8 +29,12 @@ from backend.modules.data_entry.schemas.receipt import (
AccountOption,
CashRegisterOption,
ExpenseTypeOption,
BulkDeleteRequest,
BulkDeleteResponse,
BulkDeleteFailure,
)
from backend.modules.data_entry.db.models.receipt import ReceiptStatus, ReceiptDirection
from backend.modules.data_entry.services import sse_service
# Auth integration
from shared.auth.dependencies import get_current_user
@@ -105,6 +110,51 @@ def get_current_user_company(current_user: CurrentUser) -> int:
return 1
# ============ SSE Endpoint for Real-time Status Updates ============
@router.get("/sse/status")
async def sse_status_stream(
batch_id: Optional[str] = Query(
default=None,
description="Optional batch_id to filter events for a specific batch"
),
):
"""
Server-Sent Events endpoint for real-time receipt status updates.
This endpoint provides a persistent connection that streams status change
events as they occur. Clients receive updates for CRUD operations on receipts
without needing to poll.
Query Parameters:
batch_id: Optional filter to only receive events for a specific batch upload.
Event Format:
data: {"receipt_id": 123, "status": "DRAFT", "processing_status": "completed", ...}
Headers:
- Content-Type: text/event-stream
- Cache-Control: no-cache
- Connection: keep-alive
Reconnection:
The retry: 3000 header hints clients to reconnect after 3 seconds if disconnected.
Example:
curl -N http://localhost:8000/api/data-entry/receipts/sse/status
curl -N http://localhost:8000/api/data-entry/receipts/sse/status?batch_id=abc-123
"""
return StreamingResponse(
sse_service.subscribe(batch_id=batch_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
# ============ Receipt CRUD Endpoints ============
@router.post("/", response_model=ReceiptResponse)
@@ -128,12 +178,20 @@ async def list_receipts(
date_from: Optional[str] = None,
date_to: Optional[str] = None,
search: Optional[str] = None,
# Bulk upload filters (US-012)
processing_status: Optional[str] = Query(default=None, description="Filter by processing status: pending, processing, completed, failed"),
batch_id: Optional[str] = Query(default=None, description="Filter by batch_id UUID"),
sort_by: Optional[str] = Query(default=None, description="Sort field: processing_started_at, processing_started_at_asc"),
# Pagination
page: int = Query(default=1, ge=1),
page_size: int = Query(default=20, ge=1, le=100),
session: AsyncSession = Depends(get_session),
selected_company: SelectedCompany = None,
):
"""Get paginated list of receipts with filters."""
"""Get paginated list of receipts with filters.
US-012: Extended with batch_id, processing_status filters and processing_stats.
"""
# Disable browser caching to always get fresh data
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
@@ -148,6 +206,9 @@ async def list_receipts(
date_from=date_type.fromisoformat(date_from) if date_from else None,
date_to=date_type.fromisoformat(date_to) if date_to else None,
search=search,
processing_status=processing_status,
batch_id=batch_id,
sort_by=sort_by,
page=page,
page_size=page_size,
)
@@ -231,6 +292,68 @@ async def update_receipt(
return ReceiptResponse.model_validate(receipt)
@router.delete("/bulk", response_model=BulkDeleteResponse)
async def bulk_delete_receipts(
data: BulkDeleteRequest,
session: AsyncSession = Depends(get_session),
current_user: CurrentUser = Depends(get_current_user),
):
"""
Bulk delete receipts (US-024).
Deletes multiple receipts in a single request with partial success support.
Validation rules:
- Each receipt must be in DRAFT status
- Each receipt must be created by the current user
- Receipts with processing_status 'pending' or 'processing' cannot be deleted
Returns:
BulkDeleteResponse with deleted IDs and failed items with error messages
"""
deleted: List[int] = []
failed: List[BulkDeleteFailure] = []
for receipt_id in data.ids:
# Get receipt with relationships for deletion
receipt = await ReceiptCRUD.get_by_id(session, receipt_id, include_relations=True)
if not receipt:
failed.append(BulkDeleteFailure(id=receipt_id, error="Bonul nu a fost găsit"))
continue
# Check if receipt is being processed (bulk upload in progress)
if receipt.processing_status in ["pending", "processing"]:
failed.append(BulkDeleteFailure(
id=receipt_id,
error="Bonul este în curs de procesare și nu poate fi șters"
))
continue
# Check status - only DRAFT can be deleted
if receipt.status != ReceiptStatus.DRAFT:
failed.append(BulkDeleteFailure(
id=receipt_id,
error=f"Doar bonurile în status DRAFT pot fi șterse (status curent: {receipt.status.value})"
))
continue
# Check ownership
if receipt.created_by != current_user.username:
failed.append(BulkDeleteFailure(
id=receipt_id,
error="Doar creatorul bonului poate să-l șteargă"
))
continue
# All validations passed - delete the receipt
# Note: Cascade delete handles attachments and accounting entries
await ReceiptCRUD.delete(session, receipt)
deleted.append(receipt_id)
return BulkDeleteResponse(deleted=deleted, failed=failed)
@router.delete("/{receipt_id}")
async def delete_receipt(
receipt_id: int,
@@ -261,6 +384,15 @@ async def submit_receipt(
session, receipt_id, current_user.username
)
# Broadcast SSE event on success (US-030)
if success and receipt:
await sse_service.broadcast_status_change(
receipt_id=receipt.id,
status=receipt.status.value,
processing_status=receipt.processing_status,
batch_id=receipt.batch_id,
)
return WorkflowAction(
success=success,
message=message,
@@ -279,6 +411,15 @@ async def approve_receipt(
session, receipt_id, current_user.username
)
# Broadcast SSE event on success (US-030)
if success and receipt:
await sse_service.broadcast_status_change(
receipt_id=receipt.id,
status=receipt.status.value,
processing_status=receipt.processing_status,
batch_id=receipt.batch_id,
)
return WorkflowAction(
success=success,
message=message,
@@ -298,6 +439,15 @@ async def reject_receipt(
session, receipt_id, current_user.username, data.reason
)
# Broadcast SSE event on success (US-030)
if success and receipt:
await sse_service.broadcast_status_change(
receipt_id=receipt.id,
status=receipt.status.value,
processing_status=receipt.processing_status,
batch_id=receipt.batch_id,
)
return WorkflowAction(
success=success,
message=message,
@@ -316,6 +466,15 @@ async def resubmit_receipt(
session, receipt_id, current_user.username
)
# Broadcast SSE event on success (US-030)
if success and receipt:
await sse_service.broadcast_status_change(
receipt_id=receipt.id,
status=receipt.status.value,
processing_status=receipt.processing_status,
batch_id=receipt.batch_id,
)
return WorkflowAction(
success=success,
message=message,
@@ -334,6 +493,15 @@ async def unapprove_receipt(
session, receipt_id, current_user.username
)
# Broadcast SSE event on success (US-030)
if success and receipt:
await sse_service.broadcast_status_change(
receipt_id=receipt.id,
status=receipt.status.value,
processing_status=receipt.processing_status,
batch_id=receipt.batch_id,
)
return WorkflowAction(
success=success,
message=message,

View File

@@ -12,6 +12,12 @@ from .receipt import (
WorkflowAction,
RejectRequest,
)
from .bulk import (
BulkUploadResponse,
BatchJobInfo,
BatchStatusResponse,
BulkUploadError,
)
__all__ = [
"ReceiptCreate",
@@ -25,4 +31,9 @@ __all__ = [
"AccountingEntryResponse",
"WorkflowAction",
"RejectRequest",
# Bulk upload schemas
"BulkUploadResponse",
"BatchJobInfo",
"BatchStatusResponse",
"BulkUploadError",
]

View File

@@ -0,0 +1,212 @@
"""Pydantic schemas for bulk upload endpoints."""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field
class BulkUploadResponse(BaseModel):
"""Response schema for bulk upload endpoint."""
batch_id: int = Field(..., description="Unique batch identifier for tracking")
job_ids: List[str] = Field(..., description="List of OCR job UUIDs created")
total_files: int = Field(..., description="Number of files in the batch")
message: str = Field(..., description="Status message")
class Config:
json_schema_extra = {
"example": {
"batch_id": 1,
"job_ids": [
"550e8400-e29b-41d4-a716-446655440001",
"550e8400-e29b-41d4-a716-446655440002",
],
"total_files": 2,
"message": "2 files queued for processing"
}
}
class BatchJobInfo(BaseModel):
"""Information about a single job in a batch."""
job_id: str = Field(..., description="OCR job UUID")
filename: str = Field(..., description="Original filename")
status: str = Field(..., description="Job status: pending, processing, completed, failed")
receipt_id: Optional[int] = Field(None, description="Created receipt ID (if completed)")
error_message: Optional[str] = Field(None, description="Error message (if failed)")
class BatchStatusResponse(BaseModel):
"""Response schema for batch status endpoint."""
batch_id: int = Field(..., description="Batch identifier")
status: str = Field(..., description="Overall batch status")
total_files: int = Field(..., description="Total number of files in batch")
pending_count: int = Field(..., description="Number of pending jobs")
processing_count: int = Field(..., description="Number of processing jobs")
completed_count: int = Field(..., description="Number of completed jobs")
failed_count: int = Field(..., description="Number of failed jobs")
jobs: List[BatchJobInfo] = Field(..., description="List of jobs with their status")
total_amount: Optional[float] = Field(None, description="Sum of all receipt amounts")
created_at: datetime = Field(..., description="Batch creation timestamp")
class Config:
json_schema_extra = {
"example": {
"batch_id": 1,
"status": "processing",
"total_files": 5,
"pending_count": 2,
"processing_count": 1,
"completed_count": 2,
"failed_count": 0,
"jobs": [
{"job_id": "abc-123", "filename": "bon1.pdf", "status": "completed", "receipt_id": 15},
{"job_id": "def-456", "filename": "bon2.jpg", "status": "processing", "receipt_id": None},
],
"total_amount": 150.50,
"created_at": "2025-01-09T10:30:00"
}
}
class DuplicateFileInfo(BaseModel):
"""Information about a duplicate file detected during upload."""
filename: str = Field(..., description="Name of the duplicate file")
error: str = Field(default="duplicate", description="Error type (always 'duplicate')")
existing_receipt_id: int = Field(..., description="ID of the existing receipt with same file hash")
message: str = Field(..., description="Human-readable error message")
class Config:
json_schema_extra = {
"example": {
"filename": "bon_lidl.pdf",
"error": "duplicate",
"existing_receipt_id": 123,
"message": "Fișier duplicat - există deja ca bon #123"
}
}
class BulkUploadResponseWithDuplicates(BaseModel):
"""Response schema for bulk upload with partial success (some duplicates)."""
batch_id: Optional[int] = Field(None, description="Batch ID (None if all files were duplicates)")
job_ids: List[str] = Field(default_factory=list, description="List of OCR job UUIDs created")
total_files: int = Field(..., description="Total number of files submitted")
processed_files: int = Field(..., description="Number of files successfully queued")
duplicate_files: int = Field(..., description="Number of duplicate files rejected")
duplicates: List[DuplicateFileInfo] = Field(default_factory=list, description="List of duplicate file details")
message: str = Field(..., description="Status message")
class Config:
json_schema_extra = {
"example": {
"batch_id": 1,
"job_ids": ["550e8400-e29b-41d4-a716-446655440001"],
"total_files": 3,
"processed_files": 1,
"duplicate_files": 2,
"duplicates": [
{
"filename": "bon_lidl.pdf",
"error": "duplicate",
"existing_receipt_id": 123,
"message": "Fișier duplicat - există deja ca bon #123"
}
],
"message": "1 fișier în procesare, 2 duplicate ignorate"
}
}
class BulkUploadError(BaseModel):
"""Error response for bulk upload validation failures."""
detail: str = Field(..., description="Error message")
invalid_files: Optional[List[str]] = Field(None, description="List of invalid filenames")
class RetryResponse(BaseModel):
"""Response schema for retry endpoints."""
success: bool = Field(..., description="Whether the retry was successful")
receipt_id: int = Field(..., description="Receipt ID that was retried")
job_id: Optional[str] = Field(None, description="New OCR job ID created")
message: str = Field(..., description="Status message")
class Config:
json_schema_extra = {
"example": {
"success": True,
"receipt_id": 123,
"job_id": "550e8400-e29b-41d4-a716-446655440001",
"message": "Bon reîncarcat în procesare"
}
}
class BatchRetryResponse(BaseModel):
"""Response schema for batch retry endpoint."""
success: bool = Field(..., description="Whether any retries were successful")
batch_id: str = Field(..., description="Batch ID that was retried")
retried_count: int = Field(..., description="Number of receipts successfully retried")
failed_count: int = Field(..., description="Number of receipts that couldn't be retried")
errors: List[str] = Field(default_factory=list, description="List of error messages")
message: str = Field(..., description="Status message")
class Config:
json_schema_extra = {
"example": {
"success": True,
"batch_id": "abc-123",
"retried_count": 3,
"failed_count": 0,
"errors": [],
"message": "3 bonuri reîncarcate în procesare"
}
}
class CancelJobResponse(BaseModel):
"""Response schema for cancel job endpoint."""
success: bool = Field(..., description="Whether the cancellation was successful")
job_id: str = Field(..., description="Job ID that was cancelled")
cancelled_at: datetime = Field(..., description="Timestamp when the job was cancelled")
message: str = Field(..., description="Status message")
class Config:
json_schema_extra = {
"example": {
"success": True,
"job_id": "550e8400-e29b-41d4-a716-446655440001",
"cancelled_at": "2025-01-11T15:30:00",
"message": "Job anulat cu succes"
}
}
class CancelBatchResponse(BaseModel):
"""Response schema for cancel batch endpoint."""
success: bool = Field(..., description="Whether any jobs were cancelled")
batch_id: int = Field(..., description="Batch ID that was cancelled")
cancelled_count: int = Field(..., description="Number of jobs successfully cancelled")
skipped_count: int = Field(..., description="Number of jobs skipped (completed/failed)")
message: str = Field(..., description="Status message")
class Config:
json_schema_extra = {
"example": {
"success": True,
"batch_id": 1,
"cancelled_count": 3,
"skipped_count": 2,
"message": "3 job-uri anulate, 2 ignorate (deja procesate)"
}
}

View File

@@ -6,7 +6,7 @@ from decimal import Decimal
from typing import Optional, List, Any, Union
from pydantic import BaseModel, Field, ConfigDict, field_validator
from backend.modules.data_entry.db.models.receipt import ReceiptType, ReceiptDirection, ReceiptStatus
from backend.modules.data_entry.db.models.receipt import ReceiptType, ReceiptDirection, ReceiptStatus, ProcessingStatus
from backend.modules.data_entry.db.models.accounting_entry import EntryType
@@ -161,6 +161,14 @@ class ReceiptResponse(ReceiptBase):
oracle_act_id: Optional[int] = None
oracle_error: Optional[str] = None
# Bulk upload batch tracking (US-012)
batch_id: Optional[str] = None
processing_status: Optional[str] = None
processing_error: Optional[str] = None
file_hash: Optional[str] = None
processing_started_at: Optional[datetime] = None
processing_completed_at: Optional[datetime] = None
# Relationships (optional, loaded when needed)
attachments: List[AttachmentResponse] = []
entries: List[AccountingEntryResponse] = []
@@ -196,6 +204,14 @@ class ReceiptResponse(ReceiptBase):
return None
class ProcessingStats(BaseModel):
"""Statistics for bulk upload processing status (US-012)."""
pending_count: int = 0
processing_count: int = 0
completed_count: int = 0
failed_count: int = 0
class ReceiptListResponse(BaseModel):
"""Schema for paginated receipt list response."""
items: List[ReceiptResponse]
@@ -203,6 +219,8 @@ class ReceiptListResponse(BaseModel):
page: int
page_size: int
pages: int
# Processing stats for bulk upload filtering (US-012)
processing_stats: Optional[ProcessingStats] = None
class ReceiptFilter(BaseModel):
@@ -214,6 +232,11 @@ class ReceiptFilter(BaseModel):
date_from: Optional[date] = None
date_to: Optional[date] = None
search: Optional[str] = None # Search in description, partner_name
# Bulk upload filters (US-012)
processing_status: Optional[str] = None # ProcessingStatus enum value
batch_id: Optional[str] = None # Filter by batch_id
sort_by: Optional[str] = None # Sort field (e.g., "processing_started_at")
# Pagination
page: int = Field(default=1, ge=1)
page_size: int = Field(default=20, ge=1, le=100)
@@ -267,3 +290,22 @@ class ExpenseTypeOption(BaseModel):
account_code: str
has_vat: bool
vat_percent: Decimal = Decimal("19")
# ============ Bulk Delete Schemas (US-024) ============
class BulkDeleteRequest(BaseModel):
"""Request schema for bulk delete endpoint."""
ids: List[int] = Field(..., min_length=1, description="List of receipt IDs to delete")
class BulkDeleteFailure(BaseModel):
"""Schema for a single failed deletion."""
id: int
error: str
class BulkDeleteResponse(BaseModel):
"""Response schema for bulk delete with partial success support."""
deleted: List[int] = Field(default_factory=list, description="IDs of successfully deleted receipts")
failed: List[BulkDeleteFailure] = Field(default_factory=list, description="IDs that failed with error messages")

View File

@@ -2,10 +2,15 @@
from .receipt_service import ReceiptService
from .nomenclature_service import NomenclatureService
from .expense_types import EXPENSE_TYPES, ExpenseType
from .receipt_auto_create import ReceiptAutoCreateService, ReceiptCreateResult
from . import sse_service
__all__ = [
"ReceiptService",
"NomenclatureService",
"EXPENSE_TYPES",
"ExpenseType",
"ReceiptAutoCreateService",
"ReceiptCreateResult",
"sse_service",
]

View File

@@ -0,0 +1,215 @@
"""
Cleanup service for auto-deleting expired failed receipts.
US-008: Backend - Auto-Cleanup Erori După 7 Zile
- Finds receipts with processing_status='failed' and processing_completed_at < now() - 7 days
- Deletes the receipts and their attached files from storage
- Runs at startup and then daily as a background task
"""
import asyncio
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from backend.modules.data_entry.db.models.receipt import Receipt, ReceiptAttachment
from backend.modules.data_entry.config import settings
logger = logging.getLogger(__name__)
# Cleanup configuration
CLEANUP_RETENTION_DAYS = 7
CLEANUP_INTERVAL_HOURS = 24
# In-memory storage for last cleanup stats (optional - for login notification)
_last_cleanup_stats: dict = {
"count": 0,
"timestamp": None
}
def get_last_cleanup_stats() -> dict:
"""Get stats from the last cleanup run for notification purposes."""
return _last_cleanup_stats.copy()
async def cleanup_expired_failed_receipts(session: AsyncSession) -> int:
"""
Find and delete receipts with processing_status='failed' older than 7 days.
This function:
1. Queries for failed receipts where processing_completed_at < now() - 7 days
2. Deletes attachment files from disk
3. Deletes the receipt records (cascade deletes attachment records)
Args:
session: AsyncSession for database operations
Returns:
Number of receipts deleted
"""
global _last_cleanup_stats
cutoff_date = datetime.utcnow() - timedelta(days=CLEANUP_RETENTION_DAYS)
# Find expired failed receipts with their attachments
query = select(Receipt).options(
selectinload(Receipt.attachments)
).where(
and_(
Receipt.processing_status == "failed",
Receipt.processing_completed_at.isnot(None),
Receipt.processing_completed_at < cutoff_date
)
)
result = await session.execute(query)
expired_receipts = result.scalars().all()
if not expired_receipts:
logger.debug("[Cleanup] No expired failed receipts found")
return 0
deleted_count = 0
deleted_files = 0
upload_base_path = settings.upload_path_resolved
for receipt in expired_receipts:
try:
# Delete attachment files from disk
for attachment in receipt.attachments:
file_path = upload_base_path / attachment.file_path
if file_path.exists():
try:
file_path.unlink()
deleted_files += 1
logger.debug(f"[Cleanup] Deleted file: {file_path}")
except OSError as e:
logger.warning(f"[Cleanup] Failed to delete file {file_path}: {e}")
# Also try to clean up empty parent directories
parent_dir = file_path.parent
if parent_dir.exists() and parent_dir != upload_base_path:
try:
# Only remove if directory is empty
if not any(parent_dir.iterdir()):
parent_dir.rmdir()
logger.debug(f"[Cleanup] Removed empty directory: {parent_dir}")
except OSError:
pass # Directory not empty or permission issue, skip
# Delete receipt (cascade deletes attachment records in DB)
await session.delete(receipt)
deleted_count += 1
except Exception as e:
logger.error(f"[Cleanup] Error deleting receipt {receipt.id}: {e}")
continue
# Commit all deletions
if deleted_count > 0:
await session.commit()
# Update stats for notification
_last_cleanup_stats = {
"count": deleted_count,
"files_deleted": deleted_files,
"timestamp": datetime.utcnow().isoformat()
}
logger.info(f"[Cleanup] Cleaned up {deleted_count} expired failed receipts ({deleted_files} files)")
return deleted_count
async def run_cleanup_task(get_session_func) -> None:
"""
Background task that runs cleanup at startup and then every 24 hours.
Args:
get_session_func: Async generator function that yields database sessions
"""
logger.info("[Cleanup] Starting cleanup background task")
# Run immediately at startup
try:
async for session in get_session_func():
count = await cleanup_expired_failed_receipts(session)
if count > 0:
logger.info(f"[Cleanup] Initial cleanup: {count} receipts removed")
break
except Exception as e:
logger.error(f"[Cleanup] Initial cleanup failed: {e}")
# Then run every 24 hours
while True:
try:
await asyncio.sleep(CLEANUP_INTERVAL_HOURS * 3600)
async for session in get_session_func():
count = await cleanup_expired_failed_receipts(session)
if count > 0:
logger.info(f"[Cleanup] Daily cleanup: {count} receipts removed")
break
except asyncio.CancelledError:
logger.info("[Cleanup] Cleanup task cancelled")
raise
except Exception as e:
logger.error(f"[Cleanup] Daily cleanup failed: {e}")
# Continue running even if one cleanup fails
# Global reference to cleanup task for graceful shutdown
_cleanup_task: Optional[asyncio.Task] = None
async def start_cleanup_task(get_session_func) -> bool:
"""
Start the cleanup background task.
Args:
get_session_func: Async generator function that yields database sessions
Returns:
True if task started successfully, False otherwise
"""
global _cleanup_task
if _cleanup_task is not None and not _cleanup_task.done():
logger.warning("[Cleanup] Cleanup task already running")
return False
try:
_cleanup_task = asyncio.create_task(run_cleanup_task(get_session_func))
logger.info("[Cleanup] ✅ Cleanup background task started")
return True
except Exception as e:
logger.error(f"[Cleanup] Failed to start cleanup task: {e}")
return False
async def stop_cleanup_task() -> None:
"""Stop the cleanup background task gracefully."""
global _cleanup_task
if _cleanup_task is not None and not _cleanup_task.done():
_cleanup_task.cancel()
try:
await _cleanup_task
except asyncio.CancelledError:
pass
logger.info("[Cleanup] Cleanup task stopped")
_cleanup_task = None
def is_cleanup_task_running() -> bool:
"""Check if the cleanup task is currently running."""
return _cleanup_task is not None and not _cleanup_task.done()

View File

@@ -23,7 +23,9 @@ Schema:
ocr_time_ms INTEGER, -- Actual OCR engine processing time
created_by TEXT, -- Username
original_filename TEXT,
expires_at TIMESTAMP
expires_at TIMESTAMP,
batch_id INTEGER, -- Foreign key to batch_uploads (for bulk processing)
file_hash TEXT -- SHA-256 hash for duplicate detection (US-007)
)
"""
@@ -66,6 +68,7 @@ class OCRJobStatus(str, Enum):
processing = "processing"
completed = "completed"
failed = "failed"
cancelled = "cancelled"
@dataclass
@@ -86,6 +89,8 @@ class OCRJob:
created_by: Optional[str] = None
original_filename: Optional[str] = None
expires_at: Optional[datetime] = None
batch_id: Optional[int] = None # Links to batch_uploads table for bulk processing
file_hash: Optional[str] = None # SHA-256 hash for duplicate detection (US-007)
@property
def queue_wait_ms(self) -> Optional[int]:
@@ -163,7 +168,8 @@ class OCRJobQueue:
ocr_time_ms INTEGER,
created_by TEXT,
original_filename TEXT,
expires_at TIMESTAMP
expires_at TIMESTAMP,
batch_id INTEGER
)
''')
@@ -174,6 +180,20 @@ class OCRJobQueue:
except Exception:
pass # Column already exists
# Migration: add batch_id column if it doesn't exist
try:
await db.execute('ALTER TABLE ocr_jobs ADD COLUMN batch_id INTEGER')
logger.info("[OCRJobQueue] Added batch_id column to existing table")
except Exception:
pass # Column already exists
# Migration: add file_hash column if it doesn't exist (US-007)
try:
await db.execute('ALTER TABLE ocr_jobs ADD COLUMN file_hash TEXT')
logger.info("[OCRJobQueue] Added file_hash column to existing table")
except Exception:
pass # Column already exists
# Index for efficient queue queries
await db.execute('''
CREATE INDEX IF NOT EXISTS idx_ocr_jobs_status
@@ -197,7 +217,9 @@ class OCRJobQueue:
mime_type: str,
engine: str = "doctr_plus",
username: Optional[str] = None,
original_filename: Optional[str] = None
original_filename: Optional[str] = None,
batch_id: Optional[int] = None,
file_hash: Optional[str] = None
) -> OCRJob:
"""
Create a new OCR job.
@@ -210,6 +232,8 @@ class OCRJobQueue:
engine: OCR engine ('tesseract', 'doctr', 'doctr_plus', 'paddleocr')
username: Username of requester
original_filename: Original filename from upload
batch_id: Optional batch ID for bulk upload processing
file_hash: Optional SHA-256 hash for duplicate detection (US-007)
Returns:
Created OCRJob instance
@@ -241,15 +265,15 @@ class OCRJobQueue:
await db.execute('''
INSERT INTO ocr_jobs (
id, status, file_path, mime_type, engine,
created_at, created_by, original_filename, expires_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
created_at, created_by, original_filename, expires_at, batch_id, file_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
job_id, OCRJobStatus.pending.value, str(file_path), mime_type, engine,
now.isoformat(), username, original_filename, expires_at.isoformat()
now.isoformat(), username, original_filename, expires_at.isoformat(), batch_id, file_hash
))
await db.commit()
logger.info(f"[OCRJobQueue] Created job {job_id}: engine={engine}, file={file_path.name}")
logger.info(f"[OCRJobQueue] Created job {job_id}: engine={engine}, file={file_path.name}, batch_id={batch_id}")
return OCRJob(
id=job_id,
@@ -260,7 +284,9 @@ class OCRJobQueue:
created_at=now,
created_by=username,
original_filename=original_filename,
expires_at=expires_at
expires_at=expires_at,
batch_id=batch_id,
file_hash=file_hash
)
async def get_job(self, job_id: str) -> Optional[OCRJob]:
@@ -601,6 +627,8 @@ class OCRJobQueue:
created_by=row['created_by'],
original_filename=row['original_filename'],
expires_at=parse_datetime(row['expires_at']),
batch_id=row['batch_id'] if 'batch_id' in row.keys() else None,
file_hash=row['file_hash'] if 'file_hash' in row.keys() else None,
)

View File

@@ -28,6 +28,7 @@ from typing import Optional, Set
from .job_queue import job_queue, OCRJobStatus, OCRJob
from .ocr_worker_pool import ocr_worker_pool
from backend.modules.data_entry.schemas.ocr import ExtractionData
logger = logging.getLogger(__name__)
@@ -223,6 +224,21 @@ async def _process_job(job: OCRJob) -> None:
validation_errors_count=len(extraction.get('validation_errors', [])),
)
# Auto-save receipt for batch jobs
if job.batch_id:
auto_save_result = await _auto_save_batch_receipt(
job=job,
extraction=extraction,
file_path=str(file_path)
)
if not auto_save_result:
# Auto-save failed - mark job as failed
# Note: job_queue status already updated to 'completed' above
# We need to update it back to failed with the auto-save error
logger.warning(
f"[JobWorker] Job {job.id} OCR succeeded but auto-save failed"
)
else:
# Job failed
error_msg = result.get("error", "Unknown error")
@@ -543,3 +559,107 @@ def _count_extracted_fields(extraction: dict) -> int:
count += 1
return count
# ============================================================================
# Auto-Save Batch Receipt Helper
# ============================================================================
async def _auto_save_batch_receipt(
job: OCRJob,
extraction: dict,
file_path: str
) -> bool:
"""
Automatically create a receipt from OCR result for batch jobs.
Called when a batch job completes successfully. Creates the receipt,
attachment, and accounting entries using ReceiptAutoCreateService.
Args:
job: Completed OCRJob with batch_id set
extraction: OCR extraction result dict
file_path: Path to the original uploaded file
Returns:
True if receipt created successfully, False otherwise
"""
if not job.batch_id:
return True # Not a batch job, nothing to do
logger.info(f"[JobWorker] Auto-saving receipt for batch job {job.id} (batch_id={job.batch_id})")
try:
# Import here to avoid circular imports
from backend.modules.data_entry.db.database import get_db_session
from backend.modules.data_entry.db.models import BatchUpload
from backend.modules.data_entry.services.receipt_auto_create import ReceiptAutoCreateService
from sqlalchemy import select
# Convert extraction dict to ExtractionData schema
ocr_result = ExtractionData(**extraction)
async with await get_db_session() as session:
# Get batch info to retrieve company_id and user_id
batch_result = await session.execute(
select(BatchUpload).where(BatchUpload.id == job.batch_id)
)
batch = batch_result.scalar_one_or_none()
if not batch:
error_msg = f"Batch {job.batch_id} not found"
logger.error(f"[JobWorker] Auto-save failed for job {job.id}: {error_msg}")
await job_queue.update_status(
job_id=job.id,
status=OCRJobStatus.failed,
error=f"Auto-save error: {error_msg}"
)
return False
# Call ReceiptAutoCreateService
result = await ReceiptAutoCreateService.create_from_ocr_result(
session=session,
job_id=job.id,
ocr_result=ocr_result,
username=job.created_by or batch.user_id,
batch_id=job.batch_id,
company_id=batch.company_id,
file_path=file_path,
original_filename=job.original_filename,
file_hash=job.file_hash # Pass file_hash for duplicate detection (US-007)
)
if result.success:
logger.info(
f"[JobWorker] Auto-save successful for job {job.id}: "
f"receipt_id={result.receipt_id}"
)
return True
else:
error_msg = result.error_message or "Unknown error"
logger.warning(
f"[JobWorker] Auto-save validation failed for job {job.id}: {error_msg}"
)
# Update job status to failed with the auto-save error
await job_queue.update_status(
job_id=job.id,
status=OCRJobStatus.failed,
error=f"Auto-save error: {error_msg}"
)
return False
except Exception as e:
error_msg = str(e)
logger.error(f"[JobWorker] Auto-save exception for job {job.id}: {error_msg}")
# Update job status to failed
try:
await job_queue.update_status(
job_id=job.id,
status=OCRJobStatus.failed,
error=f"Auto-save error: {error_msg}"
)
except Exception as update_err:
logger.error(f"[JobWorker] Failed to update job status after auto-save error: {update_err}")
return False

View File

@@ -0,0 +1,385 @@
"""
Auto-create Receipt from OCR results for bulk upload flow.
This service handles automatic creation of Receipt records from OCR extraction
results, enabling end-to-end processing without manual UI intervention.
The service:
1. Maps OCR ExtractionData fields to Receipt fields
2. Creates attachment from the original uploaded file
3. Generates accounting entries
4. Links the receipt back to the batch job for tracking
"""
import logging
import shutil
import uuid
from dataclasses import dataclass
from datetime import date, datetime
from decimal import Decimal
from pathlib import Path
from typing import Optional, List
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from backend.modules.data_entry.db.models.receipt import (
Receipt,
ReceiptAttachment,
ReceiptStatus,
ReceiptType,
ReceiptDirection,
)
from backend.modules.data_entry.db.models.batch import BatchJob
from backend.modules.data_entry.db.crud.receipt import ReceiptCRUD
from backend.modules.data_entry.db.crud.accounting_entry import AccountingEntryCRUD
from backend.modules.data_entry.schemas.receipt import ReceiptCreate, TvaEntrySchema, PaymentMethodSchema
from backend.modules.data_entry.schemas.ocr import ExtractionData
from backend.modules.data_entry.services.receipt_service import ReceiptService
from backend.modules.data_entry.services import sse_service
from backend.config import settings
logger = logging.getLogger(__name__)
@dataclass
class ReceiptCreateResult:
"""Result of auto-create operation."""
success: bool
receipt_id: Optional[int] = None
error_message: Optional[str] = None
class ReceiptAutoCreateService:
"""
Service for automatically creating receipts from OCR results.
Used by the bulk upload flow to create receipts without user intervention.
Created receipts are in DRAFT status and require review before approval.
"""
@staticmethod
def _validate_ocr_result(ocr_result: ExtractionData) -> tuple[bool, str]:
"""
Perform minimal validation on OCR result.
Validates:
- amount > 0 (required for receipt)
- date is valid and not in future
Args:
ocr_result: Extracted data from OCR
Returns:
Tuple of (is_valid, error_message)
"""
# Validate amount exists and is positive
if ocr_result.amount is None:
return False, "Amount not extracted from receipt"
if ocr_result.amount <= 0:
return False, f"Invalid amount: {ocr_result.amount} (must be > 0)"
# Validate date exists and is not in the future
if ocr_result.receipt_date is None:
return False, "Receipt date not extracted"
today = date.today()
if ocr_result.receipt_date > today:
return False, f"Receipt date {ocr_result.receipt_date} is in the future"
return True, ""
@staticmethod
def _map_ocr_to_receipt(
ocr_result: ExtractionData,
company_id: int,
) -> ReceiptCreate:
"""
Map OCR ExtractionData fields to ReceiptCreate schema.
Args:
ocr_result: Extracted data from OCR
company_id: Company ID for the receipt
Returns:
ReceiptCreate schema ready for database insertion
"""
# Map receipt type
receipt_type = ReceiptType.BON_FISCAL
if ocr_result.receipt_type == "chitanta":
receipt_type = ReceiptType.CHITANTA
# Map TVA breakdown from OCR TvaEntry to schema TvaEntrySchema
tva_breakdown: Optional[List[TvaEntrySchema]] = None
if ocr_result.tva_entries:
tva_breakdown = [
TvaEntrySchema(
code=entry.code,
percent=entry.percent,
amount=entry.amount
)
for entry in ocr_result.tva_entries
]
# Map payment methods
payment_methods: Optional[List[PaymentMethodSchema]] = None
if ocr_result.payment_methods:
payment_methods = [
PaymentMethodSchema(
method=pm.method,
amount=pm.amount
)
for pm in ocr_result.payment_methods
]
# Create receipt data
return ReceiptCreate(
receipt_type=receipt_type,
direction=ReceiptDirection.CHELTUIALA, # Default to expense
receipt_number=ocr_result.receipt_number,
receipt_series=ocr_result.receipt_series,
receipt_date=ocr_result.receipt_date,
amount=ocr_result.amount,
description=ocr_result.description,
tva_breakdown=tva_breakdown,
tva_total=ocr_result.tva_total,
items_count=ocr_result.items_count,
vendor_address=ocr_result.address,
company_id=company_id,
partner_name=ocr_result.partner_name,
cui=ocr_result.cui,
ocr_raw_text=ocr_result.raw_text[:5000] if ocr_result.raw_text else None, # Limit size
payment_methods=payment_methods,
payment_mode=ocr_result.suggested_payment_mode,
)
@staticmethod
async def _create_attachment_from_file(
session: AsyncSession,
receipt_id: int,
source_file_path: str,
original_filename: Optional[str] = None,
) -> Optional[ReceiptAttachment]:
"""
Create attachment by copying file from OCR job location.
Args:
session: Database session
receipt_id: Receipt ID to attach to
source_file_path: Path to the original file from OCR job
original_filename: Original filename from upload (optional)
Returns:
Created ReceiptAttachment or None if failed
"""
source_path = Path(source_file_path)
if not source_path.exists():
logger.warning(f"[ReceiptAutoCreate] Source file not found: {source_path}")
return None
# Generate stored filename
ext = source_path.suffix.lower()
stored_filename = f"{uuid.uuid4()}{ext}"
# Determine relative path (organized by year/month)
now = datetime.utcnow()
relative_path = Path(str(now.year)) / f"{now.month:02d}"
# Full destination path
dest_dir = settings.data_entry_upload_path_resolved / relative_path
dest_dir.mkdir(parents=True, exist_ok=True)
dest_path = dest_dir / stored_filename
# Copy file to attachments directory
try:
shutil.copy2(source_path, dest_path)
except Exception as e:
logger.error(f"[ReceiptAutoCreate] Failed to copy file: {e}")
return None
# Get file size
file_size = dest_path.stat().st_size
# Determine MIME type
mime_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.pdf': 'application/pdf',
}
mime_type = mime_map.get(ext, 'application/octet-stream')
# Use original filename if provided, otherwise use source filename
display_filename = original_filename or source_path.name
# Create attachment record
attachment = ReceiptAttachment(
receipt_id=receipt_id,
filename=display_filename,
stored_filename=stored_filename,
file_path=str(relative_path / stored_filename),
file_size=file_size,
mime_type=mime_type,
)
session.add(attachment)
await session.flush()
return attachment
@staticmethod
async def _update_batch_job_receipt_id(
session: AsyncSession,
job_id: str,
receipt_id: int,
) -> None:
"""
Update batch_jobs table with the created receipt_id.
Args:
session: Database session
job_id: OCR job UUID
receipt_id: Created receipt ID
"""
await session.execute(
update(BatchJob)
.where(BatchJob.job_id == job_id)
.values(receipt_id=receipt_id)
)
@staticmethod
async def create_from_ocr_result(
session: AsyncSession,
job_id: str,
ocr_result: ExtractionData,
username: str,
batch_id: int,
company_id: int,
file_path: Optional[str] = None,
original_filename: Optional[str] = None,
file_hash: Optional[str] = None,
) -> ReceiptCreateResult:
"""
Create a receipt from OCR extraction result.
This method:
1. Validates the OCR result (amount > 0, date valid)
2. Maps OCR fields to Receipt fields
3. Creates the Receipt in DRAFT status
4. Creates attachment from original file
5. Generates accounting entries
6. Updates batch_jobs with receipt_id
Args:
session: Database session
job_id: OCR job UUID for tracking
ocr_result: Extracted data from OCR processing
username: User who initiated the upload
batch_id: Batch ID for grouping
company_id: Company ID for the receipt
file_path: Path to the original uploaded file
original_filename: Original filename from upload
file_hash: SHA-256 hash of the file for duplicate detection (US-007)
Returns:
ReceiptCreateResult with success status and receipt_id or error
"""
try:
# Step 1: Validate OCR result
is_valid, error_msg = ReceiptAutoCreateService._validate_ocr_result(ocr_result)
if not is_valid:
logger.warning(f"[ReceiptAutoCreate] Validation failed for job {job_id}: {error_msg}")
return ReceiptCreateResult(
success=False,
error_message=error_msg
)
# Step 2: Map OCR to Receipt schema
receipt_data = ReceiptAutoCreateService._map_ocr_to_receipt(
ocr_result=ocr_result,
company_id=company_id,
)
# Step 3: Create receipt in DRAFT status
receipt = await ReceiptCRUD.create(session, receipt_data, created_by=username)
# Set batch tracking fields (US-007, US-011)
receipt.batch_id = str(batch_id)
receipt.file_hash = file_hash
receipt.processing_status = "completed"
session.add(receipt)
await session.flush()
logger.info(
f"[ReceiptAutoCreate] Created receipt {receipt.id} for job {job_id}: "
f"amount={receipt.amount}, vendor={receipt.partner_name}, file_hash={file_hash[:16] if file_hash else None}..."
)
# Step 4: Create attachment from original file (if path provided)
if file_path:
attachment = await ReceiptAutoCreateService._create_attachment_from_file(
session=session,
receipt_id=receipt.id,
source_file_path=file_path,
original_filename=original_filename,
)
if attachment:
logger.info(f"[ReceiptAutoCreate] Created attachment for receipt {receipt.id}")
else:
logger.warning(f"[ReceiptAutoCreate] Failed to create attachment for receipt {receipt.id}")
# Step 5: Generate accounting entries
# Note: For DRAFT status, entries are generated but not required for validation
try:
entries = ReceiptService.generate_accounting_entries(receipt)
if entries:
await AccountingEntryCRUD.create_bulk(
session, receipt.id, entries, is_auto_generated=True
)
logger.info(
f"[ReceiptAutoCreate] Generated {len(entries)} accounting entries "
f"for receipt {receipt.id}"
)
except Exception as e:
# Don't fail the receipt creation if entry generation fails
logger.warning(
f"[ReceiptAutoCreate] Failed to generate entries for receipt {receipt.id}: {e}"
)
# Step 6: Update batch_jobs with receipt_id
await ReceiptAutoCreateService._update_batch_job_receipt_id(
session=session,
job_id=job_id,
receipt_id=receipt.id,
)
# Commit all changes
await session.commit()
# Broadcast SSE event for real-time updates (US-030)
try:
await sse_service.broadcast_status_change(
receipt_id=receipt.id,
status=receipt.status.value,
processing_status=receipt.processing_status,
batch_id=receipt.batch_id,
)
except Exception as e:
# Don't fail the receipt creation if SSE broadcast fails
logger.warning(f"[ReceiptAutoCreate] SSE broadcast failed for receipt {receipt.id}: {e}")
return ReceiptCreateResult(
success=True,
receipt_id=receipt.id
)
except Exception as e:
logger.error(f"[ReceiptAutoCreate] Failed to create receipt for job {job_id}: {e}")
await session.rollback()
return ReceiptCreateResult(
success=False,
error_message=str(e)
)

View File

@@ -15,6 +15,7 @@ from backend.modules.data_entry.schemas.receipt import (
ReceiptFilter,
ReceiptResponse,
ReceiptListResponse,
ProcessingStats,
AccountingEntryCreate,
)
from backend.modules.data_entry.services.expense_types import EXPENSE_TYPES, get_expense_type
@@ -53,17 +54,26 @@ class ReceiptService:
session: AsyncSession,
filters: ReceiptFilter,
) -> ReceiptListResponse:
"""Get paginated list of receipts."""
"""Get paginated list of receipts with processing_stats (US-012)."""
receipts, total = await ReceiptCRUD.get_list(session, filters)
pages = (total + filters.page_size - 1) // filters.page_size if total > 0 else 1
# Get processing stats for bulk uploaded receipts (US-012)
stats_dict = await ReceiptCRUD.get_processing_stats(
session,
company_id=filters.company_id,
batch_id=filters.batch_id,
)
processing_stats = ProcessingStats(**stats_dict)
return ReceiptListResponse(
items=[ReceiptResponse.model_validate(r) for r in receipts],
total=total,
page=filters.page,
page_size=filters.page_size,
pages=pages,
processing_stats=processing_stats,
)
@staticmethod

View File

@@ -0,0 +1,197 @@
"""
Server-Sent Events (SSE) service for real-time status updates.
This module implements an event broadcaster pattern using asyncio.Queue per client.
When receipt status changes occur (CRUD operations), events are pushed to all
connected clients who are listening for that specific batch or all receipts.
Usage:
# In router endpoint (SSE stream):
async for event in sse_service.subscribe(batch_id=None):
yield event
# When status changes (from CRUD operations):
await sse_service.broadcast_status_change(receipt_id, status, processing_status, batch_id)
"""
import asyncio
import json
import logging
from dataclasses import dataclass, asdict
from typing import AsyncGenerator, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class StatusChangeEvent:
"""Event data for receipt status changes."""
receipt_id: int
status: str
processing_status: Optional[str] = None
batch_id: Optional[str] = None
timestamp: Optional[str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow().isoformat()
def to_sse_data(self) -> str:
"""Format as SSE data line."""
data = asdict(self)
return f"data: {json.dumps(data)}\n\n"
class SSEEventBroadcaster:
"""
Manages SSE client connections and broadcasts events.
Each client gets its own asyncio.Queue. When an event occurs,
it's pushed to all relevant queues based on batch_id filtering.
"""
def __init__(self):
# Dict of {client_id: (queue, batch_id_filter)}
# batch_id_filter is None for clients that want all events
self._clients: dict[str, tuple[asyncio.Queue, Optional[str]]] = {}
self._client_counter = 0
self._lock = asyncio.Lock()
async def _generate_client_id(self) -> str:
"""Generate unique client ID."""
async with self._lock:
self._client_counter += 1
return f"client_{self._client_counter}_{datetime.utcnow().timestamp()}"
async def subscribe(
self,
batch_id: Optional[str] = None,
) -> AsyncGenerator[str, None]:
"""
Subscribe to SSE events.
Args:
batch_id: Optional filter - only receive events for this batch.
If None, receives all events.
Yields:
SSE-formatted event strings (ready to send to client).
"""
client_id = await self._generate_client_id()
queue: asyncio.Queue = asyncio.Queue()
# Register client
async with self._lock:
self._clients[client_id] = (queue, batch_id)
logger.info(
f"SSE client {client_id} connected (batch_id filter: {batch_id}). "
f"Total clients: {len(self._clients)}"
)
try:
# Send initial retry hint for reconnection
yield "retry: 3000\n\n"
# Keep connection alive and yield events
while True:
try:
# Wait for events with timeout for keep-alive
event = await asyncio.wait_for(queue.get(), timeout=30.0)
yield event
except asyncio.TimeoutError:
# Send keep-alive comment to prevent connection timeout
yield ": keep-alive\n\n"
except asyncio.CancelledError:
logger.info(f"SSE client {client_id} subscription cancelled")
raise
finally:
# Cleanup: remove client from registry
async with self._lock:
self._clients.pop(client_id, None)
logger.info(
f"SSE client {client_id} disconnected. "
f"Remaining clients: {len(self._clients)}"
)
async def broadcast_status_change(
self,
receipt_id: int,
status: str,
processing_status: Optional[str] = None,
batch_id: Optional[str] = None,
) -> int:
"""
Broadcast a status change event to all relevant clients.
Args:
receipt_id: The receipt ID that changed.
status: New workflow status (DRAFT, PENDING_REVIEW, etc.).
processing_status: New processing status (pending, processing, completed, failed).
batch_id: The batch ID this receipt belongs to (for filtering).
Returns:
Number of clients notified.
"""
event = StatusChangeEvent(
receipt_id=receipt_id,
status=status,
processing_status=processing_status,
batch_id=batch_id,
)
sse_data = event.to_sse_data()
notified = 0
async with self._lock:
for client_id, (queue, client_batch_filter) in self._clients.items():
# Send event if:
# 1. Client has no filter (wants all events), OR
# 2. Client's filter matches the event's batch_id
if client_batch_filter is None or client_batch_filter == batch_id:
try:
queue.put_nowait(sse_data)
notified += 1
except asyncio.QueueFull:
logger.warning(
f"SSE queue full for client {client_id}, dropping event"
)
if notified > 0:
logger.debug(
f"SSE broadcast: receipt_id={receipt_id}, status={status}, "
f"processing_status={processing_status}, notified={notified} clients"
)
return notified
@property
def client_count(self) -> int:
"""Get current number of connected clients."""
return len(self._clients)
# Singleton instance for the application
sse_broadcaster = SSEEventBroadcaster()
# Convenience functions for external use
async def subscribe(batch_id: Optional[str] = None) -> AsyncGenerator[str, None]:
"""Subscribe to SSE status change events."""
async for event in sse_broadcaster.subscribe(batch_id):
yield event
async def broadcast_status_change(
receipt_id: int,
status: str,
processing_status: Optional[str] = None,
batch_id: Optional[str] = None,
) -> int:
"""Broadcast a status change event."""
return await sse_broadcaster.broadcast_status_change(
receipt_id=receipt_id,
status=status,
processing_status=processing_status,
batch_id=batch_id,
)