mark_job_status.py 1.74 KB
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations

import argparse
import json
import os

from _job_common import connect, emit_payload, require_env, update_job_status


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
    ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
    ap.add_argument('--job-id', type=int, default=int(require_env('EXTRACTION_JOB_ID', '0')))
    ap.add_argument('--status', required=True)
    ap.add_argument('--input-count', type=int)
    ap.add_argument('--output-count', type=int)
    ap.add_argument('--log-uri')
    ap.add_argument('--metadata-json')
    ap.add_argument('--set-started-at', action='store_true')
    ap.add_argument('--set-finished-at', action='store_true')
    ap.add_argument('--output')
    args = ap.parse_args()

    if not args.dsn:
        raise SystemExit('missing --dsn or PG_DSN')
    if not args.job_id:
        raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')

    metadata_patch = json.loads(args.metadata_json) if args.metadata_json else {}

    with connect(args.dsn, args.schema) as conn:
        updated = update_job_status(
            conn,
            args.job_id,
            status=args.status,
            input_count=args.input_count,
            output_count=args.output_count,
            log_uri=args.log_uri,
            metadata_patch=metadata_patch,
            set_started_at=args.set_started_at,
            set_finished_at=args.set_finished_at,
        )

    emit_payload(
        {
            'worker': 'mark_job_status',
            'schema': args.schema,
            'update': updated,
        },
        args.output,
    )


if __name__ == '__main__':
    main()