run_phase1_worker_contract_smoke_live.py 3.68 KB
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations

import argparse
import json
import subprocess
from pathlib import Path
from typing import Any

ROOT = Path(__file__).resolve().parents[1]
PYTHON_BIN = '/usr/local/miniconda3/bin/python'
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_worker_contract_smoke_report.json'


def run_cmd(cmd: list[str]) -> subprocess.CompletedProcess[str]:
    return subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True)


def reset_jobs(dsn: str, schema: str) -> None:
    proc = run_cmd([
        PYTHON_BIN,
        'scripts/bootstrap_phase1_extraction_jobs_live.py',
        '--dsn', dsn,
        '--schema', schema,
    ])
    if proc.returncode != 0:
        raise SystemExit(proc.stderr or proc.stdout)


def run_exact_lane(dsn: str, schema: str) -> dict[str, Any]:
    out = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_worker_contract_smoke_exact.json'
    proc = run_cmd([
        PYTHON_BIN,
        'workers/run_chromaprint_job.py',
        '--dsn', dsn,
        '--schema', schema,
        '--job-id', '1',
        '--output', str(out),
    ])
    if proc.returncode != 0:
        raise SystemExit(proc.stderr or proc.stdout)
    payload = json.loads(out.read_text(encoding='utf-8'))
    status = payload.get('status_after_failed') or payload.get('status_after_complete') or {}
    metadata = status.get('metadata_json') or {}
    return {
        'job_id': 1,
        'returncode': proc.returncode,
        'job_status': status.get('job_status'),
        'failure_reason': metadata.get('failure_reason'),
        'missing_asset_count': metadata.get('missing_asset_count'),
        'artifact': str(out.relative_to(ROOT)),
    }


def run_semantic_matrix(dsn: str, schema: str) -> dict[str, Any]:
    out = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_worker_contract_smoke_semantic_matrix.json'
    proc = run_cmd([
        PYTHON_BIN,
        'scripts/run_phase1_embedding_preflight_matrix_live.py',
        '--dsn', dsn,
        '--schema', schema,
        '--output', str(out),
    ])
    if proc.returncode != 0:
        raise SystemExit(proc.stderr or proc.stdout)
    payload = json.loads(out.read_text(encoding='utf-8'))
    return {
        'returncode': proc.returncode,
        'semantic_job_count': payload.get('semantic_job_count'),
        'failed_jobs': payload.get('summary', {}).get('failed_jobs'),
        'unique_blockers': payload.get('summary', {}).get('unique_blockers'),
        'artifact': str(out.relative_to(ROOT)),
    }


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument('--dsn', required=True)
    ap.add_argument('--schema', default='acr_test')
    ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
    args = ap.parse_args()

    reset_jobs(args.dsn, args.schema)
    exact = run_exact_lane(args.dsn, args.schema)
    reset_jobs(args.dsn, args.schema)
    semantic = run_semantic_matrix(args.dsn, args.schema)

    payload = {
        'schema': args.schema,
        'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
        'exact_lane': exact,
        'semantic_lane': semantic,
        'summary': {
            'exact_status': exact['job_status'],
            'semantic_failed_jobs': semantic['failed_jobs'],
            'shared_environment_blockers': [
                'missing /workspace/downloads mount',
                'missing semantic model runtime dependencies',
            ],
        },
    }
    out = Path(args.output)
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
    print(json.dumps(payload, ensure_ascii=False, indent=2))


if __name__ == '__main__':
    main()