Files
Claude Agent 2e1ead69e1 refactor: simplify Oracle pool to use only ORACLE_SERVERS format
- Remove legacy pool support (DSN, env vars fallback)
- Use first registered server when server_id not specified
- Show server dropdown even with single server in ORACLE_SERVERS
- Email login only available for 2+ servers

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 15:15:57 +00:00

193 lines
6.2 KiB
Python

"""
System routes for server monitoring and logs.
"""
import os
from pathlib import Path
from typing import Optional
from collections import deque
from fastapi import APIRouter, Depends, Query, HTTPException
from pydantic import BaseModel
from shared.auth.dependencies import get_current_user, CurrentUser
class AuthModeResponse(BaseModel):
"""Response for auth mode endpoint."""
mode: str # "single-server" or "multi-server"
supports_email_login: bool # True if email-based login is available
class LogEntry(BaseModel):
"""Single log entry."""
line: str
level: Optional[str] = None
class LogsResponse(BaseModel):
"""Response with log entries."""
file: str
lines: list[str]
total_lines: int
showing: int
logs_path: Optional[str] = None
file_exists: bool = True
file_size_kb: Optional[float] = None
def create_system_router() -> APIRouter:
"""
Create system router for logs and monitoring.
"""
router = APIRouter()
@router.get("/auth-mode", response_model=AuthModeResponse)
async def get_auth_mode():
"""
Get the authentication mode configuration.
This is a PUBLIC endpoint (no auth required) that tells the frontend
whether to use the email-based multi-server login flow or the classic
username/password single-server flow.
Returns:
- mode: "single-server" for legacy config, "multi-server" for ORACLE_SERVERS
- supports_email_login: True only in multi-server mode with email cache
"""
from backend.config import settings
servers = settings.get_oracle_servers()
# Multi-server mode: ANY servers configured via ORACLE_SERVERS
# Shows server dropdown even with 1 server (explicit server selection)
if servers and len(servers) >= 1:
return AuthModeResponse(
mode="multi-server",
supports_email_login=len(servers) > 1 # Email lookup only for 2+ servers
)
# Single-server mode: legacy config (no ORACLE_SERVERS, uses env vars)
return AuthModeResponse(
mode="single-server",
supports_email_login=False
)
def get_logs_path() -> Path:
"""Get logs directory path based on environment."""
# Windows production: C:\inetpub\wwwroot\roa2web\logs
# Development: backend/logs or ./logs
if os.name == 'nt': # Windows
prod_path = Path(r"C:\inetpub\wwwroot\roa2web\logs")
if prod_path.exists():
return prod_path
# Development fallback
dev_paths = [
Path(__file__).parent.parent.parent / "backend" / "logs",
Path(__file__).parent.parent.parent / "logs",
Path("./logs"),
]
for path in dev_paths:
if path.exists():
return path
return Path("./logs")
@router.get("/logs", response_model=LogsResponse)
async def get_logs(
file: str = Query(default="backend-stderr", description="Log file: backend-stderr or backend-stdout"),
lines: int = Query(default=100, ge=10, le=1000, description="Number of lines to return"),
filter: Optional[str] = Query(default=None, description="Filter text (case-insensitive)"),
current_user: CurrentUser = Depends(get_current_user)
):
"""
Get server log entries.
Args:
file: Log file name (backend-stderr or backend-stdout)
lines: Number of lines to return (10-1000)
filter: Optional filter text
Returns:
LogsResponse with log lines
"""
# Validate file name to prevent path traversal
allowed_files = ["backend-stderr", "backend-stdout"]
if file not in allowed_files:
raise HTTPException(status_code=400, detail=f"Invalid file. Allowed: {allowed_files}")
logs_path = get_logs_path()
log_file = logs_path / f"{file}.log"
logs_path_str = str(logs_path.resolve())
if not log_file.exists():
return LogsResponse(
file=file,
lines=[f"Log file not found: {log_file}"],
total_lines=0,
showing=0,
logs_path=logs_path_str,
file_exists=False,
file_size_kb=0
)
try:
# Get file size
file_size_kb = round(log_file.stat().st_size / 1024, 2)
# Read file and get last N lines efficiently
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
# Use deque for efficient tail operation
all_lines = deque(f, maxlen=lines * 2 if filter else lines)
# Apply filter if provided
if filter:
filter_lower = filter.lower()
filtered_lines = [line.rstrip() for line in all_lines if filter_lower in line.lower()]
result_lines = list(filtered_lines)[-lines:]
else:
result_lines = [line.rstrip() for line in all_lines][-lines:]
return LogsResponse(
file=file,
lines=result_lines,
total_lines=len(result_lines),
showing=len(result_lines),
logs_path=logs_path_str,
file_exists=True,
file_size_kb=file_size_kb
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading logs: {str(e)}")
@router.get("/logs/available")
async def get_available_logs(
current_user: CurrentUser = Depends(get_current_user)
):
"""
Get list of available log files.
"""
logs_path = get_logs_path()
if not logs_path.exists():
return {"logs_path": str(logs_path), "files": [], "exists": False}
log_files = []
for f in logs_path.glob("*.log"):
stat = f.stat()
log_files.append({
"name": f.stem,
"size_kb": round(stat.st_size / 1024, 1),
"modified": stat.st_mtime
})
return {
"logs_path": str(logs_path),
"files": sorted(log_files, key=lambda x: x["name"]),
"exists": True
}
return router