bootstrap_phase1_reference_members_live.py
3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_reference_member_bootstrap_report.json'
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--set-name', default='phase1_hot_reference_v1')
ap.add_argument('--member-role', default='hot_reference')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
schema = validate_schema(args.schema)
summary: dict[str, object] = {
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'set_name': args.set_name,
'member_role': args.member_role,
'inserted_recording_ids': [],
'reused_recording_ids': [],
}
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {schema}, public;')
ref_row = conn.execute(
'SELECT reference_set_id FROM reference_set_registry WHERE set_name = %s LIMIT 1;',
(args.set_name,),
).fetchone()
if not ref_row:
raise RuntimeError(f'reference_set_registry.set_name not found: {args.set_name}')
reference_set_id = int(ref_row[0])
summary['reference_set_id'] = reference_set_id
recordings = conn.execute(
"""
SELECT recording_id
FROM recording
WHERE is_reference = TRUE
ORDER BY recording_id;
"""
).fetchall()
for row in recordings:
recording_id = int(row[0])
existing = conn.execute(
"""
SELECT 1
FROM reference_set_member
WHERE reference_set_id = %s
AND recording_id = %s
LIMIT 1;
""",
(reference_set_id, recording_id),
).fetchone()
if existing:
summary['reused_recording_ids'].append(recording_id)
continue
conn.execute(
"""
INSERT INTO reference_set_member (reference_set_id, recording_id, member_role)
VALUES (%s, %s, %s);
""",
(reference_set_id, recording_id, args.member_role),
)
summary['inserted_recording_ids'].append(recording_id)
counts = conn.execute(
"""
SELECT
count(*) AS member_count,
count(DISTINCT ra.asset_id) FILTER (WHERE ra.ingest_status = 'ready') AS ready_asset_count,
count(DISTINCT aw.window_id) FILTER (WHERE aw.active_for_index) AS active_window_count
FROM reference_set_member rsm
LEFT JOIN recording_asset ra ON ra.recording_id = rsm.recording_id
LEFT JOIN audio_window aw ON aw.recording_id = rsm.recording_id
WHERE rsm.reference_set_id = %s;
""",
(reference_set_id,),
).fetchone()
summary['counts'] = {
'reference_recordings_seen': len(recordings),
'inserted_members': len(summary['inserted_recording_ids']),
'reused_members': len(summary['reused_recording_ids']),
'member_count': int(counts[0]),
'ready_asset_count': int(counts[1]),
'active_window_count': int(counts[2]),
}
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(summary, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()