run_embedding_job.py
3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import os
from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status
VECTOR_TABLE_BY_DIM = {
192: 'audio_embedding_vector_192',
768: 'audio_embedding_vector_768',
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--model-name', default=os.environ.get('MODEL_NAME'))
ap.add_argument('--model-version', default=os.environ.get('MODEL_VERSION'))
ap.add_argument('--vector-table', default=os.environ.get('VECTOR_TABLE'))
ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_embedding'))
ap.add_argument('--complete-dry-run', action='store_true')
ap.add_argument('--output')
args = ap.parse_args()
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
with connect(args.dsn, args.schema) as conn:
job = fetch_job_context(conn, args.job_id)
if job.model_name == 'chromaprint':
raise SystemExit(f'feature_extraction_job={args.job_id} is not an embedding job')
if args.model_name and job.model_name != args.model_name:
raise SystemExit(f'model mismatch: job={job.model_name} cli={args.model_name}')
if args.model_version and job.model_version != args.model_version:
raise SystemExit(f'model version mismatch: job={job.model_version} cli={args.model_version}')
resolved_vector_table = args.vector_table or VECTOR_TABLE_BY_DIM.get(job.embedding_dim or job.output_embedding_dim or -1)
scope = resolve_scope_summary(conn, job.target_scope)
running = update_job_status(
conn,
job.extraction_job_id,
status='running',
input_count=scope['active_window_count'] or scope['ready_asset_count'],
metadata_patch={
'worker': 'run_embedding_job',
'output_target': args.output_target,
'vector_table': resolved_vector_table,
'dry_run': True,
'target_scope_summary': scope,
'execution_mode': 'dry_run',
},
set_started_at=True,
)
completed = None
if args.complete_dry_run:
completed = update_job_status(
conn,
job.extraction_job_id,
status='completed',
output_count=0,
metadata_patch={
'worker': 'run_embedding_job',
'output_target': args.output_target,
'vector_table': resolved_vector_table,
'dry_run': True,
'dry_run_result': 'completed_without_feature_write',
'write_target_table': args.output_target,
},
set_finished_at=True,
)
emit_payload(
{
'worker': 'run_embedding_job',
'schema': args.schema,
'job': job.__dict__,
'target_scope_summary': scope,
'status_after_start': running,
'status_after_complete': completed,
'resolved_vector_table': resolved_vector_table,
'notes': [
'this worker currently validates planner -> job -> PostgreSQL state flow',
'real encoder inference can replace dry_run while preserving the same job contract',
],
},
args.output,
)
if __name__ == '__main__':
main()