run_chromaprint_job.py
2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import os
from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_fingerprint'))
ap.add_argument('--complete-dry-run', action='store_true')
ap.add_argument('--output')
args = ap.parse_args()
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
with connect(args.dsn, args.schema) as conn:
job = fetch_job_context(conn, args.job_id)
if job.model_name != 'chromaprint':
raise SystemExit(f'feature_extraction_job={args.job_id} is not a chromaprint job')
scope = resolve_scope_summary(conn, job.target_scope)
running = update_job_status(
conn,
job.extraction_job_id,
status='running',
expected_status='pending',
input_count=scope['ready_asset_count'],
metadata_patch={
'worker': 'run_chromaprint_job',
'output_target': args.output_target,
'dry_run': True,
'target_scope_summary': scope,
'execution_mode': 'dry_run',
},
set_started_at=True,
)
completed = None
if args.complete_dry_run:
completed = update_job_status(
conn,
job.extraction_job_id,
status='completed',
expected_status='running',
output_count=0,
metadata_patch={
'worker': 'run_chromaprint_job',
'output_target': args.output_target,
'dry_run': True,
'dry_run_result': 'completed_without_feature_write',
'write_target_table': 'audio_fingerprint',
},
set_finished_at=True,
)
emit_payload(
{
'worker': 'run_chromaprint_job',
'schema': args.schema,
'job': job.__dict__,
'target_scope_summary': scope,
'status_after_start': running,
'status_after_complete': completed,
'next_write_target': 'audio_fingerprint',
'notes': [
'this worker currently validates planner -> job -> PostgreSQL state flow',
'real chromaprint extraction can replace dry_run while preserving the same job contract',
],
},
args.output,
)
if __name__ == '__main__':
main()