feat: Add data-entry-app for fiscal receipts with approval workflow

New application for entering fiscal receipts (bonuri fiscale) with:

Backend (FastAPI + SQLModel + Alembic):
- Receipt, ReceiptAttachment, AccountingEntry models
- CRUD operations with async SQLite database
- Workflow: DRAFT → PENDING_REVIEW → APPROVED/REJECTED
- Auto-generation of accounting entries with VAT calculation
- File upload support (images, PDFs)
- Predefined expense types (Fuel, Materials, Office, etc.)
- Nomenclature service for partners, accounts, cash registers

Frontend (Vue.js 3 + PrimeVue + Pinia):
- ReceiptsListView with filters and stats
- ReceiptCreateView with image upload
- ReceiptDetailView with accounting entries
- ReceiptApprovalView for accountant approval

Documentation:
- REQUIREMENTS.md with functional specifications
- ARCHITECTURE.md with technical decisions
- CLAUDE.md for AI assistant guidance

Phase 1 MVP uses SQLite, prepared for Oracle integration in Phase 2.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-11 17:30:51 +02:00
parent 5823cedb94
commit 21c12ddb0f
45 changed files with 7524 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Data Entry App - Backend

View File

@@ -0,0 +1,96 @@
"""Application configuration using pydantic-settings."""
import os
from pathlib import Path
from typing import List
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# App info
app_name: str = "Data Entry API"
app_version: str = "1.0.0"
debug: bool = False
# API
api_host: str = "0.0.0.0"
api_port: int = 8003
# SQLite Database
sqlite_database_path: str = "data/receipts.db"
# File uploads
upload_path: str = "data/uploads"
max_upload_size_mb: int = 10
allowed_mime_types: List[str] = [
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
"application/pdf",
]
# Oracle Database (for nomenclatures)
oracle_user: str = ""
oracle_password: str = ""
oracle_host: str = "localhost"
oracle_port: int = 1526
oracle_sid: str = "ROA"
# JWT Authentication
jwt_secret_key: str = "change-me-in-production"
jwt_algorithm: str = "HS256"
jwt_expire_minutes: int = 480
# CORS
cors_origins: str = "http://localhost:3010,http://localhost:3000"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "ignore"
@property
def database_url(self) -> str:
"""Get SQLite database URL for async."""
return f"sqlite+aiosqlite:///{self.sqlite_database_path}"
@property
def sync_database_url(self) -> str:
"""Get SQLite database URL for sync operations (Alembic)."""
return f"sqlite:///{self.sqlite_database_path}"
@property
def upload_path_resolved(self) -> Path:
"""Get resolved upload path."""
path = Path(self.upload_path)
path.mkdir(parents=True, exist_ok=True)
return path
@property
def max_upload_size_bytes(self) -> int:
"""Get max upload size in bytes."""
return self.max_upload_size_mb * 1024 * 1024
@property
def cors_origins_list(self) -> List[str]:
"""Get CORS origins as list."""
return [origin.strip() for origin in self.cors_origins.split(",")]
@property
def oracle_dsn(self) -> str:
"""Get Oracle DSN string."""
return f"{self.oracle_host}:{self.oracle_port}/{self.oracle_sid}"
@lru_cache()
def get_settings() -> Settings:
"""Get cached settings instance."""
return Settings()
# Convenience instance
settings = get_settings()

View File

@@ -0,0 +1,4 @@
# Database module
from .database import get_session, init_db, engine
__all__ = ["get_session", "init_db", "engine"]

View File

@@ -0,0 +1,10 @@
# CRUD operations
from .receipt import ReceiptCRUD
from .attachment import AttachmentCRUD
from .accounting_entry import AccountingEntryCRUD
__all__ = [
"ReceiptCRUD",
"AttachmentCRUD",
"AccountingEntryCRUD",
]

View File

@@ -0,0 +1,197 @@
"""CRUD operations for accounting entries."""
from datetime import datetime
from typing import Optional, List
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.accounting_entry import AccountingEntry, EntryType
from app.schemas.receipt import AccountingEntryCreate, AccountingEntryUpdate
class AccountingEntryCRUD:
"""CRUD operations for AccountingEntry model."""
@staticmethod
async def create(
session: AsyncSession,
receipt_id: int,
data: AccountingEntryCreate,
sort_order: int = 0,
is_auto_generated: bool = True,
) -> AccountingEntry:
"""Create a new accounting entry."""
entry = AccountingEntry(
receipt_id=receipt_id,
entry_type=data.entry_type,
account_code=data.account_code,
account_name=data.account_name,
amount=data.amount,
partner_id=data.partner_id,
cost_center_id=data.cost_center_id,
is_auto_generated=is_auto_generated,
sort_order=sort_order,
)
session.add(entry)
await session.commit()
await session.refresh(entry)
return entry
@staticmethod
async def create_bulk(
session: AsyncSession,
receipt_id: int,
entries: List[AccountingEntryCreate],
is_auto_generated: bool = True,
) -> List[AccountingEntry]:
"""Create multiple accounting entries at once."""
created_entries = []
for idx, entry_data in enumerate(entries):
entry = AccountingEntry(
receipt_id=receipt_id,
entry_type=entry_data.entry_type,
account_code=entry_data.account_code,
account_name=entry_data.account_name,
amount=entry_data.amount,
partner_id=entry_data.partner_id,
cost_center_id=entry_data.cost_center_id,
is_auto_generated=is_auto_generated,
sort_order=idx,
)
session.add(entry)
created_entries.append(entry)
await session.commit()
for entry in created_entries:
await session.refresh(entry)
return created_entries
@staticmethod
async def get_by_id(
session: AsyncSession,
entry_id: int,
) -> Optional[AccountingEntry]:
"""Get accounting entry by ID."""
query = select(AccountingEntry).where(AccountingEntry.id == entry_id)
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_by_receipt_id(
session: AsyncSession,
receipt_id: int,
) -> List[AccountingEntry]:
"""Get all accounting entries for a receipt."""
query = select(AccountingEntry).where(
AccountingEntry.receipt_id == receipt_id
).order_by(AccountingEntry.sort_order.asc())
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def update(
session: AsyncSession,
entry: AccountingEntry,
data: AccountingEntryUpdate,
modified_by: str,
) -> AccountingEntry:
"""Update an accounting entry."""
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(entry, field, value)
entry.is_auto_generated = False
entry.modified_by = modified_by
entry.modified_at = datetime.utcnow()
session.add(entry)
await session.commit()
await session.refresh(entry)
return entry
@staticmethod
async def delete(session: AsyncSession, entry: AccountingEntry) -> bool:
"""Delete an accounting entry."""
await session.delete(entry)
await session.commit()
return True
@staticmethod
async def delete_all_for_receipt(session: AsyncSession, receipt_id: int) -> int:
"""Delete all accounting entries for a receipt."""
query = delete(AccountingEntry).where(AccountingEntry.receipt_id == receipt_id)
result = await session.execute(query)
await session.commit()
return result.rowcount
@staticmethod
async def replace_all_for_receipt(
session: AsyncSession,
receipt_id: int,
entries: List[AccountingEntryCreate],
modified_by: str,
) -> List[AccountingEntry]:
"""Replace all entries for a receipt with new ones."""
# Delete existing entries
await AccountingEntryCRUD.delete_all_for_receipt(session, receipt_id)
# Create new entries (marked as manually modified)
created_entries = []
for idx, entry_data in enumerate(entries):
entry = AccountingEntry(
receipt_id=receipt_id,
entry_type=entry_data.entry_type,
account_code=entry_data.account_code,
account_name=entry_data.account_name,
amount=entry_data.amount,
partner_id=entry_data.partner_id,
cost_center_id=entry_data.cost_center_id,
is_auto_generated=False,
modified_by=modified_by,
modified_at=datetime.utcnow(),
sort_order=idx,
)
session.add(entry)
created_entries.append(entry)
await session.commit()
for entry in created_entries:
await session.refresh(entry)
return created_entries
@staticmethod
async def validate_entries(entries: List[AccountingEntryCreate]) -> tuple[bool, str]:
"""
Validate accounting entries.
Returns (is_valid, error_message).
"""
if not entries:
return False, "At least one entry is required"
total_debit = sum(
e.amount for e in entries if e.entry_type == EntryType.DEBIT
)
total_credit = sum(
e.amount for e in entries if e.entry_type == EntryType.CREDIT
)
# Check balance (debit should equal credit)
if abs(total_debit - total_credit) > 0.01:
return False, f"Entries not balanced: Debit={total_debit}, Credit={total_credit}"
# Check for valid account codes
for entry in entries:
if not entry.account_code or len(entry.account_code) < 3:
return False, f"Invalid account code: {entry.account_code}"
return True, ""

