Commit 8002dfb0 8002dfb03ecaffefd2ee691b9fd275131a159cf4 by cnb.bofCdSsphPA

Import song-centric manifests into live PostgreSQL with idempotent upserts

Constraint: Extend the current 4-table song-centric schema with a practical manifest ingestion path without introducing the older split-table model or hidden side metadata tables.
Rejected: Leave ingestion as handwritten SQL or one-off bootstrap logic | It slows real asset onboarding and makes repeatability hard to verify.
Confidence: high
Scope-risk: narrow
Directive: Use import_songcentric_manifest_live.py plus a manifest JSONL as the default path for batch asset/window onboarding into the fused schema.
Tested: /usr/local/miniconda3/bin/python acr-engine/scripts/import_songcentric_manifest_live.py --dsn postgres://d2:d2pass@127.0.0.1:5432/d2 --schema acr_songcentric_test --manifest acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl; repeated the import and verified counts remained media_entity=5, audio_object=11, feature_fact=6, set_membership=5; git diff --check; /usr/local/miniconda3/bin/python scripts/check_markdown_links.py --root docs returned OK for 11 active markdown files
Not-tested: feature_fact generation during manifest import and large-scale manifest throughput
1 parent 5e43f28b
1 {
2 "schema": "acr_songcentric_test",
3 "manifest": "acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl",
4 "imported": [
5 {
6 "song_id": 4,
7 "asset_id": 7,
8 "window_ids": [
9 8,
10 9
11 ],
12 "membership_ids": [
13 4
14 ]
15 },
16 {
17 "song_id": 5,
18 "asset_id": 10,
19 "window_ids": [
20 11
21 ],
22 "membership_ids": [
23 5
24 ]
25 }
26 ],
27 "counts": {
28 "media_entity": 5,
29 "audio_object": 11,
30 "feature_fact": 6,
31 "set_membership": 5
32 },
33 "window_lineage_sample": {
34 "window_id": 11,
35 "asset_id": 10,
36 "song_id": 5,
37 "title": "Manifest Song 2",
38 "start_ms": 5000,
39 "end_ms": 10000
40 }
41 }
...\ No newline at end of file ...\ No newline at end of file
1 {
2 "schema": "acr_songcentric_test",
3 "manifest": "acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl",
4 "imported": [
5 {
6 "song_id": 4,
7 "asset_id": 7,
8 "window_ids": [
9 8,
10 9
11 ],
12 "membership_ids": [
13 4
14 ]
15 },
16 {
17 "song_id": 5,
18 "asset_id": 10,
19 "window_ids": [
20 11
21 ],
22 "membership_ids": [
23 5
24 ]
25 }
26 ],
27 "counts": {
28 "media_entity": 5,
29 "audio_object": 11,
30 "feature_fact": 6,
31 "set_membership": 5
32 },
33 "window_lineage_sample": {
34 "window_id": 11,
35 "asset_id": 10,
36 "song_id": 5,
37 "title": "Manifest Song 2",
38 "start_ms": 5000,
39 "end_ms": 10000
40 }
41 }
...\ No newline at end of file ...\ No newline at end of file
1 {"song":{"biz_key":"song-20001","title":"Manifest Song 1","artist_name":"Manifest Artist 1"},"asset":{"source_type":"official","storage_uri":"/workspace/downloads/song-20001/master.wav","storage_scheme":"file","checksum":"sha256:manifest-song-20001","codec":"wav","sample_rate":16000,"channels":1,"duration_ms":210000},"windows":[{"start_ms":10000,"end_ms":15000},{"start_ms":60000,"end_ms":65000}],"memberships":[{"set_type":"reference_set","set_name":"phase1_hot_reference_v1","member_type":"asset","priority":100}]}
2 {"song":{"biz_key":"song-20002","title":"Manifest Song 2","artist_name":"Manifest Artist 2"},"asset":{"source_type":"ugc","storage_uri":"/workspace/downloads/song-20002/clip.wav","storage_scheme":"file","checksum":"sha256:manifest-song-20002","codec":"wav","sample_rate":16000,"channels":1,"duration_ms":95000},"windows":[{"start_ms":5000,"end_ms":10000}],"memberships":[{"set_type":"eval_set","set_name":"phase1_eval_v1","member_type":"asset","priority":50}]}
1 #!/usr/bin/env /usr/local/miniconda3/bin/python
2 from __future__ import annotations
3
4 import argparse
5 import json
6 from pathlib import Path
7
8 import psycopg
9 from psycopg.rows import dict_row
10
11
12 def quote_ident(name: str) -> str:
13 return '"' + name.replace('"', '""') + '"'
14
15
16 def load_jsonl(path: Path):
17 for line in path.read_text().splitlines():
18 line = line.strip()
19 if line:
20 yield json.loads(line)
21
22
23 def ensure_song(cur, song: dict) -> int:
24 row = cur.execute(
25 "select entity_id from media_entity where entity_type='song' and biz_key=%s",
26 (song['biz_key'],),
27 ).fetchone()
28 if row:
29 return row['entity_id']
30 return cur.execute(
31 "insert into media_entity (entity_type,biz_key,title,artist_name) values ('song',%s,%s,%s) returning entity_id",
32 (song['biz_key'], song['title'], song.get('artist_name')),
33 ).fetchone()['entity_id']
34
35
36 def ensure_asset(cur, song_id: int, asset: dict) -> int:
37 row = cur.execute(
38 "select object_id from audio_object where object_type='asset' and song_id=%s and checksum=%s",
39 (song_id, asset['checksum']),
40 ).fetchone()
41 if row:
42 return row['object_id']
43 return cur.execute(
44 """
45 insert into audio_object (
46 object_type,song_id,source_type,storage_uri,storage_scheme,checksum,codec,sample_rate,channels,duration_ms
47 ) values ('asset',%s,%s,%s,%s,%s,%s,%s,%s,%s) returning object_id
48 """,
49 (
50 song_id, asset.get('source_type'), asset.get('storage_uri'), asset.get('storage_scheme'),
51 asset.get('checksum'), asset.get('codec'), asset.get('sample_rate'), asset.get('channels'), asset.get('duration_ms'),
52 ),
53 ).fetchone()['object_id']
54
55
56 def ensure_window(cur, song_id: int, asset_id: int, win: dict) -> int:
57 row = cur.execute(
58 "select object_id from audio_object where object_type='window' and parent_object_id=%s and start_ms=%s and end_ms=%s",
59 (asset_id, win['start_ms'], win['end_ms']),
60 ).fetchone()
61 if row:
62 return row['object_id']
63 return cur.execute(
64 "insert into audio_object (object_type,song_id,parent_object_id,start_ms,end_ms,duration_ms) values ('window',%s,%s,%s,%s,%s) returning object_id",
65 (song_id, asset_id, win['start_ms'], win['end_ms'], win['end_ms'] - win['start_ms']),
66 ).fetchone()['object_id']
67
68
69 def ensure_membership(cur, m: dict, member_id: int, song_id: int) -> int:
70 row = cur.execute(
71 "select membership_id from set_membership where set_type=%s and set_name=%s and member_type=%s and member_id=%s",
72 (m['set_type'], m['set_name'], m['member_type'], member_id),
73 ).fetchone()
74 if row:
75 return row['membership_id']
76 return cur.execute(
77 "insert into set_membership (set_type,set_name,member_type,member_id,song_id,priority) values (%s,%s,%s,%s,%s,%s) returning membership_id",
78 (m['set_type'], m['set_name'], m['member_type'], member_id, song_id, m.get('priority', 100)),
79 ).fetchone()['membership_id']
80
81
82 def main() -> int:
83 parser = argparse.ArgumentParser()
84 parser.add_argument('--dsn', required=True)
85 parser.add_argument('--schema', default='acr_songcentric_test')
86 parser.add_argument('--manifest', required=True)
87 parser.add_argument('--output', required=True)
88 args = parser.parse_args()
89
90 manifest_path = Path(args.manifest)
91 output_path = Path(args.output)
92 output_path.parent.mkdir(parents=True, exist_ok=True)
93 qschema = quote_ident(args.schema)
94
95 report = {
96 'schema': args.schema,
97 'manifest': str(manifest_path),
98 'imported': [],
99 }
100
101 with psycopg.connect(args.dsn, row_factory=dict_row) as conn:
102 with conn.cursor() as cur:
103 cur.execute(f'set search_path to {qschema}, public')
104 for row in load_jsonl(manifest_path):
105 song_id = ensure_song(cur, row['song'])
106 asset_id = ensure_asset(cur, song_id, row['asset'])
107 window_ids = [ensure_window(cur, song_id, asset_id, w) for w in row.get('windows', [])]
108 membership_ids = []
109 for m in row.get('memberships', []):
110 member_id = asset_id if m['member_type'] == 'asset' else song_id
111 membership_ids.append(ensure_membership(cur, m, member_id, song_id))
112 report['imported'].append({
113 'song_id': song_id,
114 'asset_id': asset_id,
115 'window_ids': window_ids,
116 'membership_ids': membership_ids,
117 })
118
119 counts = {}
120 for table in ['media_entity', 'audio_object', 'feature_fact', 'set_membership']:
121 counts[table] = cur.execute(f'select count(*) as c from {table}').fetchone()['c']
122 report['counts'] = counts
123 report['window_lineage_sample'] = cur.execute(
124 """
125 select win.object_id as window_id,
126 ast.object_id as asset_id,
127 song.entity_id as song_id,
128 song.title,
129 win.start_ms,
130 win.end_ms
131 from audio_object win
132 join audio_object ast on ast.object_id = win.parent_object_id and ast.object_type='asset'
133 join media_entity song on song.entity_id = win.song_id and song.entity_type='song'
134 where win.object_type='window'
135 order by win.object_id desc
136 limit 1
137 """
138 ).fetchone()
139 conn.commit()
140
141 output_path.write_text(json.dumps(report, ensure_ascii=False, indent=2))
142 print(json.dumps(report, ensure_ascii=False, indent=2))
143 return 0
144
145
146 if __name__ == '__main__':
147 raise SystemExit(main())
1 ## 2026-06-04 1 ## 2026-06-04
2 2
3 - 新增 `acr-engine/scripts/import_songcentric_manifest_live.py` 与样例 manifest `acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl`,把当前 4 表 schema 推进到“可从 JSONL manifest 批量导入 song/asset/window/set_membership”的阶段。
4
3 - 新增 `acr-engine/scripts/bootstrap_songcentric_phase1_live.py`,把当前 4 表 schema 从“单条 smoke 写入”推进到“可重复 Phase-1 bootstrap”;并准备对 `acr_songcentric_test` 做 fresh live 初始化验证。 5 - 新增 `acr-engine/scripts/bootstrap_songcentric_phase1_live.py`,把当前 4 表 schema 从“单条 smoke 写入”推进到“可重复 Phase-1 bootstrap”;并准备对 `acr_songcentric_test` 做 fresh live 初始化验证。
4 6
5 - 新增正式 SQL 文件 `acr-engine/sql/acr_pg_schema_songcentric_v1.sql` 与 live smoke 脚本 `acr-engine/scripts/smoke_songcentric_schema_live.py`,把 4 张融合优先表从文档草案推进到可执行 schema,并准备在用户 PostgreSQL 上做 fresh 验证。 7 - 新增正式 SQL 文件 `acr-engine/sql/acr_pg_schema_songcentric_v1.sql` 与 live smoke 脚本 `acr-engine/scripts/smoke_songcentric_schema_live.py`,把 4 张融合优先表从文档草案推进到可执行 schema,并准备在用户 PostgreSQL 上做 fresh 验证。
......
...@@ -252,6 +252,21 @@ flowchart TD ...@@ -252,6 +252,21 @@ flowchart TD
252 252
253 当前 live bootstrap 脚本:[`acr-engine/scripts/bootstrap_songcentric_phase1_live.py`](../acr-engine/scripts/bootstrap_songcentric_phase1_live.py) 253 当前 live bootstrap 脚本:[`acr-engine/scripts/bootstrap_songcentric_phase1_live.py`](../acr-engine/scripts/bootstrap_songcentric_phase1_live.py)
254 254
255
256 ### 4.5 Manifest 导入流程
257
258 ```mermaid
259 flowchart TD
260 A[songcentric_manifest_sample.jsonl] --> B[import_songcentric_manifest_live.py]
261 B --> C[media_entity song]
262 B --> D[audio_object asset]
263 B --> E[audio_object window x N]
264 B --> F[set_membership]
265 ```
266
267 当前样例 manifest:[`acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl`](../acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl)
268 当前导入脚本:[`acr-engine/scripts/import_songcentric_manifest_live.py`](../acr-engine/scripts/import_songcentric_manifest_live.py)
269
255 --- 270 ---
256 271
257 ## 5. 最常用 SQL 样例 272 ## 5. 最常用 SQL 样例
......