Commit 44d8268c 44d8268ccb015842859f040e802698bfa566c5c2 by cnb.bofCdSsphPA

Make the ACR prototype explainable and runnable

Add missing project documentation and a minimal executable demo flow so the repository can be understood and validated end to end.

Constraint: The existing repo had design fragments but no verified runnable path
Rejected: Delay documentation until after full productization | would keep scope opaque and slow iteration
Confidence: medium
Scope-risk: moderate
Directive: Keep future stages checkpointed with changelog entries and runnable verification commands
Tested: synthetic dataset generation; train.py --dry-run; 1 epoch CPU training; index build; recognition JSON output
Not-tested: production-scale retrieval; real copyrighted audio; API serving
1 parent e25a16be
# ACR Engine
一个可运行的听歌识曲原型,包含:
- 合成数据集生成
- 传统音频指纹(landmark hash)匹配
- 深度 embedding 检索(ECAPA-TDNN)
- Hybrid 混合识别入口
## 快速开始
```bash
cd acr-engine
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python run_demo.py full-demo
```
## 常用命令
### 1. 生成合成数据
```bash
python run_demo.py generate-data --output data/synthetic --num-songs 24
```
### 2. 训练前做干跑校验
```bash
python train.py --data data/synthetic --dry-run --device cpu
```
### 3. 训练一个最小模型
```bash
python train.py --data data/synthetic --output data/models --device cpu --epochs 1 --batch-size 8
```
### 4. 构建指纹与 embedding 索引
```bash
python run_demo.py build-index --data data/synthetic --model data/models/best_model.pt --output data/index
```
### 5. 跑识别
```bash
python run_demo.py recognize \
--query data/synthetic/segments/song_0020_seg_00.wav \
--data data/synthetic \
--model data/models/best_model.pt \
--index-prefix data/index/reference
```
### 6. 一键最小闭环
```bash
python run_demo.py full-demo --device cpu
```
## 目录
- `train.py`:训练入口
- `run_demo.py`:数据生成 / 建索引 / 识别 / 一键 demo
- `src/data`:数据集和合成数据生成
- `src/models`:ECAPA 模型与损失
- `src/engines`:指纹、embedding、hybrid 检索
- `configs/default.yaml`:默认配置
## 当前定位
这是一个**原型仓库**,目标是验证 ACR 主链路能否跑通,不是生产级服务。
model:
name: ecapa_tdnn
embed_dim: 192
channels: 512
se_channels: 128
res2net_scale: 8
num_blocks: 3
n_mels: 80
aam_m: 0.3
aam_s: 30.0
data:
sample_rate: 16000
n_fft: 512
hop_length: 160
segment_dur: 5.0
crop_per_song: 4
training:
batch_size: 32
epochs: 50
lr: 0.001
weight_decay: 0.0001
warmup_epochs: 5
temperature: 0.07
supcon_weight: 1.0
aam_weight: 0.3
mixed_precision: true
gradient_clip: 1.0
save_every: 10
log_every: 10
engine:
chromaprint:
enabled: true
n_fft: 1024
hop_length: 256
hybrid:
chroma_weight: 0.3
ecapa_weight: 0.7
reject_threshold: 0.4
numpy>=1.26
PyYAML>=6.0
soundfile>=0.12
librosa>=0.10
tqdm>=4.66
torch>=2.3
#!/usr/bin/env python3
import argparse
import json
import sys
from pathlib import Path
import numpy as np
ROOT = Path(__file__).parent
sys.path.insert(0, str(ROOT))
from src.data.synthetic import generate_dataset
from src.engines.chromaprint_matcher import ChromaprintMatcher
from src.engines.ecapa_embedder import ECAPAEmbedder
from src.engines.hybrid_engine import HybridEngine
def cmd_generate_data(args):
generate_dataset(
output_dir=args.output,
num_songs=args.num_songs,
song_duration=args.song_duration,
num_segments_per_song=args.num_segments,
segment_duration=args.segment_duration,
seed=args.seed,
)
print(f"[done] dataset generated at {args.output}")
def build_chroma_index(data_dir: Path, output_dir: Path):
matcher = ChromaprintMatcher()
matcher.index_songs_from_dir(
songs_dir=str(data_dir / 'songs'),
metadata_path=str(data_dir / 'train.json'),
cache_path=str(output_dir / 'chromaprint.pkl'),
)
print(f"[done] chromaprint index built: hashes={matcher.num_hashes}, postings={matcher.index_size}")
return matcher
def build_embedding_index(data_dir: Path, model_path: Path, output_prefix: Path, device: str):
embedder = ECAPAEmbedder(model_path=str(model_path), device=device)
ref_embs, ref_ids = embedder.build_reference_index(
songs_dir=str(data_dir / 'songs'),
metadata_path=str(data_dir / 'train.json'),
output_path=str(output_prefix),
)
print(f"[done] embedding index built: {len(ref_ids)} refs")
return embedder, ref_embs, ref_ids
def cmd_build_index(args):
data_dir = Path(args.data)
out_dir = Path(args.output)
out_dir.mkdir(parents=True, exist_ok=True)
build_chroma_index(data_dir, out_dir)
build_embedding_index(data_dir, Path(args.model), out_dir / 'reference', args.device)
def load_index(prefix: Path):
ref_embs = np.load(f"{prefix}_embs.npy")
ref_ids = np.load(f"{prefix}_ids.npy", allow_pickle=True).tolist()
return ref_embs, ref_ids
def cmd_recognize(args):
data_dir = Path(args.data)
matcher = ChromaprintMatcher()
matcher.load(str(Path(args.index_prefix).parent / 'chromaprint.pkl'))
embedder = ECAPAEmbedder(model_path=args.model, device=args.device)
ref_embs, ref_ids = load_index(Path(args.index_prefix))
engine = HybridEngine(
chroma_matcher=matcher,
ecapa_embedder=embedder,
ref_embs=ref_embs,
ref_ids=ref_ids,
)
for split in ['train.json', 'val.json', 'test.json']:
p = data_dir / split
if p.exists():
engine.load_metadata(str(p))
result = engine.recognize(args.query, top_n=args.top_n)
print(json.dumps(result, ensure_ascii=False, indent=2))
def cmd_full_demo(args):
data_dir = Path(args.data)
model_dir = Path(args.model_dir)
index_dir = Path(args.index_dir)
if not data_dir.exists() or not (data_dir / 'train.json').exists():
generate_dataset(
output_dir=str(data_dir),
num_songs=args.num_songs,
song_duration=args.song_duration,
num_segments_per_song=args.num_segments,
segment_duration=args.segment_duration,
seed=args.seed,
)
print(f"[done] dataset generated at {data_dir}")
model_path = model_dir / 'best_model.pt'
if not model_path.exists():
raise SystemExit(
'full-demo requires a trained model at data/models/best_model.pt. '\
'Run train.py first or provide one.'
)
index_dir.mkdir(parents=True, exist_ok=True)
matcher = build_chroma_index(data_dir, index_dir)
embedder, ref_embs, ref_ids = build_embedding_index(data_dir, model_path, index_dir / 'reference', args.device)
query = sorted((data_dir / 'test.json').read_text() and [] )
with open(data_dir / 'test.json') as f:
test_meta = json.load(f)
query_item = next((x for x in test_meta if 'segments/' in x['audio_path']), test_meta[0])
query_path = data_dir / query_item['audio_path']
engine = HybridEngine(matcher, embedder, ref_embs, ref_ids)
for split in ['train.json', 'val.json', 'test.json']:
engine.load_metadata(str(data_dir / split))
result = engine.recognize(str(query_path), top_n=5)
print('[demo-query]', query_item['song_id'], query_item['audio_path'])
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='ACR demo utilities')
sub = parser.add_subparsers(dest='cmd', required=True)
p = sub.add_parser('generate-data')
p.add_argument('--output', default='data/synthetic')
p.add_argument('--num-songs', type=int, default=24)
p.add_argument('--song-duration', type=float, default=20.0)
p.add_argument('--num-segments', type=int, default=4)
p.add_argument('--segment-duration', type=float, default=5.0)
p.add_argument('--seed', type=int, default=42)
p.set_defaults(func=cmd_generate_data)
p = sub.add_parser('build-index')
p.add_argument('--data', default='data/synthetic')
p.add_argument('--model', required=True)
p.add_argument('--output', default='data/index')
p.add_argument('--device', default='cpu')
p.set_defaults(func=cmd_build_index)
p = sub.add_parser('recognize')
p.add_argument('--query', required=True)
p.add_argument('--data', default='data/synthetic')
p.add_argument('--model', required=True)
p.add_argument('--index-prefix', default='data/index/reference')
p.add_argument('--top-n', type=int, default=5)
p.add_argument('--device', default='cpu')
p.set_defaults(func=cmd_recognize)
p = sub.add_parser('full-demo')
p.add_argument('--data', default='data/synthetic')
p.add_argument('--model-dir', default='data/models')
p.add_argument('--index-dir', default='data/index')
p.add_argument('--num-songs', type=int, default=24)
p.add_argument('--song-duration', type=float, default=20.0)
p.add_argument('--num-segments', type=int, default=4)
p.add_argument('--segment-duration', type=float, default=5.0)
p.add_argument('--seed', type=int, default=42)
p.add_argument('--device', default='cpu')
p.set_defaults(func=cmd_full_demo)
args = parser.parse_args()
args.func(args)
import torch
from torch.utils.data import Dataset
import numpy as np
import librosa
import random
from pathlib import Path
from typing import Dict, List, Tuple
import json
import os
class ACRDataset(Dataset):
def __init__(
self,
data_dir: str,
split: str = "train",
sr: int = 16000,
n_mels: int = 80,
n_fft: int = 512,
hop_length: int = 160,
segment_dur: float = 5.0,
augment: bool = True,
n_crops_per_song: int = 4,
):
self.sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.hop_length = hop_length
self.segment_len = int(segment_dur * sr)
self.augment = augment
self.n_crops = n_crops_per_song
self.data_dir = Path(data_dir)
meta_path = Path(data_dir) / f"{split}.json"
with open(meta_path) as f:
self.metadata = json.load(f)
self.samples = []
for item in self.metadata:
song_path = Path(data_dir) / item["audio_path"]
if song_path.exists():
self.samples.append(item)
self.song_ids = sorted(set(s["song_id"] for s in self.samples))
self.song_to_idx = {sid: i for i, sid in enumerate(self.song_ids)}
def __len__(self):
return len(self.samples) * self.n_crops
def _load_segment(self, path: str, offset: float, duration: float) -> np.ndarray:
y, _ = librosa.load(
path, sr=self.sr, mono=True,
offset=offset, duration=duration
)
if len(y) < self.segment_len:
y = np.pad(y, (0, self.segment_len - len(y)))
else:
y = y[:self.segment_len]
return y
def _to_mel(self, y: np.ndarray) -> np.ndarray:
mel = librosa.feature.melspectrogram(
y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length
)
return librosa.power_to_db(mel, ref=np.max)
def __getitem__(self, idx):
sample = self.samples[idx // self.n_crops]
duration = sample["duration"]
max_offset = max(0, duration - 5.0)
offset = random.uniform(0, max_offset) if max_offset > 0 else 0
audio_path = self.data_dir / sample["audio_path"]
y = self._load_segment(str(audio_path), offset, 5.0)
if self.augment:
from src.utils.augment import AugmentPipeline
aug = AugmentPipeline(self.sr)
y = aug(y)
mel = self._to_mel(y)
mel_tensor = torch.FloatTensor(mel)
song_id = sample["song_id"]
class_id = self.song_to_idx[song_id]
return {
"mel": mel_tensor,
"song_id": torch.tensor(class_id, dtype=torch.long),
"song_name": song_id,
}
class ACRTestDataset(Dataset):
def __init__(
self,
data_dir: str,
split: str = "test",
sr: int = 16000,
n_mels: int = 80,
n_fft: int = 512,
hop_length: int = 160,
):
self.sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.hop_length = hop_length
self.data_dir = Path(data_dir)
meta_path = Path(data_dir) / f"{split}.json"
with open(meta_path) as f:
self.metadata = json.load(f)
self.samples = []
for item in self.metadata:
p = Path(data_dir) / item["audio_path"]
if p.exists():
self.samples.append(item)
self.song_ids = sorted(set(s["song_id"] for s in self.samples))
self.song_to_idx = {sid: i for i, sid in enumerate(self.song_ids)}
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
audio_path = self.data_dir / sample["audio_path"]
y, _ = librosa.load(
str(audio_path), sr=self.sr, mono=True,
offset=0, duration=min(sample["duration"], 5.0)
)
seg_len = 5 * self.sr
if len(y) < seg_len:
y = np.pad(y, (0, seg_len - len(y)))
else:
y = y[:seg_len]
mel = librosa.power_to_db(
librosa.feature.melspectrogram(y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length),
ref=np.max
)
class_id = self.song_to_idx[sample["song_id"]]
return {
"mel": torch.FloatTensor(mel),
"song_id": torch.tensor(class_id, dtype=torch.long),
"song_name": sample["song_id"],
}
"""
Synthetic audio dataset generator for ACR demo.
Generates melodies from fundamental frequencies, simulates:
- Different "songs" (unique note sequences at different base frequencies)
- Song fragments (random crops from songs)
- Humming variants (pitch shifted, time stretched versions)
This allows the full pipeline to be validated without external data.
"""
import numpy as np
import soundfile as sf
import json
import random
import os
from pathlib import Path
from typing import List, Tuple
from tqdm import tqdm
_SR = 16000
def sine_wave(freq: float, duration: float, sr: int = _SR, amp: float = 0.5) -> np.ndarray:
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
return amp * np.sin(2 * np.pi * freq * t)
def harmonic_tone(freq: float, duration: float, sr: int = _SR, n_harmonics: int = 4) -> np.ndarray:
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
y = np.zeros_like(t)
for h in range(1, n_harmonics + 1):
amp = 0.5 / h
y += amp * np.sin(2 * np.pi * freq * h * t)
return y / np.max(np.abs(y)) * 0.5
def generate_melody(
base_freq: float,
note_count: int = 16,
note_dur: float = 0.5,
sr: int = _SR,
timbre: str = "harmonic",
) -> np.ndarray:
notes = []
freq = base_freq
for i in range(note_count):
interval = random.choice([0, 2, 4, 5, 7, 9, 11, 12]) # diatonic intervals
freq = base_freq * (2 ** (interval / 12))
dur = note_dur * random.uniform(0.8, 1.2)
if timbre == "sine":
note = sine_wave(freq, dur, sr)
else:
note = harmonic_tone(freq, dur, sr)
if random.random() < 0.15:
fade = np.linspace(0, 1, min(int(sr * 0.02), len(note)))
note[:len(fade)] *= fade
notes.append(note)
return np.concatenate(notes)
_CHORD_PROGRESSIONS = [
[0, 3, 7], # Cm
[0, 4, 7], # C
[0, 3, 7, 10], # Cm7
[0, 4, 7, 11], # Cmaj7
[0, 4, 9], # Csus4 → C
[0, 5, 7], # Csus2
]
def generate_song(
song_id: str,
base_freq: float,
duration: float = 30.0,
sr: int = _SR,
with_vocals: bool = True,
) -> Tuple[np.ndarray, float]:
segments_per_sec = 2
total_segments = int(duration * segments_per_sec)
y = np.zeros(int(sr * duration))
for i in range(total_segments):
t_start = i / segments_per_sec
t_end = (i + 1) / segments_per_sec
start_sample = int(t_start * sr)
end_sample = int(t_end * sr)
seg_len = end_sample - start_sample
chord = random.choice(_CHORD_PROGRESSIONS)
for interval in chord:
freq = base_freq * (2 ** (interval / 12))
env = np.exp(-np.linspace(0, 3, seg_len))
note = harmonic_tone(freq, seg_len / sr, sr) * env * 0.3
min_len = min(seg_len, len(note))
y[start_sample:start_sample + min_len] += note[:min_len]
if with_vocals:
melody = generate_melody(base_freq * 2, note_count=int(duration * 2), note_dur=0.5, sr=sr)
min_len = min(len(y), len(melody))
y[:min_len] += melody[:min_len] * 0.2
peak = np.max(np.abs(y))
if peak > 0:
y = y / peak * 0.5
return y, duration
def generate_dataset(
output_dir: str,
num_songs: int = 50,
song_duration: float = 30.0,
num_segments_per_song: int = 6,
segment_duration: float = 5.0,
sr: int = _SR,
seed: int = 42,
):
random.seed(seed)
np.random.seed(seed)
output_dir = Path(output_dir)
songs_dir = output_dir / "songs"
segs_dir = output_dir / "segments"
songs_dir.mkdir(parents=True, exist_ok=True)
segs_dir.mkdir(parents=True, exist_ok=True)
base_freqs = [130.81, 146.83, 164.81, 174.61, 196.0, 220.0, 246.94,
261.63, 293.66, 329.63, 349.23, 392.0, 440.0, 493.88,
523.25, 587.33, 659.25, 698.46, 783.99, 880.0, 987.77]
train_meta = []
val_meta = []
test_meta = []
print(f"Generating {num_songs} synthetic songs...")
for i in tqdm(range(num_songs)):
song_id = f"song_{i:04d}"
base_freq = base_freqs[i % len(base_freqs)]
key_offset = (i // len(base_freqs)) * 2
base_freq *= (2 ** (key_offset / 12))
y, dur = generate_song(song_id, base_freq, duration=song_duration, sr=sr)
song_path = songs_dir / f"{song_id}.wav"
sf.write(str(song_path), y, sr)
for j in range(num_segments_per_song):
max_offset = max(0, dur - segment_duration)
offset = random.uniform(0, max_offset)
start_s = int(offset * sr)
end_s = start_s + int(segment_duration * sr)
seg = y[start_s:end_s]
if len(seg) < int(segment_duration * sr):
seg = np.pad(seg, (0, int(segment_duration * sr) - len(seg)))
is_augmented = (j >= num_segments_per_song // 2)
if is_augmented:
from src.utils.augment import AugmentPipeline
aug = AugmentPipeline(sr)
seg_aug = aug(seg.copy())
seg_name = f"{song_id}_seg_{j:02d}_aug.wav"
seg_path = segs_dir / seg_name
sf.write(str(seg_path), seg_aug, sr)
meta_entry = {
"song_id": song_id,
"audio_path": f"segments/{seg_name}",
"duration": segment_duration,
"type": "augmented",
"offset": offset,
}
else:
seg_name = f"{song_id}_seg_{j:02d}.wav"
seg_path = segs_dir / seg_name
sf.write(str(seg_path), seg, sr)
meta_entry = {
"song_id": song_id,
"audio_path": f"segments/{seg_name}",
"duration": segment_duration,
"type": "clean",
"offset": offset,
}
offset_sec = offset
if offset_sec < dur * 0.2:
seg_type = "intro"
elif offset_sec > dur * 0.7:
seg_type = "outro"
else:
seg_type = "mid"
meta_entry["segment_type"] = seg_type
if i < int(num_songs * 0.7):
train_meta.append(meta_entry)
elif i < int(num_songs * 0.85):
val_meta.append(meta_entry)
else:
test_meta.append(meta_entry)
song_meta = {
"song_id": song_id,
"audio_path": f"songs/{song_id}.wav",
"duration": dur,
"base_freq": base_freq,
}
if i < int(num_songs * 0.7):
train_meta.append(song_meta)
elif i < int(num_songs * 0.85):
val_meta.append(song_meta)
else:
test_meta.append(song_meta)
for name, data in [("train", train_meta), ("val", val_meta), ("test", test_meta)]:
with open(output_dir / f"{name}.json", "w") as f:
json.dump(data, f, indent=2)
print(f" {name}: {len(data)} entries")
print(f"\nDataset generated at {output_dir}")
print(f" Songs: {num_songs}")
print(f" Total segments: {len(train_meta) + len(val_meta) + len(test_meta)}")
return output_dir
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--output", type=str, default="data/synthetic")
parser.add_argument("--num-songs", type=int, default=50)
parser.add_argument("--song-duration", type=float, default=30.0)
parser.add_argument("--segments-per-song", type=int, default=6)
parser.add_argument("--segment-duration", type=float, default=5.0)
args = parser.parse_args()
generate_dataset(
output_dir=args.output,
num_songs=args.num_songs,
song_duration=args.song_duration,
num_segments_per_song=args.segments_per_song,
segment_duration=args.segment_duration,
)
"""
Simplified Chromaprint-style fingerprint matcher.
Implements landmark-based audio fingerprinting:
1. Extract spectral peaks from spectrogram
2. Build hash table from peak pairs
3. Match queries via hash lookup + time offset histogram voting
"""
import numpy as np
import librosa
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
import pickle
import json
from pathlib import Path
class Fingerprint:
def __init__(self, song_id: str, offset: int, hash_val: int):
self.song_id = song_id
self.offset = offset
self.hash = hash_val
class ChromaprintMatcher:
def __init__(
self,
sr: int = 16000,
n_fft: int = 1024,
hop_length: int = 256,
peak_neighborhood: int = 20,
target_zone_width: int = 50,
min_peak_energy: float = 0.01,
):
self.sr = sr
self.n_fft = n_fft
self.hop_length = hop_length
self.peak_neighborhood = peak_neighborhood
self.target_zone_width = target_zone_width
self.min_peak_energy = min_peak_energy
self.hash_db: Dict[int, List[Fingerprint]] = defaultdict(list)
def _spectrogram(self, y: np.ndarray) -> np.ndarray:
S = np.abs(librosa.stft(y, n_fft=self.n_fft, hop_length=self.hop_length))
return S
def _find_peaks(self, S: np.ndarray) -> List[Tuple[int, int, float]]:
peaks = []
for t in range(0, S.shape[1] - self.peak_neighborhood):
for f in range(0, S.shape[0] - self.peak_neighborhood):
region = S[f:f + self.peak_neighborhood, t:t + self.peak_neighborhood]
center = S[f, t]
if center == np.max(region) and center > self.min_peak_energy:
peaks.append((t, f, center))
peaks.sort(key=lambda x: x[2], reverse=True)
return peaks[:200]
def _hash_peaks(self, peaks: List[Tuple[int, int, float]]) -> List[Tuple[int, int, int]]:
hashes = []
for i in range(len(peaks)):
for j in range(i + 1, len(peaks)):
t1, f1, _ = peaks[i]
t2, f2, _ = peaks[j]
if 0 < t2 - t1 < self.target_zone_width:
h = (f1 << 16) | (f2 << 8) | (t2 - t1)
hashes.append((h, t1))
return hashes
def index_song(self, song_id: str, y: np.ndarray):
S = self._spectrogram(y)
peaks = self._find_peaks(S)
hashes = self._hash_peaks(peaks)
for h, offset in hashes:
self.hash_db[h].append(Fingerprint(song_id, offset, h))
def index_songs_from_dir(
self, songs_dir: str, metadata_path: str, cache_path: Optional[str] = None
):
with open(metadata_path) as f:
meta = json.load(f)
songs_dir = Path(songs_dir)
for item in meta:
if "songs" not in item.get("audio_path", ""):
continue
audio_path = songs_dir.parent / item["audio_path"]
if not audio_path.exists():
continue
song_id = item["song_id"]
y, _ = librosa.load(str(audio_path), sr=self.sr, mono=True)
self.index_song(song_id, y)
if cache_path:
self.save(cache_path)
def match(self, y: np.ndarray, top_k: int = 10) -> List[Tuple[str, float]]:
S = self._spectrogram(y)
peaks = self._find_peaks(S)
hashes = self._hash_peaks(peaks)
song_votes: Dict[str, Dict[int, int]] = defaultdict(lambda: defaultdict(int))
for h, q_offset in hashes:
for fp in self.hash_db.get(h, []):
delta = fp.offset - q_offset
song_votes[fp.song_id][delta] += 1
results = []
for song_id, deltas in song_votes.items():
peak_score = max(deltas.values())
total_score = sum(deltas.values())
combined = peak_score * 1.0 + total_score * 0.1
results.append((song_id, combined))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
def save(self, path: str):
data = {}
for h, fps in self.hash_db.items():
data[h] = [(fp.song_id, fp.offset) for fp in fps]
with open(path, "wb") as f:
pickle.dump(data, f)
def load(self, path: str):
with open(path, "rb") as f:
data = pickle.load(f)
self.hash_db.clear()
for h, items in data.items():
self.hash_db[h] = [Fingerprint(sid, off, h) for sid, off in items]
@property
def index_size(self) -> int:
return sum(len(v) for v in self.hash_db.values())
@property
def num_hashes(self) -> int:
return len(self.hash_db)
import torch
import torch.nn.functional as F
import numpy as np
import librosa
from pathlib import Path
from typing import List, Optional, Tuple
import json
class ECAPAEmbedder:
def __init__(
self,
model_path: str,
device: str = "cpu",
sr: int = 16000,
n_mels: int = 80,
n_fft: int = 512,
hop_length: int = 160,
):
self.device = torch.device(device)
self.sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.hop_length = hop_length
from src.models.ecapa_tdnn import ECAPA_ACR
self.model = ECAPA_ACR(n_mels=n_mels, embed_dim=192)
state = torch.load(model_path, map_location="cpu", weights_only=True)
if "model_state_dict" in state:
state = state["model_state_dict"]
self.model.load_state_dict(state, strict=False)
self.model.to(self.device)
self.model.eval()
def _load_audio(self, path: str) -> np.ndarray:
y, _ = librosa.load(path, sr=self.sr, mono=True)
return y
def _to_mel(self, y: np.ndarray) -> torch.Tensor:
mel = librosa.feature.melspectrogram(
y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length
)
mel = librosa.power_to_db(mel, ref=np.max)
return torch.FloatTensor(mel).unsqueeze(0)
def extract_embedding(self, audio_path: str) -> np.ndarray:
y = self._load_audio(audio_path)
mel = self._to_mel(y).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
return emb.cpu().numpy().flatten()
def extract_embedding_from_wave(self, y: np.ndarray) -> np.ndarray:
if len(y) < self.sr:
y = np.pad(y, (0, self.sr - len(y)))
mel = self._to_mel(y[:self.sr * 5]).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
return emb.cpu().numpy().flatten()
def build_reference_index(
self,
songs_dir: str,
metadata_path: str,
output_path: str,
window_sec: float = 5.0,
stride_sec: float = 2.5,
) -> Tuple[np.ndarray, List[str]]:
with open(metadata_path) as f:
meta = json.load(f)
all_embs = []
all_ids = []
songs_dir = Path(songs_dir)
for item in meta:
if "songs/" not in item.get("audio_path", ""):
continue
audio_path = songs_dir.parent / item["audio_path"]
if not audio_path.exists():
continue
song_id = item["song_id"]
y, _ = librosa.load(str(audio_path), sr=self.sr, mono=True)
win_len = int(window_sec * self.sr)
stride = int(stride_sec * self.sr)
window_embs = []
for start in range(0, len(y) - win_len + 1, stride):
seg = y[start:start + win_len]
mel = self._to_mel(seg).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
window_embs.append(emb.cpu().numpy().flatten())
if window_embs:
song_emb = np.mean(window_embs, axis=0)
all_embs.append(song_emb)
all_ids.append(song_id)
all_embs = np.vstack(all_embs)
np.save(f"{output_path}_embs.npy", all_embs)
np.save(f"{output_path}_ids.npy", np.array(all_ids))
print(f"Built reference index: {len(all_ids)} songs, embeddings shape {all_embs.shape}")
return all_embs, all_ids
def search(
self,
query_emb: np.ndarray,
ref_embs: np.ndarray,
ref_ids: List[str],
top_k: int = 10,
) -> List[Tuple[str, float]]:
query_norm = query_emb / (np.linalg.norm(query_emb) + 1e-12)
ref_norm = ref_embs / (np.linalg.norm(ref_embs, axis=1, keepdims=True) + 1e-12)
scores = query_norm @ ref_norm.T
top_indices = np.argsort(-scores)[:top_k]
return [(ref_ids[i], float(scores[i])) for i in top_indices]
"""
Hybrid ACR Engine: Chromaprint fast pre-filter + ECAPA-TDNN deep re-ranking.
"""
import numpy as np
import librosa
from typing import List, Tuple, Optional, Dict
from pathlib import Path
import json
import time
class Candidate:
def __init__(self, song_id: str, chroma_score: float = 0.0, ecapa_score: float = 0.0):
self.song_id = song_id
self.chroma_score = chroma_score
self.ecapa_score = ecapa_score
self.metadata: Dict = {}
@property
def combined_score(self) -> float:
return 0.3 * self.chroma_score + 0.7 * self.ecapa_score
def __repr__(self):
return f"Candidate({self.song_id}, chroma={self.chroma_score:.3f}, ecapa={self.ecapa_score:.3f})"
class HybridEngine:
def __init__(
self,
chroma_matcher=None,
ecapa_embedder=None,
ref_embs: Optional[np.ndarray] = None,
ref_ids: Optional[List[str]] = None,
sr: int = 16000,
chroma_weight: float = 0.3,
ecapa_weight: float = 0.7,
reject_threshold: float = 0.4,
):
self.chroma = chroma_matcher
self.ecapa = ecapa_embedder
self.ref_embs = ref_embs
self.ref_ids = ref_ids
self.sr = sr
self.chroma_weight = chroma_weight
self.ecapa_weight = ecapa_weight
self.reject_threshold = reject_threshold
self.song_metadata: Dict[str, Dict] = {}
def load_metadata(self, metadata_path: str):
with open(metadata_path) as f:
items = json.load(f)
for item in items:
sid = item["song_id"]
if sid not in self.song_metadata:
base = item.get("base_freq", 0)
self.song_metadata[sid] = {
"song_id": sid,
"base_freq": base,
"audio_path": item.get("audio_path", ""),
}
def recognize(
self,
audio_path: str,
top_n: int = 5,
mode: str = "auto",
) -> List[Dict]:
start = time.time()
y, _ = librosa.load(audio_path, sr=self.sr, mono=True)
chroma_candidates: List[Candidate] = []
if self.chroma is not None:
chroma_matches = self.chroma.match(y, top_k=50)
seen = set()
for song_id, score in chroma_matches:
if song_id not in seen:
seen.add(song_id)
c = Candidate(song_id, chroma_score=score)
chroma_candidates.append(c)
ecapa_candidates: List[Candidate] = []
if self.ecapa is not None and self.ref_embs is not None:
query_emb = self.ecapa.extract_embedding_from_wave(y)
ref_norm = self.ref_embs / (
np.linalg.norm(self.ref_embs, axis=1, keepdims=True) + 1e-12
)
query_norm = query_emb / (np.linalg.norm(query_emb) + 1e-12)
scores = query_norm @ ref_norm.T
top_indices = np.argsort(-scores)[:top_n]
for idx in top_indices:
c = Candidate(self.ref_ids[idx], ecapa_score=float(scores[idx]))
ecapa_candidates.append(c)
combined: Dict[str, Candidate] = {}
for c in chroma_candidates:
combined[c.song_id] = c
for c in ecapa_candidates:
if c.song_id in combined:
combined[c.song_id].ecapa_score = c.ecapa_score
else:
combined[c.song_id] = c
for sid in list(combined.keys()):
combined[sid].metadata = self.song_metadata.get(sid, {})
results = sorted(
combined.values(),
key=lambda c: c.combined_score,
reverse=True,
)[:top_n]
elapsed = (time.time() - start) * 1000
output = []
for c in results:
output.append({
"song_id": c.song_id,
"confidence": round(c.combined_score, 4),
"chromaprint_score": round(c.chroma_score, 4),
"ecapa_score": round(c.ecapa_score, 4),
"metadata": c.metadata,
})
return {
"candidates": output,
"processing_time_ms": round(elapsed, 1),
"num_candidates": len(results),
}
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
class SEModule(nn.Module):
def __init__(self, channels, se_channels=128):
super().__init__()
self.se = nn.Sequential(
nn.Conv1d(channels, se_channels, kernel_size=1),
nn.ReLU(),
nn.BatchNorm1d(se_channels),
nn.Conv1d(se_channels, channels, kernel_size=1),
nn.Sigmoid(),
)
def forward(self, x):
return x * self.se(x)
class Res2Block(nn.Module):
def __init__(self, channels, kernel_size=3, dilation=1, scale=8, se_channels=128):
super().__init__()
self.width = channels // scale
self.num_split = scale
self.convs = nn.ModuleList()
for i in range(self.num_split):
self.convs.append(
nn.Sequential(
nn.Conv1d(
self.width,
self.width,
kernel_size,
padding=dilation * (kernel_size - 1) // 2,
dilation=dilation,
),
nn.ReLU(),
nn.BatchNorm1d(self.width),
)
)
self.conv1x1 = nn.Sequential(
nn.Conv1d(channels, channels, kernel_size=1),
nn.ReLU(),
nn.BatchNorm1d(channels),
)
self.se = SEModule(channels, se_channels)
def forward(self, x):
residual = x
split_x = torch.split(x, self.width, dim=1)
out = []
for i, (part, conv) in enumerate(zip(split_x, self.convs)):
if i == 0:
out.append(conv(part))
else:
out.append(conv(out[-1] if len(out) else part + part))
x = torch.cat(out, dim=1)
x = self.conv1x1(x)
x = self.se(x)
return x + residual
class StatisticsPooling(nn.Module):
def forward(self, x):
mean = torch.mean(x, dim=2)
std = torch.sqrt(torch.var(x, dim=2, unbiased=False) + 1e-12)
return torch.cat([mean, std], dim=1)
class AAMSoftmax(nn.Module):
def __init__(self, in_features, out_features, m=0.3, s=30.0):
super().__init__()
self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
nn.init.xavier_normal_(self.weight)
self.m = m
self.s = s
self.cos_m = torch.cos(torch.tensor(m))
self.sin_m = torch.sin(torch.tensor(m))
self.th = torch.cos(torch.tensor(torch.pi - m))
self.mm = torch.sin(torch.tensor(torch.pi - m)) * m
def forward(self, x, labels):
x = F.normalize(x, dim=1)
w = F.normalize(self.weight, dim=1)
cos_theta = F.linear(x, w)
sin_theta = torch.sqrt(1.0 - torch.clamp(cos_theta ** 2, 0, 1))
phi = cos_theta * self.cos_m - sin_theta * self.sin_m
phi = torch.where(cos_theta > self.th, phi, cos_theta - self.mm)
one_hot = F.one_hot(labels, num_classes=self.weight.size(0)).float()
output = (one_hot * phi) + ((1.0 - one_hot) * cos_theta)
output *= self.s
return output
class ECAPA_ACR(nn.Module):
def __init__(
self,
n_mels: int = 80,
embed_dim: int = 192,
channels: int = 512,
se_channels: int = 128,
res2net_scale: int = 8,
num_blocks: int = 3,
num_classes: Optional[int] = None,
aam_m: float = 0.3,
aam_s: float = 30.0,
):
super().__init__()
self.embed_dim = embed_dim
self.conv1 = nn.Sequential(
nn.Conv1d(n_mels, channels, kernel_size=5, stride=1, padding=2),
nn.ReLU(),
nn.BatchNorm1d(channels),
)
dilations = [1, 2, 3] if num_blocks == 3 else [d * 1 for d in range(1, num_blocks + 1)]
self.blocks = nn.ModuleList()
for d in dilations[:num_blocks]:
self.blocks.append(
Res2Block(
channels=channels,
kernel_size=3,
dilation=d,
scale=res2net_scale,
se_channels=se_channels,
)
)
in_channels = channels * num_blocks
self.mfa = nn.Sequential(
nn.Conv1d(in_channels, channels * 3, kernel_size=1),
nn.ReLU(),
nn.BatchNorm1d(channels * 3),
)
self.pooling = StatisticsPooling()
self.fc = nn.Linear(channels * 3 * 2, embed_dim)
self.bn = nn.BatchNorm1d(embed_dim, affine=False)
if num_classes is not None:
self.aam = AAMSoftmax(embed_dim, num_classes, m=aam_m, s=aam_s)
else:
self.aam = None
def forward(
self, mel: torch.Tensor, labels: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
x = self.conv1(mel)
block_outputs = []
for block in self.blocks:
x = block(x)
block_outputs.append(x)
x = torch.cat(block_outputs, dim=1)
x = self.mfa(x)
x = self.pooling(x)
x = self.fc(x)
x = self.bn(x)
embedding = F.normalize(x, p=2, dim=1)
if labels is not None and self.aam is not None:
logits = self.aam(embedding, labels)
return embedding, logits
return embedding, None
import torch
import torch.nn as nn
import torch.nn.functional as F
class SupConLoss(nn.Module):
def __init__(self, temperature: float = 0.07):
super().__init__()
self.temperature = temperature
def forward(self, features: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
batch_size = features.shape[0]
labels = labels.contiguous().view(-1, 1)
mask = torch.eq(labels, labels.T).float().to(features.device)
mask = mask - torch.eye(batch_size, device=features.device)
features = F.normalize(features, dim=1)
sim = torch.matmul(features, features.T) / self.temperature
sim_max, _ = torch.max(sim, dim=1, keepdim=True)
sim = sim - sim_max.detach()
exp_sim = torch.exp(sim) * (1 - torch.eye(batch_size, device=features.device))
log_prob = sim - torch.log(exp_sim.sum(dim=1, keepdim=True))
pos_mask = mask
pos_count = pos_mask.sum(dim=1)
loss = -(log_prob * pos_mask).sum(dim=1)
loss = loss / pos_count.clamp(min=1)
return loss.mean()
class CombinedLoss(nn.Module):
def __init__(
self,
temperature: float = 0.07,
supcon_weight: float = 1.0,
aam_weight: float = 0.3,
):
super().__init__()
self.supcon = SupConLoss(temperature)
self.ce = nn.CrossEntropyLoss()
self.supcon_weight = supcon_weight
self.aam_weight = aam_weight
def forward(
self,
embedding: torch.Tensor,
logits: torch.Tensor,
labels: torch.Tensor,
supcon_labels: torch.Tensor,
) -> dict:
loss_supcon = self.supcon(embedding, supcon_labels)
loss_ce = self.ce(logits, labels)
total = self.supcon_weight * loss_supcon + self.aam_weight * loss_ce
return {
"loss": total,
"supcon_loss": loss_supcon.item(),
"ce_loss": loss_ce.item(),
}
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import librosa
from typing import List, Optional, Tuple
class AudioProcessor:
def __init__(self, sr: int = 16000, n_mels: int = 80, n_fft: int = 512, hop_length: int = 160):
self.sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.hop_length = hop_length
def load(self, path: str, sr: Optional[int] = None, duration: Optional[float] = None) -> np.ndarray:
y, _ = librosa.load(path, sr=sr or self.sr, mono=True, duration=duration)
return y
def to_mel(self, y: np.ndarray) -> np.ndarray:
mel = librosa.feature.melspectrogram(
y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length
)
return librosa.power_to_db(mel, ref=np.max)
def to_mel_tensor(self, y: np.ndarray) -> torch.Tensor:
mel = self.to_mel(y)
return torch.FloatTensor(mel).unsqueeze(0)
def sliding_windows(self, y: np.ndarray, window_sec: float = 5.0, stride_sec: float = 2.5) -> List[np.ndarray]:
win_len = int(window_sec * self.sr)
stride = int(stride_sec * self.sr)
if len(y) < win_len:
pad = win_len - len(y)
y = np.pad(y, (0, pad))
windows = []
for start in range(0, len(y) - win_len + 1, stride):
windows.append(y[start:start + win_len])
if not windows:
windows.append(y[:win_len])
return windows
def mel_from_path(self, path: str) -> Tuple[torch.Tensor, float]:
y = self.load(path)
duration = len(y) / self.sr
return self.to_mel_tensor(y), duration
def extract_chroma(self, y: np.ndarray) -> np.ndarray:
chroma = librosa.feature.chroma_cqt(y=y, sr=self.sr)
return chroma
def extract_f0(self, y: np.ndarray, fmin=65, fmax=2093) -> np.ndarray:
f0, _, _ = librosa.pyin(y, sr=self.sr, fmin=fmin, fmax=fmax)
f0 = np.nan_to_num(f0, nan=0.0)
return f0
import numpy as np
import random
from typing import Optional, Tuple
class AugmentPipeline:
def __init__(self, sr: int = 16000):
self.sr = sr
self.noise_snr_range = (5, 30)
self.pitch_shift_range = (-6, 6)
self.time_stretch_range = (0.85, 1.15)
self.mp3_bitrate_range = (32, 128)
def add_noise(self, y: np.ndarray, snr_db: Optional[float] = None) -> np.ndarray:
if snr_db is None:
snr_db = random.uniform(*self.noise_snr_range)
signal_power = np.mean(y ** 2)
noise_power = signal_power / (10 ** (snr_db / 10))
noise = np.random.randn(len(y)) * np.sqrt(noise_power)
return y + noise
def pitch_shift(self, y: np.ndarray, semitones: Optional[float] = None) -> np.ndarray:
if semitones is None:
semitones = random.uniform(*self.pitch_shift_range)
return librosa_shift(y, sr=self.sr, n_steps=semitones)
def time_stretch(self, y: np.ndarray, rate: Optional[float] = None) -> np.ndarray:
if rate is None:
rate = random.uniform(*self.time_stretch_range)
return librosa_ts(y, sr=self.sr, rate=rate)
def add_reverb(self, y: np.ndarray, decay: float = 0.3) -> np.ndarray:
ir_len = int(0.1 * self.sr)
ir = np.exp(-np.arange(ir_len) * decay / ir_len) * np.random.randn(ir_len)
ir /= np.sqrt(np.sum(ir ** 2))
return np.convolve(y, ir, mode='same')[:len(y)]
def apply_spec_augment(self, mel: np.ndarray, max_time_mask: int = 20, max_freq_mask: int = 8) -> np.ndarray:
mel = mel.copy()
t = mel.shape[1]
f = mel.shape[0]
for _ in range(2):
t_mask = random.randint(0, max_time_mask)
t_start = random.randint(0, max(0, t - t_mask))
if t_start < t:
mel[:, t_start:t_start + t_mask] = 0
for _ in range(2):
f_mask = random.randint(0, max_freq_mask)
f_start = random.randint(0, max(0, f - f_mask))
if f_start < f:
mel[f_start:f_start + f_mask, :] = 0
return mel
def apply_to_mel(self, mel: np.ndarray) -> np.ndarray:
if random.random() < 0.3:
mel = self.apply_spec_augment(mel)
return mel
def __call__(self, y: np.ndarray) -> np.ndarray:
if random.random() < 0.5:
y = self.add_noise(y)
if random.random() < 0.3:
y = self.time_stretch(y)
if random.random() < 0.3:
y = self.pitch_shift(y)
if random.random() < 0.2:
y = self.add_reverb(y)
return y
def librosa_shift(y, sr=16000, n_steps=0):
return librosa_impl(y, lambda: __import__('librosa').effects.pitch_shift(y, sr=sr, n_steps=n_steps))
def librosa_ts(y, sr=16000, rate=1.0):
return librosa_impl(y, lambda: __import__('librosa').effects.time_stretch(y, rate=rate))
def librosa_impl(y, fn):
try:
return fn()
except Exception:
return y
#!/usr/bin/env python3
"""
ACR Engine - Training script.
"""
import os
import sys
import json
import yaml
import time
import argparse
from pathlib import Path
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from src.models.ecapa_tdnn import ECAPA_ACR
from src.models.losses import CombinedLoss
from src.data.dataset import ACRDataset, ACRTestDataset
def collate_fn(batch):
mels = [b["mel"] for b in batch]
song_ids = [b["song_id"] for b in batch]
song_names = [b["song_name"] for b in batch]
max_t = max(m.shape[1] for m in mels)
mels_padded = []
for m in mels:
pad = max_t - m.shape[1]
if pad > 0:
m = torch.nn.functional.pad(m, (0, pad))
mels_padded.append(m.unsqueeze(0))
return {
"mel": torch.cat(mels_padded, dim=0),
"song_id": torch.stack(song_ids),
"song_name": song_names,
}
def train_epoch(model, loader, optimizer, criterion, scaler, device, epoch, cfg):
model.train()
total_loss = 0
total_supcon = 0
total_ce = 0
correct = 0
total = 0
steps = 0
pbar = tqdm(loader, desc=f"Epoch {epoch}")
for batch in pbar:
mel = batch["mel"].to(device)
labels = batch["song_id"].to(device)
with torch.amp.autocast("cuda", enabled=cfg["training"]["mixed_precision"] and device.type == "cuda"):
embedding, logits = model(mel, labels)
loss_dict = criterion(embedding, logits, labels, labels)
optimizer.zero_grad()
if scaler:
scaler.scale(loss_dict["loss"]).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), cfg["training"]["gradient_clip"])
scaler.step(optimizer)
scaler.update()
else:
loss_dict["loss"].backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), cfg["training"]["gradient_clip"])
optimizer.step()
total_loss += loss_dict["loss"].item()
total_supcon += loss_dict["supcon_loss"]
total_ce += loss_dict["ce_loss"]
if logits is not None:
preds = logits.argmax(dim=1)
correct += (preds == labels).sum().item()
total += labels.size(0)
steps += 1
pbar.set_postfix({
"loss": f"{loss_dict['loss']:.4f}",
"acc": f"{correct/total:.3f}",
})
return {
"loss": total_loss / steps,
"supcon_loss": total_supcon / steps,
"ce_loss": total_ce / steps,
"accuracy": correct / total,
}
def validate(model, loader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for batch in tqdm(loader, desc="Validating"):
mel = batch["mel"].to(device)
labels = batch["song_id"].to(device)
embedding, logits = model(mel, labels)
loss_dict = criterion(embedding, logits, labels, labels)
total_loss += loss_dict["loss"].item()
if logits is not None:
preds = logits.argmax(dim=1)
correct += (preds == labels).sum().item()
total += labels.size(0)
acc = correct / total if total > 0 else 0
print(f" Validation: loss={total_loss:.4f}, accuracy={acc:.4f}")
return {"loss": total_loss, "accuracy": acc}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, default="configs/default.yaml")
parser.add_argument("--data", type=str, default="data/synthetic")
parser.add_argument("--output", type=str, default="data/models")
parser.add_argument("--resume", type=str, default=None)
parser.add_argument("--device", type=str, default="auto")
parser.add_argument("--epochs", type=int, default=None)
parser.add_argument("--batch-size", type=int, default=None)
parser.add_argument("--lr", type=float, default=None)
parser.add_argument("--dry-run", action="store_true", help="Run one batch to verify pipeline")
args = parser.parse_args()
with open(args.config) as f:
cfg = yaml.safe_load(f)
if args.epochs:
cfg["training"]["epochs"] = args.epochs
if args.batch_size:
cfg["training"]["batch_size"] = args.batch_size
if args.lr:
cfg["training"]["lr"] = args.lr
device_name = args.device
if device_name == "auto":
device_name = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device(device_name)
print(f"Device: {device}")
print("Loading datasets...")
train_dataset = ACRDataset(
args.data, split="train",
sr=cfg["data"]["sample_rate"],
n_mels=cfg["model"]["n_mels"],
n_fft=cfg["data"]["n_fft"],
hop_length=cfg["data"]["hop_length"],
segment_dur=cfg["data"]["segment_dur"],
augment=True,
n_crops_per_song=cfg["data"]["crop_per_song"],
)
val_dataset = ACRDataset(
args.data, split="val",
sr=cfg["data"]["sample_rate"],
n_mels=cfg["model"]["n_mels"],
n_fft=cfg["data"]["n_fft"],
hop_length=cfg["data"]["hop_length"],
segment_dur=cfg["data"]["segment_dur"],
augment=False,
n_crops_per_song=1,
)
train_loader = DataLoader(
train_dataset,
batch_size=cfg["training"]["batch_size"],
shuffle=True,
num_workers=2,
collate_fn=collate_fn,
drop_last=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=cfg["training"]["batch_size"],
shuffle=False,
num_workers=2,
collate_fn=collate_fn,
)
num_classes = len(train_dataset.song_ids)
print(f"Classes: {num_classes}")
print(f"Train samples: {len(train_dataset)}, Val samples: {len(val_dataset)}")
model = ECAPA_ACR(
n_mels=cfg["model"]["n_mels"],
embed_dim=cfg["model"]["embed_dim"],
channels=cfg["model"]["channels"],
se_channels=cfg["model"]["se_channels"],
res2net_scale=cfg["model"]["res2net_scale"],
num_blocks=cfg["model"]["num_blocks"],
num_classes=num_classes,
aam_m=cfg["model"]["aam_m"],
aam_s=cfg["model"]["aam_s"],
).to(device)
criterion = CombinedLoss(
temperature=cfg["training"]["temperature"],
supcon_weight=cfg["training"]["supcon_weight"],
aam_weight=cfg["training"]["aam_weight"],
)
optimizer = torch.optim.AdamW(
model.parameters(),
lr=cfg["training"]["lr"],
weight_decay=cfg["training"]["weight_decay"],
)
scaler = torch.cuda.amp.GradScaler(enabled=device.type == "cuda")
start_epoch = 1
if args.resume:
ckpt = torch.load(args.resume, map_location=device, weights_only=True)
model.load_state_dict(ckpt["model_state_dict"])
optimizer.load_state_dict(ckpt["optimizer_state_dict"])
start_epoch = ckpt["epoch"] + 1
print(f"Resumed from epoch {ckpt['epoch']}")
if args.dry_run:
print("Dry run: running one batch through forward/backward...")
batch = next(iter(train_loader))
mel = batch["mel"].to(device)
labels = batch["song_id"].to(device)
embedding, logits = model(mel, labels)
loss_dict = criterion(embedding, logits, labels, labels)
loss_dict["loss"].backward()
print(f" Forward/backward OK. Loss: {loss_dict['loss']:.4f}")
print(f" Embedding shape: {embedding.shape}")
print("Dry run passed! Pipeline is working.")
return
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=cfg["training"]["epochs"]
)
best_acc = float("-inf")
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
print("Starting training...")
for epoch in range(start_epoch, cfg["training"]["epochs"] + 1):
train_metrics = train_epoch(model, train_loader, optimizer, criterion, scaler, device, epoch, cfg)
val_metrics = validate(model, val_loader, criterion, device)
scheduler.step()
lr = optimizer.param_groups[0]["lr"]
print(f" LR: {lr:.2e}")
if epoch % cfg["training"]["save_every"] == 0 or val_metrics["accuracy"] > best_acc:
if val_metrics["accuracy"] > best_acc:
best_acc = val_metrics["accuracy"]
path = output_dir / "best_model.pt"
else:
path = output_dir / f"checkpoint_epoch_{epoch}.pt"
torch.save({
"epoch": epoch,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"best_acc": best_acc,
"config": cfg,
}, path)
print(f" Saved: {path}")
print(f"\nTraining complete. Best validation accuracy: {best_acc:.4f}")
print(f"Model saved to: {output_dir / 'best_model.pt'}")
if __name__ == "__main__":
main()
# Changelog
## 2026-06-02
### Stage: 文档补全 + ACR 最小可运行链路
完成项:
- 补充项目职责图:`docs/project-responsibility-map.md`
- 补充系统架构图:`docs/acr-architecture.md`
- 补充阶段路线图:`docs/roadmap.md`
- 补充运行手册:`docs/runbook.md`
- 补充引擎说明:`acr-engine/README.md`
- 新增依赖清单:`acr-engine/requirements.txt`
- 新增 demo CLI:`acr-engine/run_demo.py`
- 修复数据集读取路径问题:`acr-engine/src/data/dataset.py`
- 修复首次训练不落 best checkpoint 的问题:`acr-engine/train.py`
验证结果:
- 已生成 synthetic dataset
- 已通过 `train.py --dry-run`
- 已完成 1 epoch CPU 训练并生成 `best_model.pt`
- 已完成指纹索引与 embedding 索引构建
- 已完成识别命令并输出 JSON 候选结果
# ACR 项目架构图
> 更新:2026-06-02
## 1. 总体架构
```mermaid
flowchart LR
Q[Query 音频] --> P[预处理]
P --> F1[传统指纹特征]
P --> F2[Mel 特征]
F1 --> M1[Chromaprint Matcher]
F2 --> M2[ECAPA Embedder]
R[Reference 曲库] --> I1[指纹索引]
R --> I2[Embedding 索引]
M1 --> C[候选集合]
M2 --> C
C --> H[Hybrid 重排序]
H --> O[Top-K 识别结果]
```
## 2. 训练架构
```mermaid
flowchart TD
A[原始/合成音频] --> B[随机裁剪]
B --> C[增强: 噪声/变速/移调/混响]
C --> D[Mel Spectrogram]
D --> E[ECAPA-TDNN]
E --> F[Embedding]
F --> G[SupCon Loss]
F --> H[AAM Softmax]
G --> I[联合优化]
H --> I
```
## 3. 推理架构
```mermaid
sequenceDiagram
participant U as User Query
participant P as Preprocessor
participant C as Chroma Matcher
participant E as ECAPA Embedder
participant H as Hybrid Engine
U->>P: 输入音频
P->>C: 指纹特征
P->>E: Mel 特征
C-->>H: Top-N 指纹候选
E-->>H: Top-N embedding 候选
H-->>U: 融合后的识别结果
```
## 4. 当前可运行闭环
1.`synthetic.py` 生成合成曲库
2.`train.py` 训练 ECAPA 原型模型
3.`run_demo.py build-index` 构建:
- 指纹索引
- embedding 索引
4.`run_demo.py recognize` 对片段做识别
## 5. 后续生产化架构建议
- API Gateway
- 异步音频入库流水线
- Faiss/HNSW 向量服务
- Postgres/MySQL 元数据服务
- 对象存储保存原始音频
- 模型服务与索引服务解耦
# ACR 项目职责图
> 更新:2026-06-02
## 1. 项目定位
本项目是一个**听歌识曲 / 音频内容识别(ACR)原型系统**,目标是先跑通:
- 数据生成
- 特征提取
- 模型训练
- 指纹检索
- embedding 检索
- hybrid 混合识别
当前不以生产服务为目标,重点是**算法链路验证**
## 2. 仓库职责分层
```text
/workspace
├── acr-engine/ # ACR 核心算法与可运行 demo
│ ├── configs/ # 训练/推理参数配置
│ ├── src/data/ # 数据集读取、合成数据生成
│ ├── src/models/ # 声学模型、损失函数
│ ├── src/engines/ # 指纹/embedding/hybrid 检索引擎
│ ├── train.py # 模型训练入口
│ ├── run_demo.py # 数据生成、建索引、识别入口
│ └── requirements.txt # Python 依赖
├── docs/ # 设计、架构、路线图、使用说明
├── scripts/ # 环境安装与工具 bootstrap
├── container/ # 容器环境定义
└── .codex/.omx/ # Codex / OMX 协作与运行时元数据
```
## 3. 模块职责图
```mermaid
flowchart TD
A[音频输入] --> B[数据层]
B --> B1[合成数据生成 synthetic.py]
B --> B2[训练/验证数据集 dataset.py]
A --> C[特征层]
C --> C1[Mel Spectrogram]
C --> C2[Chroma / F0]
C --> C3[增强 augment.py]
C --> D[模型层]
D --> D1[ECAPA-TDNN]
D --> D2[SupCon + AAM Loss]
A --> E[检索层]
E --> E1[ChromaprintMatcher]
E --> E2[ECAPAEmbedder]
E --> E3[HybridEngine]
D --> F[训练入口 train.py]
E --> G[推理入口 run_demo.py]
```
## 4. 角色职责
| 模块 | 职责 | 当前状态 |
|---|---|---|
| `src/data/synthetic.py` | 生成可控的合成歌曲与片段 | 已实现 |
| `src/data/dataset.py` | 训练/验证数据装载 | 已实现 |
| `src/utils/audio.py` | Mel、滑窗、F0、Chroma | 已实现 |
| `src/utils/augment.py` | 噪声、变速、移调、混响增强 | 已实现 |
| `src/models/ecapa_tdnn.py` | embedding 编码器 | 已实现 |
| `src/models/losses.py` | 对比学习 + 分类训练目标 | 已实现 |
| `src/engines/chromaprint_matcher.py` | 传统哈希指纹检索 | 已实现 |
| `src/engines/ecapa_embedder.py` | embedding 提取与向量检索 | 已实现 |
| `src/engines/hybrid_engine.py` | 融合匹配结果 | 已实现 |
| `train.py` | 训练入口 | 已实现 |
| `run_demo.py` | demo 入口 | 本次补齐 |
## 5. 当前边界
当前项目**负责**
- 原型级算法验证
- 小规模曲库识别
- 本地训练与本地识别 demo
当前项目**暂不负责**
- 在线 API 服务
- 海量曲库 ANN 线上部署
- 权限、账号、计费
- 真正版权音频数据治理
- 生产监控告警
# ACR 项目 Roadmap
> 更新:2026-06-02
## Phase 0:原型跑通(当前阶段)
### 目标
完成一个端到端可运行的本地 demo。
### 范围
- [x] 合成数据生成
- [x] 数据增强
- [x] ECAPA embedding 模型
- [x] 传统指纹匹配器
- [x] HybridEngine
- [x] 最小训练入口
- [x] 最小识别入口
- [x] 文档补全
### 验收标准
- 能生成数据
- 能训练至少 1 epoch
- 能建立 reference 索引
- 能对测试片段输出 Top-K 候选
---
## Phase 1:研究验证
### 目标
验证不同场景下识别效果是否可接受。
### 任务
- [ ] 增加 top-1 / top-5 / MRR 评估脚本
- [ ] 对 clean / noisy / stretched / pitch-shifted 分开评测
- [ ] 增加 query-by-humming 专项评测集
- [ ] 加入更稳健的 negative sampling
- [ ] 补充 checkpoint / config versioning
---
## Phase 2:工程化
### 目标
把原型升级为可复现实验项目。
### 任务
- [ ] 增加 `Makefile``justfile`
- [ ] 增加 `pytest` 基础测试
- [ ] 增加日志与指标记录
- [ ] 增加模型导出与加载规范
- [ ] 增加 CLI 参数校验
- [ ] 增加 Docker 运行方式
---
## Phase 3:产品化 PoC
### 目标
提供可被业务方调用的服务接口。
### 任务
- [ ] FastAPI 服务化
- [ ] 上传音频并返回候选歌曲
- [ ] 曲库增量入库命令
- [ ] 元数据管理接口
- [ ] 结果缓存与批量检索
---
## Phase 4:大规模检索
### 目标
支持百万级以上曲库。
### 任务
- [ ] 接入 Faiss / HNSW
- [ ] embedding 分片与压缩
- [ ] 双层召回 + 精排
- [ ] 在线索引更新
- [ ] 冷热分层存储
---
## Phase 5:真实业务能力
### 目标
逼近真实听歌识曲产品。
### 任务
- [ ] 真实版权音频数据接入
- [ ] 哼唱专项模型/旋律塔
- [ ] 多模态融合(旋律 + 声纹 + 指纹)
- [ ] 在线 A/B 评估
- [ ] 监控与质量回流
# ACR 项目运行手册
## 1. 环境
```bash
cd acr-engine
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
## 2. 生成数据
```bash
python run_demo.py generate-data --output data/synthetic --num-songs 24
```
## 3. 校验训练链路
```bash
python train.py --data data/synthetic --dry-run --device cpu
```
## 4. 最小训练
```bash
python train.py --data data/synthetic --output data/models --device cpu --epochs 1 --batch-size 8
```
## 5. 建索引
```bash
python run_demo.py build-index --data data/synthetic --model data/models/best_model.pt --output data/index --device cpu
```
## 6. 跑识别
```bash
python run_demo.py recognize \
--query data/synthetic/segments/song_0020_seg_00.wav \
--data data/synthetic \
--model data/models/best_model.pt \
--index-prefix data/index/reference \
--device cpu
```
## 7. 成功判定
至少满足:
- 能输出 JSON 结果
- 返回 `candidates`
- 结果中包含 `song_id``confidence`