View File

@@ -0,0 +1,140 @@
"""CRUD operations for receipt attachments."""
import os
import uuid
import aiofiles
from datetime import datetime
from pathlib import Path
from typing import Optional, List
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import UploadFile
from app.db.models.receipt import ReceiptAttachment
from app.config import settings
class AttachmentCRUD:
"""CRUD operations for ReceiptAttachment model."""
@staticmethod
def _generate_stored_filename(original_filename: str) -> str:
"""Generate unique filename for storage."""
ext = Path(original_filename).suffix.lower()
return f"{uuid.uuid4()}{ext}"
@staticmethod
def _get_upload_path(stored_filename: str) -> Path:
"""Get full path for storing file, organized by year/month."""
now = datetime.utcnow()
relative_path = Path(str(now.year)) / f"{now.month:02d}"
full_path = settings.upload_path_resolved / relative_path
# Ensure directory exists
full_path.mkdir(parents=True, exist_ok=True)
return relative_path / stored_filename
@staticmethod
async def create(
session: AsyncSession,
receipt_id: int,
file: UploadFile,
) -> ReceiptAttachment:
"""Create attachment by saving file and creating DB record."""
# Generate stored filename
stored_filename = AttachmentCRUD._generate_stored_filename(file.filename or "upload")
# Get relative path
relative_path = AttachmentCRUD._get_upload_path(stored_filename)
# Full path for saving
full_path = settings.upload_path_resolved / relative_path
# Read file content
content = await file.read()
file_size = len(content)
# Validate file size
if file_size > settings.max_upload_size_bytes:
raise ValueError(f"File too large. Maximum size is {settings.max_upload_size_mb}MB")
# Validate MIME type
mime_type = file.content_type or "application/octet-stream"
if mime_type not in settings.allowed_mime_types:
raise ValueError(f"File type not allowed: {mime_type}")
# Save file
async with aiofiles.open(full_path, "wb") as f:
await f.write(content)
# Create DB record
attachment = ReceiptAttachment(
receipt_id=receipt_id,
filename=file.filename or "upload",
stored_filename=stored_filename,
file_path=str(relative_path),
file_size=file_size,
mime_type=mime_type,
)
session.add(attachment)
await session.commit()
await session.refresh(attachment)
return attachment
@staticmethod
async def get_by_id(
session: AsyncSession,
attachment_id: int,
) -> Optional[ReceiptAttachment]:
"""Get attachment by ID."""
query = select(ReceiptAttachment).where(ReceiptAttachment.id == attachment_id)
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_by_receipt_id(
session: AsyncSession,
receipt_id: int,
) -> List[ReceiptAttachment]:
"""Get all attachments for a receipt."""
query = select(ReceiptAttachment).where(
ReceiptAttachment.receipt_id == receipt_id
).order_by(ReceiptAttachment.uploaded_at.asc())
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
def get_file_path(attachment: ReceiptAttachment) -> Path:
"""Get full file path for an attachment."""
return settings.upload_path_resolved / attachment.file_path
@staticmethod
async def delete(session: AsyncSession, attachment: ReceiptAttachment) -> bool:
"""Delete attachment (file and DB record)."""
# Delete file
file_path = AttachmentCRUD.get_file_path(attachment)
if file_path.exists():
os.remove(file_path)
# Delete DB record
await session.delete(attachment)
await session.commit()
return True
@staticmethod
async def delete_all_for_receipt(session: AsyncSession, receipt_id: int) -> int:
"""Delete all attachments for a receipt."""
attachments = await AttachmentCRUD.get_by_receipt_id(session, receipt_id)
count = 0
for attachment in attachments:
await AttachmentCRUD.delete(session, attachment)
count += 1
return count

View File

@@ -0,0 +1,253 @@
"""CRUD operations for receipts."""
from datetime import datetime, date
from typing import Optional, List, Tuple
from sqlalchemy import select, func, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.db.models.receipt import Receipt, ReceiptStatus
from app.schemas.receipt import ReceiptCreate, ReceiptUpdate, ReceiptFilter
class ReceiptCRUD:
"""CRUD operations for Receipt model."""
@staticmethod
async def create(
session: AsyncSession,
data: ReceiptCreate,
created_by: str,
) -> Receipt:
"""Create a new receipt."""
receipt = Receipt(
**data.model_dump(),
created_by=created_by,
status=ReceiptStatus.DRAFT,
)
session.add(receipt)
await session.commit()
await session.refresh(receipt)
return receipt
@staticmethod
async def get_by_id(
session: AsyncSession,
receipt_id: int,
include_relations: bool = True,
) -> Optional[Receipt]:
"""Get receipt by ID, optionally with relationships."""
query = select(Receipt).where(Receipt.id == receipt_id)
if include_relations:
query = query.options(
selectinload(Receipt.attachments),
selectinload(Receipt.entries),
)
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_list(
session: AsyncSession,
filters: ReceiptFilter,
) -> Tuple[List[Receipt], int]:
"""Get paginated list of receipts with filters."""
# Base query
query = select(Receipt).options(
selectinload(Receipt.attachments),
selectinload(Receipt.entries),
)
# Apply filters
if filters.status:
query = query.where(Receipt.status == filters.status)
if filters.company_id:
query = query.where(Receipt.company_id == filters.company_id)
if filters.created_by:
query = query.where(Receipt.created_by == filters.created_by)
if filters.date_from:
query = query.where(Receipt.receipt_date >= filters.date_from)
if filters.date_to:
query = query.where(Receipt.receipt_date <= filters.date_to)
if filters.search:
search_term = f"%{filters.search}%"
query = query.where(
or_(
Receipt.description.ilike(search_term),
Receipt.partner_name.ilike(search_term),
Receipt.receipt_number.ilike(search_term),
)
)
# Count total
count_query = select(func.count()).select_from(query.subquery())
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# Apply pagination and ordering
query = query.order_by(Receipt.created_at.desc())
offset = (filters.page - 1) * filters.page_size
query = query.offset(offset).limit(filters.page_size)
# Execute
result = await session.execute(query)
receipts = result.scalars().all()
return list(receipts), total
@staticmethod
async def get_pending_review(
session: AsyncSession,
company_id: Optional[int] = None,
) -> List[Receipt]:
"""Get all receipts pending review."""
query = select(Receipt).where(
Receipt.status == ReceiptStatus.PENDING_REVIEW
).options(
selectinload(Receipt.attachments),
selectinload(Receipt.entries),
)
if company_id:
query = query.where(Receipt.company_id == company_id)
query = query.order_by(Receipt.submitted_at.asc())
result = await session.execute(query)
return list(result.scalars().all())
@staticmethod
async def update(
session: AsyncSession,
receipt: Receipt,
data: ReceiptUpdate,
) -> Receipt:
"""Update receipt fields."""
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(receipt, field, value)
receipt.updated_at = datetime.utcnow()
session.add(receipt)
await session.commit()
await session.refresh(receipt)
return receipt
@staticmethod
async def update_status(
session: AsyncSession,
receipt: Receipt,
new_status: ReceiptStatus,
reviewed_by: Optional[str] = None,
rejection_reason: Optional[str] = None,
) -> Receipt:
"""Update receipt workflow status."""
receipt.status = new_status
receipt.updated_at = datetime.utcnow()
if new_status == ReceiptStatus.PENDING_REVIEW:
receipt.submitted_at = datetime.utcnow()
if new_status in [ReceiptStatus.APPROVED, ReceiptStatus.REJECTED]:
receipt.reviewed_by = reviewed_by
receipt.reviewed_at = datetime.utcnow()
if new_status == ReceiptStatus.REJECTED:
receipt.rejection_reason = rejection_reason
if new_status == ReceiptStatus.DRAFT:
# Reset review fields when moving back to draft
receipt.rejection_reason = None
session.add(receipt)
await session.commit()
await session.refresh(receipt)
return receipt
@staticmethod
async def delete(session: AsyncSession, receipt: Receipt) -> bool:
"""Delete a receipt (cascade deletes attachments and entries)."""
await session.delete(receipt)
await session.commit()
return True
@staticmethod
async def can_edit(receipt: Receipt, username: str) -> bool:
"""Check if user can edit receipt."""
# Only DRAFT receipts can be edited
if receipt.status != ReceiptStatus.DRAFT:
return False
# Only creator can edit their own drafts
return receipt.created_by == username
@staticmethod
async def can_delete(receipt: Receipt, username: str) -> bool:
"""Check if user can delete receipt."""
# Only DRAFT receipts can be deleted
if receipt.status != ReceiptStatus.DRAFT:
return False
# Only creator can delete their own drafts
return receipt.created_by == username
@staticmethod
async def can_submit(receipt: Receipt, username: str) -> bool:
"""Check if user can submit receipt for review."""
# Only DRAFT or REJECTED receipts can be submitted
if receipt.status not in [ReceiptStatus.DRAFT, ReceiptStatus.REJECTED]:
return False
# Only creator can submit their own receipts
return receipt.created_by == username
@staticmethod
async def get_stats(
session: AsyncSession,
company_id: int,
created_by: Optional[str] = None,
) -> dict:
"""Get receipt statistics."""
base_query = select(
Receipt.status,
func.count(Receipt.id).label("count"),
func.sum(Receipt.amount).label("total_amount"),
).where(
Receipt.company_id == company_id
)
if created_by:
base_query = base_query.where(Receipt.created_by == created_by)
query = base_query.group_by(Receipt.status)
result = await session.execute(query)
rows = result.all()
stats = {
"draft": {"count": 0, "amount": 0},
"pending_review": {"count": 0, "amount": 0},
"approved": {"count": 0, "amount": 0},
"rejected": {"count": 0, "amount": 0},
"synced": {"count": 0, "amount": 0},
"total": {"count": 0, "amount": 0},
}
for row in rows:
status_key = row.status.value
stats[status_key] = {
"count": row.count,
"amount": float(row.total_amount or 0),
}
stats["total"]["count"] += row.count
stats["total"]["amount"] += float(row.total_amount or 0)
return stats

