diff --git a/.gitea/workflows/test.yaml b/.gitea/workflows/test.yaml new file mode 100644 index 0000000..d8e67bf --- /dev/null +++ b/.gitea/workflows/test.yaml @@ -0,0 +1,38 @@ +name: Tests + +on: + push: + branches-ignore: [main] + pull_request: + branches: [main] + +jobs: + fast-tests: + runs-on: [self-hosted] + steps: + - uses: actions/checkout@v4 + + - name: Run fast tests (unit + e2e) + run: ./test.sh ci + + full-tests: + runs-on: [self-hosted, oracle] + needs: fast-tests + if: github.event_name == 'pull_request' + steps: + - uses: actions/checkout@v4 + + - name: Run full tests (with Oracle) + run: ./test.sh full + env: + ORACLE_DSN: ${{ secrets.ORACLE_DSN }} + ORACLE_USER: ${{ secrets.ORACLE_USER }} + ORACLE_PASSWORD: ${{ secrets.ORACLE_PASSWORD }} + + - name: Upload QA reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: qa-reports + path: qa-reports/ + retention-days: 30 diff --git a/.githooks/pre-push b/.githooks/pre-push new file mode 100755 index 0000000..29ce14b --- /dev/null +++ b/.githooks/pre-push @@ -0,0 +1,9 @@ +#!/bin/bash +echo "🔍 Running pre-push tests..." +./test.sh ci +EXIT_CODE=$? +if [ $EXIT_CODE -ne 0 ]; then + echo "❌ Tests failed. Push aborted." + exit 1 +fi +echo "✅ Tests passed. Pushing..." diff --git a/.gitignore b/.gitignore index 7f705be..76ad42c 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,9 @@ api/api/ # Logs directory logs/ .gstack/ + +# QA Reports (generated by test suite) +qa-reports/ + +# Session handoff +.claude/HANDOFF.md diff --git a/CLAUDE.md b/CLAUDE.md index 19e9237..cbad077 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,12 +22,42 @@ Documentatie completa: [README.md](README.md) # INTOTDEAUNA via start.sh (seteaza Oracle env vars) ./start.sh # NU folosi uvicorn direct — lipsesc LD_LIBRARY_PATH si TNS_ADMIN - -# Tests -python api/test_app_basic.py # fara Oracle -python api/test_integration.py # cu Oracle ``` +## Testing & CI/CD + +```bash +# Teste rapide (unit + e2e, ~30s, fara Oracle) +./test.sh ci + +# Teste complete (totul inclusiv Oracle + sync real + PL/SQL, ~2-3 min) +./test.sh full + +# Smoke test pe productie (read-only, dupa deploy) +./test.sh smoke-prod --base-url http://79.119.86.134/gomag + +# Doar un layer specific +./test.sh unit # SQLite CRUD, imports, routes +./test.sh e2e # Browser tests (Playwright) +./test.sh oracle # Oracle integration +./test.sh sync # Sync real GoMag → Oracle +./test.sh qa # API health + responsive + log monitor +./test.sh logs # Doar log monitoring + +# Validate prerequisites +./test.sh --dry-run +``` + +**Flow zilnic:** +1. Lucrezi pe branch `fix/*` sau `feat/*` +2. `git push` → pre-push hook ruleaza `./test.sh ci` automat (~30s) +3. Inainte de PR → `./test.sh full` manual (~2-3 min) +4. Dupa deploy pe prod → `./test.sh smoke-prod --base-url http://79.119.86.134/gomag` + +**Output:** `qa-reports/` — health score, raport markdown, screenshots, baseline comparison. + +**Markers pytest:** `unit`, `oracle`, `e2e`, `qa`, `sync` + ## Reguli critice (nu le incalca) ### Flux import comenzi diff --git a/api/test_app_basic.py b/api/test_app_basic.py deleted file mode 100644 index 952b826..0000000 --- a/api/test_app_basic.py +++ /dev/null @@ -1,150 +0,0 @@ -""" -Test A: Basic App Import and Route Tests -========================================= -Tests module imports and all GET routes without requiring Oracle. -Run: python test_app_basic.py - -Expected results: -- All 17 module imports: PASS -- HTML routes (/ /missing-skus /mappings /sync): PASS (templates exist) -- /health: PASS (returns Oracle=error, sqlite=ok) -- /api/sync/status, /api/sync/history, /api/validate/missing-skus: PASS (SQLite-only) -- /api/mappings, /api/mappings/export-csv, /api/articles/search: FAIL (require Oracle pool) - These are KNOWN FAILURES when Oracle is unavailable - documented as bugs requiring guards. -""" - -import os -import sys -import tempfile - -# --- Set env vars BEFORE any app import --- -_tmpdir = tempfile.mkdtemp() -_sqlite_path = os.path.join(_tmpdir, "test_import.db") - -os.environ["FORCE_THIN_MODE"] = "true" -os.environ["SQLITE_DB_PATH"] = _sqlite_path -os.environ["ORACLE_DSN"] = "dummy" -os.environ["ORACLE_USER"] = "dummy" -os.environ["ORACLE_PASSWORD"] = "dummy" - -# Add api/ to path so we can import app -_api_dir = os.path.dirname(os.path.abspath(__file__)) -if _api_dir not in sys.path: - sys.path.insert(0, _api_dir) - -# ------------------------------------------------------- -# Section 1: Module Import Checks -# ------------------------------------------------------- - -MODULES = [ - "app.config", - "app.database", - "app.main", - "app.routers.health", - "app.routers.dashboard", - "app.routers.mappings", - "app.routers.sync", - "app.routers.validation", - "app.routers.articles", - "app.services.sqlite_service", - "app.services.scheduler_service", - "app.services.mapping_service", - "app.services.article_service", - "app.services.validation_service", - "app.services.import_service", - "app.services.sync_service", - "app.services.order_reader", -] - -passed = 0 -failed = 0 -results = [] - -print("\n=== Test A: GoMag Import Manager Basic Tests ===\n") -print("--- Section 1: Module Imports ---\n") - -for mod in MODULES: - try: - __import__(mod) - print(f" [PASS] import {mod}") - passed += 1 - results.append((f"import:{mod}", True, None, False)) - except Exception as e: - print(f" [FAIL] import {mod} -> {e}") - failed += 1 - results.append((f"import:{mod}", False, str(e), False)) - -# ------------------------------------------------------- -# Section 2: Route Tests via TestClient -# ------------------------------------------------------- - -print("\n--- Section 2: GET Route Tests ---\n") - -# Routes: (description, path, expected_ok_codes, known_oracle_failure) -# known_oracle_failure=True means the route needs Oracle pool and will 500 without it. -# These are flagged as bugs, not test infrastructure failures. -GET_ROUTES = [ - ("GET /health", "/health", [200], False), - ("GET / (dashboard HTML)", "/", [200, 500], False), - ("GET /missing-skus (HTML)", "/missing-skus", [200, 500], False), - ("GET /mappings (HTML)", "/mappings", [200, 500], False), - ("GET /sync (HTML)", "/sync", [200, 500], False), - ("GET /api/mappings", "/api/mappings", [200, 503], True), - ("GET /api/mappings/export-csv", "/api/mappings/export-csv", [200, 503], True), - ("GET /api/mappings/csv-template", "/api/mappings/csv-template", [200], False), - ("GET /api/sync/status", "/api/sync/status", [200], False), - ("GET /api/sync/history", "/api/sync/history", [200], False), - ("GET /api/sync/schedule", "/api/sync/schedule", [200], False), - ("GET /api/validate/missing-skus", "/api/validate/missing-skus", [200], False), - ("GET /api/validate/missing-skus?page=1", "/api/validate/missing-skus?page=1&per_page=10", [200], False), - ("GET /logs (HTML)", "/logs", [200, 500], False), - ("GET /api/sync/run/nonexistent/log", "/api/sync/run/nonexistent/log", [200, 404], False), - ("GET /api/articles/search?q=ab", "/api/articles/search?q=ab", [200, 503], True), -] - -try: - from fastapi.testclient import TestClient - from app.main import app - - # Use context manager so lifespan (startup/shutdown) runs properly. - # Without 'with', init_sqlite() never fires and SQLite-only routes return 500. - with TestClient(app, raise_server_exceptions=False) as client: - for name, path, expected, is_oracle_route in GET_ROUTES: - try: - resp = client.get(path) - if resp.status_code in expected: - print(f" [PASS] {name} -> HTTP {resp.status_code}") - passed += 1 - results.append((name, True, None, is_oracle_route)) - else: - body_snippet = resp.text[:300].replace("\n", " ") - print(f" [FAIL] {name} -> HTTP {resp.status_code} (expected {expected})") - print(f" Body: {body_snippet}") - failed += 1 - results.append((name, False, f"HTTP {resp.status_code}", is_oracle_route)) - except Exception as e: - print(f" [FAIL] {name} -> Exception: {e}") - failed += 1 - results.append((name, False, str(e), is_oracle_route)) - -except ImportError as e: - print(f" [FAIL] Cannot create TestClient: {e}") - print(" Make sure 'httpx' is installed: pip install httpx") - for name, path, _, _ in GET_ROUTES: - failed += 1 - results.append((name, False, "TestClient unavailable", False)) - -# ------------------------------------------------------- -# Summary -# ------------------------------------------------------- - -total = passed + failed -print(f"\n=== Summary: {passed}/{total} tests passed ===") - -if failed > 0: - print("\nFailed tests:") - for name, ok, err, _ in results: - if not ok: - print(f" - {name}: {err}") - -sys.exit(0 if failed == 0 else 1) diff --git a/api/test_integration.py b/api/test_integration.py deleted file mode 100644 index c6abf68..0000000 --- a/api/test_integration.py +++ /dev/null @@ -1,252 +0,0 @@ -""" -Oracle Integration Tests for GoMag Import Manager -================================================== -Requires Oracle connectivity and valid .env configuration. - -Usage: - cd /mnt/e/proiecte/vending/gomag - python api/test_integration.py - -Note: Run from the project root so that relative paths in .env resolve correctly. - The .env file is read from the api/ directory. -""" - -import os -import sys - -# Set working directory to project root so relative paths in .env work -_script_dir = os.path.dirname(os.path.abspath(__file__)) -_project_root = os.path.dirname(_script_dir) -os.chdir(_project_root) - -# Load .env from api/ before importing app modules -from dotenv import load_dotenv -_env_path = os.path.join(_script_dir, ".env") -load_dotenv(_env_path, override=True) - -# Add api/ to path so app package is importable -sys.path.insert(0, _script_dir) - -from fastapi.testclient import TestClient - -# Import the app (triggers lifespan on first TestClient use) -from app.main import app - -results = [] - - -def record(name: str, passed: bool, detail: str = ""): - status = "PASS" if passed else "FAIL" - msg = f"[{status}] {name}" - if detail: - msg += f" -- {detail}" - print(msg) - results.append(passed) - - -# --------------------------------------------------------------------------- -# Test A: GET /health — Oracle must show as connected -# --------------------------------------------------------------------------- -def test_health(client: TestClient): - test_name = "GET /health - Oracle connected" - try: - resp = client.get("/health") - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - oracle_status = body.get("oracle", "") - sqlite_status = body.get("sqlite", "") - assert oracle_status == "ok", f"oracle={oracle_status!r}" - assert sqlite_status == "ok", f"sqlite={sqlite_status!r}" - record(test_name, True, f"oracle={oracle_status}, sqlite={sqlite_status}") - except Exception as exc: - record(test_name, False, str(exc)) - - -# --------------------------------------------------------------------------- -# Test B: Mappings CRUD cycle -# POST create -> GET list (verify present) -> PUT update -> DELETE -> verify -# --------------------------------------------------------------------------- -def test_mappings_crud(client: TestClient): - test_sku = "TEST_INTEG_SKU_001" - test_codmat = "TEST_CODMAT_001" - - # -- CREATE -- - try: - resp = client.post("/api/mappings", json={ - "sku": test_sku, - "codmat": test_codmat, - "cantitate_roa": 2.5, - "procent_pret": 80.0 - }) - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - assert body.get("success") is True, f"create returned: {body}" - record("POST /api/mappings - create mapping", True, - f"sku={test_sku}, codmat={test_codmat}") - except Exception as exc: - record("POST /api/mappings - create mapping", False, str(exc)) - # Skip the rest of CRUD if creation failed - return - - # -- LIST (verify present) -- - try: - resp = client.get("/api/mappings", params={"search": test_sku}) - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - mappings = body.get("mappings", []) - found = any( - m["sku"] == test_sku and m["codmat"] == test_codmat - for m in mappings - ) - assert found, f"mapping not found in list; got {mappings}" - record("GET /api/mappings - mapping visible after create", True, - f"total={body.get('total')}") - except Exception as exc: - record("GET /api/mappings - mapping visible after create", False, str(exc)) - - # -- UPDATE -- - try: - resp = client.put(f"/api/mappings/{test_sku}/{test_codmat}", json={ - "cantitate_roa": 3.0, - "procent_pret": 90.0 - }) - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - assert body.get("success") is True, f"update returned: {body}" - record("PUT /api/mappings/{sku}/{codmat} - update mapping", True, - "cantitate_roa=3.0, procent_pret=90.0") - except Exception as exc: - record("PUT /api/mappings/{sku}/{codmat} - update mapping", False, str(exc)) - - # -- DELETE (soft: sets activ=0) -- - try: - resp = client.delete(f"/api/mappings/{test_sku}/{test_codmat}") - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - assert body.get("success") is True, f"delete returned: {body}" - record("DELETE /api/mappings/{sku}/{codmat} - soft delete", True) - except Exception as exc: - record("DELETE /api/mappings/{sku}/{codmat} - soft delete", False, str(exc)) - - # -- VERIFY: after soft-delete activ=0, listing without search filter should - # show it as activ=0 (it is still in DB). Search for it and confirm activ=0. -- - try: - resp = client.get("/api/mappings", params={"search": test_sku}) - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - mappings = body.get("mappings", []) - deleted = any( - m["sku"] == test_sku and m["codmat"] == test_codmat and m.get("activ") == 0 - for m in mappings - ) - assert deleted, ( - f"expected activ=0 for deleted mapping, got: " - f"{[m for m in mappings if m['sku'] == test_sku]}" - ) - record("GET /api/mappings - mapping has activ=0 after delete", True) - except Exception as exc: - record("GET /api/mappings - mapping has activ=0 after delete", False, str(exc)) - - -# --------------------------------------------------------------------------- -# Test C: GET /api/articles/search?q= — must return results -# --------------------------------------------------------------------------- -def test_articles_search(client: TestClient): - # Use a short generic term that should exist in most ROA databases - search_terms = ["01", "A", "PH"] - test_name = "GET /api/articles/search - returns results" - try: - found_results = False - last_body = {} - for term in search_terms: - resp = client.get("/api/articles/search", params={"q": term}) - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - last_body = body - results_list = body.get("results", []) - if results_list: - found_results = True - record(test_name, True, - f"q={term!r} returned {len(results_list)} results; " - f"first={results_list[0].get('codmat')!r}") - break - if not found_results: - # Search returned empty — not necessarily a failure if DB is empty, - # but we flag it as a warning. - record(test_name, False, - f"all search terms returned empty; last response: {last_body}") - except Exception as exc: - record(test_name, False, str(exc)) - - -# --------------------------------------------------------------------------- -# Test D: POST /api/validate/scan — triggers scan of JSON folder -# --------------------------------------------------------------------------- -def test_validate_scan(client: TestClient): - test_name = "POST /api/validate/scan - returns valid response" - try: - resp = client.post("/api/validate/scan") - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - # Must have at least these keys - for key in ("json_files", "orders", "skus"): - # "orders" may be "total_orders" if orders exist; "orders" key only - # present in the "No orders found" path. - pass - # Accept both shapes: no-orders path has "orders" key, full path has "total_orders" - has_shape = "json_files" in body and ("orders" in body or "total_orders" in body) - assert has_shape, f"unexpected response shape: {body}" - record(test_name, True, f"json_files={body.get('json_files')}, " - f"orders={body.get('total_orders', body.get('orders'))}") - except Exception as exc: - record(test_name, False, str(exc)) - - -# --------------------------------------------------------------------------- -# Test E: GET /api/sync/history — must return a list structure -# --------------------------------------------------------------------------- -def test_sync_history(client: TestClient): - test_name = "GET /api/sync/history - returns list structure" - try: - resp = client.get("/api/sync/history") - assert resp.status_code == 200, f"HTTP {resp.status_code}" - body = resp.json() - assert "runs" in body, f"missing 'runs' key; got keys: {list(body.keys())}" - assert isinstance(body["runs"], list), f"'runs' is not a list: {type(body['runs'])}" - assert "total" in body, f"missing 'total' key" - record(test_name, True, - f"total={body.get('total')}, page={body.get('page')}, pages={body.get('pages')}") - except Exception as exc: - record(test_name, False, str(exc)) - - -# --------------------------------------------------------------------------- -# Main runner -# --------------------------------------------------------------------------- -def main(): - print("=" * 60) - print("GoMag Import Manager - Oracle Integration Tests") - print(f"Env file: {_env_path}") - print(f"Oracle DSN: {os.environ.get('ORACLE_DSN', '(not set)')}") - print("=" * 60) - - with TestClient(app) as client: - test_health(client) - test_mappings_crud(client) - test_articles_search(client) - test_validate_scan(client) - test_sync_history(client) - - passed = sum(results) - total = len(results) - print("=" * 60) - print(f"Summary: {passed}/{total} tests passed") - if passed < total: - print("Some tests FAILED — review output above for details.") - sys.exit(1) - else: - print("All tests PASSED.") - - -if __name__ == "__main__": - main() diff --git a/api/tests/__init__.py b/api/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/e2e/conftest.py b/api/tests/e2e/conftest.py index b45a086..c73fc91 100644 --- a/api/tests/e2e/conftest.py +++ b/api/tests/e2e/conftest.py @@ -1,6 +1,7 @@ """ Playwright E2E test fixtures. Starts the FastAPI app on a random port with test SQLite, no Oracle. +Includes console error collector and screenshot capture. """ import os import sys @@ -9,6 +10,12 @@ import pytest import subprocess import time import socket +from pathlib import Path + + +# --- Screenshots directory --- +QA_REPORTS_DIR = Path(__file__).parents[3] / "qa-reports" +SCREENSHOTS_DIR = QA_REPORTS_DIR / "screenshots" def _free_port(): @@ -17,9 +24,33 @@ def _free_port(): return s.getsockname()[1] +def _app_is_running(url): + """Check if app is already running at the given URL.""" + try: + import urllib.request + urllib.request.urlopen(f"{url}/health", timeout=2) + return True + except Exception: + return False + + @pytest.fixture(scope="session") -def app_url(): - """Start the FastAPI app as a subprocess and return its URL.""" +def app_url(request): + """Use a running app if available (e.g. started by test.sh), otherwise start a subprocess. + + When --base-url is provided or app is already running on :5003, use the live app. + This allows E2E tests to run against the real Oracle-backed app in ./test.sh full. + """ + # Check if --base-url was provided via pytest-playwright + base_url = request.config.getoption("--base-url", default=None) + + # Try live app on :5003 first + live_url = base_url or "http://localhost:5003" + if _app_is_running(live_url): + yield live_url + return + + # No live app — start subprocess with dummy Oracle (structure-only tests) port = _free_port() tmpdir = tempfile.mkdtemp() sqlite_path = os.path.join(tmpdir, "e2e_test.db") @@ -80,3 +111,86 @@ def seed_test_data(app_url): for now E2E tests validate UI structure on empty-state pages. """ return app_url + + +# --------------------------------------------------------------------------- +# Console & Network Error Collectors +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def console_errors(): + """Session-scoped list collecting JS console errors across all tests.""" + return [] + + +@pytest.fixture(scope="session") +def network_errors(): + """Session-scoped list collecting HTTP 4xx/5xx responses across all tests.""" + return [] + + +@pytest.fixture(autouse=True) +def _attach_collectors(page, console_errors, network_errors, request): + """Auto-attach console and network listeners to every test's page.""" + test_errors = [] + test_network = [] + + def on_console(msg): + if msg.type == "error": + entry = {"test": request.node.name, "text": msg.text, "type": "console.error"} + console_errors.append(entry) + test_errors.append(entry) + + def on_pageerror(exc): + entry = {"test": request.node.name, "text": str(exc), "type": "pageerror"} + console_errors.append(entry) + test_errors.append(entry) + + def on_response(response): + if response.status >= 400: + entry = { + "test": request.node.name, + "url": response.url, + "status": response.status, + "type": "network_error", + } + network_errors.append(entry) + test_network.append(entry) + + page.on("console", on_console) + page.on("pageerror", on_pageerror) + page.on("response", on_response) + + yield + + # Remove listeners to avoid leaks + page.remove_listener("console", on_console) + page.remove_listener("pageerror", on_pageerror) + page.remove_listener("response", on_response) + + +# --------------------------------------------------------------------------- +# Screenshot on failure +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def _screenshot_on_failure(page, request): + """Take a screenshot when a test fails.""" + yield + + if request.node.rep_call and request.node.rep_call.failed: + SCREENSHOTS_DIR.mkdir(parents=True, exist_ok=True) + name = request.node.name.replace("/", "_").replace("::", "_") + path = SCREENSHOTS_DIR / f"FAIL-{name}.png" + try: + page.screenshot(path=str(path)) + except Exception: + pass # page may be closed + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + """Store test result on the item for _screenshot_on_failure.""" + outcome = yield + rep = outcome.get_result() + setattr(item, f"rep_{rep.when}", rep) diff --git a/api/tests/e2e/test_dashboard_live.py b/api/tests/e2e/test_dashboard_live.py index ed0ad91..64790cd 100644 --- a/api/tests/e2e/test_dashboard_live.py +++ b/api/tests/e2e/test_dashboard_live.py @@ -1,6 +1,8 @@ """ E2E verification: Dashboard page against the live app (localhost:5003). +pytestmark: e2e + Run with: python -m pytest api/tests/e2e/test_dashboard_live.py -v --headed @@ -9,6 +11,8 @@ This tests the LIVE app, not a test instance. Requires the app to be running. import pytest from playwright.sync_api import sync_playwright, Page, expect +pytestmark = pytest.mark.e2e + BASE_URL = "http://localhost:5003" diff --git a/api/tests/e2e/test_logs_filtering.py b/api/tests/e2e/test_logs_filtering.py index 37d3fa5..6fb6e36 100644 --- a/api/tests/e2e/test_logs_filtering.py +++ b/api/tests/e2e/test_logs_filtering.py @@ -2,6 +2,8 @@ import pytest from playwright.sync_api import Page, expect +pytestmark = pytest.mark.e2e + @pytest.fixture(autouse=True) def navigate_to_logs(page: Page, app_url: str): diff --git a/api/tests/e2e/test_mappings.py b/api/tests/e2e/test_mappings.py index 68fda61..74ea00c 100644 --- a/api/tests/e2e/test_mappings.py +++ b/api/tests/e2e/test_mappings.py @@ -2,6 +2,8 @@ import pytest from playwright.sync_api import Page, expect +pytestmark = pytest.mark.e2e + @pytest.fixture(autouse=True) def navigate_to_mappings(page: Page, app_url: str): diff --git a/api/tests/e2e/test_missing_skus.py b/api/tests/e2e/test_missing_skus.py index 309dc79..3c79e83 100644 --- a/api/tests/e2e/test_missing_skus.py +++ b/api/tests/e2e/test_missing_skus.py @@ -2,6 +2,8 @@ import pytest from playwright.sync_api import Page, expect +pytestmark = pytest.mark.e2e + @pytest.fixture(autouse=True) def navigate_to_missing(page: Page, app_url: str): diff --git a/api/tests/e2e/test_order_detail.py b/api/tests/e2e/test_order_detail.py index 11093d5..2bd980b 100644 --- a/api/tests/e2e/test_order_detail.py +++ b/api/tests/e2e/test_order_detail.py @@ -2,6 +2,8 @@ import pytest from playwright.sync_api import Page, expect +pytestmark = pytest.mark.e2e + def test_order_detail_modal_has_roa_ids(page: Page, app_url: str): """R9: Verify order detail modal contains all ROA ID labels.""" @@ -26,7 +28,8 @@ def test_order_detail_items_table_columns(page: Page, app_url: str): headers = page.locator("#orderDetailModal thead th") texts = headers.all_text_contents() - required_columns = ["SKU", "Produs", "Cant.", "Pret", "TVA", "CODMAT", "Status", "Actiune"] + # Current columns (may evolve — check dashboard.html for source of truth) + required_columns = ["SKU", "Produs", "CODMAT", "Cant.", "Pret", "Valoare"] for col in required_columns: assert col in texts, f"Column '{col}' missing from order detail items table. Found: {texts}" diff --git a/api/tests/qa/__init__.py b/api/tests/qa/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/qa/conftest.py b/api/tests/qa/conftest.py new file mode 100644 index 0000000..25a9806 --- /dev/null +++ b/api/tests/qa/conftest.py @@ -0,0 +1,100 @@ +""" +QA test fixtures — shared across api_health, responsive, smoke_prod, logs_monitor, +sync_real, plsql tests. +""" +import os +import sys +from pathlib import Path + +import pytest + +# Add api/ to path +_api_dir = str(Path(__file__).parents[2]) +if _api_dir not in sys.path: + sys.path.insert(0, _api_dir) + +# Directories +PROJECT_ROOT = Path(__file__).parents[3] +QA_REPORTS_DIR = PROJECT_ROOT / "qa-reports" +SCREENSHOTS_DIR = QA_REPORTS_DIR / "screenshots" +LOGS_DIR = PROJECT_ROOT / "logs" + + +def pytest_addoption(parser): + # --base-url is already provided by pytest-playwright; we reuse it + # Use try/except to avoid conflicts when conftest is loaded alongside other plugins + try: + parser.addoption("--env", default="test", choices=["test", "prod"], help="QA environment") + except ValueError: + pass + try: + parser.addoption("--qa-log-file", default=None, help="Specific log file to check") + except (ValueError, Exception): + pass + + +@pytest.fixture(scope="session") +def base_url(request): + """Reuse pytest-playwright's --base-url or default to localhost:5003.""" + url = request.config.getoption("--base-url") or "http://localhost:5003" + return url.rstrip("/") + + +@pytest.fixture(scope="session") +def env_name(request): + return request.config.getoption("--env") + + +@pytest.fixture(scope="session") +def qa_issues(): + """Collect issues across all QA tests for the final report.""" + return [] + + +@pytest.fixture(scope="session") +def screenshots_dir(): + SCREENSHOTS_DIR.mkdir(parents=True, exist_ok=True) + return SCREENSHOTS_DIR + + +@pytest.fixture(scope="session") +def app_log_path(request): + """Return the most recent log file from logs/.""" + custom = request.config.getoption("--qa-log-file", default=None) + if custom: + return Path(custom) + + if not LOGS_DIR.exists(): + return None + + logs = sorted(LOGS_DIR.glob("sync_comenzi_*.log"), key=lambda p: p.stat().st_mtime, reverse=True) + return logs[0] if logs else None + + +@pytest.fixture(scope="session") +def oracle_connection(): + """Create a direct Oracle connection for PL/SQL and sync tests.""" + from dotenv import load_dotenv + env_path = Path(__file__).parents[2] / ".env" + load_dotenv(str(env_path), override=True) + + user = os.environ.get("ORACLE_USER", "") + password = os.environ.get("ORACLE_PASSWORD", "") + dsn = os.environ.get("ORACLE_DSN", "") + + if not all([user, password, dsn]) or user == "dummy": + pytest.skip("Oracle not configured (ORACLE_USER/PASSWORD/DSN missing or dummy)") + + import oracledb + conn = oracledb.connect(user=user, password=password, dsn=dsn) + yield conn + conn.close() + + +def pytest_sessionfinish(session, exitstatus): + """Generate QA report at end of session.""" + try: + from . import qa_report + qa_report.generate(session, QA_REPORTS_DIR) + except Exception as e: + print(f"\n[qa_report] Failed to generate report: {e}") diff --git a/api/tests/qa/qa_report.py b/api/tests/qa/qa_report.py new file mode 100644 index 0000000..22e227c --- /dev/null +++ b/api/tests/qa/qa_report.py @@ -0,0 +1,245 @@ +""" +QA Report Generator — called by conftest.py's pytest_sessionfinish hook. +""" +import json +import os +import smtplib +from datetime import date +from email.mime.text import MIMEText +from pathlib import Path + + +CATEGORIES = { + "Console": {"weight": 0.10, "patterns": ["e2e/"]}, + "Navigation": {"weight": 0.10, "patterns": ["test_page_load", "test_", "_loads"]}, + "Functional": {"weight": 0.15, "patterns": ["e2e/"]}, + "API": {"weight": 0.15, "patterns": ["test_qa_api", "test_api_"]}, + "Responsive": {"weight": 0.10, "patterns": ["test_qa_responsive", "responsive"]}, + "Performance":{"weight": 0.10, "patterns": ["response_time"]}, + "Logs": {"weight": 0.15, "patterns": ["test_qa_logs", "log_monitor"]}, + "Sync/Oracle":{"weight": 0.15, "patterns": ["sync", "plsql", "oracle"]}, +} + + +def _match_category(nodeid: str, name: str, category: str, patterns: list) -> bool: + """Check if a test belongs to a category based on patterns.""" + nodeid_lower = nodeid.lower() + name_lower = name.lower() + + if category == "Console": + return "e2e/" in nodeid_lower + elif category == "Functional": + return "e2e/" in nodeid_lower + elif category == "Navigation": + return "test_page_load" in name_lower or name_lower.endswith("_loads") + else: + for p in patterns: + if p in nodeid_lower or p in name_lower: + return True + return False + + +def _collect_results(session): + """Return list of (nodeid, name, passed, failed, error_msg) for each test.""" + results = [] + for item in session.items: + nodeid = item.nodeid + name = item.name + passed = False + failed = False + error_msg = "" + rep = getattr(item, "rep_call", None) + if rep is None: + # try stash + try: + rep = item.stash.get(item.config._store, None) + except Exception: + pass + if rep is not None: + passed = getattr(rep, "passed", False) + failed = getattr(rep, "failed", False) + if failed: + try: + error_msg = str(rep.longrepr).split("\n")[-1][:200] + except Exception: + error_msg = "unknown error" + results.append((nodeid, name, passed, failed, error_msg)) + return results + + +def _categorize(results): + """Group tests into categories and compute per-category stats.""" + cat_stats = {} + for cat, cfg in CATEGORIES.items(): + cat_stats[cat] = { + "weight": cfg["weight"], + "passed": 0, + "total": 0, + "score": 100.0, + } + + for r in results: + nodeid, name, passed = r[0], r[1], r[2] + for cat, cfg in CATEGORIES.items(): + if _match_category(nodeid, name, cat, cfg["patterns"]): + cat_stats[cat]["total"] += 1 + if passed: + cat_stats[cat]["passed"] += 1 + + for cat, stats in cat_stats.items(): + if stats["total"] > 0: + stats["score"] = (stats["passed"] / stats["total"]) * 100.0 + + return cat_stats + + +def _compute_health(cat_stats) -> float: + total = sum( + (s["score"] / 100.0) * s["weight"] for s in cat_stats.values() + ) + return round(total * 100, 1) + + +def _load_baseline(reports_dir: Path): + baseline_path = reports_dir / "baseline.json" + if not baseline_path.exists(): + return None + try: + with open(baseline_path) as f: + data = json.load(f) + # validate minimal keys + _ = data["health_score"], data["date"] + return data + except Exception: + baseline_path.unlink(missing_ok=True) + return None + + +def _save_baseline(reports_dir: Path, health_score, passed, failed, cat_stats): + baseline_path = reports_dir / "baseline.json" + try: + data = { + "health_score": health_score, + "date": str(date.today()), + "passed": passed, + "failed": failed, + "categories": { + cat: {"score": s["score"], "passed": s["passed"], "total": s["total"]} + for cat, s in cat_stats.items() + }, + } + with open(baseline_path, "w") as f: + json.dump(data, f, indent=2) + except Exception: + pass + + +def _delta_str(health_score, baseline) -> str: + if baseline is None: + return "" + prev = baseline.get("health_score", health_score) + diff = round(health_score - prev, 1) + sign = "+" if diff >= 0 else "" + return f" (baseline: {prev}, {sign}{diff})" + + +def _build_markdown(health_score, delta, cat_stats, failed_tests, today_str) -> str: + lines = [ + f"# QA Report — {today_str}", + "", + f"## Health Score: {health_score}/100{delta}", + "", + "| Category | Score | Weight | Tests |", + "|----------|-------|--------|-------|", + ] + + for cat, s in cat_stats.items(): + score_pct = f"{s['score']:.0f}%" + weight_pct = f"{int(s['weight'] * 100)}%" + tests_str = f"{s['passed']}/{s['total']} passed" if s["total"] > 0 else "no tests" + lines.append(f"| {cat} | {score_pct} | {weight_pct} | {tests_str} |") + + lines += ["", "## Failed Tests"] + if failed_tests: + for name, msg in failed_tests: + lines.append(f"- `{name}`: {msg}") + else: + lines.append("_No failed tests._") + + lines += ["", "## Warnings"] + if health_score < 70: + lines.append("- Health score below 70 — review failures before deploy.") + + return "\n".join(lines) + "\n" + + +def _send_email(health_score, report_path): + smtp_host = os.environ.get("SMTP_HOST") + if not smtp_host: + return + try: + smtp_port = int(os.environ.get("SMTP_PORT", 587)) + smtp_user = os.environ.get("SMTP_USER", "") + smtp_pass = os.environ.get("SMTP_PASSWORD", "") + smtp_to = os.environ.get("SMTP_TO", smtp_user) + + subject = f"QA Alert: Health Score {health_score}/100" + body = f"Health score dropped to {health_score}/100.\nReport: {report_path}" + + msg = MIMEText(body) + msg["Subject"] = subject + msg["From"] = smtp_user + msg["To"] = smtp_to + + with smtplib.SMTP(smtp_host, smtp_port) as server: + server.ehlo() + server.starttls() + if smtp_user: + server.login(smtp_user, smtp_pass) + server.sendmail(smtp_user, [smtp_to], msg.as_string()) + except Exception: + pass + + +def generate(session, reports_dir: Path): + """Generate QA health report. Called from conftest.py pytest_sessionfinish.""" + try: + reports_dir = Path(reports_dir) + reports_dir.mkdir(parents=True, exist_ok=True) + + results = _collect_results(session) + + passed_count = sum(1 for r in results if r[2]) + failed_count = sum(1 for r in results if r[3]) + failed_tests = [(r[1], r[4]) for r in results if r[3]] + + cat_stats = _categorize(results) + health_score = _compute_health(cat_stats) + + baseline = _load_baseline(reports_dir) + delta = _delta_str(health_score, baseline) + + today_str = str(date.today()) + report_filename = f"qa-report-{today_str}.md" + report_path = reports_dir / report_filename + + md = _build_markdown(health_score, delta, cat_stats, failed_tests, today_str) + + try: + with open(report_path, "w") as f: + f.write(md) + except Exception: + pass + + _save_baseline(reports_dir, health_score, passed_count, failed_count, cat_stats) + + if health_score < 70: + _send_email(health_score, report_path) + + print(f"\n{'═' * 50}") + print(f" QA HEALTH SCORE: {health_score}/100{delta}") + print(f" Report: {report_path}") + print(f"{'═' * 50}\n") + + except Exception: + pass diff --git a/api/tests/qa/test_qa_api_health.py b/api/tests/qa/test_qa_api_health.py new file mode 100644 index 0000000..9267317 --- /dev/null +++ b/api/tests/qa/test_qa_api_health.py @@ -0,0 +1,87 @@ +"""QA tests for API endpoint health and basic contract validation.""" +import time +import urllib.request +import pytest +import httpx + +pytestmark = pytest.mark.qa + +ENDPOINTS = [ + "/health", + "/api/dashboard/orders", + "/api/sync/status", + "/api/sync/history", + "/api/validate/missing-skus", + "/api/mappings", + "/api/settings", +] + + +@pytest.fixture(scope="session") +def client(base_url): + """Create httpx client; skip all if app is not reachable.""" + try: + urllib.request.urlopen(f"{base_url}/health", timeout=3) + except Exception: + pytest.skip(f"App not reachable at {base_url}") + with httpx.Client(base_url=base_url, timeout=10.0) as c: + yield c + + +def test_health(client): + r = client.get("/health") + assert r.status_code == 200 + data = r.json() + assert "oracle" in data + assert "sqlite" in data + + +def test_dashboard_orders(client): + r = client.get("/api/dashboard/orders") + assert r.status_code == 200 + data = r.json() + assert "orders" in data + assert "counts" in data + + +def test_sync_status(client): + r = client.get("/api/sync/status") + assert r.status_code == 200 + data = r.json() + assert "status" in data + + +def test_sync_history(client): + r = client.get("/api/sync/history") + assert r.status_code == 200 + data = r.json() + assert "runs" in data + assert isinstance(data["runs"], list) + + +def test_missing_skus(client): + r = client.get("/api/validate/missing-skus") + assert r.status_code == 200 + data = r.json() + assert "missing_skus" in data + + +def test_mappings(client): + r = client.get("/api/mappings") + assert r.status_code == 200 + data = r.json() + assert "mappings" in data + + +def test_settings(client): + r = client.get("/api/settings") + assert r.status_code == 200 + assert isinstance(r.json(), dict) + + +@pytest.mark.parametrize("endpoint", ENDPOINTS) +def test_response_time(client, endpoint): + start = time.monotonic() + client.get(endpoint) + elapsed = time.monotonic() - start + assert elapsed < 5.0, f"{endpoint} took {elapsed:.2f}s (limit: 5s)" diff --git a/api/tests/qa/test_qa_logs_monitor.py b/api/tests/qa/test_qa_logs_monitor.py new file mode 100644 index 0000000..9a7d226 --- /dev/null +++ b/api/tests/qa/test_qa_logs_monitor.py @@ -0,0 +1,93 @@ +""" +Log monitoring tests — parse app log files for errors and anomalies. +Run with: pytest api/tests/qa/test_qa_logs_monitor.py +""" +import re + +import pytest + +pytestmark = pytest.mark.qa + +# Log line format: 2026-03-23 07:57:12,691 | INFO | app.main | message +_MAX_WARNINGS = 50 + + +def _read_lines(app_log_path): + """Read log file lines, skipping gracefully if file is missing.""" + if app_log_path is None or not app_log_path.exists(): + pytest.skip("No log file available") + return app_log_path.read_text(encoding="utf-8", errors="replace").splitlines() + + +# --------------------------------------------------------------------------- + +def test_log_file_exists(app_log_path): + """Log file path resolves to an existing file.""" + if app_log_path is None: + pytest.skip("No log file configured") + assert app_log_path.exists(), f"Log file not found: {app_log_path}" + + +def test_no_critical_errors(app_log_path, qa_issues): + """No ERROR-level lines in the log.""" + lines = _read_lines(app_log_path) + errors = [l for l in lines if "| ERROR |" in l] + if errors: + qa_issues.extend({"type": "log_error", "line": l} for l in errors) + assert len(errors) == 0, ( + f"Found {len(errors)} ERROR line(s) in {app_log_path.name}:\n" + + "\n".join(errors[:10]) + ) + + +def test_no_oracle_errors(app_log_path, qa_issues): + """No Oracle ORA- error codes in the log.""" + lines = _read_lines(app_log_path) + ora_errors = [l for l in lines if "ORA-" in l] + if ora_errors: + qa_issues.extend({"type": "oracle_error", "line": l} for l in ora_errors) + assert len(ora_errors) == 0, ( + f"Found {len(ora_errors)} ORA- error(s) in {app_log_path.name}:\n" + + "\n".join(ora_errors[:10]) + ) + + +def test_no_unhandled_exceptions(app_log_path, qa_issues): + """No unhandled Python tracebacks in the log.""" + lines = _read_lines(app_log_path) + tb_lines = [l for l in lines if "Traceback" in l] + if tb_lines: + qa_issues.extend({"type": "traceback", "line": l} for l in tb_lines) + assert len(tb_lines) == 0, ( + f"Found {len(tb_lines)} Traceback(s) in {app_log_path.name}:\n" + + "\n".join(tb_lines[:10]) + ) + + +def test_no_import_failures(app_log_path, qa_issues): + """No import failure messages in the log.""" + lines = _read_lines(app_log_path) + pattern = re.compile(r"import failed|Order.*failed", re.IGNORECASE) + failures = [l for l in lines if pattern.search(l)] + if failures: + qa_issues.extend({"type": "import_failure", "line": l} for l in failures) + assert len(failures) == 0, ( + f"Found {len(failures)} import failure(s) in {app_log_path.name}:\n" + + "\n".join(failures[:10]) + ) + + +def test_warning_count_acceptable(app_log_path, qa_issues): + """WARNING count is below acceptable threshold.""" + lines = _read_lines(app_log_path) + warnings = [l for l in lines if "| WARNING |" in l] + if len(warnings) >= _MAX_WARNINGS: + qa_issues.append({ + "type": "high_warning_count", + "count": len(warnings), + "threshold": _MAX_WARNINGS, + }) + assert len(warnings) < _MAX_WARNINGS, ( + f"Warning count {len(warnings)} exceeds threshold {_MAX_WARNINGS} " + f"in {app_log_path.name}" + ) diff --git a/api/tests/qa/test_qa_plsql.py b/api/tests/qa/test_qa_plsql.py new file mode 100644 index 0000000..d9c4daf --- /dev/null +++ b/api/tests/qa/test_qa_plsql.py @@ -0,0 +1,200 @@ +""" +PL/SQL package tests using direct Oracle connection. + +Verifies that key Oracle packages are VALID and that order import +procedures work end-to-end with cleanup. +""" +import json +import time +import logging +import pytest + +pytestmark = pytest.mark.oracle + +logger = logging.getLogger(__name__) + +PACKAGES_TO_CHECK = [ + "PACK_IMPORT_COMENZI", + "PACK_IMPORT_PARTENERI", + "PACK_COMENZI", + "PACK_FACTURARE", +] + +_STATUS_SQL = """ + SELECT status + FROM user_objects + WHERE object_name = :name + AND object_type = 'PACKAGE BODY' +""" + + +# --------------------------------------------------------------------------- +# Module-scoped fixture for sharing test order ID between tests +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="module") +def test_order_id(oracle_connection): + """ + Create a test order via PACK_IMPORT_COMENZI.importa_comanda and yield + its ID. Cleans up (DELETE) after all module tests finish. + """ + import oracledb + + conn = oracle_connection + order_id = None + + # Find a minimal valid partner ID + with conn.cursor() as cur: + cur.execute( + "SELECT MIN(id_partener) FROM parteneri WHERE id_partener > 0" + ) + row = cur.fetchone() + if not row or row[0] is None: + pytest.skip("No partners found in Oracle — cannot create test order") + partner_id = int(row[0]) + + # Build minimal JSON articles — use a SKU known from NOM_ARTICOLE if possible + with conn.cursor() as cur: + cur.execute( + "SELECT codmat FROM nom_articole WHERE rownum = 1" + ) + row = cur.fetchone() + test_sku = row[0] if row else "CAFE100" + + nr_comanda_ext = f"PYTEST-{int(time.time())}" + articles = json.dumps([{ + "sku": test_sku, + "cantitate": 1, + "pret": 50.0, + "denumire": "Test article (pytest)", + "tva": 19, + "discount": 0, + }]) + + try: + with conn.cursor() as cur: + clob_var = cur.var(oracledb.DB_TYPE_CLOB) + clob_var.setvalue(0, articles) + id_comanda_var = cur.var(oracledb.DB_TYPE_NUMBER) + + cur.callproc("PACK_IMPORT_COMENZI.importa_comanda", [ + nr_comanda_ext, # p_nr_comanda_ext + None, # p_data_comanda (NULL = SYSDATE in pkg) + partner_id, # p_id_partener + clob_var, # p_json_articole + None, # p_id_adresa_livrare + None, # p_id_adresa_facturare + None, # p_id_pol + None, # p_id_sectie + None, # p_id_gestiune + None, # p_kit_mode + None, # p_id_pol_productie + None, # p_kit_discount_codmat + None, # p_kit_discount_id_pol + id_comanda_var, # v_id_comanda (OUT) + ]) + + raw = id_comanda_var.getvalue() + order_id = int(raw) if raw is not None else None + + if order_id and order_id > 0: + conn.commit() + logger.info(f"Test order created: ID={order_id}, NR={nr_comanda_ext}") + else: + conn.rollback() + order_id = None + + except Exception as exc: + try: + conn.rollback() + except Exception: + pass + logger.warning(f"Could not create test order: {exc}") + order_id = None + + yield order_id + + # Cleanup — runs even if tests fail + if order_id: + try: + with conn.cursor() as cur: + cur.execute( + "DELETE FROM comenzi_articole WHERE id_comanda = :id", + {"id": order_id} + ) + cur.execute( + "DELETE FROM com_antet WHERE id_comanda = :id", + {"id": order_id} + ) + conn.commit() + logger.info(f"Test order {order_id} cleaned up") + except Exception as exc: + logger.error(f"Cleanup failed for order {order_id}: {exc}") + + +# --------------------------------------------------------------------------- +# Package validity tests +# --------------------------------------------------------------------------- + +def test_pack_import_comenzi_valid(oracle_connection): + """PACK_IMPORT_COMENZI package body must be VALID.""" + with oracle_connection.cursor() as cur: + cur.execute(_STATUS_SQL, {"name": "PACK_IMPORT_COMENZI"}) + row = cur.fetchone() + assert row is not None, "PACK_IMPORT_COMENZI package body not found in user_objects" + assert row[0] == "VALID", f"PACK_IMPORT_COMENZI is {row[0]}" + + +def test_pack_import_parteneri_valid(oracle_connection): + """PACK_IMPORT_PARTENERI package body must be VALID.""" + with oracle_connection.cursor() as cur: + cur.execute(_STATUS_SQL, {"name": "PACK_IMPORT_PARTENERI"}) + row = cur.fetchone() + assert row is not None, "PACK_IMPORT_PARTENERI package body not found in user_objects" + assert row[0] == "VALID", f"PACK_IMPORT_PARTENERI is {row[0]}" + + +def test_pack_comenzi_valid(oracle_connection): + """PACK_COMENZI package body must be VALID.""" + with oracle_connection.cursor() as cur: + cur.execute(_STATUS_SQL, {"name": "PACK_COMENZI"}) + row = cur.fetchone() + assert row is not None, "PACK_COMENZI package body not found in user_objects" + assert row[0] == "VALID", f"PACK_COMENZI is {row[0]}" + + +def test_pack_facturare_valid(oracle_connection): + """PACK_FACTURARE package body must be VALID.""" + with oracle_connection.cursor() as cur: + cur.execute(_STATUS_SQL, {"name": "PACK_FACTURARE"}) + row = cur.fetchone() + assert row is not None, "PACK_FACTURARE package body not found in user_objects" + assert row[0] == "VALID", f"PACK_FACTURARE is {row[0]}" + + +# --------------------------------------------------------------------------- +# Order import tests +# --------------------------------------------------------------------------- + +def test_import_order_with_articles(test_order_id): + """PACK_IMPORT_COMENZI.importa_comanda must return a valid order ID > 0.""" + if test_order_id is None: + pytest.skip("Test order creation failed — see test_order_id fixture logs") + assert test_order_id > 0, f"importa_comanda returned invalid ID: {test_order_id}" + + +def test_cleanup_test_order(oracle_connection, test_order_id): + """Verify the test order rows exist and can be queried (cleanup runs via fixture).""" + if test_order_id is None: + pytest.skip("No test order to verify") + + with oracle_connection.cursor() as cur: + cur.execute( + "SELECT COUNT(*) FROM com_antet WHERE id_comanda = :id", + {"id": test_order_id} + ) + row = cur.fetchone() + + # At this point the order should still exist (fixture cleanup runs after module) + assert row is not None + assert row[0] >= 0 # may be 0 if already cleaned, just confirm query works diff --git a/api/tests/qa/test_qa_responsive.py b/api/tests/qa/test_qa_responsive.py new file mode 100644 index 0000000..c637241 --- /dev/null +++ b/api/tests/qa/test_qa_responsive.py @@ -0,0 +1,145 @@ +""" +Responsive layout tests across 3 viewports. +Tests each page on desktop / tablet / mobile using Playwright sync API. +""" +import pytest +from pathlib import Path +from playwright.sync_api import sync_playwright, expect + +pytestmark = pytest.mark.qa + +# --------------------------------------------------------------------------- +# Viewport definitions +# --------------------------------------------------------------------------- + +VIEWPORTS = { + "desktop": {"width": 1280, "height": 900}, + "tablet": {"width": 768, "height": 1024}, + "mobile": {"width": 375, "height": 812}, +} + +# --------------------------------------------------------------------------- +# Pages to test: (path, expected_text_fragment) +# expected_text_fragment is matched loosely against page title or any

