Files
gomag-vending/api/tests/qa/test_qa_sync_real.py
Claude Agent 419464a62c feat: add CI/CD testing infrastructure with test.sh orchestrator
Complete testing system: pyproject.toml (pytest markers), test.sh
orchestrator with auto app start/stop and colorful summary,
pre-push hook, Gitea Actions workflow.

New QA tests: API health (7 endpoints), responsive (3 viewports),
log monitoring (ERROR/ORA-/Traceback detection), real GoMag sync,
PL/SQL package validation, smoke prod (read-only).

Converted test_app_basic.py and test_integration.py to pytest.
Added pytestmark to all existing tests (unit/e2e/oracle).
E2E conftest upgraded: console error collector, screenshot on
failure, auto-detect live app on :5003.

Usage: ./test.sh ci (30s) | ./test.sh full (2-3min)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 10:40:25 +00:00

135 lines
4.4 KiB
Python

"""
Real sync test: GoMag API → validate → import into Oracle (MARIUSM_AUTO).
Requires:
- App running on localhost:5003
- GOMAG_API_KEY set in api/.env
- Oracle configured (MARIUSM_AUTO_AUTO)
"""
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import httpx
import pytest
from dotenv import load_dotenv
pytestmark = pytest.mark.sync
# Load .env once at module level for API key check
_env_path = Path(__file__).parents[2] / ".env"
load_dotenv(str(_env_path), override=True)
_GOMAG_API_KEY = os.environ.get("GOMAG_API_KEY", "")
_GOMAG_API_SHOP = os.environ.get("GOMAG_API_SHOP", "")
if not _GOMAG_API_KEY:
pytestmark = [pytest.mark.sync, pytest.mark.skip(reason="GOMAG_API_KEY not set")]
@pytest.fixture(scope="module")
def client(base_url):
with httpx.Client(base_url=base_url, timeout=30.0) as c:
yield c
@pytest.fixture(scope="module")
def gomag_api_key():
if not _GOMAG_API_KEY:
pytest.skip("GOMAG_API_KEY is empty or not set")
return _GOMAG_API_KEY
@pytest.fixture(scope="module")
def gomag_api_shop():
if not _GOMAG_API_SHOP:
pytest.skip("GOMAG_API_SHOP is empty or not set")
return _GOMAG_API_SHOP
def _wait_for_sync(client, timeout=60):
"""Poll sync status until it stops running. Returns final status dict."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
r = client.get("/api/sync/status")
assert r.status_code == 200, f"sync/status returned {r.status_code}"
data = r.json()
if data.get("status") != "running":
return data
time.sleep(2)
raise TimeoutError(f"Sync did not finish within {timeout}s")
def test_gomag_api_connection(gomag_api_key, gomag_api_shop):
"""Verify direct GoMag API connectivity and order presence."""
seven_days_ago = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
# GoMag API uses a central endpoint, not the shop URL
url = "https://api.gomag.ro/api/v1/order/read/json"
params = {"startDate": seven_days_ago, "page": 1, "limit": 5}
headers = {"X-Oc-Restadmin-Id": gomag_api_key}
with httpx.Client(timeout=30.0, follow_redirects=True) as c:
r = c.get(url, params=params, headers=headers)
assert r.status_code == 200, f"GoMag API returned {r.status_code}: {r.text[:200]}"
data = r.json()
# GoMag returns either a list or a dict with orders key
if isinstance(data, dict):
assert "orders" in data or len(data) > 0, "GoMag API returned empty response"
else:
assert isinstance(data, list), f"Unexpected GoMag response type: {type(data)}"
def test_app_sync_start(client, gomag_api_key):
"""Trigger a real sync via the app API and wait for completion."""
r = client.post("/api/sync/start")
assert r.status_code == 200, f"sync/start returned {r.status_code}: {r.text[:200]}"
final_status = _wait_for_sync(client, timeout=60)
assert final_status.get("status") != "running", (
f"Sync still running after timeout: {final_status}"
)
def test_sync_results(client):
"""Verify the latest sync run processed at least one order."""
r = client.get("/api/sync/history", params={"per_page": 1})
assert r.status_code == 200, f"sync/history returned {r.status_code}"
data = r.json()
runs = data.get("runs", [])
assert len(runs) > 0, "No sync runs found in history"
latest = runs[0]
assert latest.get("total_orders", 0) > 0, (
f"Latest sync run has 0 orders: {latest}"
)
def test_sync_idempotent(client, gomag_api_key):
"""Re-running sync should result in ALREADY_IMPORTED, not double imports."""
r = client.post("/api/sync/start")
assert r.status_code == 200, f"sync/start returned {r.status_code}"
_wait_for_sync(client, timeout=60)
r = client.get("/api/sync/history", params={"per_page": 1})
assert r.status_code == 200
data = r.json()
runs = data.get("runs", [])
assert len(runs) > 0, "No sync runs found after second sync"
latest = runs[0]
total = latest.get("total_orders", 0)
already_imported = latest.get("already_imported", 0)
imported = latest.get("imported", 0)
# Most orders should be ALREADY_IMPORTED on second run
if total > 0:
assert already_imported >= imported, (
f"Expected mostly ALREADY_IMPORTED on second run, "
f"got imported={imported}, already_imported={already_imported}, total={total}"
)