View File

@@ -0,0 +1,49 @@
"""Database configuration and session management using SQLModel."""
from pathlib import Path
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
from app.config import settings
# Create async engine
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
future=True,
)
# Create async session factory
async_session_maker = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def init_db() -> None:
"""Initialize database - create tables if they don't exist."""
# Ensure data directory exists
db_path = Path(settings.sqlite_database_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Get async database session for dependency injection."""
async with async_session_maker() as session:
try:
yield session
finally:
await session.close()
# Convenience function for manual session usage
async def get_db_session() -> AsyncSession:
"""Get a new database session (manual management)."""
return async_session_maker()

View File

@@ -0,0 +1,13 @@
# Database models
from .receipt import Receipt, ReceiptAttachment, ReceiptStatus, ReceiptType, ReceiptDirection
from .accounting_entry import AccountingEntry, EntryType
__all__ = [
"Receipt",
"ReceiptAttachment",
"ReceiptStatus",
"ReceiptType",
"ReceiptDirection",
"AccountingEntry",
"EntryType",
]

View File

@@ -0,0 +1,49 @@
"""AccountingEntry SQLModel model for proposed accounting entries."""
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Optional, TYPE_CHECKING
from sqlmodel import SQLModel, Field, Relationship
if TYPE_CHECKING:
from .receipt import Receipt
class EntryType(str, Enum):
"""Type of accounting entry."""
DEBIT = "debit"
CREDIT = "credit"
class AccountingEntry(SQLModel, table=True):
"""Proposed accounting entry for a receipt."""
__tablename__ = "accounting_entries"
id: Optional[int] = Field(default=None, primary_key=True)
receipt_id: int = Field(foreign_key="receipts.id", index=True)
# Account
entry_type: EntryType
account_code: str = Field(max_length=20) # e.g., 6022, 5311, 4426
account_name: Optional[str] = Field(default=None, max_length=200) # Cache: "Cheltuieli combustibil"
# Amount
amount: Decimal = Field(decimal_places=2, max_digits=15)
# Analytics (optional)
partner_id: Optional[int] = Field(default=None)
cost_center_id: Optional[int] = Field(default=None)
# Entry metadata
is_auto_generated: bool = Field(default=True) # True if system-generated
modified_by: Optional[str] = Field(default=None, max_length=100) # Username if modified
modified_at: Optional[datetime] = Field(default=None)
# Order for display
sort_order: int = Field(default=0)
# Relationship
receipt: Optional["Receipt"] = Relationship(back_populates="entries")

View File

@@ -0,0 +1,110 @@
"""Receipt and ReceiptAttachment SQLModel models."""
from datetime import datetime, date
from decimal import Decimal
from enum import Enum
from typing import Optional, List, TYPE_CHECKING
from sqlmodel import SQLModel, Field, Relationship
class ReceiptType(str, Enum):
"""Type of receipt document."""
BON_FISCAL = "bon_fiscal"
CHITANTA = "chitanta"
class ReceiptDirection(str, Enum):
"""Direction of receipt - expense or income."""
CHELTUIALA = "cheltuiala" # Expense (receipt from supplier)
INCASARE = "incasare" # Income (receipt issued to client)
class ReceiptStatus(str, Enum):
"""Workflow status of receipt."""
DRAFT = "draft" # User is filling in data
PENDING_REVIEW = "pending_review" # Awaiting accountant approval
APPROVED = "approved" # Approved by accountant
REJECTED = "rejected" # Rejected by accountant
SYNCED = "synced" # Synced to Oracle (Phase 2)
if TYPE_CHECKING:
from .accounting_entry import AccountingEntry
class Receipt(SQLModel, table=True):
"""Receipt (Bon Fiscal / Chitanta) with approval workflow."""
__tablename__ = "receipts"
id: Optional[int] = Field(default=None, primary_key=True)
# Document identification
receipt_type: ReceiptType = Field(default=ReceiptType.BON_FISCAL)
direction: ReceiptDirection = Field(default=ReceiptDirection.CHELTUIALA)
receipt_number: Optional[str] = Field(default=None, max_length=50)
receipt_series: Optional[str] = Field(default=None, max_length=20)
# Main data
receipt_date: date
amount: Decimal = Field(decimal_places=2, max_digits=15)
description: Optional[str] = Field(default=None, max_length=500)
# Expense type (for auto-generating accounting entries)
expense_type_code: Optional[str] = Field(default=None, max_length=20)
# Oracle references (nomenclatures)
company_id: int
partner_id: Optional[int] = Field(default=None)
partner_name: Optional[str] = Field(default=None, max_length=200) # Cache for display
cash_register_id: Optional[int] = Field(default=None) # Cash/Bank ID from Oracle
cash_register_name: Optional[str] = Field(default=None, max_length=100) # Cache for display
cash_register_account: Optional[str] = Field(default=None, max_length=20) # Account code (5311, 5121)
# Workflow
status: ReceiptStatus = Field(default=ReceiptStatus.DRAFT)
created_by: str = Field(max_length=100) # Username of creator
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
submitted_at: Optional[datetime] = Field(default=None) # When submitted for approval
# Approval
reviewed_by: Optional[str] = Field(default=None, max_length=100) # Accountant username
reviewed_at: Optional[datetime] = Field(default=None)
rejection_reason: Optional[str] = Field(default=None, max_length=500) # Reason for rejection
# Phase 2 - Oracle sync
oracle_synced_at: Optional[datetime] = Field(default=None)
oracle_act_id: Optional[int] = Field(default=None)
oracle_error: Optional[str] = Field(default=None, max_length=500)
# Relationships
attachments: List["ReceiptAttachment"] = Relationship(
back_populates="receipt",
sa_relationship_kwargs={"cascade": "all, delete-orphan"}
)
entries: List["AccountingEntry"] = Relationship(
back_populates="receipt",
sa_relationship_kwargs={"cascade": "all, delete-orphan"}
)
class ReceiptAttachment(SQLModel, table=True):
"""Attachment (photo or PDF) for a receipt."""
__tablename__ = "receipt_attachments"
id: Optional[int] = Field(default=None, primary_key=True)
receipt_id: int = Field(foreign_key="receipts.id", index=True)
# File info
filename: str = Field(max_length=255) # Original filename
stored_filename: str = Field(max_length=255) # Filename on disk (UUID)
file_path: str = Field(max_length=500) # Relative path
file_size: int # Size in bytes
mime_type: str = Field(max_length=100) # MIME type (image/jpeg, application/pdf)
uploaded_at: datetime = Field(default_factory=datetime.utcnow)
# Relationship
receipt: Optional[Receipt] = Relationship(back_populates="attachments")

View File

@@ -0,0 +1,88 @@
"""FastAPI application entry point for Data Entry App."""
import sys
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
# Add shared modules to path
project_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(project_root / "shared"))
from app.config import settings
from app.db.database import init_db
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan - startup and shutdown events."""
# Startup
print(f"Starting {settings.app_name} v{settings.app_version}")
# Initialize database
await init_db()
print("Database initialized")
# Ensure upload directory exists
settings.upload_path_resolved
print(f"Upload path: {settings.upload_path_resolved}")
yield
# Shutdown
print("Shutting down...")
# Create FastAPI app
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description="API pentru introducere bonuri fiscale cu workflow de aprobare",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files for uploads (optional - can serve through nginx in prod)
uploads_path = Path(settings.upload_path)
if uploads_path.exists():
app.mount("/uploads", StaticFiles(directory=str(uploads_path)), name="uploads")
# Health check endpoint
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"app": settings.app_name,
"version": settings.app_version,
}
# Import and include routers
from app.routers import receipts
app.include_router(receipts.router, prefix="/api/receipts", tags=["receipts"])
# Root endpoint
@app.get("/")
async def root():
"""Root endpoint - API information."""
return {
"name": settings.app_name,
"version": settings.app_version,
"docs": "/docs",
"health": "/health",
}

