mark_job_status.py 1.96 KB
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations

import argparse
import json
import os

from _job_common import ALLOWED_JOB_STATUSES, connect, emit_payload, update_job_status


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
    ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
    ap.add_argument('--job-id', type=int)
    ap.add_argument('--status', required=True, choices=sorted(ALLOWED_JOB_STATUSES))
    ap.add_argument('--expected-status', choices=sorted(ALLOWED_JOB_STATUSES))
    ap.add_argument('--input-count', type=int)
    ap.add_argument('--output-count', type=int)
    ap.add_argument('--log-uri')
    ap.add_argument('--metadata-json')
    ap.add_argument('--set-started-at', action='store_true')
    ap.add_argument('--set-finished-at', action='store_true')
    ap.add_argument('--output')
    args = ap.parse_args()

    if not args.dsn:
        raise SystemExit('missing --dsn or PG_DSN')
    job_id = args.job_id or int(os.environ.get('EXTRACTION_JOB_ID', '0'))
    if not job_id:
        raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')

    metadata_patch = json.loads(args.metadata_json) if args.metadata_json else {}

    with connect(args.dsn, args.schema) as conn:
        updated = update_job_status(
            conn,
            job_id,
            status=args.status,
            expected_status=args.expected_status,
            input_count=args.input_count,
            output_count=args.output_count,
            log_uri=args.log_uri,
            metadata_patch=metadata_patch,
            set_started_at=args.set_started_at,
            set_finished_at=args.set_finished_at,
        )

    emit_payload(
        {
            'worker': 'mark_job_status',
            'schema': args.schema,
            'job_id': job_id,
            'update': updated,
        },
        args.output,
    )


if __name__ == '__main__':
    main()