extractor.py 4.2 KB
"""Chromagram 特征提取。

流程:
1. 音频解码:ffmpeg pipe 输出 22050Hz / Mono / f32le,直接读入内存,无临时文件
2. librosa.feature.chroma_cens() 提取 12×T Chromagram(CENS,对速度/音色鲁棒)
3. 主音对齐:将能量最大的音级滚至第 0 行,实现转调不变性
4. scipy.signal.resample(chroma, 128, axis=1) 时间归一化到 12×128
5. .flatten() 展开为 1536 维向量
"""

import logging
import os
import subprocess

import librosa
import numpy as np
from scipy.signal import resample

logger = logging.getLogger(__name__)

# 目标采样率和时间帧数
TARGET_SR = 22050
TARGET_FRAMES = 128
VECTOR_DIM = 12 * TARGET_FRAMES  # 1536


def _load_audio_via_pipe(audio_path: str) -> np.ndarray:
    """使用 ffmpeg pipe 将音频解码为 22050Hz mono float32,不落临时文件到磁盘。"""
    cmd = [
        "ffmpeg", "-y",
        "-i", audio_path,
        "-ar", str(TARGET_SR),
        "-ac", "1",
        "-f", "f32le",
        "pipe:1",
    ]
    result = subprocess.run(cmd, capture_output=True)
    if result.returncode != 0:
        raise RuntimeError(f"ffmpeg 解码失败: {result.stderr.decode(errors='replace')}")
    return np.frombuffer(result.stdout, dtype=np.float32)


def extract_chroma_feature_from_samples(
    samples: np.ndarray,
    sr: int,
    hop_length: int = 512,
    win_len_smooth: int = 41,
) -> np.ndarray:
    """从已加载的音频样本提取 1536 维 Chromagram 特征向量。

    若 sr 不等于 TARGET_SR,先用 librosa.resample 在内存中降采样,
    避免重新走 ffmpeg 流程。

    Args:
        samples: 单声道音频样本(任意采样率)。
        sr: samples 对应的采样率。
        hop_length: CQT hop 大小,增大可成比例降低计算量,不影响最终 128 帧精度。
        win_len_smooth: CENS 平滑窗口帧数,应随 hop_length 等比缩小以保持相同的时间覆盖。

    Returns:
        shape 为 (1536,) 的 numpy 数组。
    """
    y = samples if sr == TARGET_SR else librosa.resample(samples, orig_sr=sr, target_sr=TARGET_SR)

    # 提取 CENS Chromagram (12×T)
    chroma = librosa.feature.chroma_cens(y=y, sr=TARGET_SR, hop_length=hop_length, win_len_smooth=win_len_smooth)

    # 主音对齐
    tonic = int(np.argmax(chroma.sum(axis=1)))
    if tonic != 0:
        chroma = np.roll(chroma, -tonic, axis=0)

    # 时间归一化到 12×128
    if chroma.shape[1] != TARGET_FRAMES:
        chroma = resample(chroma, TARGET_FRAMES, axis=1)

    feature = chroma.flatten().astype(np.float32)
    assert feature.shape == (VECTOR_DIM,), (
        f"特征维度错误: 期望 {VECTOR_DIM}, 实际 {feature.shape}"
    )
    return feature


def extract_chroma_matrix_from_samples(
    samples: np.ndarray,
    sr: int,
    hop_length: int = 512,
    win_len_smooth: int = 41,
) -> np.ndarray:
    """从已加载的音频样本提取 12×128 Chromagram 矩阵(供 DTW 精排使用)。"""
    return extract_chroma_feature_from_samples(samples, sr, hop_length=hop_length, win_len_smooth=win_len_smooth).reshape(12, TARGET_FRAMES)


def extract_chroma_feature(audio_path: str, hop_length: int = 512, win_len_smooth: int = 41) -> np.ndarray:
    """从音频文件提取 1536 维 Chromagram 特征向量。

    Args:
        audio_path: 音频文件路径。
        hop_length: CQT hop 大小。
        win_len_smooth: CENS 平滑窗口帧数。

    Returns:
        shape 为 (1536,) 的 numpy 数组。

    Raises:
        FileNotFoundError: 音频文件不存在。
        RuntimeError: ffmpeg 解码失败。
    """
    if not os.path.isfile(audio_path):
        raise FileNotFoundError(f"音频文件不存在: {audio_path}")

    y = _load_audio_via_pipe(audio_path)
    return extract_chroma_feature_from_samples(y, TARGET_SR, hop_length=hop_length, win_len_smooth=win_len_smooth)


def extract_chroma_matrix(audio_path: str, hop_length: int = 512, win_len_smooth: int = 41) -> np.ndarray:
    """从音频文件提取 12×128 Chromagram 矩阵(未展平,供 DTW 精排使用)。

    Returns:
        shape 为 (12, 128) 的 numpy 数组,已做主音对齐。
    """
    return extract_chroma_feature(audio_path, hop_length=hop_length, win_len_smooth=win_len_smooth).reshape(12, TARGET_FRAMES)