pgvector_bulk_load_template.py 2.58 KB
#!/usr/bin/env python3
"""Template bulk loader for pgvector-related metadata tables.

This script intentionally avoids requiring psycopg at runtime for now.
It produces the SQL statements and row payloads that a future live loader can
execute via COPY or execute_batch.
"""

from __future__ import annotations

import argparse
import json
from pathlib import Path


SQL_STATEMENTS = {
    "songs": """
INSERT INTO songs (song_id, title, artist, version_id, source_dataset, license)
VALUES (%(song_id)s, %(title)s, %(artist)s, %(version_id)s, %(source_dataset)s, %(license)s)
ON CONFLICT (song_id) DO UPDATE SET
    title = EXCLUDED.title,
    artist = EXCLUDED.artist,
    version_id = EXCLUDED.version_id,
    source_dataset = EXCLUDED.source_dataset,
    license = EXCLUDED.license;
""".strip(),
    "references": """
INSERT INTO references (song_id, audio_uri, duration_sec, sample_rate)
VALUES (%(song_id)s, %(audio_uri)s, %(duration_sec)s, %(sample_rate)s);
""".strip(),
    "segments": """
INSERT INTO segments (song_id, audio_uri, offset_sec, duration_sec, split, type, segment_type, source_dataset)
VALUES (%(song_id)s, %(audio_uri)s, %(offset_sec)s, %(duration_sec)s, %(split)s, %(type)s, %(segment_type)s, %(source_dataset)s);
""".strip(),
}


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True, help="JSON exported by export_manifest_to_pgvector_json.py")
    parser.add_argument("--output", required=True, help="Output JSON plan for later DB execution")
    args = parser.parse_args()

    payload = json.loads(Path(args.input).read_text())
    plan = {
        "counts": {
            "songs": len(payload.get("songs", [])),
            "references": len(payload.get("references", [])),
            "segments": len(payload.get("segments", [])),
        },
        "sql": SQL_STATEMENTS,
        "rows": {
            "songs": payload.get("songs", []),
            "references": payload.get("references", []),
            "segments": payload.get("segments", []),
        },
        "notes": [
            "Execute songs before references and segments.",
            "Embedding rows should be loaded only after reference_id/segment_id resolution.",
            "A live loader can replace row-wise inserts with COPY/execute_batch.",
        ],
    }

    out = Path(args.output)
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(json.dumps(plan, indent=2, ensure_ascii=False))
    print(json.dumps({
        "status": "ok",
        "output": str(out.resolve()),
        **plan["counts"],
    }, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()