Commit 6a97ca13 6a97ca13f76962ceaccff8137f9496bd0a9821a3 by cnb.bofCdSsphPA

Make the exact lane fail honestly before real audio is mounted

Constraint: the Phase-1 exact lane must not pretend success when reference audio is unreadable, and repeated writes must be idempotent at the database boundary.
Rejected: keep partial-success writes in completed state | rejected because it would blur asset-readability failures and weaken auditability.
Confidence: high
Scope-risk: moderate
Directive: preserve the repo-local chromaprint-style wording and the all-or-nothing failure semantics until production audio mounts and real extractor validation are in place.
Tested: py_compile for chromaprint matcher and chromaprint worker; live PostgreSQL unique index creation on acr_test; non-dry-run chromaprint worker attempt with job_status=failed and failure_reason=unreadable_audio_assets; bootstrap reset back to pending; architect review APPROVED.
Not-tested: successful audio_fingerprint writes against mounted production audio, semantic worker real writes, large-scale concurrent exact-lane execution.
1 parent b4f304c1
1 {
2 "worker": "run_chromaprint_job",
3 "schema": "acr_test",
4 "job": {
5 "extraction_job_id": 1,
6 "feature_set_id": 2,
7 "target_scope": "reference_set:phase1_hot_reference_v1",
8 "job_status": "pending",
9 "shard_key": "phase1/reference/chromaprint/v1",
10 "job_metadata": {
11 "lane": "exact",
12 "phase": "phase1",
13 "priority": "p0"
14 },
15 "feature_name": "fingerprint_asset",
16 "feature_level": "asset",
17 "extraction_granularity": "full_asset",
18 "window_sec": 5.0,
19 "hop_sec": 2.5,
20 "embedding_dim": null,
21 "distance_metric": "hamming",
22 "feature_config": {
23 "lane": "exact",
24 "index_target": "audio_fingerprint"
25 },
26 "model_id": 2,
27 "model_name": "chromaprint",
28 "model_version": "v1",
29 "model_family": "fingerprint",
30 "input_sample_rate": 16000,
31 "output_embedding_dim": null,
32 "model_metadata": {
33 "lane": "exact",
34 "note": "exact fingerprint lane baseline",
35 "phase": "phase1"
36 }
37 },
38 "target_scope_summary": {
39 "scope_type": "reference_set",
40 "scope_value": "phase1_hot_reference_v1",
41 "reference_set_id": 2,
42 "reference_set_name": "phase1_hot_reference_v1",
43 "recording_count": 20,
44 "ready_asset_count": 20,
45 "active_window_count": 20
46 },
47 "scope_asset_count": 20,
48 "processed_assets": [],
49 "missing_assets": [
50 {
51 "asset_id": 1,
52 "storage_uri": "/workspace/downloads/100/type_11/93dfdeb0-7da5-42a8-9c71-cf12af57dd191650256918.wav",
53 "reason": "missing_audio"
54 },
55 {
56 "asset_id": 2,
57 "storage_uri": "/workspace/downloads/101/type_11/83c0c07f-4f96-4ff4-998c-58db910f3cfa1650256915.wav",
58 "reason": "missing_audio"
59 },
60 {
61 "asset_id": 3,
62 "storage_uri": "/workspace/downloads/102/type_11/43440ec5-70b4-4d50-8683-d3e41cad29411650256908.wav",
63 "reason": "missing_audio"
64 },
65 {
66 "asset_id": 4,
67 "storage_uri": "/workspace/downloads/103/type_11/19876dbb-fffc-40f8-9530-9322c9ed77681650256912.wav",
68 "reason": "missing_audio"
69 },
70 {
71 "asset_id": 5,
72 "storage_uri": "/workspace/downloads/104/type_11/4c1d3e22-045f-445b-ab87-ba1ae3ee09b31650256912.wav",
73 "reason": "missing_audio"
74 },
75 {
76 "asset_id": 6,
77 "storage_uri": "/workspace/downloads/105/type_11/57e61cde-4410-4751-93e9-d7a4ecece5791650256910.wav",
78 "reason": "missing_audio"
79 },
80 {
81 "asset_id": 7,
82 "storage_uri": "/workspace/downloads/106/type_11/bf61426c-67b7-4cf1-a9e7-f78cf519a0021650256910.wav",
83 "reason": "missing_audio"
84 },
85 {
86 "asset_id": 8,
87 "storage_uri": "/workspace/downloads/107/type_11/296bbc25-617c-4368-9a69-357aeec394381650256910.wav",
88 "reason": "missing_audio"
89 },
90 {
91 "asset_id": 9,
92 "storage_uri": "/workspace/downloads/108/type_11/d7e28fe6-4ad6-4243-b66b-d90ff5ca1e491650256909.wav",
93 "reason": "missing_audio"
94 },
95 {
96 "asset_id": 10,
97 "storage_uri": "/workspace/downloads/109/type_11/84acef9b-2a74-44bc-9eff-5ca7969ac9b61650256909.wav",
98 "reason": "missing_audio"
99 },
100 {
101 "asset_id": 11,
102 "storage_uri": "/workspace/downloads/110/type_11/2197b39e-23e2-4a66-b07e-dd672eab214a1650256908.wav",
103 "reason": "missing_audio"
104 },
105 {
106 "asset_id": 12,
107 "storage_uri": "/workspace/downloads/111/type_11/7f5256e8-de5f-41c5-bf76-419e05df72d81650256908.wav",
108 "reason": "missing_audio"
109 },
110 {
111 "asset_id": 13,
112 "storage_uri": "/workspace/downloads/112/type_11/34acd523-3c01-443d-ac3d-4ad7b9e2246f1650256907.wav",
113 "reason": "missing_audio"
114 },
115 {
116 "asset_id": 14,
117 "storage_uri": "/workspace/downloads/113/type_11/6d9438af-5d83-434b-bb20-76e28d0bbc4e1650256907.wav",
118 "reason": "missing_audio"
119 },
120 {
121 "asset_id": 15,
122 "storage_uri": "/workspace/downloads/114/type_11/0238ecbf-b234-470e-82e4-f3b80a267d771650256906.wav",
123 "reason": "missing_audio"
124 },
125 {
126 "asset_id": 16,
127 "storage_uri": "/workspace/downloads/115/type_11/aabad0ff-13de-4786-aa9c-40e1f957ed9f1650256906.wav",
128 "reason": "missing_audio"
129 },
130 {
131 "asset_id": 17,
132 "storage_uri": "/workspace/downloads/116/type_11/da34f6ff-39e7-4dde-8265-e1bb01b6263e1650256901.wav",
133 "reason": "missing_audio"
134 },
135 {
136 "asset_id": 18,
137 "storage_uri": "/workspace/downloads/117/type_11/1e1599e6-ebbd-4ceb-a81d-a320331ef6e31650256901.wav",
138 "reason": "missing_audio"
139 },
140 {
141 "asset_id": 19,
142 "storage_uri": "/workspace/downloads/118/type_11/db64461e-d752-4cf3-ab1d-56ff9232823d1650256901.wav",
143 "reason": "missing_audio"
144 },
145 {
146 "asset_id": 20,
147 "storage_uri": "/workspace/downloads/119/type_11/180dfa7d-836a-449c-990f-a3bf39c11da11650256898.wav",
148 "reason": "missing_audio"
149 }
150 ],
151 "status_after_start": {
152 "extraction_job_id": 1,
153 "job_status": "running",
154 "input_count": 20,
155 "output_count": null,
156 "started_at": "2026-06-04T13:35:22.194865+08:00",
157 "finished_at": null,
158 "log_uri": null,
159 "metadata_json": {
160 "lane": "exact",
161 "phase": "phase1",
162 "worker": "run_chromaprint_job",
163 "dry_run": false,
164 "priority": "p0",
165 "output_target": "audio_fingerprint",
166 "execution_mode": "write_attempt",
167 "target_scope_summary": {
168 "scope_type": "reference_set",
169 "scope_value": "phase1_hot_reference_v1",
170 "recording_count": 20,
171 "reference_set_id": 2,
172 "ready_asset_count": 20,
173 "reference_set_name": "phase1_hot_reference_v1",
174 "active_window_count": 20
175 }
176 }
177 },
178 "status_after_complete": null,
179 "status_after_failed": {
180 "extraction_job_id": 1,
181 "job_status": "failed",
182 "input_count": 20,
183 "output_count": 0,
184 "started_at": "2026-06-04T13:35:22.194865+08:00",
185 "finished_at": "2026-06-04T13:35:22.195659+08:00",
186 "log_uri": null,
187 "metadata_json": {
188 "lane": "exact",
189 "phase": "phase1",
190 "worker": "run_chromaprint_job",
191 "dry_run": false,
192 "priority": "p0",
193 "artifact_dir": "/workspace/acr-engine/data/pgvector_eval/music20/phase1_fingerprints",
194 "output_target": "audio_fingerprint",
195 "execution_mode": "write_attempt",
196 "failure_reason": "unreadable_audio_assets",
197 "write_target_table": "audio_fingerprint",
198 "missing_asset_count": 20,
199 "target_scope_summary": {
200 "scope_type": "reference_set",
201 "scope_value": "phase1_hot_reference_v1",
202 "recording_count": 20,
203 "reference_set_id": 2,
204 "ready_asset_count": 20,
205 "reference_set_name": "phase1_hot_reference_v1",
206 "active_window_count": 20
207 },
208 "missing_asset_samples": [
209 {
210 "reason": "missing_audio",
211 "asset_id": 1,
212 "storage_uri": "/workspace/downloads/100/type_11/93dfdeb0-7da5-42a8-9c71-cf12af57dd191650256918.wav"
213 },
214 {
215 "reason": "missing_audio",
216 "asset_id": 2,
217 "storage_uri": "/workspace/downloads/101/type_11/83c0c07f-4f96-4ff4-998c-58db910f3cfa1650256915.wav"
218 },
219 {
220 "reason": "missing_audio",
221 "asset_id": 3,
222 "storage_uri": "/workspace/downloads/102/type_11/43440ec5-70b4-4d50-8683-d3e41cad29411650256908.wav"
223 },
224 {
225 "reason": "missing_audio",
226 "asset_id": 4,
227 "storage_uri": "/workspace/downloads/103/type_11/19876dbb-fffc-40f8-9530-9322c9ed77681650256912.wav"
228 },
229 {
230 "reason": "missing_audio",
231 "asset_id": 5,
232 "storage_uri": "/workspace/downloads/104/type_11/4c1d3e22-045f-445b-ab87-ba1ae3ee09b31650256912.wav"
233 }
234 ]
235 }
236 },
237 "next_write_target": "audio_fingerprint",
238 "notes": [
239 "dry-run preserves the verified planner -> job -> PostgreSQL state flow",
240 "non-dry-run now writes repo-local chromaprint-style hash artifacts plus audio_fingerprint rows when source audio is readable"
241 ]
242 }
...\ No newline at end of file ...\ No newline at end of file
1 {
2 "job_row": {
3 "extraction_job_id": 1,
4 "job_status": "failed",
5 "input_count": 20,
6 "output_count": 0,
7 "failure_reason": "unreadable_audio_assets"
8 },
9 "audio_fingerprint_count": 0
10 }
...\ No newline at end of file ...\ No newline at end of file
...@@ -222,6 +222,9 @@ CREATE TABLE IF NOT EXISTS audio_fingerprint ( ...@@ -222,6 +222,9 @@ CREATE TABLE IF NOT EXISTS audio_fingerprint (
222 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 222 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
223 ); 223 );
224 224
225 CREATE UNIQUE INDEX IF NOT EXISTS uq_audio_fingerprint_feature_asset
226 ON audio_fingerprint(feature_set_id, asset_id);
227
225 CREATE TABLE IF NOT EXISTS reference_set_registry ( 228 CREATE TABLE IF NOT EXISTS reference_set_registry (
226 reference_set_id BIGSERIAL PRIMARY KEY, 229 reference_set_id BIGSERIAL PRIMARY KEY,
227 set_name TEXT NOT NULL UNIQUE, 230 set_name TEXT NOT NULL UNIQUE,
......
...@@ -8,7 +8,6 @@ Implements landmark-based audio fingerprinting: ...@@ -8,7 +8,6 @@ Implements landmark-based audio fingerprinting:
8 """ 8 """
9 9
10 import numpy as np 10 import numpy as np
11 import librosa
12 from numpy.lib.stride_tricks import sliding_window_view 11 from numpy.lib.stride_tricks import sliding_window_view
13 from collections import defaultdict 12 from collections import defaultdict
14 from typing import Dict, List, Tuple, Optional 13 from typing import Dict, List, Tuple, Optional
...@@ -16,6 +15,50 @@ import pickle ...@@ -16,6 +15,50 @@ import pickle
16 import json 15 import json
17 from pathlib import Path 16 from pathlib import Path
18 import time 17 import time
18 import wave
19
20 try:
21 import librosa # type: ignore
22 except ImportError: # pragma: no cover - optional dependency
23 librosa = None
24
25
26 def _resample_linear(y: np.ndarray, src_sr: int, target_sr: int) -> np.ndarray:
27 if src_sr == target_sr or y.size == 0:
28 return y.astype(np.float32, copy=False)
29 duration = y.shape[0] / float(src_sr)
30 target_len = max(int(round(duration * target_sr)), 1)
31 src_x = np.linspace(0.0, duration, num=y.shape[0], endpoint=False)
32 dst_x = np.linspace(0.0, duration, num=target_len, endpoint=False)
33 return np.interp(dst_x, src_x, y).astype(np.float32, copy=False)
34
35
36 def load_audio_mono(path: str, sr: int) -> tuple[np.ndarray, int]:
37 if librosa is not None:
38 y, _ = librosa.load(path, sr=sr, mono=True)
39 return y.astype(np.float32, copy=False), sr
40
41 with wave.open(path, 'rb') as wav_file:
42 src_sr = wav_file.getframerate()
43 channels = wav_file.getnchannels()
44 sample_width = wav_file.getsampwidth()
45 frame_count = wav_file.getnframes()
46 raw = wav_file.readframes(frame_count)
47
48 if sample_width == 1:
49 y = np.frombuffer(raw, dtype=np.uint8).astype(np.float32)
50 y = (y - 128.0) / 128.0
51 elif sample_width == 2:
52 y = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
53 elif sample_width == 4:
54 y = np.frombuffer(raw, dtype=np.int32).astype(np.float32) / 2147483648.0
55 else:
56 raise ValueError(f'unsupported wav sample width: {sample_width}')
57
58 if channels > 1:
59 y = y.reshape(-1, channels).mean(axis=1)
60 y = _resample_linear(y, src_sr, sr)
61 return y, sr
19 62
20 63
21 class Fingerprint: 64 class Fingerprint:
...@@ -51,8 +94,19 @@ class ChromaprintMatcher: ...@@ -51,8 +94,19 @@ class ChromaprintMatcher:
51 return candidate 94 return candidate
52 95
53 def _spectrogram(self, y: np.ndarray) -> np.ndarray: 96 def _spectrogram(self, y: np.ndarray) -> np.ndarray:
54 S = np.abs(librosa.stft(y, n_fft=self.n_fft, hop_length=self.hop_length)) 97 if librosa is not None:
55 return S 98 return np.abs(librosa.stft(y, n_fft=self.n_fft, hop_length=self.hop_length))
99
100 if y.shape[0] < self.n_fft:
101 y = np.pad(y, (0, self.n_fft - y.shape[0]))
102 frame_count = 1 + max((y.shape[0] - self.n_fft) // self.hop_length, 0)
103 frames = np.stack(
104 [y[i * self.hop_length:i * self.hop_length + self.n_fft] for i in range(frame_count)],
105 axis=1,
106 )
107 window = np.hanning(self.n_fft).astype(np.float32)
108 frames = frames * window[:, None]
109 return np.abs(np.fft.rfft(frames, axis=0))
56 110
57 def _find_peaks(self, S: np.ndarray) -> List[Tuple[int, int, float]]: 111 def _find_peaks(self, S: np.ndarray) -> List[Tuple[int, int, float]]:
58 if S.shape[0] <= self.peak_neighborhood or S.shape[1] <= self.peak_neighborhood: 112 if S.shape[0] <= self.peak_neighborhood or S.shape[1] <= self.peak_neighborhood:
...@@ -82,12 +136,15 @@ class ChromaprintMatcher: ...@@ -82,12 +136,15 @@ class ChromaprintMatcher:
82 return hashes 136 return hashes
83 137
84 def index_song(self, song_id: str, y: np.ndarray): 138 def index_song(self, song_id: str, y: np.ndarray):
85 S = self._spectrogram(y) 139 hashes = self.extract_hashes(y)
86 peaks = self._find_peaks(S)
87 hashes = self._hash_peaks(peaks)
88 for h, offset in hashes: 140 for h, offset in hashes:
89 self.hash_db[h].append(Fingerprint(song_id, offset, h)) 141 self.hash_db[h].append(Fingerprint(song_id, offset, h))
90 142
143 def extract_hashes(self, y: np.ndarray) -> List[Tuple[int, int]]:
144 S = self._spectrogram(y)
145 peaks = self._find_peaks(S)
146 return self._hash_peaks(peaks)
147
91 def index_songs_from_dir( 148 def index_songs_from_dir(
92 self, 149 self,
93 songs_dir: str, 150 songs_dir: str,
...@@ -137,7 +194,7 @@ class ChromaprintMatcher: ...@@ -137,7 +194,7 @@ class ChromaprintMatcher:
137 continue 194 continue
138 song_id = item["song_id"] 195 song_id = item["song_id"]
139 try: 196 try:
140 y, _ = librosa.load(str(audio_path), sr=self.sr, mono=True) 197 y, _ = load_audio_mono(str(audio_path), sr=self.sr)
141 except Exception as exc: 198 except Exception as exc:
142 skipped_refs += 1 199 skipped_refs += 1
143 print( 200 print(
......
...@@ -2,11 +2,96 @@ ...@@ -2,11 +2,96 @@
2 from __future__ import annotations 2 from __future__ import annotations
3 3
4 import argparse 4 import argparse
5 import json
5 import os 6 import os
7 import sys
8 from pathlib import Path
9
10 ROOT = Path(__file__).resolve().parents[1]
11 if str(ROOT) not in sys.path:
12 sys.path.insert(0, str(ROOT))
13
14 from src.engines.chromaprint_matcher import ChromaprintMatcher, load_audio_mono
6 15
7 from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status 16 from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status
8 17
9 18
19 def fetch_scope_assets(conn, target_scope: str) -> list[dict[str, object]]:
20 if not target_scope.startswith('reference_set:'):
21 raise SystemExit(f'unsupported target_scope for chromaprint worker: {target_scope}')
22 set_name = target_scope.split(':', 1)[1]
23 rows = conn.execute(
24 """
25 SELECT
26 ra.asset_id,
27 ra.storage_uri,
28 ra.ingest_status,
29 r.recording_id,
30 r.work_id,
31 r.canonical_song_id
32 FROM reference_set_registry rs
33 JOIN reference_set_member rsm ON rsm.reference_set_id = rs.reference_set_id
34 JOIN recording_asset ra ON ra.recording_id = rsm.recording_id
35 JOIN recording r ON r.recording_id = ra.recording_id
36 WHERE rs.set_name = %s
37 AND ra.ingest_status = 'ready'
38 ORDER BY ra.asset_id;
39 """,
40 (set_name,),
41 ).fetchall()
42 return [
43 {
44 'asset_id': int(row[0]),
45 'storage_uri': row[1],
46 'ingest_status': row[2],
47 'recording_id': int(row[3]),
48 'work_id': int(row[4]),
49 'canonical_song_id': int(row[5]),
50 }
51 for row in rows
52 ]
53
54
55 def upsert_audio_fingerprint(
56 conn,
57 *,
58 feature_set_id: int,
59 asset: dict[str, object],
60 fingerprint_uri: str,
61 hash_count: int,
62 metadata_json: dict[str, object],
63 ) -> tuple[int, str]:
64 row = conn.execute(
65 """
66 INSERT INTO audio_fingerprint (
67 feature_set_id, asset_id, window_id, recording_id, work_id, canonical_song_id,
68 fingerprint_uri, hash_count, is_indexed, metadata_json
69 ) VALUES (
70 %s, %s, NULL, %s, %s, %s,
71 %s, %s, TRUE, %s::jsonb
72 )
73 ON CONFLICT (feature_set_id, asset_id)
74 DO UPDATE SET
75 fingerprint_uri = EXCLUDED.fingerprint_uri,
76 hash_count = EXCLUDED.hash_count,
77 is_indexed = EXCLUDED.is_indexed,
78 metadata_json = EXCLUDED.metadata_json
79 RETURNING fingerprint_id;
80 """,
81 (
82 feature_set_id,
83 asset['asset_id'],
84 asset['recording_id'],
85 asset['work_id'],
86 asset['canonical_song_id'],
87 fingerprint_uri,
88 hash_count,
89 json.dumps(metadata_json, ensure_ascii=False),
90 ),
91 ).fetchone()
92 return int(row[0]), 'upserted'
93
94
10 def main() -> None: 95 def main() -> None:
11 ap = argparse.ArgumentParser() 96 ap = argparse.ArgumentParser()
12 ap.add_argument('--dsn', default=os.environ.get('PG_DSN')) 97 ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
...@@ -14,6 +99,7 @@ def main() -> None: ...@@ -14,6 +99,7 @@ def main() -> None:
14 ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0'))) 99 ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0')))
15 ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_fingerprint')) 100 ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_fingerprint'))
16 ap.add_argument('--complete-dry-run', action='store_true') 101 ap.add_argument('--complete-dry-run', action='store_true')
102 ap.add_argument('--artifact-dir', default=str(ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_fingerprints'))
17 ap.add_argument('--output') 103 ap.add_argument('--output')
18 args = ap.parse_args() 104 args = ap.parse_args()
19 105
...@@ -27,6 +113,7 @@ def main() -> None: ...@@ -27,6 +113,7 @@ def main() -> None:
27 if job.model_name != 'chromaprint': 113 if job.model_name != 'chromaprint':
28 raise SystemExit(f'feature_extraction_job={args.job_id} is not a chromaprint job') 114 raise SystemExit(f'feature_extraction_job={args.job_id} is not a chromaprint job')
29 scope = resolve_scope_summary(conn, job.target_scope) 115 scope = resolve_scope_summary(conn, job.target_scope)
116 scope_assets = fetch_scope_assets(conn, job.target_scope)
30 running = update_job_status( 117 running = update_job_status(
31 conn, 118 conn,
32 job.extraction_job_id, 119 job.extraction_job_id,
...@@ -36,13 +123,19 @@ def main() -> None: ...@@ -36,13 +123,19 @@ def main() -> None:
36 metadata_patch={ 123 metadata_patch={
37 'worker': 'run_chromaprint_job', 124 'worker': 'run_chromaprint_job',
38 'output_target': args.output_target, 125 'output_target': args.output_target,
39 'dry_run': True, 126 'dry_run': bool(args.complete_dry_run),
40 'target_scope_summary': scope, 127 'target_scope_summary': scope,
41 'execution_mode': 'dry_run', 128 'execution_mode': 'dry_run' if args.complete_dry_run else 'write_attempt',
42 }, 129 },
43 set_started_at=True, 130 set_started_at=True,
44 ) 131 )
45 completed = None 132 completed = None
133 failed = None
134 processed_assets: list[dict[str, object]] = []
135 missing_assets: list[dict[str, object]] = []
136 artifact_dir = Path(args.artifact_dir)
137 artifact_dir.mkdir(parents=True, exist_ok=True)
138
46 if args.complete_dry_run: 139 if args.complete_dry_run:
47 completed = update_job_status( 140 completed = update_job_status(
48 conn, 141 conn,
...@@ -59,6 +152,106 @@ def main() -> None: ...@@ -59,6 +152,106 @@ def main() -> None:
59 }, 152 },
60 set_finished_at=True, 153 set_finished_at=True,
61 ) 154 )
155 else:
156 matcher = ChromaprintMatcher(sr=job.input_sample_rate or 16000)
157 extracted_assets: list[dict[str, object]] = []
158 for asset in scope_assets:
159 asset_path = Path(str(asset['storage_uri']))
160 if not asset_path.exists():
161 missing_assets.append({
162 'asset_id': asset['asset_id'],
163 'storage_uri': str(asset_path),
164 'reason': 'missing_audio',
165 })
166 continue
167 try:
168 y, _ = load_audio_mono(str(asset_path), sr=matcher.sr)
169 hashes = matcher.extract_hashes(y)
170 extracted_assets.append({
171 'asset': asset,
172 'hashes': hashes,
173 })
174 except Exception as exc: # noqa: BLE001
175 missing_assets.append({
176 'asset_id': asset['asset_id'],
177 'storage_uri': str(asset_path),
178 'reason': 'decode_or_extract_failure',
179 'error': str(exc),
180 })
181
182 if missing_assets:
183 failed = update_job_status(
184 conn,
185 job.extraction_job_id,
186 status='failed',
187 expected_status='running',
188 output_count=0,
189 metadata_patch={
190 'worker': 'run_chromaprint_job',
191 'output_target': args.output_target,
192 'dry_run': False,
193 'write_target_table': 'audio_fingerprint',
194 'artifact_dir': str(artifact_dir),
195 'failure_reason': 'unreadable_audio_assets',
196 'missing_asset_count': len(missing_assets),
197 'missing_asset_samples': missing_assets[:5],
198 },
199 set_finished_at=True,
200 )
201 else:
202 for extracted in extracted_assets:
203 asset = extracted['asset']
204 hashes = extracted['hashes']
205 artifact_path = artifact_dir / f"job{job.extraction_job_id}_asset{asset['asset_id']}.json"
206 artifact_payload = {
207 'feature_set_id': job.feature_set_id,
208 'extraction_job_id': job.extraction_job_id,
209 'asset_id': asset['asset_id'],
210 'recording_id': asset['recording_id'],
211 'hash_count': len(hashes),
212 'hashes': [[int(h), int(offset)] for h, offset in hashes],
213 }
214 artifact_path.write_text(json.dumps(artifact_payload, ensure_ascii=False, indent=2), encoding='utf-8')
215 fingerprint_id, operation = upsert_audio_fingerprint(
216 conn,
217 feature_set_id=job.feature_set_id,
218 asset=asset,
219 fingerprint_uri=str(artifact_path),
220 hash_count=len(hashes),
221 metadata_json={
222 'worker': 'run_chromaprint_job',
223 'model_name': job.model_name,
224 'model_version': job.model_version,
225 'extraction_job_id': job.extraction_job_id,
226 'hash_encoding': 'repo-local-chromaprint-matcher',
227 'artifact_format': 'json_hash_pairs_v1',
228 },
229 )
230 processed_assets.append({
231 'asset_id': asset['asset_id'],
232 'recording_id': asset['recording_id'],
233 'fingerprint_id': fingerprint_id,
234 'hash_count': len(hashes),
235 'fingerprint_uri': str(artifact_path),
236 'operation': operation,
237 })
238 completed = update_job_status(
239 conn,
240 job.extraction_job_id,
241 status='completed',
242 expected_status='running',
243 output_count=len(processed_assets),
244 metadata_patch={
245 'worker': 'run_chromaprint_job',
246 'output_target': args.output_target,
247 'dry_run': False,
248 'write_target_table': 'audio_fingerprint',
249 'artifact_dir': str(artifact_dir),
250 'processed_asset_count': len(processed_assets),
251 'missing_asset_count': len(missing_assets),
252 },
253 set_finished_at=True,
254 )
62 255
63 emit_payload( 256 emit_payload(
64 { 257 {
...@@ -66,12 +259,16 @@ def main() -> None: ...@@ -66,12 +259,16 @@ def main() -> None:
66 'schema': args.schema, 259 'schema': args.schema,
67 'job': job.__dict__, 260 'job': job.__dict__,
68 'target_scope_summary': scope, 261 'target_scope_summary': scope,
262 'scope_asset_count': len(scope_assets),
263 'processed_assets': processed_assets,
264 'missing_assets': missing_assets,
69 'status_after_start': running, 265 'status_after_start': running,
70 'status_after_complete': completed, 266 'status_after_complete': completed,
267 'status_after_failed': failed,
71 'next_write_target': 'audio_fingerprint', 268 'next_write_target': 'audio_fingerprint',
72 'notes': [ 269 'notes': [
73 'this worker currently validates planner -> job -> PostgreSQL state flow', 270 'dry-run preserves the verified planner -> job -> PostgreSQL state flow',
74 'real chromaprint extraction can replace dry_run while preserving the same job contract', 271 'non-dry-run now writes repo-local chromaprint-style hash artifacts plus audio_fingerprint rows when source audio is readable',
75 ], 272 ],
76 }, 273 },
77 args.output, 274 args.output,
......
1 ## 2026-06-04 1 ## 2026-06-04
2 2
3 - 更新 `run_chromaprint_job.py``src/engines/chromaprint_matcher.py`,把 exact lane 从“只有 dry-run”推进到“具备真实 `audio_fingerprint` 写入路径”;同时增加无 `librosa` 环境下的 `wave + numpy` 回退实现,避免 worker 被运行时依赖直接卡死。
4 -`audio_fingerprint` 补上 `(feature_set_id, asset_id)` 唯一索引,并把 exact lane 写入改成 `INSERT ... ON CONFLICT DO UPDATE`;同时把失败语义收紧为“全量成功 / 否则失败”,避免部分不可读资产被误标成 completed。
5 - 新增 `phase1_worker_chromaprint_write_attempt.json``phase1_worker_chromaprint_write_guard_report.json`,在 live PostgreSQL `acr_test` 上验证 exact lane 的非 dry-run 行为:当前因 `/workspace/downloads/...` 缺失导致 `scope_asset_count=20``processed_assets=0`,job 被明确标记为 `failed``failure_reason=unreadable_audio_assets`,证明写入路径已接上但受环境挂载阻塞。
3 - 新增 `bootstrap_phase1_reference_members_live.py``phase1_reference_member_bootstrap_report.json`,把 `acr_test``recording.is_reference=true` 的 20 条录音真实挂到 `phase1_hot_reference_v1`,使 worker dry-run 的 scope 从 `0` 提升为 `20 recordings / 20 assets / 20 windows` 6 - 新增 `bootstrap_phase1_reference_members_live.py``phase1_reference_member_bootstrap_report.json`,把 `acr_test``recording.is_reference=true` 的 20 条录音真实挂到 `phase1_hot_reference_v1`,使 worker dry-run 的 scope 从 `0` 提升为 `20 recordings / 20 assets / 20 windows`
4 - 根据 architect 复核修正 worker contract:`mark_job_status.py` 现支持真正的“CLI 覆盖 env”并限制状态白名单;`_job_common.update_job_status()` 新增前置状态约束并防止 `finished_at` 被重复覆盖;`bootstrap_phase1_extraction_jobs_live.py` 在恢复 pending 时会清空旧时间戳与计数;`run_embedding_job.py` 对 embedding job 契约做了更严格校验。 7 - 根据 architect 复核修正 worker contract:`mark_job_status.py` 现支持真正的“CLI 覆盖 env”并限制状态白名单;`_job_common.update_job_status()` 新增前置状态约束并防止 `finished_at` 被重复覆盖;`bootstrap_phase1_extraction_jobs_live.py` 在恢复 pending 时会清空旧时间戳与计数;`run_embedding_job.py` 对 embedding job 契约做了更严格校验。
5 - 修正 `plan_phase1_extraction_jobs_live.py`:新增 schema 校验,命令模板显式锚定 `cd /workspace/acr-engine &&`,并把 `--complete-dry-run``--expected-status pending` 带入生成的命令,避免 planner 产物“看起来能跑但实际上缺关键上下文/步骤”。 8 - 修正 `plan_phase1_extraction_jobs_live.py`:新增 schema 校验,命令模板显式锚定 `cd /workspace/acr-engine &&`,并把 `--complete-dry-run``--expected-status pending` 带入生成的命令,避免 planner 产物“看起来能跑但实际上缺关键上下文/步骤”。
......
...@@ -227,10 +227,62 @@ flowchart TD ...@@ -227,10 +227,62 @@ flowchart TD
227 后续把下面逻辑塞进 `run_chromaprint_job.py` 227 后续把下面逻辑塞进 `run_chromaprint_job.py`
228 228
229 1. 读取 `recording_asset` 229 1. 读取 `recording_asset`
230 2. 调 chromaprint CLI / library 230 2. 读取可用音频并提取 exact-lane hash
231 3.`audio_fingerprint` 231 3. 写 artifact JSON
232 4. 更新 `output_count` 232 4.`audio_fingerprint`
233 5. 标记 `completed` 233 5. 更新 `output_count`
234 6. 标记 `completed`
235
236 ### 当前 exact lane 的真实状态
237
238 这轮已经把 `run_chromaprint_job.py` 从“只有 dry-run”推进到:
239
240 - 如果 source audio 可读:
241 - 生成 repo-local chromaprint-style hash artifact
242 - 写入 `audio_fingerprint`
243 - 如果 source audio 不可读:
244 - 明确把 job 标记为 `failed`
245 -`failure_reason``missing_asset_count``missing_asset_samples` 写回 PostgreSQL
246
247 ### 当前失败语义
248
249 当前 exact lane 采用的是 **全量成功 / 否则失败**
250
251 - 只要 scope 内任意 asset:
252 - 缺文件
253 - 解码失败
254 - hash 提取失败
255
256 就整体标记:
257
258 - `job_status = failed`
259 - `failure_reason = unreadable_audio_assets`
260
261 这样不会把“部分成功”伪装成 `completed`
262
263 ### 当前依赖策略
264
265 当前 exact lane 不再强依赖 `librosa`
266
267 - 优先使用 `librosa`(如果环境里存在)
268 - 否则回退到:
269 - Python `wave`
270 - `numpy` 线性重采样
271 - `numpy` FFT spectrogram
272
273 这使得 worker contract 能在更瘦的运行环境里继续工作。
274
275 ### 当前幂等保护
276
277 `audio_fingerprint` 现在补了:
278
279 - `UNIQUE(feature_set_id, asset_id)`
280
281 对应 worker 写入改成:
282
283 - `INSERT ... ON CONFLICT DO UPDATE`
284
285 因此 exact lane 对同一 `(feature_set_id, asset_id)` 的重复写入不再依赖应用层先查再写。
234 286
235 ### 7.2 Embedding worker 287 ### 7.2 Embedding worker
236 288
......
...@@ -378,6 +378,66 @@ flowchart TD ...@@ -378,6 +378,66 @@ flowchart TD
378 378
379 - 基础 claim guard 379 - 基础 claim guard
380 - 基础重复执行保护 380 - 基础重复执行保护
381
382 ---
383
384 ## exact lane 非 dry-run 写入尝试(新增)
385
386 这轮又继续向前推进了一步:
387
388 > `run_chromaprint_job.py` 已经不再只是 dry-run。
389
390 当前行为:
391
392 1. 如果 reference asset 对应音频文件可读:
393 - 提取 repo-local chromaprint-style hash
394 - 写 artifact JSON
395 -`audio_fingerprint`
396 - job 标记为 `completed`
397
398 2. 如果 reference asset 对应音频文件不可读:
399 - job 标记为 `failed`
400 -`metadata_json` 里写入:
401 - `failure_reason`
402 - `missing_asset_count`
403 - `missing_asset_samples`
404
405 ### 本轮 live 结果
406
407 报告:
408
409 - `acr-engine/data/pgvector_eval/music20/phase1_worker_chromaprint_write_attempt.json`
410 - `acr-engine/data/pgvector_eval/music20/phase1_worker_chromaprint_write_guard_report.json`
411
412 关键结果:
413
414 - `scope_asset_count = 20`
415 - `processed_assets = 0`
416 - `missing_assets = 20`
417 - `job_status = failed`
418 - `failure_reason = unreadable_audio_assets`
419 - `audio_fingerprint_count = 0`
420
421 ### 这说明什么
422
423 说明当前 exact lane 的 PostgreSQL worker contract 已经具备:
424
425 - 非 dry-run 的真实写入路径
426 - 明确的失败落盘
427 - 环境缺失时的可审计错误证据
428 - “全量成功 / 否则失败”的批次语义
429 - `audio_fingerprint(feature_set_id, asset_id)` 的原子 upsert 约束基础
430
431 但当前容器仍然缺:
432
433 - `/workspace/downloads/...` 实际音频文件
434
435 因此这轮证明的是:
436
437 - **worker 写入路径已经接上**
438 - **当前被环境数据挂载阻塞**
439
440 而不是 exact lane 逻辑本身还没落地。
381 - `type_7` 441 - `type_7`
382 442
383 因此: 443 因此:
......
...@@ -191,10 +191,11 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql ...@@ -191,10 +191,11 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
191 - 下一阶段已经不是“补 planner”,而是把 dry-run worker 替换为真实 extractor,并把 `audio_fingerprint / audio_embedding` 写入做成幂等执行 191 - 下一阶段已经不是“补 planner”,而是把 dry-run worker 替换为真实 extractor,并把 `audio_fingerprint / audio_embedding` 写入做成幂等执行
192 - `phase1_hot_reference_v1``acr_test` 里已经真实补齐 `20` 个 reference members,因此 worker dry-run 当前看到的 scope 已是 `20 recordings / 20 assets / 20 windows` 192 - `phase1_hot_reference_v1``acr_test` 里已经真实补齐 `20` 个 reference members,因此 worker dry-run 当前看到的 scope 已是 `20 recordings / 20 assets / 20 windows`
193 - worker contract 现在已有基础前置状态保护;重复执行同一 chromaprint dry-run job 会被 `expected_status=pending` 明确拒绝,证据见 `phase1_worker_double_claim_guard_report.json` 193 - worker contract 现在已有基础前置状态保护;重复执行同一 chromaprint dry-run job 会被 `expected_status=pending` 明确拒绝,证据见 `phase1_worker_double_claim_guard_report.json`
194 - exact lane 的 `run_chromaprint_job.py` 已具备非 dry-run 写入路径;当前在 `acr_test` 的 live 结果是因为 `/workspace/downloads/...` 缺失而明确 `failed`,不是继续假装 `completed`
194 195
195 ### 未验证 / 仍是缺口 196 ### 未验证 / 仍是缺口
196 - **未实际跑 MERT / MuQ encoder-only 特征抽取** 197 - **未实际跑 MERT / MuQ encoder-only 特征抽取**
197 - **worker 目前仍以 dry-run 为主,尚未写真实 `audio_fingerprint / audio_embedding`** 198 - **semantic / cover 等后续 lane 仍主要停留在 dry-run;exact lane 已接上真实 `audio_fingerprint` 写入路径,但当前容器缺 reference 音频挂载,live 结果仍停在可审计失败**
198 - **还未落更大规模的生产 reference set 真实业务数据(当前仅验证了 `acr_test` 下的 20-song live members)** 199 - **还未落更大规模的生产 reference set 真实业务数据(当前仅验证了 `acr_test` 下的 20-song live members)**
199 - **未定义最终线上分数融合细则** 200 - **未定义最终线上分数融合细则**
200 - **type_8 / type_16 还没有进入当前 live JSONL 的 PostgreSQL 实测链** 201 - **type_8 / type_16 还没有进入当前 live JSONL 的 PostgreSQL 实测链**
......