bootstrap_phase1_reference_members_live.py 3.93 KB
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations

import argparse
import json
from pathlib import Path
import sys

import psycopg

ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from workers._job_common import validate_schema

DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_reference_member_bootstrap_report.json'


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument('--dsn', required=True)
    ap.add_argument('--schema', default='acr_test')
    ap.add_argument('--set-name', default='phase1_hot_reference_v1')
    ap.add_argument('--member-role', default='hot_reference')
    ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
    args = ap.parse_args()
    schema = validate_schema(args.schema)

    summary: dict[str, object] = {
        'schema': schema,
        'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
        'set_name': args.set_name,
        'member_role': args.member_role,
        'inserted_recording_ids': [],
        'reused_recording_ids': [],
    }

    with psycopg.connect(args.dsn, autocommit=True) as conn:
        conn.execute(f'SET search_path TO {schema}, public;')

        ref_row = conn.execute(
            'SELECT reference_set_id FROM reference_set_registry WHERE set_name = %s LIMIT 1;',
            (args.set_name,),
        ).fetchone()
        if not ref_row:
            raise RuntimeError(f'reference_set_registry.set_name not found: {args.set_name}')
        reference_set_id = int(ref_row[0])
        summary['reference_set_id'] = reference_set_id

        recordings = conn.execute(
            """
            SELECT recording_id
            FROM recording
            WHERE is_reference = TRUE
            ORDER BY recording_id;
            """
        ).fetchall()
        for row in recordings:
            recording_id = int(row[0])
            existing = conn.execute(
                """
                SELECT 1
                FROM reference_set_member
                WHERE reference_set_id = %s
                  AND recording_id = %s
                LIMIT 1;
                """,
                (reference_set_id, recording_id),
            ).fetchone()
            if existing:
                summary['reused_recording_ids'].append(recording_id)
                continue
            conn.execute(
                """
                INSERT INTO reference_set_member (reference_set_id, recording_id, member_role)
                VALUES (%s, %s, %s);
                """,
                (reference_set_id, recording_id, args.member_role),
            )
            summary['inserted_recording_ids'].append(recording_id)

        counts = conn.execute(
            """
            SELECT
                count(*) AS member_count,
                count(DISTINCT ra.asset_id) FILTER (WHERE ra.ingest_status = 'ready') AS ready_asset_count,
                count(DISTINCT aw.window_id) FILTER (WHERE aw.active_for_index) AS active_window_count
            FROM reference_set_member rsm
            LEFT JOIN recording_asset ra ON ra.recording_id = rsm.recording_id
            LEFT JOIN audio_window aw ON aw.recording_id = rsm.recording_id
            WHERE rsm.reference_set_id = %s;
            """,
            (reference_set_id,),
        ).fetchone()
        summary['counts'] = {
            'reference_recordings_seen': len(recordings),
            'inserted_members': len(summary['inserted_recording_ids']),
            'reused_members': len(summary['reused_recording_ids']),
            'member_count': int(counts[0]),
            'ready_asset_count': int(counts[1]),
            'active_window_count': int(counts[2]),
        }

    output = Path(args.output)
    output.parent.mkdir(parents=True, exist_ok=True)
    output.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
    print(json.dumps(summary, ensure_ascii=False, indent=2))


if __name__ == '__main__':
    main()