Commit 62688d3b 62688d3bccd07a3a03a98d6ed698f1980e1e298d by cnb.bofCdSsphPA

period upload

1 parent 44d8268c
......@@ -7,3 +7,4 @@
.codex/skills/.system/**
!.codex/prompts/
!.codex/prompts/**
.venv
......
......@@ -65,3 +65,16 @@ python run_demo.py full-demo --device cpu
## 当前定位
这是一个**原型仓库**,目标是验证 ACR 主链路能否跑通,不是生产级服务。
## 评测
```bash
python evaluate.py --data data/synthetic --model data/models/best_model.pt --index-prefix data/index/reference --split test --device cpu
```
## 当前提升方向
- 更强合成混淆样本(confused / humming_like)
- Hybrid 分数归一化后再融合
- full-demo 自动训练
- 后续可接入开源数据集
......
#!/usr/bin/env python3
import argparse
import json
from pathlib import Path
import numpy as np
from src.engines.chromaprint_matcher import ChromaprintMatcher
from src.engines.ecapa_embedder import ECAPAEmbedder
from src.engines.hybrid_engine import HybridEngine
def load_items(meta_path: Path):
with open(meta_path) as f:
return json.load(f)
def main():
parser = argparse.ArgumentParser(description="Evaluate ACR recognition quality")
parser.add_argument("--data", default="data/synthetic")
parser.add_argument("--model", required=True)
parser.add_argument("--index-prefix", default="data/index/reference")
parser.add_argument("--split", default="test")
parser.add_argument("--top-k", type=int, default=5)
parser.add_argument("--device", default="cpu")
args = parser.parse_args()
data_dir = Path(args.data)
matcher = ChromaprintMatcher()
matcher.load(str(Path(args.index_prefix).parent / "chromaprint.pkl"))
embedder = ECAPAEmbedder(model_path=args.model, device=args.device)
ref_embs = np.load(f"{args.index_prefix}_embs.npy")
ref_ids = np.load(f"{args.index_prefix}_ids.npy", allow_pickle=True).tolist()
engine = HybridEngine(matcher, embedder, ref_embs, ref_ids)
for split in ["train.json", "val.json", "test.json"]:
p = data_dir / split
if p.exists():
engine.load_metadata(str(p))
items = load_items(data_dir / f"{args.split}.json")
queries = [x for x in items if str(x.get("audio_path", "")).startswith("segments/")]
if not queries:
raise SystemExit("No segment queries found for evaluation")
top1 = 0
topk = 0
by_type = {}
failures = []
for item in queries:
result = engine.recognize(str(data_dir / item["audio_path"]), top_n=args.top_k)
preds = [c["song_id"] for c in result["candidates"]]
truth = item["song_id"]
qtype = item.get("type", "unknown")
stats = by_type.setdefault(qtype, {"n": 0, "top1": 0, "topk": 0})
stats["n"] += 1
if preds and preds[0] == truth:
top1 += 1
stats["top1"] += 1
if truth in preds:
topk += 1
stats["topk"] += 1
else:
failures.append({
"truth": truth,
"query": item["audio_path"],
"type": qtype,
"preds": preds,
})
total = len(queries)
report = {
"split": args.split,
"num_queries": total,
"top1": round(top1 / total, 4),
"topk": round(topk / total, 4),
"by_type": {
k: {
"n": v["n"],
"top1": round(v["top1"] / v["n"], 4) if v["n"] else 0.0,
"topk": round(v["topk"] / v["n"], 4) if v["n"] else 0.0,
}
for k, v in by_type.items()
},
"sample_failures": failures[:10],
}
print(json.dumps(report, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
......@@ -31,7 +31,7 @@ def build_chroma_index(data_dir: Path, output_dir: Path):
matcher = ChromaprintMatcher()
matcher.index_songs_from_dir(
songs_dir=str(data_dir / 'songs'),
metadata_path=str(data_dir / 'train.json'),
metadata_path=str(data_dir / 'catalog.json' if (data_dir / 'catalog.json').exists() else data_dir / 'train.json'),
cache_path=str(output_dir / 'chromaprint.pkl'),
)
print(f"[done] chromaprint index built: hashes={matcher.num_hashes}, postings={matcher.index_size}")
......@@ -42,7 +42,7 @@ def build_embedding_index(data_dir: Path, model_path: Path, output_prefix: Path,
embedder = ECAPAEmbedder(model_path=str(model_path), device=device)
ref_embs, ref_ids = embedder.build_reference_index(
songs_dir=str(data_dir / 'songs'),
metadata_path=str(data_dir / 'train.json'),
metadata_path=str(data_dir / 'catalog.json' if (data_dir / 'catalog.json').exists() else data_dir / 'train.json'),
output_path=str(output_prefix),
)
print(f"[done] embedding index built: {len(ref_ids)} refs")
......@@ -104,16 +104,20 @@ def cmd_full_demo(args):
model_path = model_dir / 'best_model.pt'
if not model_path.exists():
raise SystemExit(
'full-demo requires a trained model at data/models/best_model.pt. '\
'Run train.py first or provide one.'
)
import subprocess
model_dir.mkdir(parents=True, exist_ok=True)
cmd = [
'/usr/local/miniconda3/bin/python', 'train.py',
'--data', str(data_dir), '--output', str(model_dir),
'--device', args.device, '--epochs', '3', '--batch-size', '8'
]
print('[full-demo] training model:', ' '.join(cmd))
subprocess.run(cmd, check=True)
index_dir.mkdir(parents=True, exist_ok=True)
matcher = build_chroma_index(data_dir, index_dir)
embedder, ref_embs, ref_ids = build_embedding_index(data_dir, model_path, index_dir / 'reference', args.device)
query = sorted((data_dir / 'test.json').read_text() and [] )
with open(data_dir / 'test.json') as f:
test_meta = json.load(f)
query_item = next((x for x in test_meta if 'segments/' in x['audio_path']), test_meta[0])
......
#!/usr/bin/env python3
"""Helpers for optional open music dataset integration."""
import argparse
import json
from pathlib import Path
DATASETS = {
"fma_small": {
"url": "https://github.com/mdeff/fma",
"notes": "Use FMA small subset first; convert clips into catalog/query JSON for local experiments.",
},
"mtg_jamendo": {
"url": "https://github.com/MTG/mtg-jamendo-dataset",
"notes": "Use upstream download scripts; sample a small subset into catalog/query structure.",
},
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("dataset", choices=sorted(DATASETS))
parser.add_argument("--output", default="../docs/open-datasets.json")
args = parser.parse_args()
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
with open(out, "w") as f:
json.dump({args.dataset: DATASETS[args.dataset]}, f, indent=2)
print(f"Wrote dataset integration note to {out}")
if __name__ == "__main__":
main()
import torch
from torch.utils.data import Dataset
import numpy as np
import librosa
import json
import random
from pathlib import Path
from typing import Dict, List, Tuple
import json
import os
from typing import Dict, List, Optional
import librosa
import numpy as np
import torch
from torch.utils.data import Dataset
class ACRDataset(Dataset):
......@@ -21,6 +21,8 @@ class ACRDataset(Dataset):
segment_dur: float = 5.0,
augment: bool = True,
n_crops_per_song: int = 4,
song_to_idx: Optional[Dict[str, int]] = None,
references_only: bool = False,
):
self.sr = sr
self.n_mels = n_mels
......@@ -31,36 +33,39 @@ class ACRDataset(Dataset):
self.n_crops = n_crops_per_song
self.data_dir = Path(data_dir)
meta_path = Path(data_dir) / f"{split}.json"
meta_path = self.data_dir / f"{split}.json"
with open(meta_path) as f:
self.metadata = json.load(f)
self.samples = []
for item in self.metadata:
song_path = Path(data_dir) / item["audio_path"]
if references_only and item.get("type") != "reference":
continue
song_path = self.data_dir / item["audio_path"]
if song_path.exists():
self.samples.append(item)
self.song_ids = sorted(set(s["song_id"] for s in self.samples))
self.song_to_idx = {sid: i for i, sid in enumerate(self.song_ids)}
self.song_to_idx = song_to_idx or {sid: i for i, sid in enumerate(self.song_ids)}
def __len__(self):
return len(self.samples) * self.n_crops
def _load_segment(self, path: str, offset: float, duration: float) -> np.ndarray:
y, _ = librosa.load(
path, sr=self.sr, mono=True,
offset=offset, duration=duration
)
y, _ = librosa.load(path, sr=self.sr, mono=True, offset=offset, duration=duration)
if len(y) < self.segment_len:
y = np.pad(y, (0, self.segment_len - len(y)))
else:
y = y[:self.segment_len]
y = y[: self.segment_len]
return y
def _to_mel(self, y: np.ndarray) -> np.ndarray:
mel = librosa.feature.melspectrogram(
y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length
y=y,
sr=self.sr,
n_mels=self.n_mels,
n_fft=self.n_fft,
hop_length=self.hop_length,
)
return librosa.power_to_db(mel, ref=np.max)
......@@ -73,7 +78,7 @@ class ACRDataset(Dataset):
audio_path = self.data_dir / sample["audio_path"]
y = self._load_segment(str(audio_path), offset, 5.0)
if self.augment:
if self.augment and sample.get("type") != "reference":
from src.utils.augment import AugmentPipeline
aug = AugmentPipeline(self.sr)
y = aug(y)
......@@ -88,6 +93,7 @@ class ACRDataset(Dataset):
"mel": mel_tensor,
"song_id": torch.tensor(class_id, dtype=torch.long),
"song_name": song_id,
"type": sample.get("type", "unknown"),
}
......@@ -100,6 +106,7 @@ class ACRTestDataset(Dataset):
n_mels: int = 80,
n_fft: int = 512,
hop_length: int = 160,
song_to_idx: Optional[Dict[str, int]] = None,
):
self.sr = sr
self.n_mels = n_mels
......@@ -107,18 +114,18 @@ class ACRTestDataset(Dataset):
self.hop_length = hop_length
self.data_dir = Path(data_dir)
meta_path = Path(data_dir) / f"{split}.json"
meta_path = self.data_dir / f"{split}.json"
with open(meta_path) as f:
self.metadata = json.load(f)
self.samples = []
for item in self.metadata:
p = Path(data_dir) / item["audio_path"]
p = self.data_dir / item["audio_path"]
if p.exists():
self.samples.append(item)
self.song_ids = sorted(set(s["song_id"] for s in self.samples))
self.song_to_idx = {sid: i for i, sid in enumerate(self.song_ids)}
self.song_to_idx = song_to_idx or {sid: i for i, sid in enumerate(self.song_ids)}
def __len__(self):
return len(self.samples)
......@@ -126,10 +133,7 @@ class ACRTestDataset(Dataset):
def __getitem__(self, idx):
sample = self.samples[idx]
audio_path = self.data_dir / sample["audio_path"]
y, _ = librosa.load(
str(audio_path), sr=self.sr, mono=True,
offset=0, duration=min(sample["duration"], 5.0)
)
y, _ = librosa.load(str(audio_path), sr=self.sr, mono=True, offset=0, duration=min(sample["duration"], 5.0))
seg_len = 5 * self.sr
if len(y) < seg_len:
y = np.pad(y, (0, seg_len - len(y)))
......@@ -137,13 +141,100 @@ class ACRTestDataset(Dataset):
y = y[:seg_len]
mel = librosa.power_to_db(
librosa.feature.melspectrogram(y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length),
ref=np.max
librosa.feature.melspectrogram(
y=y,
sr=self.sr,
n_mels=self.n_mels,
n_fft=self.n_fft,
hop_length=self.hop_length,
),
ref=np.max,
)
class_id = self.song_to_idx[sample["song_id"]]
return {
"mel": torch.FloatTensor(mel),
"song_id": torch.tensor(class_id, dtype=torch.long),
"song_name": sample["song_id"],
"type": sample.get("type", "unknown"),
}
class SongPairDataset(Dataset):
def __init__(
self,
data_dir: str,
split: str = "train",
sr: int = 16000,
n_mels: int = 80,
n_fft: int = 512,
hop_length: int = 160,
segment_dur: float = 5.0,
augment: bool = True,
):
self.sr = sr
self.n_mels = n_mels
self.n_fft = n_fft
self.hop_length = hop_length
self.segment_len = int(segment_dur * sr)
self.augment = augment
self.data_dir = Path(data_dir)
with open(self.data_dir / f"{split}.json") as f:
metadata = json.load(f)
self.by_song: Dict[str, List[Dict]] = {}
for item in metadata:
if item.get("type") == "reference":
continue
p = self.data_dir / item["audio_path"]
if p.exists():
self.by_song.setdefault(item["song_id"], []).append(item)
self.song_ids = sorted(self.by_song)
self.song_to_idx = {sid: i for i, sid in enumerate(self.song_ids)}
def __len__(self):
return len(self.song_ids)
def _load_clip(self, sample: Dict) -> np.ndarray:
path = self.data_dir / sample["audio_path"]
y, _ = librosa.load(str(path), sr=self.sr, mono=True, duration=5.0)
if len(y) < self.segment_len:
y = np.pad(y, (0, self.segment_len - len(y)))
else:
y = y[: self.segment_len]
return y
def _to_mel(self, y: np.ndarray) -> torch.Tensor:
mel = librosa.feature.melspectrogram(
y=y,
sr=self.sr,
n_mels=self.n_mels,
n_fft=self.n_fft,
hop_length=self.hop_length,
)
mel = librosa.power_to_db(mel, ref=np.max)
return torch.FloatTensor(mel)
def __getitem__(self, idx):
song_id = self.song_ids[idx]
choices = self.by_song[song_id]
if len(choices) == 1:
a = b = choices[0]
else:
a, b = random.sample(choices, 2)
wavs = []
for sample in (a, b):
y = self._load_clip(sample)
if self.augment:
from src.utils.augment import AugmentPipeline
y = AugmentPipeline(self.sr)(y)
wavs.append(self._to_mel(y))
label = self.song_to_idx[song_id]
return {
"mel": torch.stack(wavs, dim=0),
"song_id": torch.tensor([label, label], dtype=torch.long),
"song_name": song_id,
}
......
......@@ -5,6 +5,7 @@ Generates melodies from fundamental frequencies, simulates:
- Different "songs" (unique note sequences at different base frequencies)
- Song fragments (random crops from songs)
- Humming variants (pitch shifted, time stretched versions)
- Hard negatives / confusing variants for robustness testing
This allows the full pipeline to be validated without external data.
"""
......@@ -13,9 +14,8 @@ import numpy as np
import soundfile as sf
import json
import random
import os
from pathlib import Path
from typing import List, Tuple
from typing import Tuple
from tqdm import tqdm
......@@ -33,7 +33,10 @@ def harmonic_tone(freq: float, duration: float, sr: int = _SR, n_harmonics: int
for h in range(1, n_harmonics + 1):
amp = 0.5 / h
y += amp * np.sin(2 * np.pi * freq * h * t)
return y / np.max(np.abs(y)) * 0.5
peak = np.max(np.abs(y))
if peak > 0:
y = y / peak * 0.5
return y
def generate_melody(
......@@ -44,9 +47,8 @@ def generate_melody(
timbre: str = "harmonic",
) -> np.ndarray:
notes = []
freq = base_freq
for i in range(note_count):
interval = random.choice([0, 2, 4, 5, 7, 9, 11, 12]) # diatonic intervals
for _ in range(note_count):
interval = random.choice([0, 2, 4, 5, 7, 9, 11, 12])
freq = base_freq * (2 ** (interval / 12))
dur = note_dur * random.uniform(0.8, 1.2)
......@@ -57,7 +59,7 @@ def generate_melody(
if random.random() < 0.15:
fade = np.linspace(0, 1, min(int(sr * 0.02), len(note)))
note[:len(fade)] *= fade
note[: len(fade)] *= fade
notes.append(note)
......@@ -65,15 +67,35 @@ def generate_melody(
_CHORD_PROGRESSIONS = [
[0, 3, 7], # Cm
[0, 4, 7], # C
[0, 3, 7, 10], # Cm7
[0, 4, 7, 11], # Cmaj7
[0, 4, 9], # Csus4 → C
[0, 5, 7], # Csus2
[0, 3, 7],
[0, 4, 7],
[0, 3, 7, 10],
[0, 4, 7, 11],
[0, 4, 9],
[0, 5, 7],
]
def apply_confusion_mix(y: np.ndarray, sr: int, strength: float = 0.22) -> np.ndarray:
t = np.linspace(0, len(y) / sr, len(y), endpoint=False)
distractor = 0.0
for f in [220.0, 330.0, 440.0]:
distractor += np.sin(2 * np.pi * f * t + random.uniform(0, np.pi))
distractor /= max(np.max(np.abs(distractor)), 1e-8)
mixed = y + strength * distractor
peak = np.max(np.abs(mixed))
return mixed / peak * 0.5 if peak > 0 else mixed
def apply_humming_style(y: np.ndarray, sr: int) -> np.ndarray:
env = np.linspace(0.7, 1.0, len(y))
hum = y * env
kernel = np.ones(max(5, sr // 400)) / max(5, sr // 400)
hum = np.convolve(hum, kernel, mode="same")
peak = np.max(np.abs(hum))
return hum / peak * 0.5 if peak > 0 else hum
def generate_song(
song_id: str,
base_freq: float,
......@@ -98,7 +120,7 @@ def generate_song(
env = np.exp(-np.linspace(0, 3, seg_len))
note = harmonic_tone(freq, seg_len / sr, sr) * env * 0.3
min_len = min(seg_len, len(note))
y[start_sample:start_sample + min_len] += note[:min_len]
y[start_sample : start_sample + min_len] += note[:min_len]
if with_vocals:
melody = generate_melody(base_freq * 2, note_count=int(duration * 2), note_dur=0.5, sr=sr)
......@@ -130,9 +152,11 @@ def generate_dataset(
songs_dir.mkdir(parents=True, exist_ok=True)
segs_dir.mkdir(parents=True, exist_ok=True)
base_freqs = [130.81, 146.83, 164.81, 174.61, 196.0, 220.0, 246.94,
261.63, 293.66, 329.63, 349.23, 392.0, 440.0, 493.88,
523.25, 587.33, 659.25, 698.46, 783.99, 880.0, 987.77]
base_freqs = [
130.81, 146.83, 164.81, 174.61, 196.0, 220.0, 246.94,
261.63, 293.66, 329.63, 349.23, 392.0, 440.0, 493.88,
523.25, 587.33, 659.25, 698.46, 783.99, 880.0, 987.77,
]
train_meta = []
val_meta = []
......@@ -143,7 +167,7 @@ def generate_dataset(
song_id = f"song_{i:04d}"
base_freq = base_freqs[i % len(base_freqs)]
key_offset = (i // len(base_freqs)) * 2
base_freq *= (2 ** (key_offset / 12))
base_freq *= 2 ** (key_offset / 12)
y, dur = generate_song(song_id, base_freq, duration=song_duration, sr=sr)
song_path = songs_dir / f"{song_id}.wav"
......@@ -155,42 +179,41 @@ def generate_dataset(
start_s = int(offset * sr)
end_s = start_s + int(segment_duration * sr)
seg = y[start_s:end_s]
target_len = int(segment_duration * sr)
if len(seg) < int(segment_duration * sr):
seg = np.pad(seg, (0, int(segment_duration * sr) - len(seg)))
is_augmented = (j >= num_segments_per_song // 2)
if len(seg) < target_len:
seg = np.pad(seg, (0, target_len - len(seg)))
if is_augmented:
variant_type = "clean"
out_seg = seg.copy()
if j >= num_segments_per_song // 2:
from src.utils.augment import AugmentPipeline
aug = AugmentPipeline(sr)
seg_aug = aug(seg.copy())
seg_name = f"{song_id}_seg_{j:02d}_aug.wav"
seg_path = segs_dir / seg_name
sf.write(str(seg_path), seg_aug, sr)
meta_entry = {
"song_id": song_id,
"audio_path": f"segments/{seg_name}",
"duration": segment_duration,
"type": "augmented",
"offset": offset,
}
else:
seg_name = f"{song_id}_seg_{j:02d}.wav"
seg_path = segs_dir / seg_name
sf.write(str(seg_path), seg, sr)
meta_entry = {
"song_id": song_id,
"audio_path": f"segments/{seg_name}",
"duration": segment_duration,
"type": "clean",
"offset": offset,
}
offset_sec = offset
if offset_sec < dur * 0.2:
out_seg = aug(out_seg)
variant_type = "augmented"
if j == num_segments_per_song - 1:
out_seg = apply_confusion_mix(out_seg, sr)
variant_type = "confused"
elif j == num_segments_per_song - 2 and num_segments_per_song >= 4:
out_seg = apply_humming_style(out_seg, sr)
variant_type = "humming_like"
seg_name = f"{song_id}_seg_{j:02d}_{variant_type}.wav" if variant_type != "clean" else f"{song_id}_seg_{j:02d}.wav"
seg_path = segs_dir / seg_name
sf.write(str(seg_path), out_seg, sr)
meta_entry = {
"song_id": song_id,
"audio_path": f"segments/{seg_name}",
"duration": segment_duration,
"type": variant_type,
"offset": offset,
}
if offset < dur * 0.2:
seg_type = "intro"
elif offset_sec > dur * 0.7:
elif offset > dur * 0.7:
seg_type = "outro"
else:
seg_type = "mid"
......@@ -208,6 +231,7 @@ def generate_dataset(
"audio_path": f"songs/{song_id}.wav",
"duration": dur,
"base_freq": base_freq,
"type": "reference",
}
if i < int(num_songs * 0.7):
train_meta.append(song_meta)
......@@ -216,6 +240,10 @@ def generate_dataset(
else:
test_meta.append(song_meta)
catalog_meta = [item for item in train_meta + val_meta + test_meta if item.get("type") == "reference"]
with open(output_dir / "catalog.json", "w") as f:
json.dump(catalog_meta, f, indent=2)
for name, data in [("train", train_meta), ("val", val_meta), ("test", test_meta)]:
with open(output_dir / f"{name}.json", "w") as f:
json.dump(data, f, indent=2)
......@@ -229,6 +257,7 @@ def generate_dataset(
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--output", type=str, default="data/synthetic")
parser.add_argument("--num-songs", type=int, default=50)
......
import torch
import torch.nn.functional as F
import numpy as np
import librosa
import json
from pathlib import Path
from typing import List, Optional, Tuple
import json
import librosa
import numpy as np
import torch
class ECAPAEmbedder:
......@@ -24,11 +24,22 @@ class ECAPAEmbedder:
self.hop_length = hop_length
from src.models.ecapa_tdnn import ECAPA_ACR
self.model = ECAPA_ACR(n_mels=n_mels, embed_dim=192)
state = torch.load(model_path, map_location="cpu", weights_only=True)
if "model_state_dict" in state:
state = state["model_state_dict"]
self.model.load_state_dict(state, strict=False)
cfg = state.get("config", {})
model_cfg = cfg.get("model", {})
self.model = ECAPA_ACR(
n_mels=model_cfg.get("n_mels", n_mels),
embed_dim=model_cfg.get("embed_dim", 192),
channels=model_cfg.get("channels", 512),
se_channels=model_cfg.get("se_channels", 128),
res2net_scale=model_cfg.get("res2net_scale", 8),
num_blocks=model_cfg.get("num_blocks", 3),
num_classes=None,
)
missing = self.model.load_state_dict(state["model_state_dict"], strict=False)
if missing.unexpected_keys:
print(f"[warn] unexpected keys while loading model: {missing.unexpected_keys}")
self.model.to(self.device)
self.model.eval()
......@@ -38,26 +49,37 @@ class ECAPAEmbedder:
def _to_mel(self, y: np.ndarray) -> torch.Tensor:
mel = librosa.feature.melspectrogram(
y=y, sr=self.sr, n_mels=self.n_mels,
n_fft=self.n_fft, hop_length=self.hop_length
y=y,
sr=self.sr,
n_mels=self.n_mels,
n_fft=self.n_fft,
hop_length=self.hop_length,
)
mel = librosa.power_to_db(mel, ref=np.max)
return torch.FloatTensor(mel).unsqueeze(0)
def _windows(self, y: np.ndarray, window_sec: float = 5.0, stride_sec: float = 2.5) -> List[np.ndarray]:
win_len = int(window_sec * self.sr)
stride = int(stride_sec * self.sr)
if len(y) < win_len:
y = np.pad(y, (0, win_len - len(y)))
windows = []
for start in range(0, max(len(y) - win_len + 1, 1), stride):
windows.append(y[start : start + win_len])
return windows or [y[:win_len]]
def extract_embedding(self, audio_path: str) -> np.ndarray:
y = self._load_audio(audio_path)
mel = self._to_mel(y).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
return emb.cpu().numpy().flatten()
return self.extract_embedding_from_wave(y)
def extract_embedding_from_wave(self, y: np.ndarray) -> np.ndarray:
if len(y) < self.sr:
y = np.pad(y, (0, self.sr - len(y)))
mel = self._to_mel(y[:self.sr * 5]).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
return emb.cpu().numpy().flatten()
window_embs = []
for seg in self._windows(y):
mel = self._to_mel(seg).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
window_embs.append(emb.cpu().numpy().flatten())
return np.mean(window_embs, axis=0)
def build_reference_index(
self,
......@@ -75,7 +97,7 @@ class ECAPAEmbedder:
songs_dir = Path(songs_dir)
for item in meta:
if "songs/" not in item.get("audio_path", ""):
if item.get("type") != "reference" and "songs/" not in item.get("audio_path", ""):
continue
audio_path = songs_dir.parent / item["audio_path"]
if not audio_path.exists():
......@@ -83,35 +105,20 @@ class ECAPAEmbedder:
song_id = item["song_id"]
y, _ = librosa.load(str(audio_path), sr=self.sr, mono=True)
win_len = int(window_sec * self.sr)
stride = int(stride_sec * self.sr)
window_embs = []
for start in range(0, len(y) - win_len + 1, stride):
seg = y[start:start + win_len]
for seg in self._windows(y, window_sec=window_sec, stride_sec=stride_sec):
mel = self._to_mel(seg).to(self.device)
with torch.no_grad():
emb, _ = self.model(mel)
window_embs.append(emb.cpu().numpy().flatten())
if window_embs:
song_emb = np.mean(window_embs, axis=0)
all_embs.append(song_emb)
all_embs.append(emb.cpu().numpy().flatten())
all_ids.append(song_id)
all_embs = np.vstack(all_embs)
np.save(f"{output_path}_embs.npy", all_embs)
np.save(f"{output_path}_ids.npy", np.array(all_ids))
print(f"Built reference index: {len(all_ids)} songs, embeddings shape {all_embs.shape}")
print(f"Built reference index: {len(all_ids)} windows, embeddings shape {all_embs.shape}")
return all_embs, all_ids
def search(
self,
query_emb: np.ndarray,
ref_embs: np.ndarray,
ref_ids: List[str],
top_k: int = 10,
) -> List[Tuple[str, float]]:
def search(self, query_emb: np.ndarray, ref_embs: np.ndarray, ref_ids: List[str], top_k: int = 10):
query_norm = query_emb / (np.linalg.norm(query_emb) + 1e-12)
ref_norm = ref_embs / (np.linalg.norm(ref_embs, axis=1, keepdims=True) + 1e-12)
scores = query_norm @ ref_norm.T
......
......@@ -2,12 +2,12 @@
Hybrid ACR Engine: Chromaprint fast pre-filter + ECAPA-TDNN deep re-ranking.
"""
import numpy as np
import librosa
from typing import List, Tuple, Optional, Dict
from pathlib import Path
import json
import time
from typing import Dict, List, Optional
import librosa
import numpy as np
class Candidate:
......@@ -17,9 +17,8 @@ class Candidate:
self.ecapa_score = ecapa_score
self.metadata: Dict = {}
@property
def combined_score(self) -> float:
return 0.3 * self.chroma_score + 0.7 * self.ecapa_score
def combined_score(self, chroma_weight: float, ecapa_weight: float) -> float:
return chroma_weight * self.chroma_score + ecapa_weight * self.ecapa_score
def __repr__(self):
return f"Candidate({self.song_id}, chroma={self.chroma_score:.3f}, ecapa={self.ecapa_score:.3f})"
......@@ -33,9 +32,9 @@ class HybridEngine:
ref_embs: Optional[np.ndarray] = None,
ref_ids: Optional[List[str]] = None,
sr: int = 16000,
chroma_weight: float = 0.3,
ecapa_weight: float = 0.7,
reject_threshold: float = 0.4,
chroma_weight: float = 0.35,
ecapa_weight: float = 0.65,
reject_threshold: float = 0.35,
):
self.chroma = chroma_matcher
self.ecapa = ecapa_embedder
......@@ -45,7 +44,6 @@ class HybridEngine:
self.chroma_weight = chroma_weight
self.ecapa_weight = ecapa_weight
self.reject_threshold = reject_threshold
self.song_metadata: Dict[str, Dict] = {}
def load_metadata(self, metadata_path: str):
......@@ -53,75 +51,83 @@ class HybridEngine:
items = json.load(f)
for item in items:
sid = item["song_id"]
if sid not in self.song_metadata:
base = item.get("base_freq", 0)
existing = self.song_metadata.get(sid, {})
if item.get("type") == "reference" or not existing:
self.song_metadata[sid] = {
"song_id": sid,
"base_freq": base,
"audio_path": item.get("audio_path", ""),
"base_freq": item.get("base_freq", existing.get("base_freq", 0)),
"audio_path": item.get("audio_path", existing.get("audio_path", "")),
"type": item.get("type", existing.get("type", "unknown")),
}
@staticmethod
def _normalize_scores(score_pairs: List[tuple], invert: bool = False) -> Dict[str, float]:
if not score_pairs:
return {}
ids = [sid for sid, _ in score_pairs]
values = np.array([float(score) for _, score in score_pairs], dtype=np.float32)
if invert:
values = -values
if len(values) == 1:
return {ids[0]: 1.0}
vmin = float(values.min())
vmax = float(values.max())
if abs(vmax - vmin) < 1e-8:
return {sid: 1.0 for sid in ids}
norm = (values - vmin) / (vmax - vmin)
return {sid: float(score) for sid, score in zip(ids, norm)}
def recognize(
self,
audio_path: str,
top_n: int = 5,
mode: str = "auto",
) -> List[Dict]:
) -> Dict:
del mode
start = time.time()
y, _ = librosa.load(audio_path, sr=self.sr, mono=True)
chroma_candidates: List[Candidate] = []
if self.chroma is not None:
chroma_matches = self.chroma.match(y, top_k=50)
seen = set()
for song_id, score in chroma_matches:
if song_id not in seen:
seen.add(song_id)
c = Candidate(song_id, chroma_score=score)
chroma_candidates.append(c)
ecapa_candidates: List[Candidate] = []
if self.ecapa is not None and self.ref_embs is not None:
chroma_matches = self.chroma.match(y, top_k=max(50, top_n * 5)) if self.chroma is not None else []
chroma_norm = self._normalize_scores(chroma_matches)
ecapa_matches = []
if self.ecapa is not None and self.ref_embs is not None and self.ref_ids is not None:
query_emb = self.ecapa.extract_embedding_from_wave(y)
ref_norm = self.ref_embs / (
np.linalg.norm(self.ref_embs, axis=1, keepdims=True) + 1e-12
)
ref_norm = self.ref_embs / (np.linalg.norm(self.ref_embs, axis=1, keepdims=True) + 1e-12)
query_norm = query_emb / (np.linalg.norm(query_emb) + 1e-12)
scores = query_norm @ ref_norm.T
top_indices = np.argsort(-scores)[:top_n]
for idx in top_indices:
c = Candidate(self.ref_ids[idx], ecapa_score=float(scores[idx]))
ecapa_candidates.append(c)
combined: Dict[str, Candidate] = {}
for c in chroma_candidates:
combined[c.song_id] = c
for c in ecapa_candidates:
if c.song_id in combined:
combined[c.song_id].ecapa_score = c.ecapa_score
else:
combined[c.song_id] = c
for sid in list(combined.keys()):
combined[sid].metadata = self.song_metadata.get(sid, {})
results = sorted(
combined.values(),
key=lambda c: c.combined_score,
reverse=True,
)[:top_n]
top_indices = np.argsort(-scores)[: max(top_n * 5, 20)]
ecapa_matches = [(self.ref_ids[idx], float(scores[idx])) for idx in top_indices]
ecapa_norm = self._normalize_scores(ecapa_matches)
all_song_ids = set(chroma_norm) | set(ecapa_norm)
combined: List[Candidate] = []
for song_id in all_song_ids:
candidate = Candidate(
song_id=song_id,
chroma_score=chroma_norm.get(song_id, 0.0),
ecapa_score=ecapa_norm.get(song_id, 0.0),
)
candidate.metadata = self.song_metadata.get(song_id, {})
combined.append(candidate)
combined.sort(key=lambda c: c.combined_score(self.chroma_weight, self.ecapa_weight), reverse=True)
results = combined[:top_n]
elapsed = (time.time() - start) * 1000
output = []
for c in results:
output.append({
"song_id": c.song_id,
"confidence": round(c.combined_score, 4),
"chromaprint_score": round(c.chroma_score, 4),
"ecapa_score": round(c.ecapa_score, 4),
"metadata": c.metadata,
})
fused = c.combined_score(self.chroma_weight, self.ecapa_weight)
output.append(
{
"song_id": c.song_id,
"confidence": round(fused, 4),
"chromaprint_score": round(c.chroma_score, 4),
"ecapa_score": round(c.ecapa_score, 4),
"accepted": fused >= self.reject_threshold,
"metadata": c.metadata,
}
)
return {
"candidates": output,
......
# Open Dataset Integration Plan
## Recommended order
1. **FMA small**
- URL: https://github.com/mdeff/fma
- Why: easiest small realistic music subset for retrieval experiments
2. **MTG-Jamendo**
- URL: https://github.com/MTG/mtg-jamendo-dataset
- Why: larger CC-licensed corpus with scriptable upstream tooling
3. **QBSH / humming corpora**
- Why: add after retrieval baseline is stable
## Repo strategy
- Keep external dataset ingestion optional
- Convert external tracks into:
- `catalog.json` for searchable references
- query segment manifests for evaluation
- Start with small local subsets before full-corpus scaling