Commit 399db601 399db60123b2615233d22af3c10957165c60ba4b by cnb.bofCdSsphPA

Make semantic extraction failures auditable before model runtimes land

Constraint: Current container lacks /workspace/downloads and torch/torchaudio/transformers, so Phase-1 semantic work must prove honest failure semantics instead of pretending inference succeeded.
Rejected: Stub semantic embeddings | Would blur the contract between real model outputs and repo-local placeholders.
Confidence: high
Scope-risk: narrow
Directive: Keep the preflight blockers explicit until real MERT/MuQ/ECAPA adapters and asset-level embedding tests exist.
Tested: /usr/local/miniconda3/bin/python -m py_compile workers/run_embedding_job.py workers/run_chromaprint_job.py workers/_job_common.py scripts/bootstrap_phase1_extraction_jobs_live.py scripts/plan_phase1_extraction_jobs_live.py scripts/bootstrap_phase1_reference_members_live.py scripts/live_pgvector_music20_eval.py; git diff --check; /usr/local/miniconda3/bin/python scripts/bootstrap_phase1_extraction_jobs_live.py --dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' --schema acr_test; /usr/local/miniconda3/bin/python workers/run_embedding_job.py --dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' --schema acr_test --job-id 2 --model-name mert --model-version v1-95m --vector-table audio_embedding_vector_768 --output data/pgvector_eval/music20/phase1_worker_embedding_write_attempt.json
Not-tested: Real encoder inference and asset-level embedding upsert path remain unavailable in this container.
1 parent 94d75e92
{
"audio_embedding_count": 20,
"audio_embedding_vector_768_count": 0,
"job_2": [
2,
"failed",
20,
0,
{
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1",
"worker": "run_embedding_job",
"dry_run": false,
"artifact_dir": "data/pgvector_eval/music20/phase1_embeddings",
"vector_table": "audio_embedding_vector_768",
"output_target": "audio_embedding",
"execution_mode": "preflight_failure",
"failure_reason": "preflight_failed",
"runtime_report": {
"ready": false,
"model_name": "mert",
"availability": {
"numpy": true,
"torch": false,
"torchaudio": false,
"transformers": false
},
"requirements": [
"numpy",
"torch",
"torchaudio",
"transformers"
],
"missing_dependencies": [
"torch",
"torchaudio",
"transformers"
]
},
"preflight_blockers": [
"unreadable_audio_assets",
"model_runtime_unavailable"
],
"scope_window_count": 20,
"write_target_table": "audio_embedding",
"vector_table_report": {
"reason": null,
"resolved": true,
"expected_dim": 768,
"table_exists": true,
"allowed_vector_tables": [
"audio_embedding_vector_192",
"audio_embedding_vector_768"
],
"requested_vector_table": "audio_embedding_vector_768"
},
"missing_window_count": 20,
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 20
},
"missing_window_samples": [
{
"reason": "missing_audio",
"asset_id": 1,
"window_id": 1,
"storage_uri": "/workspace/downloads/100/type_11/93dfdeb0-7da5-42a8-9c71-cf12af57dd191650256918.wav"
},
{
"reason": "missing_audio",
"asset_id": 2,
"window_id": 2,
"storage_uri": "/workspace/downloads/101/type_11/83c0c07f-4f96-4ff4-998c-58db910f3cfa1650256915.wav"
},
{
"reason": "missing_audio",
"asset_id": 3,
"window_id": 3,
"storage_uri": "/workspace/downloads/102/type_11/43440ec5-70b4-4d50-8683-d3e41cad29411650256908.wav"
},
{
"reason": "missing_audio",
"asset_id": 4,
"window_id": 4,
"storage_uri": "/workspace/downloads/103/type_11/19876dbb-fffc-40f8-9530-9322c9ed77681650256912.wav"
},
{
"reason": "missing_audio",
"asset_id": 5,
"window_id": 5,
"storage_uri": "/workspace/downloads/104/type_11/4c1d3e22-045f-445b-ab87-ba1ae3ee09b31650256912.wav"
}
]
}
]
}
\ No newline at end of file
{
"worker": "run_embedding_job",
"schema": "acr_test",
"job": {
"extraction_job_id": 2,
"feature_set_id": 3,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"shard_key": "phase1/reference/mert/v1-95m/5s_2.5s",
"job_metadata": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1"
},
"feature_name": "semantic_embedding",
"feature_level": "window",
"extraction_granularity": "sliding_window",
"window_sec": 5.0,
"hop_sec": 2.5,
"embedding_dim": 768,
"distance_metric": "cosine",
"feature_config": {
"role": "primary_semantic_baseline"
},
"model_id": 3,
"model_name": "mert",
"model_version": "v1-95m",
"model_family": "music_ssl",
"input_sample_rate": 24000,
"output_embedding_dim": 768,
"model_metadata": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1"
}
},
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"reference_set_id": 2,
"reference_set_name": "phase1_hot_reference_v1",
"recording_count": 20,
"ready_asset_count": 20,
"active_window_count": 20
},
"scope_window_count": 20,
"status_after_start": {
"extraction_job_id": 2,
"job_status": "running",
"input_count": 20,
"output_count": null,
"started_at": "2026-06-04T13:44:05.982252+08:00",
"finished_at": null,
"log_uri": null,
"metadata_json": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1",
"worker": "run_embedding_job",
"dry_run": false,
"vector_table": "audio_embedding_vector_768",
"output_target": "audio_embedding",
"execution_mode": "preflight",
"runtime_report": {
"ready": false,
"model_name": "mert",
"availability": {
"numpy": true,
"torch": false,
"torchaudio": false,
"transformers": false
},
"requirements": [
"numpy",
"torch",
"torchaudio",
"transformers"
],
"missing_dependencies": [
"torch",
"torchaudio",
"transformers"
]
},
"scope_window_count": 20,
"vector_table_report": {
"reason": null,
"resolved": true,
"expected_dim": 768,
"table_exists": true,
"allowed_vector_tables": [
"audio_embedding_vector_192",
"audio_embedding_vector_768"
],
"requested_vector_table": "audio_embedding_vector_768"
},
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 20
}
}
},
"status_after_complete": null,
"status_after_failed": {
"extraction_job_id": 2,
"job_status": "failed",
"input_count": 20,
"output_count": 0,
"started_at": "2026-06-04T13:44:05.982252+08:00",
"finished_at": "2026-06-04T13:44:05.983441+08:00",
"log_uri": null,
"metadata_json": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1",
"worker": "run_embedding_job",
"dry_run": false,
"artifact_dir": "data/pgvector_eval/music20/phase1_embeddings",
"vector_table": "audio_embedding_vector_768",
"output_target": "audio_embedding",
"execution_mode": "preflight_failure",
"failure_reason": "preflight_failed",
"runtime_report": {
"ready": false,
"model_name": "mert",
"availability": {
"numpy": true,
"torch": false,
"torchaudio": false,
"transformers": false
},
"requirements": [
"numpy",
"torch",
"torchaudio",
"transformers"
],
"missing_dependencies": [
"torch",
"torchaudio",
"transformers"
]
},
"preflight_blockers": [
"unreadable_audio_assets",
"model_runtime_unavailable"
],
"scope_window_count": 20,
"write_target_table": "audio_embedding",
"vector_table_report": {
"reason": null,
"resolved": true,
"expected_dim": 768,
"table_exists": true,
"allowed_vector_tables": [
"audio_embedding_vector_192",
"audio_embedding_vector_768"
],
"requested_vector_table": "audio_embedding_vector_768"
},
"missing_window_count": 20,
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 20
},
"missing_window_samples": [
{
"reason": "missing_audio",
"asset_id": 1,
"window_id": 1,
"storage_uri": "/workspace/downloads/100/type_11/93dfdeb0-7da5-42a8-9c71-cf12af57dd191650256918.wav"
},
{
"reason": "missing_audio",
"asset_id": 2,
"window_id": 2,
"storage_uri": "/workspace/downloads/101/type_11/83c0c07f-4f96-4ff4-998c-58db910f3cfa1650256915.wav"
},
{
"reason": "missing_audio",
"asset_id": 3,
"window_id": 3,
"storage_uri": "/workspace/downloads/102/type_11/43440ec5-70b4-4d50-8683-d3e41cad29411650256908.wav"
},
{
"reason": "missing_audio",
"asset_id": 4,
"window_id": 4,
"storage_uri": "/workspace/downloads/103/type_11/19876dbb-fffc-40f8-9530-9322c9ed77681650256912.wav"
},
{
"reason": "missing_audio",
"asset_id": 5,
"window_id": 5,
"storage_uri": "/workspace/downloads/104/type_11/4c1d3e22-045f-445b-ab87-ba1ae3ee09b31650256912.wav"
}
]
}
},
"resolved_vector_table": "audio_embedding_vector_768",
"vector_table_report": {
"requested_vector_table": "audio_embedding_vector_768",
"expected_dim": 768,
"allowed_vector_tables": [
"audio_embedding_vector_192",
"audio_embedding_vector_768"
],
"resolved": true,
"table_exists": true,
"reason": null
},
"runtime_report": {
"model_name": "mert",
"requirements": [
"numpy",
"torch",
"torchaudio",
"transformers"
],
"availability": {
"numpy": true,
"torch": false,
"torchaudio": false,
"transformers": false
},
"missing_dependencies": [
"torch",
"torchaudio",
"transformers"
],
"ready": false
},
"processed_windows": [],
"notes": [
"this worker now validates planner -> job -> scope windows -> PostgreSQL failure semantics",
"real model inference should replace the guarded failure path without changing the job contract or idempotent upsert keys"
]
}
\ No newline at end of file
{
"command": "/usr/local/miniconda3/bin/python workers/run_embedding_job.py --dsn postgres://d2:d2pass@127.0.0.1:5432/d2 --schema acr_test --job-id 2 --model-name mert --model-version v1-95m --vector-table audio_embedding_vector_768",
"returncode": 1,
"stdout": "",
"stderr": "failed to update feature_extraction_job=2 with expected_status=pending\n",
"expected_guard": "failed to update feature_extraction_job=2 with expected_status=pending",
"passed": true
}
\ No newline at end of file
......@@ -423,6 +423,14 @@ ON audio_window(canonical_song_id);
CREATE INDEX IF NOT EXISTS idx_audio_window_active_for_index
ON audio_window(active_for_index);
CREATE UNIQUE INDEX IF NOT EXISTS uq_audio_embedding_feature_window
ON audio_embedding(feature_set_id, window_id)
WHERE window_id IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS uq_audio_embedding_feature_asset
ON audio_embedding(feature_set_id, asset_id)
WHERE window_id IS NULL AND asset_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_audio_embedding_feature_set_id
ON audio_embedding(feature_set_id);
......
......@@ -2,7 +2,13 @@
from __future__ import annotations
import argparse
import json
import math
import os
from pathlib import Path
from typing import Any
from psycopg import sql
from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status
......@@ -10,6 +16,229 @@ VECTOR_TABLE_BY_DIM = {
192: 'audio_embedding_vector_192',
768: 'audio_embedding_vector_768',
}
MODEL_RUNTIME_REQUIREMENTS = {
'mert': ('numpy', 'torch', 'torchaudio', 'transformers'),
'muq': ('numpy', 'torch', 'torchaudio', 'transformers'),
'ecapa': ('numpy', 'torch', 'torchaudio', 'speechbrain'),
}
ALLOWED_VECTOR_TABLES = set(VECTOR_TABLE_BY_DIM.values())
def fetch_scope_windows(conn, target_scope: str) -> list[dict[str, object]]:
if not target_scope.startswith('reference_set:'):
raise SystemExit(f'unsupported target_scope for embedding worker: {target_scope}')
set_name = target_scope.split(':', 1)[1]
rows = conn.execute(
"""
SELECT
aw.window_id,
aw.asset_id,
aw.window_index,
aw.start_sec,
aw.end_sec,
aw.duration_sec,
aw.recording_id,
aw.work_id,
aw.canonical_song_id,
ra.storage_uri,
ra.ingest_status,
aw.active_for_index
FROM reference_set_registry rs
JOIN reference_set_member rsm ON rsm.reference_set_id = rs.reference_set_id
JOIN audio_window aw ON aw.recording_id = rsm.recording_id
JOIN recording_asset ra ON ra.asset_id = aw.asset_id
WHERE rs.set_name = %s
AND aw.active_for_index = TRUE
AND ra.ingest_status = 'ready'
ORDER BY aw.window_id;
""",
(set_name,),
).fetchall()
return [
{
'window_id': int(row[0]),
'asset_id': int(row[1]),
'window_index': int(row[2]),
'start_sec': float(row[3]),
'end_sec': float(row[4]),
'duration_sec': float(row[5]),
'recording_id': int(row[6]),
'work_id': int(row[7]),
'canonical_song_id': int(row[8]),
'storage_uri': row[9],
'ingest_status': row[10],
'active_for_index': bool(row[11]),
}
for row in rows
]
def detect_runtime(model_name: str) -> dict[str, Any]:
checks: dict[str, Any] = {'model_name': model_name, 'requirements': list(MODEL_RUNTIME_REQUIREMENTS.get(model_name, ('numpy',)))}
availability: dict[str, bool] = {}
missing: list[str] = []
for package_name in checks['requirements']:
try:
__import__(package_name)
availability[package_name] = True
except Exception: # noqa: BLE001
availability[package_name] = False
missing.append(package_name)
checks['availability'] = availability
checks['missing_dependencies'] = missing
checks['ready'] = not missing
return checks
def validate_vector_table(conn, vector_table: str | None, expected_dim: int | None) -> dict[str, Any]:
payload = {
'requested_vector_table': vector_table,
'expected_dim': expected_dim,
'allowed_vector_tables': sorted(ALLOWED_VECTOR_TABLES),
'resolved': False,
'table_exists': False,
'reason': None,
}
if not vector_table:
payload['reason'] = 'missing_vector_table'
return payload
if vector_table not in ALLOWED_VECTOR_TABLES:
payload['reason'] = 'vector_table_not_allowlisted'
return payload
dim_from_table = 192 if vector_table.endswith('_192') else 768 if vector_table.endswith('_768') else None
if expected_dim is not None and dim_from_table is not None and dim_from_table != expected_dim:
payload['reason'] = 'vector_table_dim_mismatch'
return payload
row = conn.execute('SELECT to_regclass(%s);', (vector_table,)).fetchone()
payload['table_exists'] = bool(row and row[0])
if not payload['table_exists']:
payload['reason'] = 'vector_table_missing_in_schema'
return payload
payload['resolved'] = True
return payload
def build_artifact_path(artifact_dir: Path, *, extraction_job_id: int, window_id: int) -> Path:
artifact_dir.mkdir(parents=True, exist_ok=True)
return artifact_dir / f'job{extraction_job_id}_window{window_id}.json'
def vector_literal(values: list[float]) -> str:
return '[' + ','.join(f'{value:.10f}' for value in values) + ']'
def compute_vector_norm(values: list[float]) -> float:
return math.sqrt(sum(value * value for value in values))
def upsert_audio_embedding(
conn,
*,
feature_set_id: int,
extraction_job_id: int,
vector_table: str,
window: dict[str, object],
embedding_uri: str,
embedding: list[float],
checksum: str | None,
metadata_json: dict[str, object],
) -> tuple[int, str]:
row = conn.execute(
"""
INSERT INTO audio_embedding (
feature_set_id, extraction_job_id, asset_id, window_id, recording_id, work_id,
canonical_song_id, embedding_storage_mode, embedding_uri, vector_norm, checksum,
is_indexed, metadata_json
) VALUES (
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
TRUE, %s::jsonb
)
ON CONFLICT (feature_set_id, window_id) WHERE window_id IS NOT NULL
DO UPDATE SET
extraction_job_id = EXCLUDED.extraction_job_id,
asset_id = EXCLUDED.asset_id,
recording_id = EXCLUDED.recording_id,
work_id = EXCLUDED.work_id,
canonical_song_id = EXCLUDED.canonical_song_id,
embedding_storage_mode = EXCLUDED.embedding_storage_mode,
embedding_uri = EXCLUDED.embedding_uri,
vector_norm = EXCLUDED.vector_norm,
checksum = EXCLUDED.checksum,
is_indexed = EXCLUDED.is_indexed,
metadata_json = EXCLUDED.metadata_json
RETURNING embedding_id, xmax = 0 AS inserted;
""",
(
feature_set_id,
extraction_job_id,
window['asset_id'],
window['window_id'],
window['recording_id'],
window['work_id'],
window['canonical_song_id'],
'pgvector_inline',
embedding_uri,
compute_vector_norm(embedding),
checksum,
json.dumps(metadata_json, ensure_ascii=False),
),
).fetchone()
embedding_id = int(row[0])
inserted = bool(row[1])
conn.execute(
sql.SQL(
"""
INSERT INTO {vector_table} (embedding_id, embedding)
VALUES (%s, %s::vector)
ON CONFLICT (embedding_id)
DO UPDATE SET embedding = EXCLUDED.embedding;
"""
).format(vector_table=sql.Identifier(vector_table)),
(embedding_id, vector_literal(embedding)),
)
return embedding_id, 'inserted' if inserted else 'updated'
def fail_job(
conn,
*,
job,
blockers: list[str],
output_target: str,
resolved_vector_table: str | None,
artifact_dir: Path,
scope: dict[str, Any],
scope_windows: list[dict[str, object]],
missing_windows: list[dict[str, object]],
runtime_report: dict[str, Any],
vector_table_report: dict[str, Any],
) -> dict[str, Any]:
return update_job_status(
conn,
job.extraction_job_id,
status='failed',
expected_status='running',
output_count=0,
metadata_patch={
'worker': 'run_embedding_job',
'output_target': output_target,
'vector_table': resolved_vector_table,
'dry_run': False,
'write_target_table': output_target,
'artifact_dir': str(artifact_dir),
'execution_mode': 'preflight_failure',
'failure_reason': 'preflight_failed',
'preflight_blockers': blockers,
'scope_window_count': len(scope_windows),
'missing_window_count': len(missing_windows),
'missing_window_samples': missing_windows[:5],
'runtime_report': runtime_report,
'vector_table_report': vector_table_report,
'target_scope_summary': scope,
},
set_finished_at=True,
)
def main() -> None:
......@@ -22,6 +251,7 @@ def main() -> None:
ap.add_argument('--vector-table', default=os.environ.get('VECTOR_TABLE'))
ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_embedding'))
ap.add_argument('--complete-dry-run', action='store_true')
ap.add_argument('--artifact-dir', default=os.environ.get('ARTIFACT_DIR', 'data/pgvector_eval/music20/phase1_embeddings'))
ap.add_argument('--output')
args = ap.parse_args()
......@@ -30,6 +260,8 @@ def main() -> None:
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
artifact_dir = Path(args.artifact_dir)
with connect(args.dsn, args.schema) as conn:
job = fetch_job_context(conn, args.job_id)
if job.model_name == 'chromaprint':
......@@ -43,25 +275,37 @@ def main() -> None:
raise SystemExit(f'model mismatch: job={job.model_name} cli={args.model_name}')
if args.model_version and job.model_version != args.model_version:
raise SystemExit(f'model version mismatch: job={job.model_version} cli={args.model_version}')
resolved_vector_table = args.vector_table or VECTOR_TABLE_BY_DIM.get(job.embedding_dim or job.output_embedding_dim or -1)
expected_dim = job.embedding_dim or job.output_embedding_dim
resolved_vector_table = args.vector_table or VECTOR_TABLE_BY_DIM.get(expected_dim or -1)
scope = resolve_scope_summary(conn, job.target_scope)
scope_windows = fetch_scope_windows(conn, job.target_scope)
runtime_report = detect_runtime(job.model_name)
vector_table_report = validate_vector_table(conn, resolved_vector_table, expected_dim)
running = update_job_status(
conn,
job.extraction_job_id,
status='running',
expected_status='pending',
input_count=scope['active_window_count'] or scope['ready_asset_count'],
input_count=len(scope_windows),
metadata_patch={
'worker': 'run_embedding_job',
'output_target': args.output_target,
'vector_table': resolved_vector_table,
'dry_run': True,
'dry_run': bool(args.complete_dry_run),
'target_scope_summary': scope,
'execution_mode': 'dry_run',
'execution_mode': 'dry_run' if args.complete_dry_run else 'preflight',
'runtime_report': runtime_report,
'vector_table_report': vector_table_report,
'scope_window_count': len(scope_windows),
},
set_started_at=True,
)
completed = None
failed = None
processed_windows: list[dict[str, object]] = []
if args.complete_dry_run:
completed = update_job_status(
conn,
......@@ -76,9 +320,69 @@ def main() -> None:
'dry_run': True,
'dry_run_result': 'completed_without_feature_write',
'write_target_table': args.output_target,
'scope_window_count': len(scope_windows),
'runtime_report': runtime_report,
'vector_table_report': vector_table_report,
},
set_finished_at=True,
)
else:
missing_windows: list[dict[str, object]] = []
for window in scope_windows:
asset_path = Path(str(window['storage_uri']))
if not asset_path.exists():
missing_windows.append({
'window_id': window['window_id'],
'asset_id': window['asset_id'],
'storage_uri': str(asset_path),
'reason': 'missing_audio',
})
blockers: list[str] = []
if missing_windows:
blockers.append('unreadable_audio_assets')
if not vector_table_report['resolved']:
blockers.append(str(vector_table_report['reason']))
if not runtime_report['ready']:
blockers.append('model_runtime_unavailable')
if blockers:
failed = fail_job(
conn,
job=job,
blockers=blockers,
output_target=args.output_target,
resolved_vector_table=resolved_vector_table,
artifact_dir=artifact_dir,
scope=scope,
scope_windows=scope_windows,
missing_windows=missing_windows,
runtime_report=runtime_report,
vector_table_report=vector_table_report,
)
else:
failed = update_job_status(
conn,
job.extraction_job_id,
status='failed',
expected_status='running',
output_count=0,
metadata_patch={
'worker': 'run_embedding_job',
'output_target': args.output_target,
'vector_table': resolved_vector_table,
'dry_run': False,
'write_target_table': args.output_target,
'artifact_dir': str(artifact_dir),
'execution_mode': 'write_attempt',
'failure_reason': 'encoder_inference_not_implemented',
'scope_window_count': len(scope_windows),
'runtime_report': runtime_report,
'vector_table_report': vector_table_report,
'next_expected_step': 'replace the guarded failure path with real model inference while keeping the same upsert contract',
},
set_finished_at=True,
)
emit_payload(
{
......@@ -86,12 +390,17 @@ def main() -> None:
'schema': args.schema,
'job': job.__dict__,
'target_scope_summary': scope,
'scope_window_count': len(scope_windows),
'status_after_start': running,
'status_after_complete': completed,
'status_after_failed': failed,
'resolved_vector_table': resolved_vector_table,
'vector_table_report': vector_table_report,
'runtime_report': runtime_report,
'processed_windows': processed_windows,
'notes': [
'this worker currently validates planner -> job -> PostgreSQL state flow',
'real encoder inference can replace dry_run while preserving the same job contract',
'this worker now validates planner -> job -> scope windows -> PostgreSQL failure semantics',
'real model inference should replace the guarded failure path without changing the job contract or idempotent upsert keys',
],
},
args.output,
......
## 2026-06-04
- 更新 `run_embedding_job.py`,把 semantic lane 从“只有 dry-run”推进到“真实 scope 读取 + vector table 校验 + runtime 依赖校验 + 缺音频校验 + PostgreSQL failed 落账”的 preflight write contract;当前 live `mert` job 会把 `unreadable_audio_assets``model_runtime_unavailable` 同时写入 `feature_extraction_job.metadata_json`,不再只停留在纸面设计。
-`audio_embedding` 补上 `UNIQUE(feature_set_id, window_id) WHERE window_id IS NOT NULL``UNIQUE(feature_set_id, asset_id) WHERE window_id IS NULL AND asset_id IS NOT NULL` 两条幂等唯一键,为后续真实 `MERT / MuQ / ECAPA` upsert 落库固定主键策略。
- 新增 `phase1_worker_embedding_write_attempt.json``phase1_worker_embedding_write_guard_report.json``phase1_worker_embedding_post_state.json`,在 live PostgreSQL `acr_test` 上验证 semantic lane 的非 dry-run 行为:当前 `scope_window_count=20`,但因 `/workspace/downloads/...` 未挂载且 `torch/torchaudio/transformers` 缺失,job 被诚实标记为 `failed`,同时 `audio_embedding_vector_768_count` 仍保持 `0`
- 更新 `run_chromaprint_job.py``src/engines/chromaprint_matcher.py`,把 exact lane 从“只有 dry-run”推进到“具备真实 `audio_fingerprint` 写入路径”;同时增加无 `librosa` 环境下的 `wave + numpy` 回退实现,避免 worker 被运行时依赖直接卡死。
-`audio_fingerprint` 补上 `(feature_set_id, asset_id)` 唯一索引,并把 exact lane 写入改成 `INSERT ... ON CONFLICT DO UPDATE`;同时把失败语义收紧为“全量成功 / 否则失败”,避免部分不可读资产被误标成 completed。
- 新增 `phase1_worker_chromaprint_write_attempt.json``phase1_worker_chromaprint_write_guard_report.json`,在 live PostgreSQL `acr_test` 上验证 exact lane 的非 dry-run 行为:当前因 `/workspace/downloads/...` 缺失导致 `scope_asset_count=20``processed_assets=0`,job 被明确标记为 `failed``failure_reason=unreadable_audio_assets`,证明写入路径已接上但受环境挂载阻塞。
......
......@@ -286,15 +286,75 @@ flowchart TD
### 7.2 Embedding worker
后续把下面逻辑塞进 `run_embedding_job.py`
`run_embedding_job.py` 现在已经不再只是简单 dry-run。当前它已经具备
1. 读取 `audio_window`
2. 加载 `MERT` / `MuQ` / `ECAPA`
3. 提取向量
4.`audio_embedding`
5.`audio_embedding_vector_<dim>`
6. 更新 `output_count`
7. 标记 `completed`
1. 真实读取 `reference_set -> audio_window -> recording_asset` scope
2. 真实检查目标向量表是否存在且与维度匹配
3. 真实检查模型 runtime 依赖是否齐全
4. 真实检查 source audio 是否存在
5. 把 blocker 明确写回 `feature_extraction_job.metadata_json`
6. 在 blocker 存在时把 job 诚实标记为 `failed`
### 当前失败语义
semantic lane 当前采用的是 **preflight all-or-nothing**
- 只要 scope 内音频路径不可达 / 文件不存在,记为:
- `unreadable_audio_assets`
- 只要模型 runtime 依赖导入不满足,记为:
- `model_runtime_unavailable`
- 只要目标向量表非法 / 缺失 / 维度不匹配,记为对应 blocker
worker 会把这些 blocker 聚合到:
- `failure_reason = preflight_failed`
- `preflight_blockers = [...]`
这样不会把“模型没法跑”误写成 completed,也不会只暴露第一个错误。
### 当前 live 证据
MERT 5s/2.5s job (`extraction_job_id=2`) 在 `acr_test` 上已经真实验证:
- `scope_window_count = 20`
- `job_status = failed`
- `output_count = 0`
- `preflight_blockers = ['unreadable_audio_assets', 'model_runtime_unavailable']`
- `runtime_report.missing_dependencies = ['torch', 'torchaudio', 'transformers']`
- `audio_embedding_vector_768` 已通过存在性与维度校验
对应产物:
- `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_write_attempt.json`
- `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_write_guard_report.json`
- `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_post_state.json`
### 当前幂等保护
为了服务后续真正的 window embedding upsert,`audio_embedding` 现在补了两条唯一键:
- `UNIQUE(feature_set_id, window_id) WHERE window_id IS NOT NULL`
- `UNIQUE(feature_set_id, asset_id) WHERE window_id IS NULL AND asset_id IS NOT NULL`
这让后续真实 encoder 接入后可以直接做:
- window 级 embedding upsert
- asset 级 embedding upsert
而不需要先查再写。
### 下一步替换点
当 runtime 与音频挂载到位后,只需要把 guarded failure path 替换成真实 inference:
1. 加载 `MERT` / `MuQ` / `ECAPA`
2. 提取向量
3.`audio_embedding`
4.`audio_embedding_vector_<dim>`
5. 更新 `output_count`
6. 标记 `completed`
也就是说,**PostgreSQL worker contract 已经固定,下一步换的是 encoder adapter,不是 orchestration 结构。**
---
......@@ -313,11 +373,11 @@ flowchart TD
当前还没有完成的部分:
- 真实 chromaprint 特征写入
- 真实 MERT / MuQ / ECAPA embedding 写入
- exact lane 虽已有真实写入路径,但当前 live 环境仍被 `/workspace/downloads` 缺失阻塞
- semantic lane 已有真实 preflight failure contract,但还没有接上真正的 `MERT / MuQ / ECAPA` inference adapter
- `failed` 重试策略
- job 分片执行器
- 幂等去重写入策略
- 更完整的 embedding artifact / checksum 治理策略
但现在已经足够支撑下一阶段:
......
......@@ -552,6 +552,8 @@ flowchart TD
- 样例数据链可以按 `song -> work -> recording -> asset -> window -> embedding` 落盘
- live pgvector 检索和现有 stand-in 逻辑一致
- `retrieval_candidate` / `match_decision` 可以真实承载在线结果
- semantic worker 已真实验证 preflight failure 语义:既能识别 `/workspace/downloads` 缺失,也能识别 `torch/torchaudio/transformers` 缺失
- `audio_embedding` 已补上 window / asset 双路幂等唯一键,为后续 encoder 真实 upsert 预留稳定主键
### 未验证
......@@ -690,3 +692,50 @@ cd /workspace/acr-engine
> PostgreSQL 这条路已经可以真实落 schema、落样例、落 candidate、落 decision,也能真实跑 pgvector 检索。
> 当前最大的短板不再是“怎么存”,而是 **当前 baseline embedding 对混淆 query 的召回仍然明显不够**。
## 新增:Phase-1 semantic worker live 证据
本轮继续对 `run_embedding_job.py` 做 live PostgreSQL 验证,目标不是伪造 embedding,而是把 **失败语义先固定住**
### 结果摘要
`extraction_job_id=2``mert v1-95m`, `5s/2.5s`)执行非 dry-run worker 后:
| 项 | 结果 |
|---|---|
| `scope_window_count` | `20` |
| `job_status` | `failed` |
| `output_count` | `0` |
| `failure_reason` | `preflight_failed` |
| `preflight_blockers` | `['unreadable_audio_assets', 'model_runtime_unavailable']` |
| `vector_table_report.resolved` | `true` |
| `audio_embedding_vector_768_count` | `0` |
说明:
- 当前语义 lane 不是“没做事”,而是已经真实走到了 PostgreSQL job scope / runtime / vector table / asset 路径检查
- 只是当前容器同时被两个外部条件挡住:
1. `/workspace/downloads/...` 未挂载
2. `torch / torchaudio / transformers` 未安装
### 证据文件
- `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_write_attempt.json`
- `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_write_guard_report.json`
- `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_post_state.json`
### 为什么要先补唯一键
当前 `audio_embedding` 已新增:
- `uq_audio_embedding_feature_window`
- `uq_audio_embedding_feature_asset`
设计意图是:
1. 同一 `feature_set_id + window_id` 的 embedding 重跑时可以稳定 upsert
2. 将来如果有 asset-level embedding,也能独立幂等
3. 不把幂等职责留给应用层“先查再写”
这一步对后续的 `MERT / MuQ / ECAPA` 都通用。
......
......@@ -189,6 +189,8 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- extraction plan 报告里已包含 `command_suggestions / primary_command`,下次可直接从 plan 抄 worker 命令模板
- Phase-1 worker 入口已真实落地:`run_chromaprint_job.py / run_embedding_job.py / mark_job_status.py`
- 下一阶段已经不是“补 planner”,而是把 dry-run worker 替换为真实 extractor,并把 `audio_fingerprint / audio_embedding` 写入做成幂等执行
- semantic lane 也已完成 live failure contract:`run_embedding_job.py` 现在会同时暴露 `unreadable_audio_assets``model_runtime_unavailable`,而不是把失败伪装成 completed
- `audio_embedding` 已补上 window / asset 双路唯一键,后续真实 encoder 只需替换 inference adapter 即可复用同一 upsert 合同
- `phase1_hot_reference_v1``acr_test` 里已经真实补齐 `20` 个 reference members,因此 worker dry-run 当前看到的 scope 已是 `20 recordings / 20 assets / 20 windows`
- worker contract 现在已有基础前置状态保护;重复执行同一 chromaprint dry-run job 会被 `expected_status=pending` 明确拒绝,证据见 `phase1_worker_double_claim_guard_report.json`
- exact lane 的 `run_chromaprint_job.py` 已具备非 dry-run 写入路径;当前在 `acr_test` 的 live 结果是因为 `/workspace/downloads/...` 缺失而明确 `failed`,不是继续假装 `completed`
......