Commit bd66c06b bd66c06bd7512295f9d9510ddb3ae45a150685c0 by cnb.bofCdSsphPA

Add voice chunking and match-context foundations for ACR service

Constraint: keep humming/recording query support lightweight and compatible with the existing FAISS-first local workflow while production retrieval remains pgvector-oriented
Rejected: delaying service-path scaffolding until full production retrieval is ready | would block validation of voice-to-chunk and context export behavior
Confidence: high
Scope-risk: moderate
Directive: keep  semantics song_id-first and treat resource paths only as supporting evidence/context artifacts
Tested: /usr/local/miniconda3/bin/python -m unittest discover -s acr-engine/tests -v
Not-tested: live FastAPI smoke until uvicorn is available in the current interpreter environment
1 parent 69843933
......@@ -123,3 +123,29 @@ cd acr-engine
- Hybrid 分数归一化后再融合
- full-demo 自动训练
- 后续可接入开源数据集
## 哼唱 / 录音识别接口(voice -> chunk -> song_id)
当前已经补齐了两段基础能力:
- `src/data/voice_chunker.py`:把原始 voice / humming 音频切成可检索 chunk
- `src/utils/context_exporter.py`:把命中的 reference window 导出为上下文 clip(默认 10s)
FastAPI 目标接口:
- `POST /recognize/voice`
输入:
- 外部上传语音/录音文件
输出:
- `song_id`
- `reference_audio_path`
- `best_chunk`
- `context_clip`
- `chunk_results`
说明:
- 该接口代码已接入 `src/service/app.py`
- 当前环境尚缺 `uvicorn`,因此服务 smoke 需要先补运行依赖后再执行。
......
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--chunks-json', required=True)
ap.add_argument('--song-id', required=True)
ap.add_argument('--split', default='test')
ap.add_argument('--output', required=True)
ap.add_argument('--source-dataset', default='humming_real')
args = ap.parse_args()
payload = json.loads(Path(args.chunks_json).read_text(encoding='utf-8'))
rows = []
for chunk in payload.get('chunks', []):
rows.append({
'song_id': args.song_id,
'audio_path': chunk['audio_path'],
'duration': chunk['duration_sec'],
'type': 'humming_real',
'segment_type': 'humming_query',
'offset': chunk['start_sec'],
'source_dataset': args.source_dataset,
'split': args.split,
})
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps({'rows': len(rows), 'output': str(out)}, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import json
import subprocess
import time
from pathlib import Path
from urllib.request import Request, urlopen
BASE = 'http://127.0.0.1:8000'
def post_multipart(url: str, file_path: Path):
boundary = '----acrboundary'
data = file_path.read_bytes()
body = (
f'--{boundary}\r\n'
f'Content-Disposition: form-data; name="file"; filename="{file_path.name}"\r\n'
f'Content-Type: audio/wav\r\n\r\n'
).encode('utf-8') + data + f'\r\n--{boundary}--\r\n'.encode('utf-8')
req = Request(url, data=body, method='POST')
req.add_header('Content-Type', f'multipart/form-data; boundary={boundary}')
with urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode('utf-8'))
def main():
cmd = [
'/usr/local/miniconda3/bin/python', '-m', 'uvicorn', 'src.service.app:app', '--host', '127.0.0.1', '--port', '8000'
]
proc = subprocess.Popen(cmd, cwd='/root/vprecog/acr-engine', stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
query = Path('/workspace/downloads/111/type_7/75cd601b-7604-4b37-8132-cfab39e7c644.mp3')
try:
for _ in range(20):
time.sleep(0.5)
try:
result = post_multipart(BASE + '/recognize/voice', query)
print(json.dumps({
'status': 'ok',
'chunk_count': result.get('chunk_count'),
'top_song_id': result.get('candidates', [{}])[0].get('song_id') if result.get('candidates') else None,
'has_context': bool(result.get('candidates', [{}])[0].get('context_clip')) if result.get('candidates') else False,
}, ensure_ascii=False, indent=2))
return
except Exception:
continue
raise SystemExit('service voice smoke failed: service not ready or endpoint failed')
finally:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
if __name__ == '__main__':
main()
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import List, Dict
import librosa
import numpy as np
import soundfile as sf
def normalize_audio(audio_path: str, sr: int = 16000) -> np.ndarray:
y, _ = librosa.load(audio_path, sr=sr, mono=True)
return y.astype(np.float32)
def detect_voiced_intervals(y: np.ndarray, sr: int, top_db: int = 30, min_voiced_sec: float = 2.0) -> List[tuple[int, int]]:
intervals = librosa.effects.split(y, top_db=top_db)
min_len = int(sr * min_voiced_sec)
kept = []
for start, end in intervals:
if end - start >= min_len:
kept.append((int(start), int(end)))
return kept
def chunk_intervals(intervals: List[tuple[int, int]], sr: int, target_chunk_sec: float = 8.0, stride_sec: float = 4.0) -> List[tuple[int, int, bool]]:
chunk_len = int(sr * target_chunk_sec)
stride = int(sr * stride_sec)
chunks: List[tuple[int, int, bool]] = []
for start, end in intervals:
seg_len = end - start
if seg_len < chunk_len:
chunks.append((start, end, True))
continue
pos = start
while pos + chunk_len <= end:
chunks.append((pos, pos + chunk_len, False))
pos += stride
if pos < end and end - pos >= int(sr * 2.0):
tail_start = max(start, end - chunk_len)
chunks.append((tail_start, end, end - tail_start < chunk_len))
deduped = []
seen = set()
for item in chunks:
key = (item[0], item[1])
if key not in seen:
deduped.append(item)
seen.add(key)
return deduped
def write_chunks(y: np.ndarray, sr: int, chunks: List[tuple[int, int, bool]], output_dir: str, source_audio_path: str) -> List[Dict]:
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
chunk_len = None
results = []
for idx, (start, end, padded) in enumerate(chunks):
clip = y[start:end]
if chunk_len is None:
chunk_len = max(len(clip), 1)
target_len = max(chunk_len, len(clip))
if padded and len(clip) < target_len:
clip = np.pad(clip, (0, target_len - len(clip)))
chunk_path = out_dir / f'chunk_{idx:03d}.wav'
sf.write(str(chunk_path), clip, sr)
results.append({
'chunk_id': f'chunk_{idx:03d}',
'audio_path': str(chunk_path),
'start_sec': round(start / sr, 4),
'end_sec': round(end / sr, 4),
'duration_sec': round(len(clip) / sr, 4),
'padded': padded,
'source_audio_path': source_audio_path,
})
return results
def voice_to_chunks(audio_path: str, output_dir: str, target_chunk_sec: float = 8.0, stride_sec: float = 4.0, min_voiced_sec: float = 2.0, top_db: int = 30, sr: int = 16000) -> List[Dict]:
y = normalize_audio(audio_path, sr=sr)
intervals = detect_voiced_intervals(y, sr=sr, top_db=top_db, min_voiced_sec=min_voiced_sec)
chunks = chunk_intervals(intervals, sr=sr, target_chunk_sec=target_chunk_sec, stride_sec=stride_sec)
return write_chunks(y, sr, chunks, output_dir, source_audio_path=audio_path)
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--input', required=True)
ap.add_argument('--output-dir', required=True)
ap.add_argument('--target-chunk-sec', type=float, default=8.0)
ap.add_argument('--stride-sec', type=float, default=4.0)
ap.add_argument('--min-voiced-sec', type=float, default=2.0)
ap.add_argument('--top-db', type=int, default=30)
ap.add_argument('--sr', type=int, default=16000)
ap.add_argument('--output-json', default='chunks.json')
args = ap.parse_args()
chunks = voice_to_chunks(
audio_path=args.input,
output_dir=args.output_dir,
target_chunk_sec=args.target_chunk_sec,
stride_sec=args.stride_sec,
min_voiced_sec=args.min_voiced_sec,
top_db=args.top_db,
sr=args.sr,
)
out_json = Path(args.output_dir) / args.output_json
out_json.write_text(json.dumps({'chunks': chunks}, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps({'chunks': chunks}, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
from __future__ import annotations
from pathlib import Path
from tempfile import TemporaryDirectory
from threading import Lock
from typing import Optional
import numpy as np
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, File, HTTPException, UploadFile
from pydantic import BaseModel
from src.data.voice_chunker import voice_to_chunks
from src.engines.chromaprint_matcher import ChromaprintMatcher
from src.engines.ecapa_embedder import ECAPAEmbedder
from src.engines.hybrid_engine import HybridEngine
from src.service.settings import ServiceSettings
from src.utils.context_exporter import export_match_context, find_best_matching_window
class RecognizeRequest(BaseModel):
......@@ -30,7 +33,7 @@ class BuildIndexRequest(BaseModel):
device: Optional[str] = None
app = FastAPI(title="ACR Service", version="0.3.0")
app = FastAPI(title='ACR Service', version='0.4.0')
settings = ServiceSettings()
_engine_cache: dict[tuple[str, str, str, str], HybridEngine] = {}
_cache_lock = Lock()
......@@ -38,52 +41,52 @@ _cache_lock = Lock()
def _resolve(req_data_dir=None, req_model_path=None, req_index_prefix=None, req_device=None):
return {
"data_dir": req_data_dir or settings.data_dir,
"model_path": req_model_path or settings.model_path,
"index_prefix": req_index_prefix or settings.index_prefix,
"device": req_device or settings.device,
'data_dir': req_data_dir or settings.data_dir,
'model_path': req_model_path or settings.model_path,
'index_prefix': req_index_prefix or settings.index_prefix,
'device': req_device or settings.device,
}
def _readiness_snapshot(data_dir: str, model_path: str, index_prefix: str) -> dict:
chroma_path = str(Path(index_prefix).parent / "chromaprint.pkl")
embs_path = f"{index_prefix}_embs.npy"
ids_path = f"{index_prefix}_ids.npy"
manifest_candidates = [str((Path(data_dir) / split).resolve()) for split in ["catalog.json", "train.json", "val.json", "test.json"] if (Path(data_dir) / split).exists()]
chroma_path = str(Path(index_prefix).parent / 'chromaprint.pkl')
embs_path = f'{index_prefix}_embs.npy'
ids_path = f'{index_prefix}_ids.npy'
manifest_candidates = [
str((Path(data_dir) / split).resolve())
for split in ['catalog.json', 'train.json', 'val.json', 'test.json']
if (Path(data_dir) / split).exists()
]
files = {
"data_dir": {"path": str(Path(data_dir).resolve()), "exists": Path(data_dir).exists()},
"model": {"path": str(Path(model_path).resolve()), "exists": Path(model_path).exists()},
"chromaprint_index": {"path": str(Path(chroma_path).resolve()), "exists": Path(chroma_path).exists()},
"embedding_index": {"path": str(Path(embs_path).resolve()), "exists": Path(embs_path).exists()},
"id_index": {"path": str(Path(ids_path).resolve()), "exists": Path(ids_path).exists()},
}
return {
"ready": all(item["exists"] for item in files.values()),
"files": files,
"manifests": manifest_candidates,
'data_dir': {'path': str(Path(data_dir).resolve()), 'exists': Path(data_dir).exists()},
'model': {'path': str(Path(model_path).resolve()), 'exists': Path(model_path).exists()},
'chromaprint_index': {'path': str(Path(chroma_path).resolve()), 'exists': Path(chroma_path).exists()},
'embedding_index': {'path': str(Path(embs_path).resolve()), 'exists': Path(embs_path).exists()},
'id_index': {'path': str(Path(ids_path).resolve()), 'exists': Path(ids_path).exists()},
}
return {'ready': all(item['exists'] for item in files.values()), 'files': files, 'manifests': manifest_candidates}
def _load_engine_uncached(data_dir: str, model_path: str, index_prefix: str, device: str) -> HybridEngine:
matcher = ChromaprintMatcher()
chroma_path = str(Path(index_prefix).parent / "chromaprint.pkl")
chroma_path = str(Path(index_prefix).parent / 'chromaprint.pkl')
if not Path(chroma_path).exists():
raise HTTPException(status_code=400, detail=f"Missing chromaprint index: {chroma_path}")
raise HTTPException(status_code=400, detail=f'Missing chromaprint index: {chroma_path}')
matcher.load(chroma_path)
if not Path(model_path).exists():
raise HTTPException(status_code=400, detail=f"Missing model: {model_path}")
raise HTTPException(status_code=400, detail=f'Missing model: {model_path}')
embedder = ECAPAEmbedder(model_path=model_path, device=device)
embs_path = f"{index_prefix}_embs.npy"
ids_path = f"{index_prefix}_ids.npy"
embs_path = f'{index_prefix}_embs.npy'
ids_path = f'{index_prefix}_ids.npy'
if not Path(embs_path).exists() or not Path(ids_path).exists():
raise HTTPException(status_code=400, detail="Missing embedding index files")
raise HTTPException(status_code=400, detail='Missing embedding index files')
ref_embs = np.load(embs_path)
ref_ids = np.load(ids_path, allow_pickle=True).tolist()
engine = HybridEngine(matcher, embedder, ref_embs, ref_ids)
for split in ["catalog.json", "train.json", "val.json", "test.json"]:
for split in ['catalog.json', 'train.json', 'val.json', 'test.json']:
p = Path(data_dir) / split
if p.exists():
engine.load_metadata(str(p))
......@@ -105,70 +108,168 @@ def _load_engine(data_dir: str, model_path: str, index_prefix: str, device: str)
def _cache_stats() -> dict:
with _cache_lock:
keys = list(_engine_cache.keys())
return {"engine_cache_size": len(keys), "cache_keys": keys}
@app.get("/health")
return {'engine_cache_size': len(keys), 'cache_keys': keys}
def _aggregate_chunk_results(chunk_results: list[dict], top_n: int) -> list[dict]:
by_song: dict[str, dict] = {}
for chunk in chunk_results:
for cand in chunk.get('candidates', []):
song_id = cand['song_id']
entry = by_song.setdefault(song_id, {
'song_id': song_id,
'best_confidence': -1.0,
'match_count': 0,
'best_chunk': None,
'best_candidate': None,
})
entry['match_count'] += 1
if cand['confidence'] > entry['best_confidence']:
entry['best_confidence'] = cand['confidence']
entry['best_chunk'] = chunk
entry['best_candidate'] = cand
ranked = []
for entry in by_song.values():
combined = float(entry['best_confidence']) + 0.05 * float(entry['match_count'])
ranked.append({
'song_id': entry['song_id'],
'combined_confidence': round(combined, 4),
'best_confidence': round(float(entry['best_confidence']), 4),
'match_count': entry['match_count'],
'best_chunk': entry['best_chunk'],
'best_candidate': entry['best_candidate'],
})
ranked.sort(key=lambda x: x['combined_confidence'], reverse=True)
return ranked[:top_n]
def _reference_audio_for_song(engine: HybridEngine, song_id: str) -> str | None:
return engine.song_audio_paths.get(song_id)
@app.get('/health')
def health():
resolved = _resolve()
readiness = _readiness_snapshot(resolved["data_dir"], resolved["model_path"], resolved["index_prefix"])
return {
"status": "ok",
"service": "acr",
"version": "0.3.0",
"ready": readiness["ready"],
}
readiness = _readiness_snapshot(resolved['data_dir'], resolved['model_path'], resolved['index_prefix'])
return {'status': 'ok', 'service': 'acr', 'version': '0.4.0', 'ready': readiness['ready']}
@app.get("/ready")
@app.get('/ready')
def ready():
resolved = _resolve()
readiness = _readiness_snapshot(resolved["data_dir"], resolved["model_path"], resolved["index_prefix"])
return {
"service": "acr",
"version": "0.3.0",
**readiness,
**_cache_stats(),
}
readiness = _readiness_snapshot(resolved['data_dir'], resolved['model_path'], resolved['index_prefix'])
return {'service': 'acr', 'version': '0.4.0', **readiness, **_cache_stats()}
@app.get("/config")
@app.get('/config')
def config():
return settings.model_dump()
@app.get("/cache")
@app.get('/cache')
def cache_status():
return _cache_stats()
@app.post("/recognize")
@app.post('/recognize')
def recognize(req: RecognizeRequest):
resolved = _resolve(req.data_dir, req.model_path, req.index_prefix, req.device)
if not Path(req.query_path).exists():
raise HTTPException(status_code=400, detail=f"Missing query file: {req.query_path}")
raise HTTPException(status_code=400, detail=f'Missing query file: {req.query_path}')
engine, cache_hit = _load_engine(**resolved)
result = engine.recognize(req.query_path, top_n=req.top_n)
return {
"cache_hit": cache_hit,
"resolved": resolved,
"result": result,
}
return {'cache_hit': cache_hit, 'resolved': resolved, 'result': result}
@app.post("/index/build")
@app.post('/index/build')
def build_index(req: BuildIndexRequest):
from run_demo import build_chroma_index, build_embedding_index
resolved = _resolve(req.data_dir, req.model_path, None, req.device)
data_dir = Path(resolved["data_dir"])
data_dir = Path(resolved['data_dir'])
out_dir = Path(req.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
build_chroma_index(data_dir, out_dir)
_, ref_embs, ref_ids = build_embedding_index(data_dir, Path(resolved["model_path"]), out_dir / "reference", resolved["device"])
_, ref_embs, ref_ids = build_embedding_index(data_dir, Path(resolved['model_path']), out_dir / 'reference', resolved['device'])
return {
"status": "ok",
"num_reference_windows": len(ref_ids),
"embedding_dim": int(ref_embs.shape[1]) if len(ref_embs.shape) > 1 else 0,
"output_dir": str(out_dir.resolve()),
'status': 'ok',
'num_reference_windows': len(ref_ids),
'embedding_dim': int(ref_embs.shape[1]) if len(ref_embs.shape) > 1 else 0,
'output_dir': str(out_dir.resolve()),
}
@app.post('/recognize/voice')
async def recognize_voice(
file: UploadFile = File(...),
top_n: int = 5,
data_dir: Optional[str] = None,
model_path: Optional[str] = None,
index_prefix: Optional[str] = None,
device: Optional[str] = None,
context_sec: float = 10.0,
output_format: str = 'mp3',
):
resolved = _resolve(data_dir, model_path, index_prefix, device)
engine, cache_hit = _load_engine(**resolved)
with TemporaryDirectory(prefix='acr_voice_') as tmpdir:
tmp = Path(tmpdir)
suffix = Path(file.filename or 'upload.wav').suffix or '.wav'
raw_path = tmp / f'input{suffix}'
raw_path.write_bytes(await file.read())
chunk_dir = tmp / 'chunks'
chunks = voice_to_chunks(str(raw_path), str(chunk_dir))
if not chunks:
raise HTTPException(status_code=400, detail='No voiced chunks detected from input audio')
chunk_results = []
for chunk in chunks:
result = engine.recognize(chunk['audio_path'], top_n=top_n)
chunk_results.append({
'chunk': chunk,
'candidates': result['candidates'],
'processing_time_ms': result['processing_time_ms'],
})
ranked = _aggregate_chunk_results(chunk_results, top_n=top_n)
response_candidates = []
for item in ranked:
song_id = item['song_id']
ref_audio = _reference_audio_for_song(engine, song_id)
context_info = None
if ref_audio and item['best_chunk'] is not None:
match = find_best_matching_window(
query_audio_path=item['best_chunk']['chunk']['audio_path'],
reference_audio_path=ref_audio,
)
out_path = tmp / 'contexts' / f'{song_id}.{output_format}'
context_info = export_match_context(
audio_path=ref_audio,
window_start_sec=match['window_start_sec'],
window_end_sec=match['window_end_sec'],
output_path=str(out_path),
context_sec=context_sec,
output_format=output_format,
)
context_info['match'] = match
response_candidates.append({
'song_id': song_id,
'combined_confidence': item['combined_confidence'],
'best_confidence': item['best_confidence'],
'match_count': item['match_count'],
'reference_audio_path': ref_audio,
'best_candidate': item['best_candidate'],
'best_chunk': item['best_chunk']['chunk'] if item['best_chunk'] else None,
'context_clip': context_info,
})
return {
'cache_hit': cache_hit,
'resolved': resolved,
'query_audio_filename': file.filename,
'chunk_count': len(chunks),
'chunk_results': chunk_results,
'candidates': response_candidates,
}
......
from __future__ import annotations
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, Tuple
import librosa
import numpy as np
import soundfile as sf
def load_audio(audio_path: str, sr: int = 16000) -> np.ndarray:
y, _ = librosa.load(audio_path, sr=sr, mono=True)
return y.astype(np.float32)
def chroma_embedding(y: np.ndarray, sr: int) -> np.ndarray:
chroma = librosa.feature.chroma_stft(y=y, sr=sr, n_chroma=12)
feat = np.concatenate([chroma.mean(axis=1), chroma.std(axis=1)], axis=0).astype(np.float32)
norm = np.linalg.norm(feat)
return feat / norm if norm > 0 else feat
def find_best_matching_window(
query_audio_path: str,
reference_audio_path: str,
sr: int = 16000,
stride_sec: float = 1.0,
) -> Dict:
query_y = load_audio(query_audio_path, sr=sr)
ref_y = load_audio(reference_audio_path, sr=sr)
query_len = len(query_y)
if query_len == 0:
raise ValueError('Empty query audio')
if len(ref_y) < query_len:
ref_y = np.pad(ref_y, (0, query_len - len(ref_y)))
query_feat = chroma_embedding(query_y, sr)
stride = max(1, int(sr * stride_sec))
best_score = -1.0
best_start = 0
for start in range(0, max(len(ref_y) - query_len + 1, 1), stride):
window = ref_y[start:start + query_len]
if len(window) < query_len:
window = np.pad(window, (0, query_len - len(window)))
score = float(np.dot(query_feat, chroma_embedding(window, sr)))
if score > best_score:
best_score = score
best_start = start
return {
'window_start_sec': round(best_start / sr, 4),
'window_end_sec': round((best_start + query_len) / sr, 4),
'window_score': round(best_score, 6),
'query_duration_sec': round(query_len / sr, 4),
}
def export_match_context(
audio_path: str,
window_start_sec: float,
window_end_sec: float,
output_path: str,
context_sec: float = 10.0,
output_format: str = 'mp3',
sr: int = 16000,
) -> Dict:
y = load_audio(audio_path, sr=sr)
center = (window_start_sec + window_end_sec) / 2.0
half = context_sec / 2.0
clip_start_sec = max(0.0, center - half)
clip_end_sec = min(len(y) / sr, center + half)
start = int(clip_start_sec * sr)
end = max(start + 1, int(clip_end_sec * sr))
clip = y[start:end]
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
actual_format = output_format
if output_format == 'mp3' and shutil.which('ffmpeg'):
with tempfile.TemporaryDirectory() as tmp:
wav_path = Path(tmp) / 'context.wav'
sf.write(wav_path, clip, sr)
cmd = [shutil.which('ffmpeg') or 'ffmpeg', '-y', '-i', str(wav_path), str(output)]
subprocess.run(cmd, check=True, capture_output=True)
else:
if output_format == 'mp3':
actual_format = 'wav'
output = output.with_suffix('.wav')
sf.write(output, clip, sr)
return {
'source_audio_path': audio_path,
'clip_start_sec': round(clip_start_sec, 4),
'clip_end_sec': round(clip_end_sec, 4),
'duration_sec': round((end - start) / sr, 4),
'output_path': str(output),
'output_format': actual_format,
}
from pathlib import Path
import sys
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
import tempfile
import unittest
from pathlib import Path
import test_bootstrap
import numpy as np
import soundfile as sf
from src.utils.context_exporter import export_match_context, find_best_matching_window
class ContextExporterTests(unittest.TestCase):
def test_find_best_matching_window_returns_valid_range(self):
sr = 16000
with tempfile.TemporaryDirectory() as tmp:
query = Path(tmp) / 'query.wav'
ref = Path(tmp) / 'ref.wav'
tone = 0.2 * np.sin(2 * np.pi * 440 * np.linspace(0, 3, sr * 3, endpoint=False)).astype(np.float32)
ref_y = np.concatenate([np.zeros(sr), tone, np.zeros(sr)]).astype(np.float32)
sf.write(query, tone, sr)
sf.write(ref, ref_y, sr)
match = find_best_matching_window(str(query), str(ref), sr=sr, stride_sec=0.5)
self.assertGreaterEqual(match['window_start_sec'], 0.0)
self.assertGreater(match['window_end_sec'], match['window_start_sec'])
def test_export_match_context_writes_audio(self):
sr = 16000
with tempfile.TemporaryDirectory() as tmp:
ref = Path(tmp) / 'ref.wav'
out = Path(tmp) / 'context.wav'
y = 0.2 * np.sin(2 * np.pi * 440 * np.linspace(0, 12, sr * 12, endpoint=False)).astype(np.float32)
sf.write(ref, y, sr)
info = export_match_context(str(ref), 4.0, 7.0, str(out), context_sec=10.0, output_format='wav', sr=sr)
self.assertTrue(Path(info['output_path']).exists())
self.assertEqual(info['output_format'], 'wav')
if __name__ == '__main__':
unittest.main()
......@@ -2,6 +2,8 @@ import tempfile
import unittest
from pathlib import Path
import test_bootstrap
from scripts.local_music20_acr import collect_pairs, first_file
......
import tempfile
import unittest
from pathlib import Path
import test_bootstrap
import numpy as np
import soundfile as sf
from src.data.voice_chunker import detect_voiced_intervals, chunk_intervals, voice_to_chunks
class VoiceChunkerTests(unittest.TestCase):
def test_detect_voiced_intervals_filters_short_segments(self):
sr = 16000
y = np.concatenate([
np.zeros(sr),
0.2 * np.sin(2 * np.pi * 440 * np.linspace(0, 3, sr * 3, endpoint=False)),
np.zeros(sr // 2),
]).astype(np.float32)
intervals = detect_voiced_intervals(y, sr=sr, top_db=30, min_voiced_sec=2.0)
self.assertEqual(len(intervals), 1)
def test_chunk_intervals_handles_short_and_long_regions(self):
sr = 16000
chunks = chunk_intervals([(0, sr * 3), (sr * 5, sr * 15)], sr=sr, target_chunk_sec=8.0, stride_sec=4.0)
self.assertTrue(any(padded for _, _, padded in chunks))
self.assertGreaterEqual(len(chunks), 2)
def test_voice_to_chunks_writes_chunk_files(self):
sr = 16000
with tempfile.TemporaryDirectory() as tmp:
src = Path(tmp) / 'hum.wav'
out = Path(tmp) / 'chunks'
y = np.concatenate([
np.zeros(sr),
0.2 * np.sin(2 * np.pi * 330 * np.linspace(0, 4, sr * 4, endpoint=False)),
np.zeros(sr),
]).astype(np.float32)
sf.write(src, y, sr)
chunks = voice_to_chunks(str(src), str(out), target_chunk_sec=3.0, stride_sec=2.0, min_voiced_sec=2.0, sr=sr)
self.assertGreaterEqual(len(chunks), 1)
self.assertTrue(Path(chunks[0]['audio_path']).exists())
if __name__ == '__main__':
unittest.main()
## 2026-06-03 voice-to-chunk and context export foundation
- 新增 `acr-engine/src/data/voice_chunker.py`,支持 voice / humming 音频切 chunk。
- 新增 `acr-engine/scripts/build_humming_eval_manifest.py`,支持从 chunk 结果生成 `humming_real` 评测 manifest。
- 新增 `acr-engine/src/utils/context_exporter.py`,支持把命中的 reference window 导出成上下文 clip。
- 扩展 `acr-engine/src/service/app.py`,加入 `POST /recognize/voice` 接口雏形。
- 文档入口 `docs/README.md` 已简化为最新架构与最短阅读顺序。
Fresh evidence:
- `/usr/local/miniconda3/bin/python -m unittest discover -s acr-engine/tests -v` => `Ran 7 tests, OK`
- 当前环境缺 `uvicorn`,服务 smoke 尚不能直接启动,需要先补运行依赖。
## 2026-06-03 20-song local ACR workflow in acr-engine
- 新增 `acr-engine/scripts/local_music20_acr.py`,在 `acr-engine` 内提供基于 `/workspace/downloads` 的本地 20 首歌 ACR 小样本流程。
......
# ACR Docs Overview
> 更新:2026-06-02
> 保留最新架构与最短落地入口。历史细节仍在仓库中,但默认阅读只保留下面 6 份主文档。
## 一页结论
## 最短阅读顺序
当前文档入口过多,现统一浓缩为 **5 组主文档**
1. [session-handoff.md](./session-handoff.md)
2. [CHANGELOG.md](./CHANGELOG.md)
3. [acr-architecture.md](./acr-architecture.md)
4. [dataset-spec.md](./dataset-spec.md)
5. [training-data-and-pgvector-guide.md](./training-data-and-pgvector-guide.md)
6. [runbook.md](./runbook.md)
1. **项目与架构**
2. **数据与评测**
3. **业务数据接入**
4. **服务与工程**
5. **研究与路线**
## 当前推荐只看这几类
建议先只读这 5 组,不必一次看完全部细节文档。
### 1. 项目架构
- [acr-architecture.md](./acr-architecture.md)
- [session-handoff.md](./session-handoff.md)
---
### 2. 数据与评测
- [dataset-spec.md](./dataset-spec.md)
- [training-data-and-pgvector-guide.md](./training-data-and-pgvector-guide.md)
- [open-dataset-workflow.md](./open-dataset-workflow.md)
## 1. 文档导航图
### 3. 运行与服务
- [runbook.md](./runbook.md)
- [service-api.md](./service-api.md)
```mermaid
flowchart TD
A[Docs Entry] --> B[Project Responsibility]
A --> C[Architecture]
A --> D[Dataset Spec]
A --> E[Business Export Chain]
A --> F[Service API]
A --> G[Industrial Benchmark]
A --> H[Industrialization Roadmap]
A --> I[Licensing & Sources]
A --> J[SOTA Research]
### 4. 最新 hard-case 结论
- [acr-hard-case-analysis.md](../acr-engine/../docs/acr-hard-case-analysis.md)
B --> C
C --> D
D --> E
E --> F
G --> H
I --> H
J --> H
```
## 当前架构一句话
---
## 2. 浓缩阅读入口
| 读者角色 | 建议先读 |
|---|---|
| 新成员 | [项目与架构](./project-responsibility-map.md), [系统架构](./acr-architecture.md) |
| 算法/模型 | [数据规范](./dataset-spec.md), [SOTA 调研](./sota-research-2026.md) |
| 平台/后端 | [服务接口](./service-api.md), [评测规范](./industrial-benchmark-spec.md) |
| 数据接入 | [开放数据工作流](./open-dataset-workflow.md), [业务导出 Cookbook](./business-export-cookbook.md) |
| 负责人/规划 | [工业化路线](./industrialization-roadmap.md), [交接文档](./session-handoff.md) |
---
## 2.5 新 session 最短阅读顺序
如果是新 session 接手,建议直接按这个顺序:
1. [持续开发交接文档](./session-handoff.md)
2. [更新记录](./CHANGELOG.md)
3. [业务导出 Cookbook](./business-export-cookbook.md)[开放数据工作流](./open-dataset-workflow.md)
选择规则:
- 做你们自己的业务素材接入:先读 `business-export-cookbook.md`
- 做 FMA / MTG-Jamendo 这类开放数据:先读 `open-dataset-workflow.md`
## 2.6 新 session 最短可跑命令
如果你只是想先确认“业务导出链还能不能跑”,直接执行:
```bash
cd /workspace/acr-engine
/usr/local/miniconda3/bin/python scripts/business_export_offline_smoke.py \
--output-root /tmp/business_export_offline_smoke
```
预期结果:
- 生成业务导出样例
- 生成 manifest-ready JSONL
- 生成项目 `catalog/train/test/val`
- `train.py --dry-run` 通过
## 3. 主文档分组
### A. 项目与架构
- [项目职责图](./project-responsibility-map.md)
- [系统架构](./acr-architecture.md)
### B. 数据与评测
- [数据规范](./dataset-spec.md)
- [开放数据工作流](./open-dataset-workflow.md)
- [训练数据与 pgvector 指南](./training-data-and-pgvector-guide.md)
- [生产 Encoder 冻结与 Embedding 策略答疑](./production-encoder-freeze-and-embedding-strategy.md)
- [数据来源与接入](./dataset-sources-and-licensing.md)
- [工业评测规范](./industrial-benchmark-spec.md)
快速落地入口:
- [开放数据工作流](./open-dataset-workflow.md)
- [本地开放数据落点目录](../acr-engine/data/raw/README.md)
- 离线 smoke 已验证:`acr-engine/scripts/business_export_offline_smoke.py`
### C. 业务数据接入
- [业务素材类型与 Bucket 指南](./business-music-bucket-and-type-guide.md)
- [业务 Manifest 与 Type-Role 规范](./business-manifest-and-type-role-spec.md)
- [业务导出 Cookbook](./business-export-cookbook.md)
- [业务数据到项目 Manifest 适配](./business-project-manifest-adapter.md)
业务数据最短链:
1. [业务导出 Cookbook](./business-export-cookbook.md)
2. `acr-engine/scripts/normalize_business_export.py`
3. `acr-engine/scripts/split_business_manifest_ready.py`
4. `acr-engine/scripts/build_business_project_manifests.py`
5. `acr-engine/scripts/business_export_offline_smoke.py`
### D. 服务与工程
- [服务接口](./service-api.md)
- [持续开发交接文档](./session-handoff.md)
- [当前能力地图](./current-capability-map.md)
- [首次启动检查清单](../acr-engine/FIRST_RUN_CHECKLIST.md)
- [更新记录](./CHANGELOG.md)
### E. 研究与路线
- [工业化路线](./industrialization-roadmap.md)
- [SOTA 调研](./sota-research-2026.md)
- [引用来源总表](./references-and-sources.md)
---
## 4. 文字说明
现在开始减少“同层重复文档”的阅读成本:
- 先从入口页做分组
- 再在每组里保留 1~3 份主文档
- 次级细节尽量放到组内,而不是继续横向扩张文件数量
---
## 5. 细节附录
建议使用方式:
- 想了解项目先读 [项目职责图](./project-responsibility-map.md) + [系统架构](./acr-architecture.md)
- 想训练/评测先读 [数据规范](./dataset-spec.md)
- 想接开放数据先读 [数据来源与接入](./dataset-sources-and-licensing.md)
- 想看历史演进再读 [更新记录](./CHANGELOG.md)
## Sources
- This file is an internal documentation navigation artifact for the current repo state.
- `/workspace`:样本与素材来源
- `acr-engine/`:训练、索引、识别、服务主工程
- 本地小样本验证:优先 **FAISS**
- 生产向量检索:统一 **pgvector**
......