service_smoke.py 1.78 KB
#!/usr/bin/env python3
"""Minimal local smoke test for the FastAPI ACR service."""

from __future__ import annotations

import json
import subprocess
import time
from urllib.request import urlopen
from urllib.error import URLError, HTTPError

BASE = "http://127.0.0.1:8000"


def fetch_json(path: str):
    with urlopen(BASE + path, timeout=10) as resp:
        return json.loads(resp.read().decode("utf-8"))


def main():
    cmd = [
        "/usr/local/miniconda3/bin/python",
        "-m",
        "uvicorn",
        "src.service.app:app",
        "--host",
        "127.0.0.1",
        "--port",
        "8000",
    ]
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    try:
        last_error = None
        for _ in range(20):
            time.sleep(0.5)
            try:
                health = fetch_json("/health")
                ready = fetch_json("/ready")
                config = fetch_json("/config")
                cache = fetch_json("/cache")
                print(json.dumps({
                    "status": "ok",
                    "health": health,
                    "ready": ready,
                    "config": config,
                    "cache": cache,
                }, indent=2, ensure_ascii=False))
                return
            except (URLError, HTTPError, ConnectionError) as exc:
                last_error = str(exc)
        raise SystemExit(json.dumps({
            "status": "failed",
            "reason": "service_not_ready_in_time",
            "last_error": last_error,
        }, indent=2, ensure_ascii=False))
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=5)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait(timeout=5)


if __name__ == "__main__":
    main()