- Add deployment/linux/ with deploy.sh for deploying from Claude-Agent LXC to Windows server - Add ServerLogsView.vue for viewing server logs from frontend - Add shared/routes/system.py for system health endpoints - Update CLAUDE.md with quick deploy instructions - Improve Windows deployment scripts (ROA2WEB-Console.ps1) - Fix OCR service validation and worker pool improvements - Update environment config examples - Various script permission and startup fixes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
141 lines
4.5 KiB
Python
141 lines
4.5 KiB
Python
"""CRUD operations for receipt attachments."""
|
|
|
|
import os
|
|
import uuid
|
|
import aiofiles
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from fastapi import UploadFile
|
|
|
|
from backend.modules.data_entry.db.models.receipt import ReceiptAttachment
|
|
from backend.config import settings
|
|
|
|
|
|
class AttachmentCRUD:
|
|
"""CRUD operations for ReceiptAttachment model."""
|
|
|
|
@staticmethod
|
|
def _generate_stored_filename(original_filename: str) -> str:
|
|
"""Generate unique filename for storage."""
|
|
ext = Path(original_filename).suffix.lower()
|
|
return f"{uuid.uuid4()}{ext}"
|
|
|
|
@staticmethod
|
|
def _get_upload_path(stored_filename: str) -> Path:
|
|
"""Get full path for storing file, organized by year/month."""
|
|
now = datetime.utcnow()
|
|
relative_path = Path(str(now.year)) / f"{now.month:02d}"
|
|
full_path = settings.data_entry_upload_path_resolved / relative_path
|
|
|
|
# Ensure directory exists
|
|
full_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
return relative_path / stored_filename
|
|
|
|
@staticmethod
|
|
async def create(
|
|
session: AsyncSession,
|
|
receipt_id: int,
|
|
file: UploadFile,
|
|
) -> ReceiptAttachment:
|
|
"""Create attachment by saving file and creating DB record."""
|
|
# Generate stored filename
|
|
stored_filename = AttachmentCRUD._generate_stored_filename(file.filename or "upload")
|
|
|
|
# Get relative path
|
|
relative_path = AttachmentCRUD._get_upload_path(stored_filename)
|
|
|
|
# Full path for saving
|
|
full_path = settings.data_entry_upload_path_resolved / relative_path
|
|
|
|
# Read file content
|
|
content = await file.read()
|
|
file_size = len(content)
|
|
|
|
# Validate file size
|
|
if file_size > settings.data_entry_max_upload_size_bytes:
|
|
raise ValueError(f"File too large. Maximum size is {settings.data_entry_max_upload_size_mb}MB")
|
|
|
|
# Validate MIME type
|
|
mime_type = file.content_type or "application/octet-stream"
|
|
if mime_type not in settings.data_entry_allowed_mime_types:
|
|
raise ValueError(f"File type not allowed: {mime_type}")
|
|
|
|
# Save file
|
|
async with aiofiles.open(full_path, "wb") as f:
|
|
await f.write(content)
|
|
|
|
# Create DB record
|
|
attachment = ReceiptAttachment(
|
|
receipt_id=receipt_id,
|
|
filename=file.filename or "upload",
|
|
stored_filename=stored_filename,
|
|
file_path=str(relative_path),
|
|
file_size=file_size,
|
|
mime_type=mime_type,
|
|
)
|
|
|
|
session.add(attachment)
|
|
await session.commit()
|
|
await session.refresh(attachment)
|
|
|
|
return attachment
|
|
|
|
@staticmethod
|
|
async def get_by_id(
|
|
session: AsyncSession,
|
|
attachment_id: int,
|
|
) -> Optional[ReceiptAttachment]:
|
|
"""Get attachment by ID."""
|
|
query = select(ReceiptAttachment).where(ReceiptAttachment.id == attachment_id)
|
|
result = await session.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_by_receipt_id(
|
|
session: AsyncSession,
|
|
receipt_id: int,
|
|
) -> List[ReceiptAttachment]:
|
|
"""Get all attachments for a receipt."""
|
|
query = select(ReceiptAttachment).where(
|
|
ReceiptAttachment.receipt_id == receipt_id
|
|
).order_by(ReceiptAttachment.uploaded_at.asc())
|
|
|
|
result = await session.execute(query)
|
|
return list(result.scalars().all())
|
|
|
|
@staticmethod
|
|
def get_file_path(attachment: ReceiptAttachment) -> Path:
|
|
"""Get full file path for an attachment."""
|
|
return settings.data_entry_upload_path_resolved / attachment.file_path
|
|
|
|
@staticmethod
|
|
async def delete(session: AsyncSession, attachment: ReceiptAttachment) -> bool:
|
|
"""Delete attachment (file and DB record)."""
|
|
# Delete file
|
|
file_path = AttachmentCRUD.get_file_path(attachment)
|
|
if file_path.exists():
|
|
os.remove(file_path)
|
|
|
|
# Delete DB record
|
|
await session.delete(attachment)
|
|
await session.commit()
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
async def delete_all_for_receipt(session: AsyncSession, receipt_id: int) -> int:
|
|
"""Delete all attachments for a receipt."""
|
|
attachments = await AttachmentCRUD.get_by_receipt_id(session, receipt_id)
|
|
count = 0
|
|
|
|
for attachment in attachments:
|
|
await AttachmentCRUD.delete(session, attachment)
|
|
count += 1
|
|
|
|
return count
|