mark_job_status.py
1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
import os
from _job_common import connect, emit_payload, require_env, update_job_status
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(require_env('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--status', required=True)
ap.add_argument('--input-count', type=int)
ap.add_argument('--output-count', type=int)
ap.add_argument('--log-uri')
ap.add_argument('--metadata-json')
ap.add_argument('--set-started-at', action='store_true')
ap.add_argument('--set-finished-at', action='store_true')
ap.add_argument('--output')
args = ap.parse_args()
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
metadata_patch = json.loads(args.metadata_json) if args.metadata_json else {}
with connect(args.dsn, args.schema) as conn:
updated = update_job_status(
conn,
args.job_id,
status=args.status,
input_count=args.input_count,
output_count=args.output_count,
log_uri=args.log_uri,
metadata_patch=metadata_patch,
set_started_at=args.set_started_at,
set_finished_at=args.set_finished_at,
)
emit_payload(
{
'worker': 'mark_job_status',
'schema': args.schema,
'update': updated,
},
args.output,
)
if __name__ == '__main__':
main()