Commit d04a6e65 d04a6e6538f3619e8a778fc6f518135d6d245ae8 by cnb.bofCdSsphPA

Close the song-centric import loop with feature_fact ingestion

Constraint: Complete the current manifest-to-PostgreSQL onboarding path on the 4-table fused schema without reintroducing any split-table storage path.
Rejected: Keep feature generation outside the manifest import workflow for Phase-1 | It leaves the current onboarding path incomplete and harder to validate end-to-end.
Confidence: high
Scope-risk: narrow
Directive: Treat windows[].features[] in song-centric manifests as the default batch path for writing fingerprint and embedding rows into feature_fact.
Tested: /usr/local/miniconda3/bin/python acr-engine/scripts/import_songcentric_manifest_live.py --dsn postgres://d2:d2pass@127.0.0.1:5432/d2 --schema acr_songcentric_test --manifest acr-engine/data/pgvector_eval/music20/songcentric_feature_manifest_sample.jsonl; repeated the import and verified counts remained media_entity=7, audio_object=15, feature_fact=9, set_membership=7; git diff --check; /usr/local/miniconda3/bin/python scripts/check_markdown_links.py --root docs returned OK for 11 active markdown files
Not-tested: automatic feature extraction from raw audio during import; large-scale concurrent manifest ingest
1 parent ba387bf0
{
"schema": "acr_songcentric_test",
"manifest": "acr-engine/data/pgvector_eval/music20/songcentric_feature_manifest_sample.jsonl",
"imported": [
{
"song_id": 6,
"asset_id": 12,
"window_ids": [
13
],
"feature_ids": [
7,
8
],
"membership_ids": [
6
]
},
{
"song_id": 7,
"asset_id": 14,
"window_ids": [
15
],
"feature_ids": [
9
],
"membership_ids": [
7
]
}
],
"counts": {
"media_entity": 7,
"audio_object": 15,
"feature_fact": 9,
"set_membership": 7
},
"window_lineage_sample": {
"window_id": 15,
"asset_id": 14,
"song_id": 7,
"title": "Feature Manifest Song 2",
"start_ms": 5000,
"end_ms": 10000
},
"feature_lineage_sample": {
"feature_type": "embedding",
"model_name": "muq",
"model_version": "large-msd-iter",
"feature_set_name": "muq_5s_hop2.5_meanpool",
"window_id": 15,
"song_id": 7,
"title": "Feature Manifest Song 2"
}
}
\ No newline at end of file
{
"schema": "acr_songcentric_test",
"manifest": "acr-engine/data/pgvector_eval/music20/songcentric_feature_manifest_sample.jsonl",
"imported": [
{
"song_id": 6,
"asset_id": 12,
"window_ids": [
13
],
"feature_ids": [
7,
8
],
"membership_ids": [
6
]
},
{
"song_id": 7,
"asset_id": 14,
"window_ids": [
15
],
"feature_ids": [
9
],
"membership_ids": [
7
]
}
],
"counts": {
"media_entity": 7,
"audio_object": 15,
"feature_fact": 9,
"set_membership": 7
},
"window_lineage_sample": {
"window_id": 15,
"asset_id": 14,
"song_id": 7,
"title": "Feature Manifest Song 2",
"start_ms": 5000,
"end_ms": 10000
},
"feature_lineage_sample": {
"feature_type": "embedding",
"model_name": "muq",
"model_version": "large-msd-iter",
"feature_set_name": "muq_5s_hop2.5_meanpool",
"window_id": 15,
"song_id": 7,
"title": "Feature Manifest Song 2"
}
}
\ No newline at end of file
{"song":{"biz_key":"song-30001","title":"Feature Manifest Song 1","artist_name":"Feature Artist 1"},"asset":{"source_type":"official","storage_uri":"/workspace/downloads/song-30001/master.wav","storage_scheme":"file","checksum":"sha256:feature-song-30001","codec":"wav","sample_rate":16000,"channels":1,"duration_ms":205000},"windows":[{"start_ms":10000,"end_ms":15000,"features":[{"feature_type":"fingerprint","model_name":"chromaprint","model_version":"phase1","feature_set_name":"chromaprint_5s","fingerprint_value":"fp-song-30001-w1","checksum":"fpchk-song-30001-w1"},{"feature_type":"embedding","model_name":"mert","model_version":"v1-95m","feature_set_name":"mert_5s_hop2.5_meanpool","embedding_dim":768,"embedding_uri":"s3://bucket/song-30001/win1.npy","vector_table_name":"audio_embedding_vector_768","checksum":"embchk-song-30001-w1"}]}],"memberships":[{"set_type":"reference_set","set_name":"phase1_hot_reference_v1","member_type":"asset","priority":100}]}
{"song":{"biz_key":"song-30002","title":"Feature Manifest Song 2","artist_name":"Feature Artist 2"},"asset":{"source_type":"ugc","storage_uri":"/workspace/downloads/song-30002/clip.wav","storage_scheme":"file","checksum":"sha256:feature-song-30002","codec":"wav","sample_rate":16000,"channels":1,"duration_ms":98000},"windows":[{"start_ms":5000,"end_ms":10000,"features":[{"feature_type":"embedding","model_name":"muq","model_version":"large-msd-iter","feature_set_name":"muq_5s_hop2.5_meanpool","embedding_dim":768,"embedding_uri":"s3://bucket/song-30002/win1.npy","vector_table_name":"audio_embedding_vector_768","checksum":"embchk-song-30002-w1"}]}],"memberships":[{"set_type":"eval_set","set_name":"phase1_eval_v1","member_type":"asset","priority":50}]}
......@@ -47,8 +47,15 @@ def ensure_asset(cur, song_id: int, asset: dict) -> int:
) values ('asset',%s,%s,%s,%s,%s,%s,%s,%s,%s) returning object_id
""",
(
song_id, asset.get('source_type'), asset.get('storage_uri'), asset.get('storage_scheme'),
asset.get('checksum'), asset.get('codec'), asset.get('sample_rate'), asset.get('channels'), asset.get('duration_ms'),
song_id,
asset.get('source_type'),
asset.get('storage_uri'),
asset.get('storage_scheme'),
asset.get('checksum'),
asset.get('codec'),
asset.get('sample_rate'),
asset.get('channels'),
asset.get('duration_ms'),
),
).fetchone()['object_id']
......@@ -66,16 +73,71 @@ def ensure_window(cur, song_id: int, asset_id: int, win: dict) -> int:
).fetchone()['object_id']
def ensure_membership(cur, m: dict, member_id: int, song_id: int) -> int:
def ensure_feature(cur, feature: dict, object_id: int, song_id: int) -> int:
row = cur.execute(
"select feature_id from feature_fact where object_id=%s and model_name=%s and model_version=%s and feature_set_name=%s and feature_type=%s",
(object_id, feature['model_name'], feature['model_version'], feature['feature_set_name'], feature['feature_type']),
).fetchone()
if row:
return row['feature_id']
if feature['feature_type'] == 'fingerprint':
return cur.execute(
"""
insert into feature_fact (
feature_type, object_id, song_id, model_name, model_version,
feature_set_name, fingerprint_value, checksum, metadata_json
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb)
returning feature_id
""",
(
feature['feature_type'],
object_id,
song_id,
feature['model_name'],
feature['model_version'],
feature['feature_set_name'],
feature['fingerprint_value'],
feature.get('checksum'),
json.dumps(feature.get('metadata_json', {})),
),
).fetchone()['feature_id']
return cur.execute(
"""
insert into feature_fact (
feature_type, object_id, song_id, model_name, model_version,
feature_set_name, feature_schema_ver, embedding_dim, embedding_uri, vector_table_name, checksum, metadata_json
) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb)
returning feature_id
""",
(
feature['feature_type'],
object_id,
song_id,
feature['model_name'],
feature['model_version'],
feature['feature_set_name'],
feature.get('feature_schema_ver', 'v1'),
feature.get('embedding_dim'),
feature.get('embedding_uri'),
feature.get('vector_table_name'),
feature.get('checksum'),
json.dumps(feature.get('metadata_json', {})),
),
).fetchone()['feature_id']
def ensure_membership(cur, membership: dict, member_id: int, song_id: int) -> int:
row = cur.execute(
"select membership_id from set_membership where set_type=%s and set_name=%s and member_type=%s and member_id=%s",
(m['set_type'], m['set_name'], m['member_type'], member_id),
(membership['set_type'], membership['set_name'], membership['member_type'], member_id),
).fetchone()
if row:
return row['membership_id']
return cur.execute(
"insert into set_membership (set_type,set_name,member_type,member_id,song_id,priority) values (%s,%s,%s,%s,%s,%s) returning membership_id",
(m['set_type'], m['set_name'], m['member_type'], member_id, song_id, m.get('priority', 100)),
(membership['set_type'], membership['set_name'], membership['member_type'], member_id, song_id, membership.get('priority', 100)),
).fetchone()['membership_id']
......@@ -92,11 +154,7 @@ def main() -> int:
output_path.parent.mkdir(parents=True, exist_ok=True)
qschema = quote_ident(args.schema)
report = {
'schema': args.schema,
'manifest': str(manifest_path),
'imported': [],
}
report = {'schema': args.schema, 'manifest': str(manifest_path), 'imported': []}
with psycopg.connect(args.dsn, row_factory=dict_row) as conn:
with conn.cursor() as cur:
......@@ -104,17 +162,26 @@ def main() -> int:
for row in load_jsonl(manifest_path):
song_id = ensure_song(cur, row['song'])
asset_id = ensure_asset(cur, song_id, row['asset'])
window_ids = [ensure_window(cur, song_id, asset_id, w) for w in row.get('windows', [])]
window_ids = []
feature_ids = []
for w in row.get('windows', []):
window_id = ensure_window(cur, song_id, asset_id, w)
window_ids.append(window_id)
for feature in w.get('features', []):
feature_ids.append(ensure_feature(cur, feature, window_id, song_id))
membership_ids = []
for m in row.get('memberships', []):
member_id = asset_id if m['member_type'] == 'asset' else song_id
membership_ids.append(ensure_membership(cur, m, member_id, song_id))
report['imported'].append({
report['imported'].append(
{
'song_id': song_id,
'asset_id': asset_id,
'window_ids': window_ids,
'feature_ids': feature_ids,
'membership_ids': membership_ids,
})
}
)
counts = {}
for table in ['media_entity', 'audio_object', 'feature_fact', 'set_membership']:
......@@ -136,6 +203,22 @@ def main() -> int:
limit 1
"""
).fetchone()
report['feature_lineage_sample'] = cur.execute(
"""
select ff.feature_type,
ff.model_name,
ff.model_version,
ff.feature_set_name,
win.object_id as window_id,
song.entity_id as song_id,
song.title
from feature_fact ff
join audio_object win on win.object_id = ff.object_id and win.object_type='window'
join media_entity song on song.entity_id = ff.song_id and song.entity_type='song'
order by ff.feature_id desc
limit 1
"""
).fetchone()
conn.commit()
output_path.write_text(json.dumps(report, ensure_ascii=False, indent=2))
......
## 2026-06-04
- 扩展 `import_songcentric_manifest_live.py` 支持从 manifest 的 `windows[].features[]` 直接落 `feature_fact`,并用 `songcentric_feature_manifest_sample.jsonl` 在 live PostgreSQL 上验证 `song -> asset -> window -> feature -> membership` 的完整导入闭环与幂等性。
- 扩展 `import_songcentric_manifest_live.py` 支持从 manifest 的 `windows[].features[]` 直接落 `feature_fact`,并新增 `songcentric_feature_manifest_sample.jsonl` 用于验证 `song -> asset -> window -> feature -> membership` 的完整导入闭环。
- 新增 `acr-engine/scripts/import_songcentric_manifest_live.py` 与样例 manifest `acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl`,把当前 4 表 schema 推进到“可从 JSONL manifest 批量导入 song/asset/window/set_membership”的阶段。
- 新增 `acr-engine/scripts/bootstrap_songcentric_phase1_live.py`,把当前 4 表 schema 从“单条 smoke 写入”推进到“可重复 Phase-1 bootstrap”;并准备对 `acr_songcentric_test` 做 fresh live 初始化验证。
......
......@@ -261,12 +261,15 @@ flowchart TD
B --> C[media_entity song]
B --> D[audio_object asset]
B --> E[audio_object window x N]
B --> F[set_membership]
B --> F[feature_fact]
B --> G[set_membership]
```
当前样例 manifest:[`acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl`](../acr-engine/data/pgvector_eval/music20/songcentric_manifest_sample.jsonl)
当前导入脚本:[`acr-engine/scripts/import_songcentric_manifest_live.py`](../acr-engine/scripts/import_songcentric_manifest_live.py)
当前带 feature 样例 manifest:[`acr-engine/data/pgvector_eval/music20/songcentric_feature_manifest_sample.jsonl`](../acr-engine/data/pgvector_eval/music20/songcentric_feature_manifest_sample.jsonl)
---
## 5. 最常用 SQL 样例
......