""" Log monitoring tests — parse app log files for errors and anomalies. Run with: pytest api/tests/qa/test_qa_logs_monitor.py Tests only check log lines from the current session (last 1 hour) to avoid failing on pre-existing historical errors. """ import re from datetime import datetime, timedelta import pytest pytestmark = pytest.mark.qa # Log line format: 2026-03-23 07:57:12,691 | INFO | app.main | message _MAX_WARNINGS = 50 _SESSION_WINDOW_HOURS = 1 # Known issues that are tracked separately and should not fail the QA suite. # These are real bugs that need fixing but should not block test runs. _KNOWN_ISSUES = [ "soft-deleting order ID=533: ORA-00942", # Pre-existing: missing table/view ] def _read_recent_lines(app_log_path): """Read log file lines from the last session window only.""" if app_log_path is None or not app_log_path.exists(): pytest.skip("No log file available") all_lines = app_log_path.read_text(encoding="utf-8", errors="replace").splitlines() # Filter to recent lines only (within session window) cutoff = datetime.now() - timedelta(hours=_SESSION_WINDOW_HOURS) recent = [] for line in all_lines: # Parse timestamp from log line: "2026-03-24 09:43:46,174 | ..." match = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", line) if match: try: ts = datetime.strptime(match.group(1), "%Y-%m-%d %H:%M:%S") if ts >= cutoff: recent.append(line) except ValueError: recent.append(line) # Include unparseable lines else: # Non-timestamped lines (continuations) — include if we're in recent window if recent: recent.append(line) return recent # --------------------------------------------------------------------------- def test_log_file_exists(app_log_path): """Log file path resolves to an existing file.""" if app_log_path is None: pytest.skip("No log file configured") assert app_log_path.exists(), f"Log file not found: {app_log_path}" def _is_known_issue(line): """Check if a log line matches a known tracked issue.""" return any(ki in line for ki in _KNOWN_ISSUES) def test_no_critical_errors(app_log_path, qa_issues): """No unexpected ERROR-level lines in recent log entries.""" lines = _read_recent_lines(app_log_path) errors = [l for l in lines if "| ERROR |" in l and not _is_known_issue(l)] known = [l for l in lines if "| ERROR |" in l and _is_known_issue(l)] if errors: qa_issues.extend({"type": "log_error", "line": l} for l in errors) if known: qa_issues.extend({"type": "known_issue", "line": l} for l in known) assert len(errors) == 0, ( f"Found {len(errors)} unexpected ERROR line(s) in recent {_SESSION_WINDOW_HOURS}h window:\n" + "\n".join(errors[:10]) ) def test_no_oracle_errors(app_log_path, qa_issues): """No unexpected Oracle ORA- error codes in recent log entries.""" lines = _read_recent_lines(app_log_path) ora_errors = [l for l in lines if "ORA-" in l and not _is_known_issue(l)] known = [l for l in lines if "ORA-" in l and _is_known_issue(l)] if ora_errors: qa_issues.extend({"type": "oracle_error", "line": l} for l in ora_errors) if known: qa_issues.extend({"type": "known_issue", "line": l} for l in known) assert len(ora_errors) == 0, ( f"Found {len(ora_errors)} unexpected ORA- error(s) in recent {_SESSION_WINDOW_HOURS}h window:\n" + "\n".join(ora_errors[:10]) ) def test_no_unhandled_exceptions(app_log_path, qa_issues): """No unhandled Python tracebacks in recent log entries.""" lines = _read_recent_lines(app_log_path) tb_lines = [l for l in lines if "Traceback" in l] if tb_lines: qa_issues.extend({"type": "traceback", "line": l} for l in tb_lines) assert len(tb_lines) == 0, ( f"Found {len(tb_lines)} Traceback(s) in recent {_SESSION_WINDOW_HOURS}h window:\n" + "\n".join(tb_lines[:10]) ) def test_no_import_failures(app_log_path, qa_issues): """No import failure messages in recent log entries.""" lines = _read_recent_lines(app_log_path) pattern = re.compile(r"import failed|Order.*failed", re.IGNORECASE) failures = [l for l in lines if pattern.search(l)] if failures: qa_issues.extend({"type": "import_failure", "line": l} for l in failures) assert len(failures) == 0, ( f"Found {len(failures)} import failure(s) in recent {_SESSION_WINDOW_HOURS}h window:\n" + "\n".join(failures[:10]) ) def test_warning_count_acceptable(app_log_path, qa_issues): """WARNING count in recent window is below acceptable threshold.""" lines = _read_recent_lines(app_log_path) warnings = [l for l in lines if "| WARNING |" in l] if len(warnings) >= _MAX_WARNINGS: qa_issues.append({ "type": "high_warning_count", "count": len(warnings), "threshold": _MAX_WARNINGS, }) assert len(warnings) < _MAX_WARNINGS, ( f"Warning count {len(warnings)} exceeds threshold {_MAX_WARNINGS} " f"in recent {_SESSION_WINDOW_HOURS}h window" )