pgvector_bulk_load_template.py
2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/env python3
"""Template bulk loader for pgvector-related metadata tables.
This script intentionally avoids requiring psycopg at runtime for now.
It produces the SQL statements and row payloads that a future live loader can
execute via COPY or execute_batch.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
SQL_STATEMENTS = {
"songs": """
INSERT INTO songs (song_id, title, artist, version_id, source_dataset, license)
VALUES (%(song_id)s, %(title)s, %(artist)s, %(version_id)s, %(source_dataset)s, %(license)s)
ON CONFLICT (song_id) DO UPDATE SET
title = EXCLUDED.title,
artist = EXCLUDED.artist,
version_id = EXCLUDED.version_id,
source_dataset = EXCLUDED.source_dataset,
license = EXCLUDED.license;
""".strip(),
"references": """
INSERT INTO references (song_id, audio_uri, duration_sec, sample_rate)
VALUES (%(song_id)s, %(audio_uri)s, %(duration_sec)s, %(sample_rate)s);
""".strip(),
"segments": """
INSERT INTO segments (song_id, audio_uri, offset_sec, duration_sec, split, type, segment_type, source_dataset)
VALUES (%(song_id)s, %(audio_uri)s, %(offset_sec)s, %(duration_sec)s, %(split)s, %(type)s, %(segment_type)s, %(source_dataset)s);
""".strip(),
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="JSON exported by export_manifest_to_pgvector_json.py")
parser.add_argument("--output", required=True, help="Output JSON plan for later DB execution")
args = parser.parse_args()
payload = json.loads(Path(args.input).read_text())
plan = {
"counts": {
"songs": len(payload.get("songs", [])),
"references": len(payload.get("references", [])),
"segments": len(payload.get("segments", [])),
},
"sql": SQL_STATEMENTS,
"rows": {
"songs": payload.get("songs", []),
"references": payload.get("references", []),
"segments": payload.get("segments", []),
},
"notes": [
"Execute songs before references and segments.",
"Embedding rows should be loaded only after reference_id/segment_id resolution.",
"A live loader can replace row-wise inserts with COPY/execute_batch.",
],
}
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(plan, indent=2, ensure_ascii=False))
print(json.dumps({
"status": "ok",
"output": str(out.resolve()),
**plan["counts"],
}, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()