run_phase1_worker_contract_smoke_live.py
3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
PYTHON_BIN = '/usr/local/miniconda3/bin/python'
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_worker_contract_smoke_report.json'
def run_cmd(cmd: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True)
def reset_jobs(dsn: str, schema: str) -> None:
proc = run_cmd([
PYTHON_BIN,
'scripts/bootstrap_phase1_extraction_jobs_live.py',
'--dsn', dsn,
'--schema', schema,
])
if proc.returncode != 0:
raise SystemExit(proc.stderr or proc.stdout)
def run_exact_lane(dsn: str, schema: str) -> dict[str, Any]:
out = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_worker_contract_smoke_exact.json'
proc = run_cmd([
PYTHON_BIN,
'workers/run_chromaprint_job.py',
'--dsn', dsn,
'--schema', schema,
'--job-id', '1',
'--output', str(out),
])
if proc.returncode != 0:
raise SystemExit(proc.stderr or proc.stdout)
payload = json.loads(out.read_text(encoding='utf-8'))
status = payload.get('status_after_failed') or payload.get('status_after_complete') or {}
metadata = status.get('metadata_json') or {}
return {
'job_id': 1,
'returncode': proc.returncode,
'job_status': status.get('job_status'),
'failure_reason': metadata.get('failure_reason'),
'missing_asset_count': metadata.get('missing_asset_count'),
'artifact': str(out.relative_to(ROOT)),
}
def run_semantic_matrix(dsn: str, schema: str) -> dict[str, Any]:
out = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_worker_contract_smoke_semantic_matrix.json'
proc = run_cmd([
PYTHON_BIN,
'scripts/run_phase1_embedding_preflight_matrix_live.py',
'--dsn', dsn,
'--schema', schema,
'--output', str(out),
])
if proc.returncode != 0:
raise SystemExit(proc.stderr or proc.stdout)
payload = json.loads(out.read_text(encoding='utf-8'))
return {
'returncode': proc.returncode,
'semantic_job_count': payload.get('semantic_job_count'),
'failed_jobs': payload.get('summary', {}).get('failed_jobs'),
'unique_blockers': payload.get('summary', {}).get('unique_blockers'),
'artifact': str(out.relative_to(ROOT)),
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
reset_jobs(args.dsn, args.schema)
exact = run_exact_lane(args.dsn, args.schema)
reset_jobs(args.dsn, args.schema)
semantic = run_semantic_matrix(args.dsn, args.schema)
payload = {
'schema': args.schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'exact_lane': exact,
'semantic_lane': semantic,
'summary': {
'exact_status': exact['job_status'],
'semantic_failed_jobs': semantic['failed_jobs'],
'shared_environment_blockers': [
'missing /workspace/downloads mount',
'missing semantic model runtime dependencies',
],
},
}
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()