run_phase1_prereq_audit_live.py
3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import importlib
import json
from pathlib import Path
import sys
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_prereq_audit_report.json'
MODEL_REQUIREMENTS = {
'mert': ['numpy', 'torch', 'torchaudio', 'transformers'],
'muq': ['numpy', 'torch', 'torchaudio', 'transformers'],
'ecapa': ['numpy', 'torch', 'torchaudio', 'speechbrain'],
'chromaprint': ['numpy'],
}
def check_import(name: str) -> dict[str, Any]:
try:
importlib.import_module(name)
return {'package': name, 'available': True}
except Exception as exc: # noqa: BLE001
return {'package': name, 'available': False, 'error_type': type(exc).__name__, 'error': str(exc).splitlines()[0]}
def load_jobs(conn: psycopg.Connection) -> list[dict[str, Any]]:
rows = conn.execute(
"""
SELECT fej.extraction_job_id, mr.model_name, mr.model_version, fs.embedding_dim, fej.target_scope
FROM feature_extraction_job fej
JOIN feature_set_registry fs ON fs.feature_set_id = fej.feature_set_id
JOIN model_registry mr ON mr.model_id = fs.model_id
ORDER BY fej.extraction_job_id;
"""
).fetchall()
return [
{
'extraction_job_id': int(row[0]),
'model_name': row[1],
'model_version': row[2],
'embedding_dim': int(row[3]) if row[3] is not None else None,
'target_scope': row[4],
}
for row in rows
]
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--downloads-root', default='/workspace/downloads')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
schema = validate_schema(args.schema)
downloads_root = Path(args.downloads_root)
downloads_exists = downloads_root.exists()
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {schema}, public;')
jobs = load_jobs(conn)
package_names = sorted({pkg for job in jobs for pkg in MODEL_REQUIREMENTS.get(job['model_name'], ['numpy'])})
package_checks = {item['package']: item for item in (check_import(name) for name in package_names)}
job_reports = []
for job in jobs:
required = MODEL_REQUIREMENTS.get(job['model_name'], ['numpy'])
missing = [name for name in required if not package_checks[name]['available']]
job_reports.append(
{
**job,
'required_packages': required,
'missing_packages': missing,
'downloads_root_exists': downloads_exists,
'ready_for_live_worker': downloads_exists and not missing,
}
)
payload = {
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'downloads_root': str(downloads_root),
'downloads_root_exists': downloads_exists,
'package_checks': package_checks,
'jobs': job_reports,
'summary': {
'total_jobs': len(job_reports),
'ready_jobs': sum(1 for job in job_reports if job['ready_for_live_worker']),
'blocked_jobs': sum(1 for job in job_reports if not job['ready_for_live_worker']),
'missing_packages_union': sorted({pkg for job in job_reports for pkg in job['missing_packages']}),
},
}
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()