/

+# --------------------------------------------------------------------------- + +PAGES = [ + ("/", "Panou"), + ("/logs", "Jurnale"), + ("/mappings", "Mapari"), + ("/missing-skus", "SKU"), + ("/settings", "Setari"), +] + + +# --------------------------------------------------------------------------- +# Session-scoped browser (reused across all parametrized tests) +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def pw_browser(): + """Launch a Chromium browser for the full QA session.""" + with sync_playwright() as pw: + browser = pw.chromium.launch(headless=True) + yield browser + browser.close() + + +# --------------------------------------------------------------------------- +# Parametrized test: viewport x page +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("viewport_name", list(VIEWPORTS.keys())) +@pytest.mark.parametrize("page_path,expected_text", PAGES) +def test_responsive_page( + pw_browser, + base_url: str, + screenshots_dir: Path, + viewport_name: str, + page_path: str, + expected_text: str, +): + """Each page renders without error on every viewport and contains expected text.""" + viewport = VIEWPORTS[viewport_name] + context = pw_browser.new_context(viewport=viewport) + page = context.new_page() + + try: + page.goto(f"{base_url}{page_path}", wait_until="networkidle", timeout=15_000) + + # Screenshot + page_name = page_path.strip("/") or "dashboard" + screenshot_path = screenshots_dir / f"{page_name}-{viewport_name}.png" + page.screenshot(path=str(screenshot_path), full_page=True) + + # Basic content check: title or any h1/h4 contains expected text + title = page.title() + headings = page.locator("h1, h4").all_text_contents() + all_text = " ".join([title] + headings) + assert expected_text.lower() in all_text.lower(), ( + f"Expected '{expected_text}' in page text on {viewport_name} {page_path}. " + f"Got title='{title}', headings={headings}" + ) + finally: + context.close() + + +# --------------------------------------------------------------------------- +# Mobile-specific: navbar toggler +# --------------------------------------------------------------------------- + +def test_mobile_navbar_visible(pw_browser, base_url: str): + """Mobile viewport: navbar should still be visible and functional.""" + context = pw_browser.new_context(viewport=VIEWPORTS["mobile"]) + page = context.new_page() + try: + page.goto(base_url, wait_until="networkidle", timeout=15_000) + # Custom navbar: .top-navbar with .navbar-brand + navbar = page.locator(".top-navbar") + expect(navbar).to_be_visible() + finally: + context.close() + + +# --------------------------------------------------------------------------- +# Mobile-specific: tables wrapped in .table-responsive or scrollable +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("page_path", ["/logs", "/mappings", "/missing-skus"]) +def test_mobile_table_responsive(pw_browser, base_url: str, page_path: str): + """ + On mobile, any should live inside a .table-responsive wrapper + OR the page should have a horizontal scroll container around it. + If no table is present (empty state), the test is skipped. + """ + context = pw_browser.new_context(viewport=VIEWPORTS["mobile"]) + page = context.new_page() + try: + page.goto(f"{base_url}{page_path}", wait_until="networkidle", timeout=15_000) + + tables = page.locator("table").all() + if not tables: + pytest.skip(f"No tables on {page_path} (empty state)") + + # Check each table has an ancestor with overflow-x scroll or .table-responsive class + for table in tables: + # Check direct parent chain for .table-responsive + wrapped = page.evaluate( + """(el) => { + let node = el.parentElement; + for (let i = 0; i < 6 && node; i++) { + if (node.classList.contains('table-responsive')) return true; + const style = window.getComputedStyle(node); + if (style.overflowX === 'auto' || style.overflowX === 'scroll') return true; + node = node.parentElement; + } + return false; + }""", + table.element_handle(), + ) + assert wrapped, ( + f"Table on {page_path} is not inside a .table-responsive wrapper " + f"or overflow-x:auto/scroll container on mobile viewport" + ) + finally: + context.close() diff --git a/api/tests/qa/test_qa_smoke_prod.py b/api/tests/qa/test_qa_smoke_prod.py new file mode 100644 index 0000000..bbcef97 --- /dev/null +++ b/api/tests/qa/test_qa_smoke_prod.py @@ -0,0 +1,142 @@ +""" +Smoke tests for production — read-only, no clicks. +Run against a live app: pytest api/tests/qa/test_qa_smoke_prod.py --base-url http://localhost:5003 +""" +import time +import urllib.request +import json + +import pytest +from playwright.sync_api import sync_playwright + +pytestmark = pytest.mark.smoke + +PAGES = ["/", "/logs", "/mappings", "/missing-skus", "/settings"] + + +def _app_is_reachable(base_url: str) -> bool: + """Quick check if the app is reachable.""" + try: + urllib.request.urlopen(f"{base_url}/health", timeout=3) + return True + except Exception: + return False + + +@pytest.fixture(scope="module", autouse=True) +def _require_app(base_url): + """Skip all smoke tests if the app is not running.""" + if not _app_is_reachable(base_url): + pytest.skip(f"App not reachable at {base_url} — start the app first") + +PAGE_TITLES = { + "/": "Panou de Comanda", + "/logs": "Jurnale Import", + "/mappings": "Mapari SKU", + "/missing-skus": "SKU-uri Lipsa", + "/settings": "Setari", +} + + +@pytest.fixture(scope="module") +def browser(): + with sync_playwright() as p: + b = p.chromium.launch(headless=True) + yield b + b.close() + + +# --------------------------------------------------------------------------- +# test_page_loads +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("path", PAGES) +def test_page_loads(browser, base_url, screenshots_dir, path): + """Each page returns HTTP 200 and loads without crashing.""" + page = browser.new_page() + try: + response = page.goto(f"{base_url}{path}", wait_until="domcontentloaded", timeout=15_000) + assert response is not None, f"No response for {path}" + assert response.status == 200, f"Expected 200, got {response.status} for {path}" + + safe_name = path.strip("/").replace("/", "_") or "dashboard" + screenshot_path = screenshots_dir / f"smoke_{safe_name}.png" + page.screenshot(path=str(screenshot_path)) + finally: + page.close() + + +# --------------------------------------------------------------------------- +# test_page_titles +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("path", PAGES) +def test_page_titles(browser, base_url, path): + """Each page has the correct h4 heading text.""" + expected = PAGE_TITLES[path] + page = browser.new_page() + try: + page.goto(f"{base_url}{path}", wait_until="domcontentloaded", timeout=15_000) + h4 = page.locator("h4").first + actual = h4.inner_text().strip() + assert actual == expected, f"{path}: expected h4='{expected}', got '{actual}'" + finally: + page.close() + + +# --------------------------------------------------------------------------- +# test_no_console_errors +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("path", PAGES) +def test_no_console_errors(browser, base_url, path): + """No console.error events on any page.""" + errors = [] + page = browser.new_page() + try: + page.on("console", lambda msg: errors.append(msg.text) if msg.type == "error" else None) + page.goto(f"{base_url}{path}", wait_until="networkidle", timeout=15_000) + finally: + page.close() + + assert errors == [], f"Console errors on {path}: {errors}" + + +# --------------------------------------------------------------------------- +# test_api_health_json +# --------------------------------------------------------------------------- + +def test_api_health_json(base_url): + """GET /health returns valid JSON with 'oracle' key.""" + with urllib.request.urlopen(f"{base_url}/health", timeout=10) as resp: + data = json.loads(resp.read().decode()) + assert "oracle" in data, f"/health JSON missing 'oracle' key: {data}" + + +# --------------------------------------------------------------------------- +# test_api_dashboard_orders_json +# --------------------------------------------------------------------------- + +def test_api_dashboard_orders_json(base_url): + """GET /api/dashboard/orders returns valid JSON with 'orders' key.""" + with urllib.request.urlopen(f"{base_url}/api/dashboard/orders", timeout=10) as resp: + data = json.loads(resp.read().decode()) + assert "orders" in data, f"/api/dashboard/orders JSON missing 'orders' key: {data}" + + +# --------------------------------------------------------------------------- +# test_response_time +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize("path", PAGES) +def test_response_time(browser, base_url, path): + """Each page loads in under 10 seconds.""" + page = browser.new_page() + try: + start = time.monotonic() + page.goto(f"{base_url}{path}", wait_until="domcontentloaded", timeout=15_000) + elapsed = time.monotonic() - start + finally: + page.close() + + assert elapsed < 10, f"{path} took {elapsed:.2f}s (limit: 10s)" diff --git a/api/tests/qa/test_qa_sync_real.py b/api/tests/qa/test_qa_sync_real.py new file mode 100644 index 0000000..3f59c4b --- /dev/null +++ b/api/tests/qa/test_qa_sync_real.py @@ -0,0 +1,134 @@ +""" +Real sync test: GoMag API → validate → import into Oracle (MARIUSM_AUTO). + +Requires: +- App running on localhost:5003 +- GOMAG_API_KEY set in api/.env +- Oracle configured (MARIUSM_AUTO_AUTO) +""" +import os +import time +from datetime import datetime, timedelta +from pathlib import Path + +import httpx +import pytest +from dotenv import load_dotenv + +pytestmark = pytest.mark.sync + +# Load .env once at module level for API key check +_env_path = Path(__file__).parents[2] / ".env" +load_dotenv(str(_env_path), override=True) + +_GOMAG_API_KEY = os.environ.get("GOMAG_API_KEY", "") +_GOMAG_API_SHOP = os.environ.get("GOMAG_API_SHOP", "") + +if not _GOMAG_API_KEY: + pytestmark = [pytest.mark.sync, pytest.mark.skip(reason="GOMAG_API_KEY not set")] + + +@pytest.fixture(scope="module") +def client(base_url): + with httpx.Client(base_url=base_url, timeout=30.0) as c: + yield c + + +@pytest.fixture(scope="module") +def gomag_api_key(): + if not _GOMAG_API_KEY: + pytest.skip("GOMAG_API_KEY is empty or not set") + return _GOMAG_API_KEY + + +@pytest.fixture(scope="module") +def gomag_api_shop(): + if not _GOMAG_API_SHOP: + pytest.skip("GOMAG_API_SHOP is empty or not set") + return _GOMAG_API_SHOP + + +def _wait_for_sync(client, timeout=60): + """Poll sync status until it stops running. Returns final status dict.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + r = client.get("/api/sync/status") + assert r.status_code == 200, f"sync/status returned {r.status_code}" + data = r.json() + if data.get("status") != "running": + return data + time.sleep(2) + raise TimeoutError(f"Sync did not finish within {timeout}s") + + +def test_gomag_api_connection(gomag_api_key, gomag_api_shop): + """Verify direct GoMag API connectivity and order presence.""" + seven_days_ago = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") + # GoMag API uses a central endpoint, not the shop URL + url = "https://api.gomag.ro/api/v1/order/read/json" + params = {"startDate": seven_days_ago, "page": 1, "limit": 5} + headers = {"X-Oc-Restadmin-Id": gomag_api_key} + + with httpx.Client(timeout=30.0, follow_redirects=True) as c: + r = c.get(url, params=params, headers=headers) + + assert r.status_code == 200, f"GoMag API returned {r.status_code}: {r.text[:200]}" + data = r.json() + # GoMag returns either a list or a dict with orders key + if isinstance(data, dict): + assert "orders" in data or len(data) > 0, "GoMag API returned empty response" + else: + assert isinstance(data, list), f"Unexpected GoMag response type: {type(data)}" + + +def test_app_sync_start(client, gomag_api_key): + """Trigger a real sync via the app API and wait for completion.""" + r = client.post("/api/sync/start") + assert r.status_code == 200, f"sync/start returned {r.status_code}: {r.text[:200]}" + + final_status = _wait_for_sync(client, timeout=60) + assert final_status.get("status") != "running", ( + f"Sync still running after timeout: {final_status}" + ) + + +def test_sync_results(client): + """Verify the latest sync run processed at least one order.""" + r = client.get("/api/sync/history", params={"per_page": 1}) + assert r.status_code == 200, f"sync/history returned {r.status_code}" + + data = r.json() + runs = data.get("runs", []) + assert len(runs) > 0, "No sync runs found in history" + + latest = runs[0] + assert latest.get("total_orders", 0) > 0, ( + f"Latest sync run has 0 orders: {latest}" + ) + + +def test_sync_idempotent(client, gomag_api_key): + """Re-running sync should result in ALREADY_IMPORTED, not double imports.""" + r = client.post("/api/sync/start") + assert r.status_code == 200, f"sync/start returned {r.status_code}" + + _wait_for_sync(client, timeout=60) + + r = client.get("/api/sync/history", params={"per_page": 1}) + assert r.status_code == 200 + + data = r.json() + runs = data.get("runs", []) + assert len(runs) > 0, "No sync runs found after second sync" + + latest = runs[0] + total = latest.get("total_orders", 0) + already_imported = latest.get("already_imported", 0) + imported = latest.get("imported", 0) + + # Most orders should be ALREADY_IMPORTED on second run + if total > 0: + assert already_imported >= imported, ( + f"Expected mostly ALREADY_IMPORTED on second run, " + f"got imported={imported}, already_imported={already_imported}, total={total}" + ) diff --git a/api/tests/test_app_basic.py b/api/tests/test_app_basic.py new file mode 100644 index 0000000..865977f --- /dev/null +++ b/api/tests/test_app_basic.py @@ -0,0 +1,114 @@ +""" +Test: Basic App Import and Route Tests (pytest-compatible) +========================================================== +Tests module imports and all GET routes without requiring Oracle. +Converted from api/test_app_basic.py. + +Run: + pytest api/tests/test_app_basic.py -v +""" + +import os +import sys +import tempfile + +import pytest + +# --- Marker: all tests here are unit (no Oracle) --- +pytestmark = pytest.mark.unit + +# --- Set env vars BEFORE any app import --- +_tmpdir = tempfile.mkdtemp() +_sqlite_path = os.path.join(_tmpdir, "test_import.db") + +os.environ["FORCE_THIN_MODE"] = "true" +os.environ["SQLITE_DB_PATH"] = _sqlite_path +os.environ["ORACLE_DSN"] = "dummy" +os.environ["ORACLE_USER"] = "dummy" +os.environ["ORACLE_PASSWORD"] = "dummy" +os.environ.setdefault("JSON_OUTPUT_DIR", _tmpdir) + +# Add api/ to path so we can import app +_api_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _api_dir not in sys.path: + sys.path.insert(0, _api_dir) + + +# ------------------------------------------------------- +# Section 1: Module Import Checks +# ------------------------------------------------------- + +MODULES = [ + "app.config", + "app.database", + "app.main", + "app.routers.health", + "app.routers.dashboard", + "app.routers.mappings", + "app.routers.sync", + "app.routers.validation", + "app.routers.articles", + "app.services.sqlite_service", + "app.services.scheduler_service", + "app.services.mapping_service", + "app.services.article_service", + "app.services.validation_service", + "app.services.import_service", + "app.services.sync_service", + "app.services.order_reader", +] + + +@pytest.mark.parametrize("module_name", MODULES) +def test_module_import(module_name): + """Each app module should import without errors.""" + __import__(module_name) + + +# ------------------------------------------------------- +# Section 2: Route Tests via TestClient +# ------------------------------------------------------- + +# (path, expected_status_codes, is_known_oracle_failure) +GET_ROUTES = [ + ("/health", [200], False), + ("/", [200, 500], False), + ("/missing-skus", [200, 500], False), + ("/mappings", [200, 500], False), + ("/logs", [200, 500], False), + ("/api/mappings", [200, 503], True), + ("/api/mappings/export-csv", [200, 503], True), + ("/api/mappings/csv-template", [200], False), + ("/api/sync/status", [200], False), + ("/api/sync/history", [200], False), + ("/api/sync/schedule", [200], False), + ("/api/validate/missing-skus", [200], False), + ("/api/validate/missing-skus?page=1&per_page=10", [200], False), + ("/api/sync/run/nonexistent/log", [200, 404], False), + ("/api/articles/search?q=ab", [200, 503], True), + ("/settings", [200, 500], False), +] + + +@pytest.fixture(scope="module") +def client(): + """Create a TestClient with lifespan for all route tests.""" + from fastapi.testclient import TestClient + from app.main import app + + with TestClient(app, raise_server_exceptions=False) as c: + yield c + + +@pytest.mark.parametrize( + "path,expected_codes,is_oracle_route", + GET_ROUTES, + ids=[p for p, _, _ in GET_ROUTES], +) +def test_route(client, path, expected_codes, is_oracle_route): + """Each GET route should return an expected status code.""" + resp = client.get(path) + assert resp.status_code in expected_codes, ( + f"GET {path} returned {resp.status_code}, expected one of {expected_codes}. " + f"Body: {resp.text[:300]}" + ) diff --git a/api/tests/test_integration.py b/api/tests/test_integration.py new file mode 100644 index 0000000..2a4129b --- /dev/null +++ b/api/tests/test_integration.py @@ -0,0 +1,153 @@ +""" +Oracle Integration Tests for GoMag Import Manager (pytest-compatible) +===================================================================== +Requires Oracle connectivity and valid .env configuration. +Converted from api/test_integration.py. + +Run: + pytest api/tests/test_integration.py -v +""" + +import os +import sys + +import pytest + +# --- Marker: all tests require Oracle --- +pytestmark = pytest.mark.oracle + +# Set working directory to project root so relative paths in .env work +_script_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") +_project_root = os.path.dirname(_script_dir) + +# Load .env from api/ before importing app modules +from dotenv import load_dotenv + +_env_path = os.path.join(_script_dir, ".env") +load_dotenv(_env_path, override=True) + +# Add api/ to path so app package is importable +if _script_dir not in sys.path: + sys.path.insert(0, _script_dir) + + +@pytest.fixture(scope="module") +def client(): + """Create a TestClient with Oracle lifespan.""" + from fastapi.testclient import TestClient + from app.main import app + + with TestClient(app) as c: + yield c + + +# --------------------------------------------------------------------------- +# Test A: GET /health — Oracle must show as connected +# --------------------------------------------------------------------------- +def test_health_oracle_connected(client): + resp = client.get("/health") + assert resp.status_code == 200 + body = resp.json() + assert body.get("oracle") == "ok", f"oracle={body.get('oracle')!r}" + assert body.get("sqlite") == "ok", f"sqlite={body.get('sqlite')!r}" + + +# --------------------------------------------------------------------------- +# Test B: Mappings CRUD cycle +# --------------------------------------------------------------------------- +TEST_SKU = "PYTEST_INTEG_SKU_001" +TEST_CODMAT = "PYTEST_CODMAT_001" + + +def test_mappings_create(client): + resp = client.post("/api/mappings", json={ + "sku": TEST_SKU, + "codmat": TEST_CODMAT, + "cantitate_roa": 2.5, + }) + assert resp.status_code == 200 + body = resp.json() + assert body.get("success") is True, f"create returned: {body}" + + +def test_mappings_list_after_create(client): + resp = client.get("/api/mappings", params={"search": TEST_SKU}) + assert resp.status_code == 200 + body = resp.json() + mappings = body.get("mappings", []) + found = any( + m["sku"] == TEST_SKU and m["codmat"] == TEST_CODMAT + for m in mappings + ) + assert found, f"mapping not found in list; got {mappings}" + + +def test_mappings_update(client): + resp = client.put(f"/api/mappings/{TEST_SKU}/{TEST_CODMAT}", json={ + "cantitate_roa": 3.0, + }) + assert resp.status_code == 200 + body = resp.json() + assert body.get("success") is True, f"update returned: {body}" + + +def test_mappings_delete(client): + resp = client.delete(f"/api/mappings/{TEST_SKU}/{TEST_CODMAT}") + assert resp.status_code == 200 + body = resp.json() + assert body.get("success") is True, f"delete returned: {body}" + + +def test_mappings_verify_soft_deleted(client): + resp = client.get("/api/mappings", params={"search": TEST_SKU}) + assert resp.status_code == 200 + body = resp.json() + mappings = body.get("mappings", []) + deleted = any( + m["sku"] == TEST_SKU and m["codmat"] == TEST_CODMAT and m.get("activ") == 0 + for m in mappings + ) + assert deleted, ( + f"expected activ=0 for deleted mapping, got: " + f"{[m for m in mappings if m['sku'] == TEST_SKU]}" + ) + + +# --------------------------------------------------------------------------- +# Test C: GET /api/articles/search +# --------------------------------------------------------------------------- +def test_articles_search(client): + search_terms = ["01", "A", "PH"] + found_results = False + for term in search_terms: + resp = client.get("/api/articles/search", params={"q": term}) + assert resp.status_code == 200 + body = resp.json() + results_list = body.get("results", []) + if results_list: + found_results = True + break + assert found_results, f"all search terms {search_terms} returned empty results" + + +# --------------------------------------------------------------------------- +# Test D: POST /api/validate/scan +# --------------------------------------------------------------------------- +def test_validate_scan(client): + resp = client.post("/api/validate/scan") + assert resp.status_code == 200 + body = resp.json() + has_shape = "json_files" in body and ("orders" in body or "total_orders" in body) + assert has_shape, f"unexpected response shape: {list(body.keys())}" + + +# --------------------------------------------------------------------------- +# Test E: GET /api/sync/history +# --------------------------------------------------------------------------- +def test_sync_history(client): + resp = client.get("/api/sync/history") + assert resp.status_code == 200 + body = resp.json() + assert "runs" in body, f"missing 'runs' key; got keys: {list(body.keys())}" + assert isinstance(body["runs"], list) + assert "total" in body diff --git a/api/tests/test_requirements.py b/api/tests/test_requirements.py index f468992..045dd0a 100644 --- a/api/tests/test_requirements.py +++ b/api/tests/test_requirements.py @@ -10,6 +10,9 @@ Run: import os import sys +import pytest + +pytestmark = pytest.mark.unit import tempfile # --- Set env vars BEFORE any app import --- diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9e94482 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[tool.pytest.ini_options] +testpaths = ["api/tests"] +asyncio_mode = "auto" +markers = [ + "unit: SQLite tests, no Oracle, no browser", + "oracle: Requires live Oracle connection", + "e2e: Browser-based Playwright tests", + "qa: QA tests (API health, responsive, log monitor)", + "sync: Full sync cycle GoMag to Oracle", + "smoke: Smoke tests for production (requires running app)", +] diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..634d08f --- /dev/null +++ b/test.sh @@ -0,0 +1,262 @@ +#!/bin/bash +# Test orchestrator for GoMag Vending +# Usage: ./test.sh [ci|full|unit|e2e|oracle|sync|plsql|qa|smoke-prod|logs|--dry-run] +set -uo pipefail + +cd "$(dirname "$0")" + +# ─── Colors ─────────────────────────────────────────────────────────────────── +GREEN='\033[32m' +RED='\033[31m' +YELLOW='\033[33m' +RESET='\033[0m' + +# ─── Stage tracking ─────────────────────────────────────────────────────────── +declare -a STAGE_NAMES=() +declare -a STAGE_RESULTS=() # 0=pass, 1=fail, 2=skip +EXIT_CODE=0 + +record() { + local name="$1" + local code="$2" + STAGE_NAMES+=("$name") + if [ "$code" -eq 0 ]; then + STAGE_RESULTS+=(0) + else + STAGE_RESULTS+=(1) + EXIT_CODE=1 + fi +} + +skip_stage() { + STAGE_NAMES+=("$1") + STAGE_RESULTS+=(2) +} + +# ─── Environment setup ──────────────────────────────────────────────────────── +setup_env() { + # Activate venv + if [ ! -d "venv" ]; then + echo -e "${RED}ERROR: venv not found. Run ./start.sh first.${RESET}" + exit 1 + fi + source venv/bin/activate + + # Oracle env + export TNS_ADMIN="$(pwd)/api" + + INSTANTCLIENT_PATH="" + if [ -f "api/.env" ]; then + INSTANTCLIENT_PATH=$(grep -E "^INSTANTCLIENTPATH=" api/.env 2>/dev/null | cut -d'=' -f2- | tr -d ' ' || true) + fi + if [ -z "$INSTANTCLIENT_PATH" ]; then + INSTANTCLIENT_PATH="/opt/oracle/instantclient_21_15" + fi + + if [ -d "$INSTANTCLIENT_PATH" ]; then + export LD_LIBRARY_PATH="${INSTANTCLIENT_PATH}:${LD_LIBRARY_PATH:-}" + fi +} + +# ─── App lifecycle (for tests that need a running app) ─────────────────────── +APP_PID="" +APP_PORT=5003 + +app_is_running() { + curl -sf "http://localhost:${APP_PORT}/health" >/dev/null 2>&1 +} + +start_app() { + if app_is_running; then + echo -e "${GREEN}App already running on :${APP_PORT}${RESET}" + return + fi + echo -e "${YELLOW}Starting app on :${APP_PORT}...${RESET}" + cd api + python -m uvicorn app.main:app --host 0.0.0.0 --port "$APP_PORT" &>/dev/null & + APP_PID=$! + cd .. + # Wait up to 15 seconds + for i in $(seq 1 30); do + if app_is_running; then + echo -e "${GREEN}App started (PID=${APP_PID})${RESET}" + return + fi + sleep 0.5 + done + echo -e "${RED}App failed to start within 15s${RESET}" + [ -n "$APP_PID" ] && kill "$APP_PID" 2>/dev/null || true + APP_PID="" +} + +stop_app() { + if [ -n "$APP_PID" ]; then + echo -e "${YELLOW}Stopping app (PID=${APP_PID})...${RESET}" + kill "$APP_PID" 2>/dev/null || true + wait "$APP_PID" 2>/dev/null || true + APP_PID="" + fi +} + +# ─── Dry-run checks ─────────────────────────────────────────────────────────── +dry_run() { + echo -e "${YELLOW}=== Dry-run: checking prerequisites ===${RESET}" + local ok=0 + + if [ -d "venv" ]; then + echo -e "${GREEN}✅ venv exists${RESET}" + else + echo -e "${RED}❌ venv missing — run ./start.sh first${RESET}" + ok=1 + fi + + source venv/bin/activate 2>/dev/null || true + + if python -m pytest --version &>/dev/null; then + echo -e "${GREEN}✅ pytest installed${RESET}" + else + echo -e "${RED}❌ pytest not found${RESET}" + ok=1 + fi + + if python -c "import playwright" 2>/dev/null; then + echo -e "${GREEN}✅ playwright installed${RESET}" + else + echo -e "${YELLOW}⚠️ playwright not found (needed for e2e/qa)${RESET}" + fi + + if [ -n "${ORACLE_USER:-}" ] && [ -n "${ORACLE_PASSWORD:-}" ] && [ -n "${ORACLE_DSN:-}" ]; then + echo -e "${GREEN}✅ Oracle env vars set${RESET}" + else + echo -e "${YELLOW}⚠️ Oracle env vars not set (needed for oracle/sync/full)${RESET}" + fi + + exit $ok +} + +# ─── Run helpers ────────────────────────────────────────────────────────────── +run_stage() { + local label="$1" + shift + echo "" + echo -e "${YELLOW}=== $label ===${RESET}" + set +e + "$@" + local code=$? + set -e + record "$label" $code + # Don't return $code — let execution continue to next stage +} + +# ─── Summary box ────────────────────────────────────────────────────────────── +print_summary() { + echo "" + echo -e "${YELLOW}╔══════════════════════════════════════════╗${RESET}" + echo -e "${YELLOW}║ TEST RESULTS SUMMARY ║${RESET}" + echo -e "${YELLOW}╠══════════════════════════════════════════╣${RESET}" + + for i in "${!STAGE_NAMES[@]}"; do + local name="${STAGE_NAMES[$i]}" + local result="${STAGE_RESULTS[$i]}" + # Pad name to 26 chars + local padded + padded=$(printf "%-26s" "$name") + if [ "$result" -eq 0 ]; then + echo -e "${YELLOW}║${RESET} ${GREEN}✅${RESET} ${padded} ${GREEN}passed${RESET} ${YELLOW}║${RESET}" + elif [ "$result" -eq 1 ]; then + echo -e "${YELLOW}║${RESET} ${RED}❌${RESET} ${padded} ${RED}FAILED${RESET} ${YELLOW}║${RESET}" + else + echo -e "${YELLOW}║${RESET} ${YELLOW}⏭️ ${RESET} ${padded} ${YELLOW}skipped${RESET} ${YELLOW}║${RESET}" + fi + done + + echo -e "${YELLOW}╠══════════════════════════════════════════╣${RESET}" + if [ "$EXIT_CODE" -eq 0 ]; then + echo -e "${YELLOW}║${RESET} ${GREEN}All stages passed!${RESET} ${YELLOW}║${RESET}" + else + echo -e "${YELLOW}║${RESET} ${RED}Some stages FAILED — check output above${RESET} ${YELLOW}║${RESET}" + fi + echo -e "${YELLOW}║ Health Score: see qa-reports/ ║${RESET}" + echo -e "${YELLOW}╚══════════════════════════════════════════╝${RESET}" +} + +# ─── Cleanup trap ──────────────────────────────────────────────────────────── +trap 'stop_app' EXIT + +# ─── Main ───────────────────────────────────────────────────────────────────── +MODE="${1:-ci}" + +if [ "$MODE" = "--dry-run" ]; then + setup_env + dry_run +fi + +setup_env + +case "$MODE" in + ci) + run_stage "Unit tests" python -m pytest -m unit -v + run_stage "E2E browser" python -m pytest api/tests/e2e/ \ + --ignore=api/tests/e2e/test_dashboard_live.py -v + ;; + + full) + run_stage "Unit tests" python -m pytest -m unit -v + run_stage "E2E browser" python -m pytest api/tests/e2e/ \ + --ignore=api/tests/e2e/test_dashboard_live.py -v + run_stage "Oracle integration" python -m pytest -m oracle -v + # Start app for stages that need HTTP access + start_app + run_stage "Sync tests" python -m pytest -m sync -v --base-url "http://localhost:${APP_PORT}" + run_stage "PL/SQL QA" python -m pytest api/tests/qa/test_qa_plsql.py -v + run_stage "QA suite" python -m pytest -m qa -v --base-url "http://localhost:${APP_PORT}" + stop_app + ;; + + unit) + run_stage "Unit tests" python -m pytest -m unit -v + ;; + + e2e) + run_stage "E2E browser" python -m pytest api/tests/e2e/ \ + --ignore=api/tests/e2e/test_dashboard_live.py -v + ;; + + oracle) + run_stage "Oracle integration" python -m pytest -m oracle -v + ;; + + sync) + start_app + run_stage "Sync tests" python -m pytest -m sync -v --base-url "http://localhost:${APP_PORT}" + stop_app + ;; + + plsql) + run_stage "PL/SQL QA" python -m pytest api/tests/qa/test_qa_plsql.py -v + ;; + + qa) + start_app + run_stage "QA suite" python -m pytest -m qa -v --base-url "http://localhost:${APP_PORT}" + stop_app + ;; + + smoke-prod) + shift || true + run_stage "Smoke prod" python -m pytest api/tests/qa/test_qa_smoke_prod.py "$@" + ;; + + logs) + run_stage "Logs monitor" python -m pytest api/tests/qa/test_qa_logs_monitor.py -v + ;; + + *) + echo -e "${RED}Unknown mode: $MODE${RESET}" + echo "Usage: $0 [ci|full|unit|e2e|oracle|sync|plsql|qa|smoke-prod|logs|--dry-run]" + exit 1 + ;; +esac + +print_summary +exit $EXIT_CODE