Commit e45896b7 e45896b757062f9b8d7c1b55efa06e99a2d21236 by cnb.bofCdSsphPA

Make long CPU index builds resumable and root-path tolerant

Constraint: Real FMA smoke indexing can run for a long time on CPU and synthetic/root-layout datasets must still use the same build-index entrypoint
Rejected: Treat build-index as all-or-nothing and require full reruns after interruption | wastes hours on CPU and obscures whether work was already completed
Confidence: high
Scope-risk: moderate
Directive: Preserve checkpoint file compatibility; future smoke-local automation should prefer resume before rebuilding from scratch
Tested: /usr/local/miniconda3/bin/python -m py_compile acr-engine/src/engines/ecapa_embedder.py acr-engine/src/engines/chromaprint_matcher.py acr-engine/run_demo.py; synthetic_v2 partial-checkpoint resume vs fresh rebuild equality check (shape/ids/embeddings/progress)
Not-tested: In-place resumption of the currently running real FMA process after an actual external kill/restart
1 parent 90e252b8
......@@ -39,13 +39,22 @@ def build_chroma_index(data_dir: Path, output_dir: Path):
return matcher
def build_embedding_index(data_dir: Path, model_path: Path, output_prefix: Path, device: str):
def build_embedding_index(
data_dir: Path,
model_path: Path,
output_prefix: Path,
device: str,
checkpoint_every_refs: int = 250,
resume: bool = False,
):
embedder = ECAPAEmbedder(model_path=str(model_path), device=device)
metadata_path = data_dir / 'catalog.json' if (data_dir / 'catalog.json').exists() else data_dir / 'train.json'
ref_embs, ref_ids = embedder.build_reference_index(
songs_dir=str(data_dir),
metadata_path=str(metadata_path),
output_path=str(output_prefix),
checkpoint_every_refs=checkpoint_every_refs,
resume=resume,
)
print(f"[done] embedding index built: {len(ref_ids)} refs")
return embedder, ref_embs, ref_ids
......@@ -58,8 +67,18 @@ def cmd_build_index(args):
print(f"[build-index] starting chromaprint index: data={data_dir} output={out_dir}")
build_chroma_index(data_dir, out_dir)
print(f"[build-index] starting embedding index: model={args.model} device={args.device}")
build_embedding_index(data_dir, Path(args.model), out_dir / 'reference', args.device)
print(
f"[build-index] starting embedding index: model={args.model} device={args.device} "
f"resume={args.resume} checkpoint_every_refs={args.checkpoint_every_refs}"
)
build_embedding_index(
data_dir,
Path(args.model),
out_dir / 'reference',
args.device,
checkpoint_every_refs=args.checkpoint_every_refs,
resume=args.resume,
)
def load_index(prefix: Path):
......@@ -153,6 +172,8 @@ if __name__ == '__main__':
p.add_argument('--model', required=True)
p.add_argument('--output', default='data/index')
p.add_argument('--device', default='cpu')
p.add_argument('--checkpoint-every-refs', type=int, default=250)
p.add_argument('--resume', action='store_true')
p.set_defaults(func=cmd_build_index)
p = sub.add_parser('recognize')
......
......@@ -41,6 +41,13 @@ class ChromaprintMatcher:
self.min_peak_energy = min_peak_energy
self.hash_db: Dict[int, List[Fingerprint]] = defaultdict(list)
def _resolve_audio_path(self, songs_dir: Path, rel_path: str) -> Path:
candidate = songs_dir / rel_path
if candidate.exists():
return candidate
candidate = songs_dir.parent / rel_path
return candidate
def _spectrogram(self, y: np.ndarray) -> np.ndarray:
S = np.abs(librosa.stft(y, n_fft=self.n_fft, hop_length=self.hop_length))
return S
......@@ -84,7 +91,7 @@ class ChromaprintMatcher:
for item in meta:
if item.get("type") != "reference":
continue
audio_path = songs_dir.parent / item["audio_path"]
audio_path = self._resolve_audio_path(songs_dir, item["audio_path"])
if not audio_path.exists():
continue
song_id = item["song_id"]
......
......@@ -54,6 +54,13 @@ class ECAPAEmbedder:
y, _ = librosa.load(path, sr=self.sr, mono=True)
return y
def _resolve_audio_path(self, songs_dir: Path, rel_path: str) -> Path:
candidate = songs_dir / rel_path
if candidate.exists():
return candidate
candidate = songs_dir.parent / rel_path
return candidate
def _to_mel(self, y: np.ndarray) -> torch.Tensor:
mel = librosa.feature.melspectrogram(
y=y,
......@@ -95,6 +102,8 @@ class ECAPAEmbedder:
output_path: str,
window_sec: float = 5.0,
stride_sec: float = 2.5,
checkpoint_every_refs: int = 250,
resume: bool = False,
) -> Tuple[np.ndarray, List[str]]:
with open(metadata_path) as f:
meta = json.load(f)
......@@ -105,13 +114,90 @@ class ECAPAEmbedder:
refs = [item for item in meta if item.get("type") == "reference"]
total_refs = len(refs)
start_time = time.time()
output_prefix = Path(output_path)
progress_path = output_prefix.parent / f"{output_prefix.name}_progress.json"
partial_embs_path = Path(f"{output_path}_embs.partial.npy")
partial_ids_path = Path(f"{output_path}_ids.partial.npy")
final_embs_path = Path(f"{output_path}_embs.npy")
final_ids_path = Path(f"{output_path}_ids.npy")
refs_done = 0
if resume and final_embs_path.exists() and final_ids_path.exists():
print(f"[build-reference-index] resume hit complete index: {final_embs_path} / {final_ids_path}")
final_embs = np.load(final_embs_path)
final_ids = np.load(final_ids_path, allow_pickle=True).tolist()
return final_embs, final_ids
if resume and progress_path.exists() and partial_embs_path.exists() and partial_ids_path.exists():
try:
progress = json.loads(progress_path.read_text())
refs_done = int(progress.get("refs_done", 0) or 0)
partial_embs = np.load(partial_embs_path)
partial_ids = np.load(partial_ids_path, allow_pickle=True).tolist()
all_embs = [row for row in partial_embs]
all_ids = partial_ids
print(
f"[build-reference-index] resuming from checkpoint: refs_done={refs_done}/{total_refs} "
f"windows_done={len(all_ids)}"
)
except Exception as exc:
print(f"[build-reference-index] resume checkpoint ignored due to load failure: {exc}")
refs_done = 0
all_embs = []
all_ids = []
print(
f"[build-reference-index] start: refs={total_refs} device={self.device.type} "
f"window_sec={window_sec} stride_sec={stride_sec}"
f"window_sec={window_sec} stride_sec={stride_sec} resume={resume} refs_done={refs_done}"
)
for ref_idx, item in enumerate(refs, start=1):
audio_path = songs_dir.parent / item["audio_path"]
def write_checkpoint(ref_idx: int):
if not all_embs:
return
elapsed = max(time.time() - start_time, 1e-6)
refs_per_sec = ref_idx / elapsed
eta_sec = (total_refs - ref_idx) / refs_per_sec if refs_per_sec > 0 else 0.0
emb_array = np.vstack(all_embs)
np.save(partial_embs_path, emb_array)
np.save(partial_ids_path, np.array(all_ids))
progress_path.write_text(json.dumps({
"status": "building",
"refs_done": ref_idx,
"refs_total": total_refs,
"windows_done": len(all_ids),
"elapsed_sec": round(elapsed, 3),
"eta_sec": round(eta_sec, 3),
"device": self.device.type,
"window_sec": window_sec,
"stride_sec": stride_sec,
"partial_embs_path": str(partial_embs_path),
"partial_ids_path": str(partial_ids_path),
}, indent=2))
def write_complete(total_windows: int, emb_shape: tuple[int, ...]):
elapsed = max(time.time() - start_time, 1e-6)
progress_path.write_text(json.dumps({
"status": "complete",
"refs_done": total_refs,
"refs_total": total_refs,
"windows_done": total_windows,
"elapsed_sec": round(elapsed, 3),
"device": self.device.type,
"window_sec": window_sec,
"stride_sec": stride_sec,
"final_embs_path": str(final_embs_path),
"final_ids_path": str(final_ids_path),
"embedding_shape": list(emb_shape),
}, indent=2))
if refs_done > total_refs:
print(f"[build-reference-index] resume refs_done={refs_done} exceeds refs_total={total_refs}; restarting")
refs_done = 0
all_embs = []
all_ids = []
for ref_idx, item in enumerate(refs[refs_done:], start=refs_done + 1):
audio_path = self._resolve_audio_path(songs_dir, item["audio_path"])
if not audio_path.exists():
continue
song_id = item["song_id"]
......@@ -131,10 +217,18 @@ class ECAPAEmbedder:
f"[build-reference-index] progress: refs={ref_idx}/{total_refs} "
f"windows={len(all_ids)} elapsed_sec={elapsed:.1f} eta_sec={eta_sec:.1f}"
)
if checkpoint_every_refs > 0 and (ref_idx % checkpoint_every_refs == 0 or ref_idx == total_refs):
write_checkpoint(ref_idx)
if not all_embs:
raise ValueError(
f"No reference embeddings were produced from metadata={metadata_path} songs_dir={songs_dir}"
)
all_embs = np.vstack(all_embs)
np.save(f"{output_path}_embs.npy", all_embs)
np.save(f"{output_path}_ids.npy", np.array(all_ids))
np.save(final_embs_path, all_embs)
np.save(final_ids_path, np.array(all_ids))
write_complete(len(all_ids), all_embs.shape)
print(f"Built reference index: {len(all_ids)} windows, embeddings shape {all_embs.shape}")
return all_embs, all_ids
......
......@@ -5441,3 +5441,44 @@
- **建库侧**:固定滑窗
- **开源集 query 生成侧**`random / sliding / silence_aware / hybrid`
- 下一阶段可继续叠加 beat/onset/chorus-aware 切片,而无需推翻现有流程
### Stage: build-index checkpoint resume + path compatibility hardening
完成项:
-`acr-engine/src/engines/ecapa_embedder.py` 完成 embedding index 的 checkpoint / resume 逻辑
- 支持读取 `reference_progress.json`
- 支持复用 `reference_embs.partial.npy` / `reference_ids.partial.npy`
- 若 final index 已存在,`--resume` 直接命中完成态
-`acr-engine/run_demo.py build-index` 暴露:
- `--resume`
- `--checkpoint-every-refs`
- 修复 `synthetic` / 根目录型数据集的音频路径解析兼容问题:
- `acr-engine/src/engines/ecapa_embedder.py`
- `acr-engine/src/engines/chromaprint_matcher.py`
- 为 “没有任何 reference 被成功解析” 的场景补充显式报错,避免 `np.vstack([])` 这类低可读错误
-[docs/open-dataset-workflow.md](./open-dataset-workflow.md) 补充 `build-index --resume` 用法
验证结果:
- 代码编译验证:
- `/usr/local/miniconda3/bin/python -m py_compile src/engines/ecapa_embedder.py src/engines/chromaprint_matcher.py run_demo.py`
- 兼容性验证:
- `run_demo.py build-index --data data/synthetic_v2 --model data/models_v6/best_model.pt --output /tmp/index_resume_fresh --device cpu`
- synthetic 根目录型 `audio_path=songs/...` 已可正常建索引
- resume 一致性验证:
1.`data/synthetic_v2/catalog.json` 的前 2 首 reference 生成 partial checkpoint
2. 人工保留 `reference_embs.partial.npy / reference_ids.partial.npy + reference_progress.json`
3. 执行:
- `run_demo.py build-index ... --resume --checkpoint-every-refs 1`
4. 与 fresh full rebuild 对比结果:
- `resume_shape == fresh_shape == (120, 192)`
- `ids_equal == True`
- `embs_allclose == True`
- `progress_status == complete`
- `progress_refs_done == progress_refs_total == 24`
- resume 日志证据:
- `[build-reference-index] resuming from checkpoint: refs_done=2/24 windows_done=10`
结论:
- 现在 CPU 长时间 `build-index` 任务即使中断,也可以从 partial checkpoint 续跑
- 该恢复逻辑已经拿到“恢复结果与 fresh rebuild 完全一致”的新鲜证据
- 下一步可以把这套 resume 能力进一步接到 `smoke-local` 的自动恢复策略里
......
......@@ -72,6 +72,15 @@ flowchart LR
/usr/local/miniconda3/bin/python src/data/external_adapters.py validate-local fma data/external_ingested/fma/manifests
/usr/local/miniconda3/bin/python train.py --data data/external_ingested/fma/manifests --output data/models_fma_smoke --device cpu --epochs 1 --batch-size 2 --dry-run
/usr/local/miniconda3/bin/python run_demo.py build-index --data data/external_ingested/fma/manifests --model data/models_fma_smoke/best_model.pt --output data/index_fma_smoke --device cpu
# 如果长时间 CPU 建索引被中断,可从 partial checkpoint 续跑
/usr/local/miniconda3/bin/python run_demo.py build-index \
--data data/external_ingested/fma/manifests \
--model data/models_fma_smoke/best_model.pt \
--output data/index_fma_smoke \
--device cpu \
--resume \
--checkpoint-every-refs 100
/usr/local/miniconda3/bin/python evaluate.py --data data/external_ingested/fma/manifests --model data/models_fma_smoke/best_model.pt --index-prefix data/index_fma_smoke/reference --split test --device cpu --fast-eval --output-json reports/fma-smoke/eval.json
/usr/local/miniconda3/bin/python scripts/generate_artifacts.py --eval-json reports/fma-smoke/eval.json --config-json reports/fma-smoke/config.json --output-dir reports/fma-smoke --model-version fma-smoke --data-version fma_local
```
......