run_embedding_job.py 4.17 KB
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations

import argparse
import os

from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status

VECTOR_TABLE_BY_DIM = {
    192: 'audio_embedding_vector_192',
    768: 'audio_embedding_vector_768',
}


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
    ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
    ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0')))
    ap.add_argument('--model-name', default=os.environ.get('MODEL_NAME'))
    ap.add_argument('--model-version', default=os.environ.get('MODEL_VERSION'))
    ap.add_argument('--vector-table', default=os.environ.get('VECTOR_TABLE'))
    ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_embedding'))
    ap.add_argument('--complete-dry-run', action='store_true')
    ap.add_argument('--output')
    args = ap.parse_args()

    if not args.dsn:
        raise SystemExit('missing --dsn or PG_DSN')
    if not args.job_id:
        raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')

    with connect(args.dsn, args.schema) as conn:
        job = fetch_job_context(conn, args.job_id)
        if job.model_name == 'chromaprint':
            raise SystemExit(f'feature_extraction_job={args.job_id} is not an embedding job')
        if job.feature_name != 'semantic_embedding' or job.feature_level != 'window':
            raise SystemExit(
                f'feature_extraction_job={args.job_id} does not match embedding feature contract: '
                f'{job.feature_name}/{job.feature_level}'
            )
        if args.model_name and job.model_name != args.model_name:
            raise SystemExit(f'model mismatch: job={job.model_name} cli={args.model_name}')
        if args.model_version and job.model_version != args.model_version:
            raise SystemExit(f'model version mismatch: job={job.model_version} cli={args.model_version}')
        resolved_vector_table = args.vector_table or VECTOR_TABLE_BY_DIM.get(job.embedding_dim or job.output_embedding_dim or -1)
        scope = resolve_scope_summary(conn, job.target_scope)
        running = update_job_status(
            conn,
            job.extraction_job_id,
            status='running',
            expected_status='pending',
            input_count=scope['active_window_count'] or scope['ready_asset_count'],
            metadata_patch={
                'worker': 'run_embedding_job',
                'output_target': args.output_target,
                'vector_table': resolved_vector_table,
                'dry_run': True,
                'target_scope_summary': scope,
                'execution_mode': 'dry_run',
            },
            set_started_at=True,
        )
        completed = None
        if args.complete_dry_run:
            completed = update_job_status(
                conn,
                job.extraction_job_id,
                status='completed',
                expected_status='running',
                output_count=0,
                metadata_patch={
                    'worker': 'run_embedding_job',
                    'output_target': args.output_target,
                    'vector_table': resolved_vector_table,
                    'dry_run': True,
                    'dry_run_result': 'completed_without_feature_write',
                    'write_target_table': args.output_target,
                },
                set_finished_at=True,
            )

    emit_payload(
        {
            'worker': 'run_embedding_job',
            'schema': args.schema,
            'job': job.__dict__,
            'target_scope_summary': scope,
            'status_after_start': running,
            'status_after_complete': completed,
            'resolved_vector_table': resolved_vector_table,
            'notes': [
                'this worker currently validates planner -> job -> PostgreSQL state flow',
                'real encoder inference can replace dry_run while preserving the same job contract',
            ],
        },
        args.output,
    )


if __name__ == '__main__':
    main()