View File

@@ -0,0 +1,4 @@
# API routers
from . import receipts
__all__ = ["receipts"]

View File

@@ -0,0 +1,450 @@
"""API endpoints for receipts."""
from typing import List, Optional
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
from fastapi.responses import FileResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.database import get_session
from app.db.crud.receipt import ReceiptCRUD
from app.db.crud.attachment import AttachmentCRUD
from app.db.crud.accounting_entry import AccountingEntryCRUD
from app.services.receipt_service import ReceiptService
from app.services.nomenclature_service import NomenclatureService
from app.schemas.receipt import (
ReceiptCreate,
ReceiptUpdate,
ReceiptResponse,
ReceiptListResponse,
ReceiptFilter,
AttachmentResponse,
AccountingEntryResponse,
WorkflowAction,
RejectRequest,
EntriesUpdateRequest,
PartnerOption,
AccountOption,
CashRegisterOption,
ExpenseTypeOption,
)
from app.db.models.receipt import ReceiptStatus
router = APIRouter()
# ============ Helper for current user (simplified for Phase 1) ============
async def get_current_user() -> str:
"""
Get current authenticated user.
Phase 1: Returns hardcoded user for testing.
Phase 2: Will integrate with shared JWT auth.
"""
# TODO: Integrate with shared/auth middleware
return "test_user"
async def get_current_user_company() -> int:
"""
Get current user's active company.
Phase 1: Returns hardcoded company for testing.
Phase 2: Will get from JWT token or session.
"""
# TODO: Integrate with shared/auth
return 1
# ============ Receipt CRUD Endpoints ============
@router.post("/", response_model=ReceiptResponse)
async def create_receipt(
data: ReceiptCreate,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Create a new receipt in DRAFT status."""
receipt = await ReceiptService.create_receipt(session, data, current_user)
return ReceiptResponse.model_validate(receipt)
@router.get("/", response_model=ReceiptListResponse)
async def list_receipts(
status: Optional[ReceiptStatus] = None,
company_id: Optional[int] = None,
created_by: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
search: Optional[str] = None,
page: int = Query(default=1, ge=1),
page_size: int = Query(default=20, ge=1, le=100),
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
current_company: int = Depends(get_current_user_company),
):
"""Get paginated list of receipts with filters."""
from datetime import date as date_type
filters = ReceiptFilter(
status=status,
company_id=company_id or current_company,
created_by=created_by,
date_from=date_type.fromisoformat(date_from) if date_from else None,
date_to=date_type.fromisoformat(date_to) if date_to else None,
search=search,
page=page,
page_size=page_size,
)
return await ReceiptService.get_receipts(session, filters)
@router.get("/pending", response_model=List[ReceiptResponse])
async def list_pending_receipts(
company_id: Optional[int] = None,
session: AsyncSession = Depends(get_session),
current_company: int = Depends(get_current_user_company),
):
"""Get all receipts pending review (for accountant view)."""
receipts = await ReceiptCRUD.get_pending_review(
session, company_id or current_company
)
return [ReceiptResponse.model_validate(r) for r in receipts]
@router.get("/stats")
async def get_receipt_stats(
company_id: Optional[int] = None,
my_receipts: bool = False,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
current_company: int = Depends(get_current_user_company),
):
"""Get receipt statistics."""
return await ReceiptCRUD.get_stats(
session,
company_id or current_company,
created_by=current_user if my_receipts else None,
)
@router.get("/{receipt_id}", response_model=ReceiptResponse)
async def get_receipt(
receipt_id: int,
session: AsyncSession = Depends(get_session),
):
"""Get receipt details with attachments and accounting entries."""
receipt = await ReceiptService.get_receipt(session, receipt_id)
if not receipt:
raise HTTPException(status_code=404, detail="Receipt not found")
return ReceiptResponse.model_validate(receipt)
@router.put("/{receipt_id}", response_model=ReceiptResponse)
async def update_receipt(
receipt_id: int,
data: ReceiptUpdate,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Update receipt (only DRAFT status, only by creator)."""
success, message, receipt = await ReceiptService.update_receipt(
session, receipt_id, data, current_user
)
if not success:
raise HTTPException(status_code=400, detail=message)
return ReceiptResponse.model_validate(receipt)
@router.delete("/{receipt_id}")
async def delete_receipt(
receipt_id: int,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Delete receipt (only DRAFT status, only by creator)."""
success, message = await ReceiptService.delete_receipt(
session, receipt_id, current_user
)
if not success:
raise HTTPException(status_code=400, detail=message)
return {"success": True, "message": message}
# ============ Workflow Endpoints ============
@router.post("/{receipt_id}/submit", response_model=WorkflowAction)
async def submit_receipt(
receipt_id: int,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Submit receipt for review (DRAFT → PENDING_REVIEW)."""
success, message, receipt = await ReceiptService.submit_for_review(
session, receipt_id, current_user
)
return WorkflowAction(
success=success,
message=message,
receipt=ReceiptResponse.model_validate(receipt) if receipt else None,
)
@router.post("/{receipt_id}/approve", response_model=WorkflowAction)
async def approve_receipt(
receipt_id: int,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Approve receipt (PENDING_REVIEW → APPROVED). Accountant action."""
success, message, receipt = await ReceiptService.approve_receipt(
session, receipt_id, current_user
)
return WorkflowAction(
success=success,
message=message,
receipt=ReceiptResponse.model_validate(receipt) if receipt else None,
)
@router.post("/{receipt_id}/reject", response_model=WorkflowAction)
async def reject_receipt(
receipt_id: int,
data: RejectRequest,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Reject receipt (PENDING_REVIEW → REJECTED). Accountant action."""
success, message, receipt = await ReceiptService.reject_receipt(
session, receipt_id, current_user, data.reason
)
return WorkflowAction(
success=success,
message=message,
receipt=ReceiptResponse.model_validate(receipt) if receipt else None,
)
@router.post("/{receipt_id}/resubmit", response_model=WorkflowAction)
async def resubmit_receipt(
receipt_id: int,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Resubmit rejected receipt after corrections (REJECTED → PENDING_REVIEW)."""
success, message, receipt = await ReceiptService.resubmit_receipt(
session, receipt_id, current_user
)
return WorkflowAction(
success=success,
message=message,
receipt=ReceiptResponse.model_validate(receipt) if receipt else None,
)
# ============ Accounting Entries Endpoints ============
@router.get("/{receipt_id}/entries", response_model=List[AccountingEntryResponse])
async def get_receipt_entries(
receipt_id: int,
session: AsyncSession = Depends(get_session),
):
"""Get accounting entries for a receipt."""
entries = await AccountingEntryCRUD.get_by_receipt_id(session, receipt_id)
return [AccountingEntryResponse.model_validate(e) for e in entries]
@router.put("/{receipt_id}/entries", response_model=List[AccountingEntryResponse])
async def update_receipt_entries(
receipt_id: int,
data: EntriesUpdateRequest,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Update accounting entries for a receipt (accountant action)."""
success, message, entries = await ReceiptService.update_entries(
session, receipt_id, data.entries, current_user
)
if not success:
raise HTTPException(status_code=400, detail=message)
return [AccountingEntryResponse.model_validate(e) for e in entries]
@router.post("/{receipt_id}/entries/regenerate", response_model=List[AccountingEntryResponse])
async def regenerate_entries(
receipt_id: int,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Regenerate accounting entries based on receipt data."""
success, message, _ = await ReceiptService.regenerate_entries(
session, receipt_id, current_user
)
if not success:
raise HTTPException(status_code=400, detail=message)
entries = await AccountingEntryCRUD.get_by_receipt_id(session, receipt_id)
return [AccountingEntryResponse.model_validate(e) for e in entries]
# ============ Attachment Endpoints ============
@router.post("/{receipt_id}/attachments", response_model=AttachmentResponse)
async def upload_attachment(
receipt_id: int,
file: UploadFile = File(...),
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Upload attachment for a receipt."""
# Check receipt exists and user can modify it
receipt = await ReceiptCRUD.get_by_id(session, receipt_id, include_relations=False)
if not receipt:
raise HTTPException(status_code=404, detail="Receipt not found")
# Only allow uploads for DRAFT and REJECTED receipts
if receipt.status not in [ReceiptStatus.DRAFT, ReceiptStatus.REJECTED]:
raise HTTPException(
status_code=400,
detail="Cannot upload attachments for this receipt status"
)
# Only creator can upload
if receipt.created_by != current_user:
raise HTTPException(
status_code=403,
detail="Only the creator can upload attachments"
)
try:
attachment = await AttachmentCRUD.create(session, receipt_id, file)
return AttachmentResponse.model_validate(attachment)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{receipt_id}/attachments", response_model=List[AttachmentResponse])
async def list_attachments(
receipt_id: int,
session: AsyncSession = Depends(get_session),
):
"""Get all attachments for a receipt."""
attachments = await AttachmentCRUD.get_by_receipt_id(session, receipt_id)
return [AttachmentResponse.model_validate(a) for a in attachments]
@router.get("/attachments/{attachment_id}/download")
async def download_attachment(
attachment_id: int,
session: AsyncSession = Depends(get_session),
):
"""Download an attachment file."""
attachment = await AttachmentCRUD.get_by_id(session, attachment_id)
if not attachment:
raise HTTPException(status_code=404, detail="Attachment not found")
file_path = AttachmentCRUD.get_file_path(attachment)
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=str(file_path),
filename=attachment.filename,
media_type=attachment.mime_type,
)
@router.delete("/attachments/{attachment_id}")
async def delete_attachment(
attachment_id: int,
session: AsyncSession = Depends(get_session),
current_user: str = Depends(get_current_user),
):
"""Delete an attachment."""
attachment = await AttachmentCRUD.get_by_id(session, attachment_id)
if not attachment:
raise HTTPException(status_code=404, detail="Attachment not found")
# Get receipt to check permissions
receipt = await ReceiptCRUD.get_by_id(session, attachment.receipt_id, include_relations=False)
if not receipt:
raise HTTPException(status_code=404, detail="Receipt not found")
# Only allow deletion for DRAFT receipts by creator
if receipt.status != ReceiptStatus.DRAFT:
raise HTTPException(
status_code=400,
detail="Cannot delete attachments for this receipt status"
)
if receipt.created_by != current_user:
raise HTTPException(
status_code=403,
detail="Only the creator can delete attachments"
)
await AttachmentCRUD.delete(session, attachment)
return {"success": True, "message": "Attachment deleted"}
# ============ Nomenclature Endpoints ============
@router.get("/nomenclature/partners", response_model=List[PartnerOption])
async def get_partners(
search: Optional[str] = None,
company_id: Optional[int] = None,
current_company: int = Depends(get_current_user_company),
):
"""Get partners (suppliers/customers) for dropdown."""
return await NomenclatureService.get_partners(
company_id or current_company, search
)
@router.get("/nomenclature/accounts", response_model=List[AccountOption])
async def get_accounts(
prefix: Optional[str] = None,
company_id: Optional[int] = None,
current_company: int = Depends(get_current_user_company),
):
"""Get chart of accounts for dropdown."""
return await NomenclatureService.get_accounts(
company_id or current_company, prefix
)
@router.get("/nomenclature/cash-registers", response_model=List[CashRegisterOption])
async def get_cash_registers(
company_id: Optional[int] = None,
current_company: int = Depends(get_current_user_company),
):
"""Get cash registers and bank accounts for dropdown."""
return await NomenclatureService.get_cash_registers(company_id or current_company)
@router.get("/nomenclature/expense-types", response_model=List[ExpenseTypeOption])
async def get_expense_types():
"""Get predefined expense types for dropdown."""
return await NomenclatureService.get_expense_types()

View File

@@ -0,0 +1,28 @@
# Pydantic schemas
from .receipt import (
ReceiptCreate,
ReceiptUpdate,
ReceiptResponse,
ReceiptListResponse,
ReceiptFilter,
AttachmentResponse,
AccountingEntryCreate,
AccountingEntryUpdate,
AccountingEntryResponse,
WorkflowAction,
RejectRequest,
)
__all__ = [
"ReceiptCreate",
"ReceiptUpdate",
"ReceiptResponse",
"ReceiptListResponse",
"ReceiptFilter",
"AttachmentResponse",
"AccountingEntryCreate",
"AccountingEntryUpdate",
"AccountingEntryResponse",
"WorkflowAction",
"RejectRequest",
]

View File

@@ -0,0 +1,199 @@
"""Pydantic schemas for receipts API."""
from datetime import datetime, date
from decimal import Decimal
from typing import Optional, List
from pydantic import BaseModel, Field, ConfigDict
from app.db.models.receipt import ReceiptType, ReceiptDirection, ReceiptStatus
from app.db.models.accounting_entry import EntryType
# ============ Accounting Entry Schemas ============
class AccountingEntryBase(BaseModel):
"""Base schema for accounting entry."""
entry_type: EntryType
account_code: str = Field(max_length=20)
account_name: Optional[str] = Field(default=None, max_length=200)
amount: Decimal
partner_id: Optional[int] = None
cost_center_id: Optional[int] = None
class AccountingEntryCreate(AccountingEntryBase):
"""Schema for creating an accounting entry."""
pass
class AccountingEntryUpdate(BaseModel):
"""Schema for updating an accounting entry."""
entry_type: Optional[EntryType] = None
account_code: Optional[str] = Field(default=None, max_length=20)
account_name: Optional[str] = Field(default=None, max_length=200)
amount: Optional[Decimal] = None
partner_id: Optional[int] = None
cost_center_id: Optional[int] = None
class AccountingEntryResponse(AccountingEntryBase):
"""Schema for accounting entry response."""
model_config = ConfigDict(from_attributes=True)
id: int
receipt_id: int
is_auto_generated: bool
modified_by: Optional[str] = None
modified_at: Optional[datetime] = None
sort_order: int
# ============ Attachment Schemas ============
class AttachmentResponse(BaseModel):
"""Schema for attachment response."""
model_config = ConfigDict(from_attributes=True)
id: int
receipt_id: int
filename: str
stored_filename: str
file_path: str
file_size: int
mime_type: str
uploaded_at: datetime
# ============ Receipt Schemas ============
class ReceiptBase(BaseModel):
"""Base schema for receipt."""
receipt_type: ReceiptType = ReceiptType.BON_FISCAL
direction: ReceiptDirection = ReceiptDirection.CHELTUIALA
receipt_number: Optional[str] = Field(default=None, max_length=50)
receipt_series: Optional[str] = Field(default=None, max_length=20)
receipt_date: date
amount: Decimal = Field(gt=0)
description: Optional[str] = Field(default=None, max_length=500)
expense_type_code: Optional[str] = Field(default=None, max_length=20)
company_id: int
partner_id: Optional[int] = None
partner_name: Optional[str] = Field(default=None, max_length=200)
cash_register_id: Optional[int] = None
cash_register_name: Optional[str] = Field(default=None, max_length=100)
cash_register_account: Optional[str] = Field(default=None, max_length=20)
class ReceiptCreate(ReceiptBase):
"""Schema for creating a receipt."""
pass
class ReceiptUpdate(BaseModel):
"""Schema for updating a receipt (DRAFT only)."""
receipt_type: Optional[ReceiptType] = None
direction: Optional[ReceiptDirection] = None
receipt_number: Optional[str] = Field(default=None, max_length=50)
receipt_series: Optional[str] = Field(default=None, max_length=20)
receipt_date: Optional[date] = None
amount: Optional[Decimal] = Field(default=None, gt=0)
description: Optional[str] = Field(default=None, max_length=500)
expense_type_code: Optional[str] = Field(default=None, max_length=20)
partner_id: Optional[int] = None
partner_name: Optional[str] = Field(default=None, max_length=200)
cash_register_id: Optional[int] = None
cash_register_name: Optional[str] = Field(default=None, max_length=100)
cash_register_account: Optional[str] = Field(default=None, max_length=20)
class ReceiptResponse(ReceiptBase):
"""Schema for receipt response with all fields."""
model_config = ConfigDict(from_attributes=True)
id: int
status: ReceiptStatus
created_by: str
created_at: datetime
updated_at: datetime
submitted_at: Optional[datetime] = None
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
rejection_reason: Optional[str] = None
oracle_synced_at: Optional[datetime] = None
oracle_act_id: Optional[int] = None
oracle_error: Optional[str] = None
# Relationships (optional, loaded when needed)
attachments: List[AttachmentResponse] = []
entries: List[AccountingEntryResponse] = []
class ReceiptListResponse(BaseModel):
"""Schema for paginated receipt list response."""
items: List[ReceiptResponse]
total: int
page: int
page_size: int
pages: int
class ReceiptFilter(BaseModel):
"""Schema for filtering receipts."""
status: Optional[ReceiptStatus] = None
company_id: Optional[int] = None
created_by: Optional[str] = None
date_from: Optional[date] = None
date_to: Optional[date] = None
search: Optional[str] = None # Search in description, partner_name
page: int = Field(default=1, ge=1)
page_size: int = Field(default=20, ge=1, le=100)
# ============ Workflow Schemas ============
class WorkflowAction(BaseModel):
"""Schema for workflow action response."""
success: bool
message: str
receipt: Optional[ReceiptResponse] = None
class RejectRequest(BaseModel):
"""Schema for rejection request."""
reason: str = Field(min_length=5, max_length=500)
class EntriesUpdateRequest(BaseModel):
"""Schema for bulk updating accounting entries."""
entries: List[AccountingEntryCreate]
# ============ Nomenclature Schemas ============
class PartnerOption(BaseModel):
"""Schema for partner dropdown option."""
id: int
name: str
code: Optional[str] = None
class AccountOption(BaseModel):
"""Schema for account dropdown option."""
code: str
name: str
class CashRegisterOption(BaseModel):
"""Schema for cash register dropdown option."""
id: int
name: str
account_code: str # 5311, 5121, etc.
class ExpenseTypeOption(BaseModel):
"""Schema for expense type dropdown option."""
code: str
name: str
account_code: str
has_vat: bool
vat_percent: Decimal = Decimal("19")

View File

@@ -0,0 +1,11 @@
# Business logic services
from .receipt_service import ReceiptService
from .nomenclature_service import NomenclatureService
from .expense_types import EXPENSE_TYPES, ExpenseType
__all__ = [
"ReceiptService",
"NomenclatureService",
"EXPENSE_TYPES",
"ExpenseType",
]

View File

@@ -0,0 +1,101 @@
"""Predefined expense types for automatic accounting entry generation."""
from decimal import Decimal
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class ExpenseType:
"""Expense type definition with accounting configuration."""
code: str
name: str
account_code: str
account_name: str
has_vat: bool
vat_percent: Decimal = Decimal("19")
vat_account: str = "4426"
# Predefined expense types
EXPENSE_TYPES: Dict[str, ExpenseType] = {
"FUEL": ExpenseType(
code="FUEL",
name="Combustibil",
account_code="6022",
account_name="Cheltuieli cu combustibilii",
has_vat=True,
),
"MATERIALS": ExpenseType(
code="MATERIALS",
name="Materiale consumabile",
account_code="6028",
account_name="Alte cheltuieli cu materiale consumabile",
has_vat=True,
),
"OFFICE": ExpenseType(
code="OFFICE",
name="Rechizite birou",
account_code="6024",
account_name="Cheltuieli privind materialele pentru ambalat",
has_vat=True,
),
"PHONE": ExpenseType(
code="PHONE",
name="Telefonie / Internet",
account_code="626",
account_name="Cheltuieli postale si taxe de telecomunicatii",
has_vat=True,
),
"PARKING": ExpenseType(
code="PARKING",
name="Parcare",
account_code="6022",
account_name="Cheltuieli cu combustibilii",
has_vat=True,
),
"FOOD": ExpenseType(
code="FOOD",
name="Alimentatie",
account_code="6028",
account_name="Alte cheltuieli cu materiale consumabile",
has_vat=False, # No deductible VAT for food
),
"TRANSPORT": ExpenseType(
code="TRANSPORT",
name="Transport",
account_code="624",
account_name="Cheltuieli cu transportul de bunuri si personal",
has_vat=True,
),
"OTHER": ExpenseType(
code="OTHER",
name="Altele",
account_code="628",
account_name="Alte cheltuieli cu serviciile executate de terti",
has_vat=True,
),
}
def get_expense_type(code: str) -> Optional[ExpenseType]:
"""Get expense type by code."""
return EXPENSE_TYPES.get(code)
def get_all_expense_types() -> Dict[str, ExpenseType]:
"""Get all expense types."""
return EXPENSE_TYPES.copy()
# Default cash register accounts
CASH_REGISTER_ACCOUNTS = {
"CASA": {
"code": "5311",
"name": "Casa in lei",
},
"BANCA": {
"code": "5121",
"name": "Conturi la banci in lei",
},
}

View File

@@ -0,0 +1,164 @@
"""Service for fetching nomenclatures from Oracle (read-only)."""
from typing import List, Optional
from decimal import Decimal
from app.schemas.receipt import (
PartnerOption,
AccountOption,
CashRegisterOption,
ExpenseTypeOption,
)
from app.services.expense_types import EXPENSE_TYPES
class NomenclatureService:
"""
Service for fetching nomenclatures.
In Phase 1 (MVP), some nomenclatures are hardcoded.
In Phase 2, these will be fetched from Oracle.
"""
@staticmethod
async def get_partners(company_id: int, search: Optional[str] = None) -> List[PartnerOption]:
"""
Get partners (suppliers/customers) for a company.
Phase 1: Returns empty list or mock data.
Phase 2: Will fetch from Oracle NOM_PARTENERI.
"""
# TODO: Implement Oracle fetch in Phase 2
# For now, return some mock data for testing
mock_partners = [
PartnerOption(id=1, name="OMV Petrom", code="RO123456"),
PartnerOption(id=2, name="Dedeman", code="RO789012"),
PartnerOption(id=3, name="Kaufland", code="RO345678"),
PartnerOption(id=4, name="Emag", code="RO901234"),
PartnerOption(id=5, name="Altex", code="RO567890"),
]
if search:
search_lower = search.lower()
mock_partners = [
p for p in mock_partners
if search_lower in p.name.lower() or (p.code and search_lower in p.code.lower())
]
return mock_partners
@staticmethod
async def get_accounts(company_id: int, prefix: Optional[str] = None) -> List[AccountOption]:
"""
Get chart of accounts for a company.
Phase 1: Returns common expense/income accounts.
Phase 2: Will fetch from Oracle PLAN_CONTURI.
"""
# Common accounts for expenses and receipts
accounts = [
# Expense accounts (Class 6)
AccountOption(code="6022", name="Cheltuieli cu combustibilii"),
AccountOption(code="6024", name="Cheltuieli materiale pentru ambalat"),
AccountOption(code="6028", name="Alte cheltuieli cu materiale consumabile"),
AccountOption(code="624", name="Cheltuieli cu transportul de bunuri si personal"),
AccountOption(code="626", name="Cheltuieli postale si taxe telecomunicatii"),
AccountOption(code="628", name="Alte cheltuieli cu serviciile executate de terti"),
# VAT
AccountOption(code="4426", name="TVA deductibila"),
AccountOption(code="4427", name="TVA colectata"),
# Cash and Bank (Class 5)
AccountOption(code="5311", name="Casa in lei"),
AccountOption(code="5121", name="Conturi la banci in lei"),
# Income accounts (Class 7)
AccountOption(code="7588", name="Alte venituri din exploatare"),
]
if prefix:
accounts = [a for a in accounts if a.code.startswith(prefix)]
return accounts
@staticmethod
async def get_cash_registers(company_id: int) -> List[CashRegisterOption]:
"""
Get cash registers and bank accounts for a company.
Phase 1: Returns default options.
Phase 2: Will fetch from Oracle NOM_CASE / NOM_BANCI.
"""
# Default cash registers
return [
CashRegisterOption(id=1, name="Casa principala", account_code="5311"),
CashRegisterOption(id=2, name="Cont BCR", account_code="5121"),
CashRegisterOption(id=3, name="Cont BRD", account_code="5121"),
]
@staticmethod
async def get_expense_types() -> List[ExpenseTypeOption]:
"""
Get predefined expense types with their accounting configuration.
"""
return [
ExpenseTypeOption(
code=et.code,
name=et.name,
account_code=et.account_code,
has_vat=et.has_vat,
vat_percent=et.vat_percent,
)
for et in EXPENSE_TYPES.values()
]
@staticmethod
async def get_companies(username: str) -> List[dict]:
"""
Get companies accessible by user.
Phase 1: Returns mock data.
Phase 2: Will fetch from shared auth based on user permissions.
"""
# TODO: Integrate with shared auth to get user's companies
return [
{"id": 1, "name": "SC Test SRL", "cui": "RO12345678"},
{"id": 2, "name": "SC Demo SA", "cui": "RO87654321"},
]
# ============ Phase 2 Oracle Integration Methods ============
@staticmethod
async def _fetch_partners_oracle(company_id: int, search: Optional[str] = None) -> List[PartnerOption]:
"""
Fetch partners from Oracle NOM_PARTENERI.
Will be implemented in Phase 2.
"""
# TODO: Implement using shared oracle_pool
# Example query:
# SELECT ID_PART, DEN_PART, COD_FISCAL
# FROM {schema}.NOM_PARTENERI
# WHERE DEN_PART LIKE :search
raise NotImplementedError("Oracle integration pending - Phase 2")
@staticmethod
async def _fetch_accounts_oracle(company_id: int, prefix: Optional[str] = None) -> List[AccountOption]:
"""
Fetch chart of accounts from Oracle PLAN_CONTURI.
Will be implemented in Phase 2.
"""
# TODO: Implement using shared oracle_pool
raise NotImplementedError("Oracle integration pending - Phase 2")
@staticmethod
async def _fetch_cash_registers_oracle(company_id: int) -> List[CashRegisterOption]:
"""
Fetch cash registers from Oracle NOM_CASE / NOM_BANCI.
Will be implemented in Phase 2.
"""
# TODO: Implement using shared oracle_pool
raise NotImplementedError("Oracle integration pending - Phase 2")

View File

@@ -0,0 +1,389 @@
"""Business logic service for receipts workflow."""
from decimal import Decimal, ROUND_HALF_UP
from typing import List, Optional, Tuple
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.receipt import Receipt, ReceiptStatus, ReceiptDirection
from app.db.models.accounting_entry import EntryType
from app.db.crud.receipt import ReceiptCRUD
from app.db.crud.accounting_entry import AccountingEntryCRUD
from app.schemas.receipt import (
ReceiptCreate,
ReceiptUpdate,
ReceiptFilter,
ReceiptResponse,
ReceiptListResponse,
AccountingEntryCreate,
)
from app.services.expense_types import EXPENSE_TYPES, get_expense_type
class ReceiptService:
"""Service for receipt business logic and workflow."""
@staticmethod
async def create_receipt(
session: AsyncSession,
data: ReceiptCreate,
created_by: str,
) -> Receipt:
"""Create a new receipt in DRAFT status."""
return await ReceiptCRUD.create(session, data, created_by)
@staticmethod
async def get_receipt(
session: AsyncSession,
receipt_id: int,
) -> Optional[Receipt]:
"""Get receipt by ID with all relationships."""
return await ReceiptCRUD.get_by_id(session, receipt_id, include_relations=True)
@staticmethod
async def get_receipts(
session: AsyncSession,
filters: ReceiptFilter,
) -> ReceiptListResponse:
"""Get paginated list of receipts."""
receipts, total = await ReceiptCRUD.get_list(session, filters)
pages = (total + filters.page_size - 1) // filters.page_size if total > 0 else 1
return ReceiptListResponse(
items=[ReceiptResponse.model_validate(r) for r in receipts],
total=total,
page=filters.page,
page_size=filters.page_size,
pages=pages,
)
@staticmethod
async def update_receipt(
session: AsyncSession,
receipt_id: int,
data: ReceiptUpdate,
username: str,
) -> Tuple[bool, str, Optional[Receipt]]:
"""
Update receipt (only DRAFT status).
Returns (success, message, receipt).
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", None
if not await ReceiptCRUD.can_edit(receipt, username):
return False, "Cannot edit this receipt", None
updated = await ReceiptCRUD.update(session, receipt, data)
return True, "Receipt updated", updated
@staticmethod
async def delete_receipt(
session: AsyncSession,
receipt_id: int,
username: str,
) -> Tuple[bool, str]:
"""
Delete receipt (only DRAFT status).
Returns (success, message).
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found"
if not await ReceiptCRUD.can_delete(receipt, username):
return False, "Cannot delete this receipt"
await ReceiptCRUD.delete(session, receipt)
return True, "Receipt deleted"
@staticmethod
def generate_accounting_entries(receipt: Receipt) -> List[AccountingEntryCreate]:
"""
Generate accounting entries based on receipt data and expense type.
"""
entries: List[AccountingEntryCreate] = []
# Get expense type configuration
expense_type = get_expense_type(receipt.expense_type_code or "OTHER")
if not expense_type:
expense_type = EXPENSE_TYPES["OTHER"]
amount = Decimal(str(receipt.amount))
if receipt.direction == ReceiptDirection.CHELTUIALA:
# Expense: Debit expense account, Credit cash/bank
if expense_type.has_vat:
# Calculate net and VAT
vat_rate = expense_type.vat_percent / Decimal("100")
net_amount = (amount / (1 + vat_rate)).quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
vat_amount = amount - net_amount
# Debit: Expense account (net)
entries.append(AccountingEntryCreate(
entry_type=EntryType.DEBIT,
account_code=expense_type.account_code,
account_name=expense_type.account_name,
amount=net_amount,
partner_id=receipt.partner_id,
))
# Debit: VAT deductible
entries.append(AccountingEntryCreate(
entry_type=EntryType.DEBIT,
account_code=expense_type.vat_account,
account_name="TVA deductibila",
amount=vat_amount,
))
else:
# No VAT - full amount to expense
entries.append(AccountingEntryCreate(
entry_type=EntryType.DEBIT,
account_code=expense_type.account_code,
account_name=expense_type.account_name,
amount=amount,
partner_id=receipt.partner_id,
))
# Credit: Cash/Bank
cash_account = receipt.cash_register_account or "5311"
cash_name = receipt.cash_register_name or "Casa in lei"
entries.append(AccountingEntryCreate(
entry_type=EntryType.CREDIT,
account_code=cash_account,
account_name=cash_name,
amount=amount,
))
else:
# Income: Debit cash/bank, Credit income account
# For now, simple income posting
cash_account = receipt.cash_register_account or "5311"
cash_name = receipt.cash_register_name or "Casa in lei"
# Debit: Cash/Bank
entries.append(AccountingEntryCreate(
entry_type=EntryType.DEBIT,
account_code=cash_account,
account_name=cash_name,
amount=amount,
))
# Credit: Income account (7xx - to be configured)
entries.append(AccountingEntryCreate(
entry_type=EntryType.CREDIT,
account_code="7588",
account_name="Alte venituri din exploatare",
amount=amount,
))
return entries
@staticmethod
async def submit_for_review(
session: AsyncSession,
receipt_id: int,
username: str,
) -> Tuple[bool, str, Optional[Receipt]]:
"""
Submit receipt for review (DRAFT/REJECTED → PENDING_REVIEW).
Generates accounting entries automatically.
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", None
if not await ReceiptCRUD.can_submit(receipt, username):
return False, "Cannot submit this receipt", None
# Check if receipt has at least one attachment
if not receipt.attachments:
return False, "Receipt must have at least one attachment", None
# Check required fields
if not receipt.expense_type_code:
return False, "Expense type is required", None
if not receipt.cash_register_account:
return False, "Cash register is required", None
# Generate accounting entries
entries = ReceiptService.generate_accounting_entries(receipt)
# Delete existing entries and create new ones
await AccountingEntryCRUD.delete_all_for_receipt(session, receipt_id)
await AccountingEntryCRUD.create_bulk(session, receipt_id, entries, is_auto_generated=True)
# Update status
updated = await ReceiptCRUD.update_status(
session, receipt, ReceiptStatus.PENDING_REVIEW
)
# Reload with entries
updated = await ReceiptCRUD.get_by_id(session, receipt_id)
return True, "Receipt submitted for review", updated
@staticmethod
async def approve_receipt(
session: AsyncSession,
receipt_id: int,
username: str,
) -> Tuple[bool, str, Optional[Receipt]]:
"""
Approve receipt (PENDING_REVIEW → APPROVED).
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", None
if receipt.status != ReceiptStatus.PENDING_REVIEW:
return False, "Receipt is not pending review", None
# Validate accounting entries
if not receipt.entries:
return False, "Receipt has no accounting entries", None
# Update status
updated = await ReceiptCRUD.update_status(
session, receipt, ReceiptStatus.APPROVED, reviewed_by=username
)
return True, "Receipt approved", updated
@staticmethod
async def reject_receipt(
session: AsyncSession,
receipt_id: int,
username: str,
reason: str,
) -> Tuple[bool, str, Optional[Receipt]]:
"""
Reject receipt (PENDING_REVIEW → REJECTED).
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", None
if receipt.status != ReceiptStatus.PENDING_REVIEW:
return False, "Receipt is not pending review", None
# Update status
updated = await ReceiptCRUD.update_status(
session,
receipt,
ReceiptStatus.REJECTED,
reviewed_by=username,
rejection_reason=reason,
)
return True, "Receipt rejected", updated
@staticmethod
async def resubmit_receipt(
session: AsyncSession,
receipt_id: int,
username: str,
) -> Tuple[bool, str, Optional[Receipt]]:
"""
Resubmit rejected receipt after corrections (REJECTED → PENDING_REVIEW).
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", None
if receipt.status != ReceiptStatus.REJECTED:
return False, "Receipt is not rejected", None
if receipt.created_by != username:
return False, "Only the creator can resubmit", None
# Re-generate accounting entries
entries = ReceiptService.generate_accounting_entries(receipt)
await AccountingEntryCRUD.delete_all_for_receipt(session, receipt_id)
await AccountingEntryCRUD.create_bulk(session, receipt_id, entries, is_auto_generated=True)
# Update status
updated = await ReceiptCRUD.update_status(
session, receipt, ReceiptStatus.PENDING_REVIEW
)
# Reload with entries
updated = await ReceiptCRUD.get_by_id(session, receipt_id)
return True, "Receipt resubmitted for review", updated
@staticmethod
async def regenerate_entries(
session: AsyncSession,
receipt_id: int,
username: str,
) -> Tuple[bool, str, List[AccountingEntryCreate]]:
"""
Regenerate accounting entries for a receipt.
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", []
if receipt.status not in [ReceiptStatus.DRAFT, ReceiptStatus.PENDING_REVIEW]:
return False, "Cannot regenerate entries for this receipt status", []
# Generate new entries
entries = ReceiptService.generate_accounting_entries(receipt)
# Replace existing entries
await AccountingEntryCRUD.delete_all_for_receipt(session, receipt_id)
await AccountingEntryCRUD.create_bulk(session, receipt_id, entries, is_auto_generated=True)
return True, "Entries regenerated", entries
@staticmethod
async def update_entries(
session: AsyncSession,
receipt_id: int,
entries: List[AccountingEntryCreate],
username: str,
) -> Tuple[bool, str, List]:
"""
Update accounting entries for a receipt (accountant action).
"""
receipt = await ReceiptCRUD.get_by_id(session, receipt_id)
if not receipt:
return False, "Receipt not found", []
if receipt.status != ReceiptStatus.PENDING_REVIEW:
return False, "Can only modify entries for receipts pending review", []
# Validate entries
is_valid, error = await AccountingEntryCRUD.validate_entries(entries)
if not is_valid:
return False, error, []
# Replace entries
updated_entries = await AccountingEntryCRUD.replace_all_for_receipt(
session, receipt_id, entries, username
)
return True, "Entries updated", updated_entries
@staticmethod
async def get_pending_count(
session: AsyncSession,
company_id: Optional[int] = None,
) -> int:
"""Get count of receipts pending review."""
receipts = await ReceiptCRUD.get_pending_review(session, company_id)
return len(receipts)