run_phase1_embedding_preflight_matrix_live.py
4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_embedding_preflight_matrix_report.json'
PYTHON_BIN = '/usr/local/miniconda3/bin/python'
def load_semantic_jobs(conn: psycopg.Connection) -> list[dict[str, Any]]:
rows = conn.execute(
"""
SELECT
fej.extraction_job_id,
mr.model_name,
mr.model_version,
fs.embedding_dim
FROM feature_extraction_job fej
JOIN feature_set_registry fs ON fs.feature_set_id = fej.feature_set_id
JOIN model_registry mr ON mr.model_id = fs.model_id
WHERE fs.feature_name = 'semantic_embedding'
AND fs.feature_level = 'window'
ORDER BY fej.extraction_job_id;
"""
).fetchall()
return [
{
'extraction_job_id': int(row[0]),
'model_name': row[1],
'model_version': row[2],
'embedding_dim': int(row[3]) if row[3] is not None else None,
'vector_table': f"audio_embedding_vector_{int(row[3])}" if row[3] in (192, 768) else None,
}
for row in rows
]
def reset_jobs(dsn: str, schema: str) -> None:
cmd = [
PYTHON_BIN,
'scripts/bootstrap_phase1_extraction_jobs_live.py',
'--dsn', dsn,
'--schema', schema,
]
subprocess.run(cmd, cwd=ROOT, check=True, capture_output=True, text=True)
def run_job(dsn: str, schema: str, job: dict[str, Any]) -> dict[str, Any]:
attempt_path = ROOT / 'data' / 'pgvector_eval' / 'music20' / f"job{job['extraction_job_id']}_{job['model_name']}_preflight_attempt.json"
cmd = [
PYTHON_BIN,
'workers/run_embedding_job.py',
'--dsn', dsn,
'--schema', schema,
'--job-id', str(job['extraction_job_id']),
'--model-name', job['model_name'],
'--model-version', job['model_version'],
]
if job['vector_table']:
cmd.extend(['--vector-table', job['vector_table']])
cmd.extend(['--output', str(attempt_path)])
proc = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True)
payload = json.loads(attempt_path.read_text(encoding='utf-8'))
status_after_failed = payload.get('status_after_failed') or {}
metadata = status_after_failed.get('metadata_json') or {}
return {
'extraction_job_id': job['extraction_job_id'],
'model_name': job['model_name'],
'model_version': job['model_version'],
'vector_table': job['vector_table'],
'returncode': proc.returncode,
'job_status': status_after_failed.get('job_status') or payload.get('status_after_complete', {}).get('job_status'),
'failure_reason': metadata.get('failure_reason'),
'preflight_blockers': metadata.get('preflight_blockers'),
'missing_window_count': metadata.get('missing_window_count'),
'runtime_missing_dependencies': ((metadata.get('runtime_report') or {}).get('missing_dependencies')),
'vector_table_report': metadata.get('vector_table_report'),
'attempt_artifact': str(attempt_path.relative_to(ROOT)),
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
reset_jobs(args.dsn, args.schema)
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {args.schema}, public;')
jobs = load_semantic_jobs(conn)
results = [run_job(args.dsn, args.schema, job) for job in jobs]
payload = {
'schema': args.schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'semantic_job_count': len(results),
'results': results,
'summary': {
'failed_jobs': sum(1 for item in results if item['job_status'] == 'failed'),
'models': [item['model_name'] for item in results],
'unique_blockers': sorted({blocker for item in results for blocker in (item.get('preflight_blockers') or [])}),
},
}
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()