- Remove legacy pool support (DSN, env vars fallback) - Use first registered server when server_id not specified - Show server dropdown even with single server in ORACLE_SERVERS - Email login only available for 2+ servers Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
193 lines
6.2 KiB
Python
193 lines
6.2 KiB
Python
"""
|
|
System routes for server monitoring and logs.
|
|
"""
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from collections import deque
|
|
|
|
from fastapi import APIRouter, Depends, Query, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from shared.auth.dependencies import get_current_user, CurrentUser
|
|
|
|
|
|
class AuthModeResponse(BaseModel):
|
|
"""Response for auth mode endpoint."""
|
|
mode: str # "single-server" or "multi-server"
|
|
supports_email_login: bool # True if email-based login is available
|
|
|
|
|
|
class LogEntry(BaseModel):
|
|
"""Single log entry."""
|
|
line: str
|
|
level: Optional[str] = None
|
|
|
|
|
|
class LogsResponse(BaseModel):
|
|
"""Response with log entries."""
|
|
file: str
|
|
lines: list[str]
|
|
total_lines: int
|
|
showing: int
|
|
logs_path: Optional[str] = None
|
|
file_exists: bool = True
|
|
file_size_kb: Optional[float] = None
|
|
|
|
|
|
def create_system_router() -> APIRouter:
|
|
"""
|
|
Create system router for logs and monitoring.
|
|
"""
|
|
router = APIRouter()
|
|
|
|
@router.get("/auth-mode", response_model=AuthModeResponse)
|
|
async def get_auth_mode():
|
|
"""
|
|
Get the authentication mode configuration.
|
|
|
|
This is a PUBLIC endpoint (no auth required) that tells the frontend
|
|
whether to use the email-based multi-server login flow or the classic
|
|
username/password single-server flow.
|
|
|
|
Returns:
|
|
- mode: "single-server" for legacy config, "multi-server" for ORACLE_SERVERS
|
|
- supports_email_login: True only in multi-server mode with email cache
|
|
"""
|
|
from backend.config import settings
|
|
|
|
servers = settings.get_oracle_servers()
|
|
|
|
# Multi-server mode: ANY servers configured via ORACLE_SERVERS
|
|
# Shows server dropdown even with 1 server (explicit server selection)
|
|
if servers and len(servers) >= 1:
|
|
return AuthModeResponse(
|
|
mode="multi-server",
|
|
supports_email_login=len(servers) > 1 # Email lookup only for 2+ servers
|
|
)
|
|
|
|
# Single-server mode: legacy config (no ORACLE_SERVERS, uses env vars)
|
|
return AuthModeResponse(
|
|
mode="single-server",
|
|
supports_email_login=False
|
|
)
|
|
|
|
def get_logs_path() -> Path:
|
|
"""Get logs directory path based on environment."""
|
|
# Windows production: C:\inetpub\wwwroot\roa2web\logs
|
|
# Development: backend/logs or ./logs
|
|
if os.name == 'nt': # Windows
|
|
prod_path = Path(r"C:\inetpub\wwwroot\roa2web\logs")
|
|
if prod_path.exists():
|
|
return prod_path
|
|
|
|
# Development fallback
|
|
dev_paths = [
|
|
Path(__file__).parent.parent.parent / "backend" / "logs",
|
|
Path(__file__).parent.parent.parent / "logs",
|
|
Path("./logs"),
|
|
]
|
|
for path in dev_paths:
|
|
if path.exists():
|
|
return path
|
|
|
|
return Path("./logs")
|
|
|
|
@router.get("/logs", response_model=LogsResponse)
|
|
async def get_logs(
|
|
file: str = Query(default="backend-stderr", description="Log file: backend-stderr or backend-stdout"),
|
|
lines: int = Query(default=100, ge=10, le=1000, description="Number of lines to return"),
|
|
filter: Optional[str] = Query(default=None, description="Filter text (case-insensitive)"),
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Get server log entries.
|
|
|
|
Args:
|
|
file: Log file name (backend-stderr or backend-stdout)
|
|
lines: Number of lines to return (10-1000)
|
|
filter: Optional filter text
|
|
|
|
Returns:
|
|
LogsResponse with log lines
|
|
"""
|
|
# Validate file name to prevent path traversal
|
|
allowed_files = ["backend-stderr", "backend-stdout"]
|
|
if file not in allowed_files:
|
|
raise HTTPException(status_code=400, detail=f"Invalid file. Allowed: {allowed_files}")
|
|
|
|
logs_path = get_logs_path()
|
|
log_file = logs_path / f"{file}.log"
|
|
logs_path_str = str(logs_path.resolve())
|
|
|
|
if not log_file.exists():
|
|
return LogsResponse(
|
|
file=file,
|
|
lines=[f"Log file not found: {log_file}"],
|
|
total_lines=0,
|
|
showing=0,
|
|
logs_path=logs_path_str,
|
|
file_exists=False,
|
|
file_size_kb=0
|
|
)
|
|
|
|
try:
|
|
# Get file size
|
|
file_size_kb = round(log_file.stat().st_size / 1024, 2)
|
|
|
|
# Read file and get last N lines efficiently
|
|
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
|
|
# Use deque for efficient tail operation
|
|
all_lines = deque(f, maxlen=lines * 2 if filter else lines)
|
|
|
|
# Apply filter if provided
|
|
if filter:
|
|
filter_lower = filter.lower()
|
|
filtered_lines = [line.rstrip() for line in all_lines if filter_lower in line.lower()]
|
|
result_lines = list(filtered_lines)[-lines:]
|
|
else:
|
|
result_lines = [line.rstrip() for line in all_lines][-lines:]
|
|
|
|
return LogsResponse(
|
|
file=file,
|
|
lines=result_lines,
|
|
total_lines=len(result_lines),
|
|
showing=len(result_lines),
|
|
logs_path=logs_path_str,
|
|
file_exists=True,
|
|
file_size_kb=file_size_kb
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Error reading logs: {str(e)}")
|
|
|
|
@router.get("/logs/available")
|
|
async def get_available_logs(
|
|
current_user: CurrentUser = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Get list of available log files.
|
|
"""
|
|
logs_path = get_logs_path()
|
|
|
|
if not logs_path.exists():
|
|
return {"logs_path": str(logs_path), "files": [], "exists": False}
|
|
|
|
log_files = []
|
|
for f in logs_path.glob("*.log"):
|
|
stat = f.stat()
|
|
log_files.append({
|
|
"name": f.stem,
|
|
"size_kb": round(stat.st_size / 1024, 1),
|
|
"modified": stat.st_mtime
|
|
})
|
|
|
|
return {
|
|
"logs_path": str(logs_path),
|
|
"files": sorted(log_files, key=lambda x: x["name"]),
|
|
"exists": True
|
|
}
|
|
|
|
return router
|