Commit 6ea7365b 6ea7365b235904d9b4fbfcd3b704d8d1cdec2259 by cnb.bofCdSsphPA

Prove asset-level embedding upserts against live PostgreSQL

Constraint: The schema already declared asset-level idempotency, but without live evidence future work could mistake it for an unverified design note.
Rejected: Rely on DDL inspection alone | It would not prove duplicate inserts are blocked and upserts reuse the same embedding row.
Confidence: high
Scope-risk: narrow
Directive: Keep asset-level writer implementations aligned with the verified ON CONFLICT (feature_set_id, asset_id) WHERE window_id IS NULL contract.
Tested: /usr/local/miniconda3/bin/python -m py_compile scripts/validate_audio_embedding_asset_upsert_live.py; git diff --check; /usr/local/miniconda3/bin/python scripts/validate_audio_embedding_asset_upsert_live.py --dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' --schema acr_asset_upsert_test --output data/pgvector_eval/music20/audio_embedding_asset_upsert_live_report.json
Not-tested: No production semantic writer uses the asset-level contract yet; this commit validates the DB contract, not an end-to-end extractor.
1 parent 015e3261
{
"schema": "acr_asset_upsert_test",
"dsn_redacted": "postgres://d2:***@127.0.0.1:5432/d2",
"seed_ids": {
"model_id": 1,
"feature_set_id": 1,
"canonical_song_id": 1,
"work_id": 1,
"recording_id": 1,
"asset_id": 1
},
"first_insert_embedding_id": 1,
"duplicate_insert_guard": {
"passed": true,
"error_type": "UniqueViolation",
"message": "duplicate key value violates unique constraint \"uq_audio_embedding_feature_asset\""
},
"upsert_embedding_id": 1,
"same_embedding_id_reused": true,
"counts": {
"audio_embedding": 1,
"audio_embedding_vector_192": 1
},
"final_state": {
"embedding_id": 1,
"asset_id": 1,
"window_id": null,
"checksum": "checksum-v2",
"embedding_uri": "inline://asset-probe-upsert",
"metadata_json": {
"probe": "asset_level_upsert_v2"
},
"vector_literal": "[0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2,0.2]"
},
"passed": true
}
\ No newline at end of file
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_SCHEMA_SQL = ROOT / 'sql' / 'acr_pg_schema_v2.sql'
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'audio_embedding_asset_upsert_live_report.json'
def vec_literal(vec: list[float]) -> str:
return '[' + ','.join(f'{x:.10f}' for x in vec) + ']'
def reset_schema(conn: psycopg.Connection, schema: str) -> None:
schema = validate_schema(schema)
conn.execute(f'DROP SCHEMA IF EXISTS {schema} CASCADE;')
conn.execute(f'CREATE SCHEMA {schema};')
conn.execute(f'SET search_path TO {schema}, public;')
def apply_schema(conn: psycopg.Connection, schema_sql: Path) -> None:
conn.execute(schema_sql.read_text(encoding='utf-8'))
def seed_minimal_graph(conn: psycopg.Connection) -> dict[str, int]:
model_id = conn.execute(
"""
INSERT INTO model_registry (
model_name, model_family, model_version, model_source, model_uri,
license_name, input_sample_rate, default_window_sec, default_hop_sec,
output_embedding_dim, pooling_supported, metadata_json
) VALUES (
'asset_level_probe', 'probe', 'v1', 'live-test',
'scripts/validate_audio_embedding_asset_upsert_live.py', 'internal-eval',
16000, 5.0, 2.5, 192, ARRAY['none'], '{}'::jsonb
)
RETURNING model_id;
"""
).fetchone()[0]
feature_set_id = conn.execute(
"""
INSERT INTO feature_set_registry (
model_id, feature_name, feature_level, extraction_granularity,
window_sec, hop_sec, embedding_dim, pooling_strategy, layer_selection,
normalize_l2, distance_metric, quantization_type, feature_schema_version,
config_json, status
) VALUES (
%s, 'semantic_embedding', 'asset', 'whole_asset',
5.0, 2.5, 192, 'none', 'na', TRUE, 'cosine', NULL, 'v1',
'{"probe":"asset_level_upsert"}'::jsonb, 'active'
)
RETURNING feature_set_id;
""",
(model_id,),
).fetchone()[0]
canonical_song_id = conn.execute(
"""
INSERT INTO canonical_song (biz_song_code, title, rights_status, metadata_json)
VALUES ('asset-probe-song', 'Asset Probe Song', 'protected', '{}'::jsonb)
RETURNING canonical_song_id;
"""
).fetchone()[0]
work_id = conn.execute(
"""
INSERT INTO work (canonical_song_id, work_code, work_title, metadata_json)
VALUES (%s, 'asset-probe-work', 'Asset Probe Work', '{}'::jsonb)
RETURNING work_id;
""",
(canonical_song_id,),
).fetchone()[0]
recording_id = conn.execute(
"""
INSERT INTO recording (
work_id, canonical_song_id, recording_code, recording_title,
version_type, is_reference, duration_sec, metadata_json
) VALUES (%s, %s, 'asset-probe-rec', 'Asset Probe Recording', 'master_reference', TRUE, 5.0, '{}'::jsonb)
RETURNING recording_id;
""",
(work_id, canonical_song_id),
).fetchone()[0]
asset_id = conn.execute(
"""
INSERT INTO recording_asset (
recording_id, asset_role, storage_uri, storage_scheme, file_ext,
mime_type, sample_rate, channels, codec_name, duration_sec,
normalized_storage_uri, ingest_status, metadata_json
) VALUES (
%s, 'reference_audio', '/tmp/asset-probe.wav', 'file', 'wav',
'audio/wav', 16000, 1, 'pcm_s16le', 5.0,
'/tmp/asset-probe.wav', 'ready', '{}'::jsonb
)
RETURNING asset_id;
""",
(recording_id,),
).fetchone()[0]
return {
'model_id': int(model_id),
'feature_set_id': int(feature_set_id),
'canonical_song_id': int(canonical_song_id),
'work_id': int(work_id),
'recording_id': int(recording_id),
'asset_id': int(asset_id),
}
def insert_asset_embedding(conn: psycopg.Connection, ids: dict[str, int], *, checksum: str, metadata: dict[str, Any], vec: list[float]) -> int:
embedding_id = conn.execute(
"""
INSERT INTO audio_embedding (
feature_set_id, extraction_job_id, asset_id, window_id, recording_id, work_id,
canonical_song_id, embedding_storage_mode, embedding_uri, vector_norm, checksum,
is_indexed, metadata_json
) VALUES (
%s, NULL, %s, NULL, %s, %s,
%s, 'pgvector_inline_192', 'inline://asset-probe', 1.0, %s,
TRUE, %s::jsonb
)
RETURNING embedding_id;
""",
(
ids['feature_set_id'],
ids['asset_id'],
ids['recording_id'],
ids['work_id'],
ids['canonical_song_id'],
checksum,
json.dumps(metadata, ensure_ascii=False),
),
).fetchone()[0]
conn.execute(
'INSERT INTO audio_embedding_vector_192 (embedding_id, embedding) VALUES (%s, %s::vector);',
(embedding_id, vec_literal(vec)),
)
return int(embedding_id)
def expect_duplicate_insert_failure(conn: psycopg.Connection, ids: dict[str, int]) -> dict[str, Any]:
try:
with conn.transaction():
conn.execute(
"""
INSERT INTO audio_embedding (
feature_set_id, extraction_job_id, asset_id, window_id, recording_id, work_id,
canonical_song_id, embedding_storage_mode, embedding_uri, vector_norm, checksum,
is_indexed, metadata_json
) VALUES (
%s, NULL, %s, NULL, %s, %s,
%s, 'pgvector_inline_192', 'inline://asset-probe-duplicate', 1.0, 'dup-checksum',
TRUE, '{"probe":"duplicate_insert"}'::jsonb
);
""",
(
ids['feature_set_id'],
ids['asset_id'],
ids['recording_id'],
ids['work_id'],
ids['canonical_song_id'],
),
)
return {'passed': False, 'note': 'duplicate asset-level insert unexpectedly succeeded'}
except Exception as exc: # noqa: BLE001
return {
'passed': 'uq_audio_embedding_feature_asset' in str(exc),
'error_type': type(exc).__name__,
'message': str(exc).splitlines()[0],
}
def upsert_asset_embedding(conn: psycopg.Connection, ids: dict[str, int], *, checksum: str, metadata: dict[str, Any], vec: list[float]) -> int:
embedding_id = conn.execute(
"""
INSERT INTO audio_embedding (
feature_set_id, extraction_job_id, asset_id, window_id, recording_id, work_id,
canonical_song_id, embedding_storage_mode, embedding_uri, vector_norm, checksum,
is_indexed, metadata_json
) VALUES (
%s, NULL, %s, NULL, %s, %s,
%s, 'pgvector_inline_192', 'inline://asset-probe-upsert', 1.0, %s,
TRUE, %s::jsonb
)
ON CONFLICT (feature_set_id, asset_id)
WHERE window_id IS NULL AND asset_id IS NOT NULL
DO UPDATE SET
checksum = EXCLUDED.checksum,
embedding_uri = EXCLUDED.embedding_uri,
metadata_json = EXCLUDED.metadata_json,
is_indexed = EXCLUDED.is_indexed,
vector_norm = EXCLUDED.vector_norm
RETURNING embedding_id;
""",
(
ids['feature_set_id'],
ids['asset_id'],
ids['recording_id'],
ids['work_id'],
ids['canonical_song_id'],
checksum,
json.dumps(metadata, ensure_ascii=False),
),
).fetchone()[0]
conn.execute(
"""
INSERT INTO audio_embedding_vector_192 (embedding_id, embedding)
VALUES (%s, %s::vector)
ON CONFLICT (embedding_id)
DO UPDATE SET embedding = EXCLUDED.embedding;
""",
(embedding_id, vec_literal(vec)),
)
return int(embedding_id)
def fetch_final_state(conn: psycopg.Connection, embedding_id: int) -> dict[str, Any]:
row = conn.execute(
"""
SELECT ae.embedding_id, ae.asset_id, ae.window_id, ae.checksum, ae.embedding_uri, ae.metadata_json,
aev.embedding::text
FROM audio_embedding ae
JOIN audio_embedding_vector_192 aev ON aev.embedding_id = ae.embedding_id
WHERE ae.embedding_id = %s;
""",
(embedding_id,),
).fetchone()
return {
'embedding_id': int(row[0]),
'asset_id': int(row[1]),
'window_id': row[2],
'checksum': row[3],
'embedding_uri': row[4],
'metadata_json': row[5] or {},
'vector_literal': row[6],
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_asset_upsert_test')
ap.add_argument('--schema-sql', default=str(DEFAULT_SCHEMA_SQL))
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
initial_vec = [0.1] * 192
updated_vec = [0.2] * 192
payload: dict[str, Any] = {
'schema': args.schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
}
with psycopg.connect(args.dsn, autocommit=True) as conn:
reset_schema(conn, args.schema)
apply_schema(conn, Path(args.schema_sql))
ids = seed_minimal_graph(conn)
payload['seed_ids'] = ids
first_embedding_id = insert_asset_embedding(
conn,
ids,
checksum='checksum-v1',
metadata={'probe': 'asset_level_insert_v1'},
vec=initial_vec,
)
payload['first_insert_embedding_id'] = first_embedding_id
payload['duplicate_insert_guard'] = expect_duplicate_insert_failure(conn, ids)
upsert_embedding_id = upsert_asset_embedding(
conn,
ids,
checksum='checksum-v2',
metadata={'probe': 'asset_level_upsert_v2'},
vec=updated_vec,
)
payload['upsert_embedding_id'] = upsert_embedding_id
payload['same_embedding_id_reused'] = first_embedding_id == upsert_embedding_id
payload['counts'] = {
'audio_embedding': int(conn.execute('SELECT count(*) FROM audio_embedding;').fetchone()[0]),
'audio_embedding_vector_192': int(conn.execute('SELECT count(*) FROM audio_embedding_vector_192;').fetchone()[0]),
}
payload['final_state'] = fetch_final_state(conn, upsert_embedding_id)
payload['passed'] = (
payload['duplicate_insert_guard'].get('passed')
and payload['same_embedding_id_reused']
and payload['counts']['audio_embedding'] == 1
and payload['counts']['audio_embedding_vector_192'] == 1
and payload['final_state']['checksum'] == 'checksum-v2'
and payload['final_state']['metadata_json'].get('probe') == 'asset_level_upsert_v2'
)
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
## 2026-06-04
- 新增 `scripts/validate_audio_embedding_asset_upsert_live.py``audio_embedding_asset_upsert_live_report.json`,在隔离 schema `acr_asset_upsert_test` 上真实验证 `uq_audio_embedding_feature_asset`:重复普通 insert 会触发 `UniqueViolation`,而 `ON CONFLICT ... DO UPDATE` 会复用同一 `embedding_id`,最终 `audio_embedding/audio_embedding_vector_192` 行数都保持为 `1`
- 新增 `scripts/run_phase1_embedding_preflight_matrix_live.py``phase1_embedding_preflight_matrix_report.json`,对 `mert / muq / ecapa` 四条 semantic jobs 做了统一 live preflight 矩阵验证;结果表明 4 条 job 全都稳定落到 `preflight_failed`,且 blocker 已收敛为 `/workspace/downloads` 未挂载与语义模型 runtime 缺失,而不是单条 job 的偶发异常。
- 更新 `run_embedding_job.py`,把 semantic lane 从“只有 dry-run”推进到“真实 scope 读取 + vector table 校验 + runtime 依赖校验 + 缺音频校验 + PostgreSQL failed 落账”的 preflight write contract;当前 live `mert` job 会把 `unreadable_audio_assets``model_runtime_unavailable` 同时写入 `feature_extraction_job.metadata_json`,不再只停留在纸面设计。
-`audio_embedding` 补上 `UNIQUE(feature_set_id, window_id) WHERE window_id IS NOT NULL``UNIQUE(feature_set_id, asset_id) WHERE window_id IS NULL AND asset_id IS NOT NULL` 两条幂等唯一键,为后续真实 `MERT / MuQ / ECAPA` upsert 落库固定主键策略。
......
......@@ -343,6 +343,17 @@ MERT 5s/2.5s job (`extraction_job_id=2`) 在 `acr_test` 上已经真实验证:
而不需要先查再写。
当前这两条唯一键里,asset-level 路径也已经有 live 证据:
- `scripts/validate_audio_embedding_asset_upsert_live.py`
- `audio_embedding_asset_upsert_live_report.json`
已验证:
- 重复 `INSERT` 会被 `uq_audio_embedding_feature_asset` 拒绝
- `ON CONFLICT ... DO UPDATE` 会复用同一个 `embedding_id`
- `audio_embedding` / `audio_embedding_vector_192` 行数都保持为 `1`
### 下一步替换点
当 runtime 与音频挂载到位后,只需要把 guarded failure path 替换成真实 inference:
......
......@@ -774,3 +774,40 @@ cd /workspace/acr-engine
- 当前真正阻塞 Phase-1 encoder-only 落地的是:
1. `/workspace/downloads` 音频挂载
2. 模型 runtime 依赖安装
## 新增:asset-level embedding upsert live 验证
为了把 `uq_audio_embedding_feature_asset` 从“DDL 声明”推进到“真实证据”,本轮新增:
- `acr-engine/scripts/validate_audio_embedding_asset_upsert_live.py`
- `acr-engine/data/pgvector_eval/music20/audio_embedding_asset_upsert_live_report.json`
### 验证动作
脚本会在隔离 schema `acr_asset_upsert_test` 中:
1. 落最小主数据图:`song -> work -> recording -> asset`
2. 插入第一条 `window_id IS NULL` 的 asset-level embedding
3. 再做一次普通重复 `INSERT`
4. 预期被 `uq_audio_embedding_feature_asset` 拒绝
5. 再做一次 `ON CONFLICT ... DO UPDATE`
6. 验证最终仍只有 `1``audio_embedding``1``audio_embedding_vector_192`
### 当前结果
| 项 | 结果 |
|---|---|
| 首次 `embedding_id` | `1` |
| 重复普通 `INSERT` | `UniqueViolation` |
| 唯一键名 | `uq_audio_embedding_feature_asset` |
| upsert 后 `embedding_id` | `1` |
| `same_embedding_id_reused` | `true` |
| `audio_embedding` 行数 | `1` |
| `audio_embedding_vector_192` 行数 | `1` |
| 最终 `checksum` | `checksum-v2` |
结论:
- asset-level 唯一键不是“纸面存在”,而是已经在 live PostgreSQL 上真实生效
- 后续如果补 asset-level semantic writer,可以直接沿用同一个 `ON CONFLICT (feature_set_id, asset_id) ...` 合同
......
......@@ -192,6 +192,7 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- semantic lane 也已完成 live failure contract:`run_embedding_job.py` 现在会同时暴露 `unreadable_audio_assets``model_runtime_unavailable`,而不是把失败伪装成 completed
- `audio_embedding` 已补上 window / asset 双路唯一键,后续真实 encoder 只需替换 inference adapter 即可复用同一 upsert 合同
- `scripts/run_phase1_embedding_preflight_matrix_live.py` 已跑通,4 条 semantic jobs(mert/muq/ecapa)在 `acr_test` 上都被稳定标记为 `preflight_failed`;当前共性 blocker 已收敛为 `/workspace/downloads` 缺失 + 语义模型 runtime 缺失
- `scripts/validate_audio_embedding_asset_upsert_live.py` 已在隔离 schema `acr_asset_upsert_test` 上验证 `uq_audio_embedding_feature_asset`:重复 insert 会被唯一键拒绝,upsert 会复用同一 `embedding_id`,说明 asset-level 幂等键也已有真实证据
- `phase1_hot_reference_v1``acr_test` 里已经真实补齐 `20` 个 reference members,因此 worker dry-run 当前看到的 scope 已是 `20 recordings / 20 assets / 20 windows`
- worker contract 现在已有基础前置状态保护;重复执行同一 chromaprint dry-run job 会被 `expected_status=pending` 明确拒绝,证据见 `phase1_worker_double_claim_guard_report.json`
- exact lane 的 `run_chromaprint_job.py` 已具备非 dry-run 写入路径;当前在 `acr_test` 的 live 结果是因为 `/workspace/downloads/...` 缺失而明确 `failed`,不是继续假装 `completed`
......