extractor.py
4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Chromagram 特征提取。
流程:
1. 音频解码:ffmpeg pipe 输出 22050Hz / Mono / f32le,直接读入内存,无临时文件
2. librosa.feature.chroma_cens() 提取 12×T Chromagram(CENS,对速度/音色鲁棒)
3. 主音对齐:将能量最大的音级滚至第 0 行,实现转调不变性
4. scipy.signal.resample(chroma, 128, axis=1) 时间归一化到 12×128
5. .flatten() 展开为 1536 维向量
"""
import logging
import os
import subprocess
import librosa
import numpy as np
from scipy.signal import resample
logger = logging.getLogger(__name__)
# 目标采样率和时间帧数
TARGET_SR = 22050
TARGET_FRAMES = 128
VECTOR_DIM = 12 * TARGET_FRAMES # 1536
def _load_audio_via_pipe(audio_path: str) -> np.ndarray:
"""使用 ffmpeg pipe 将音频解码为 22050Hz mono float32,不落临时文件到磁盘。"""
cmd = [
"ffmpeg", "-y",
"-i", audio_path,
"-ar", str(TARGET_SR),
"-ac", "1",
"-f", "f32le",
"pipe:1",
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg 解码失败: {result.stderr.decode(errors='replace')}")
return np.frombuffer(result.stdout, dtype=np.float32)
def extract_chroma_feature_from_samples(
samples: np.ndarray,
sr: int,
hop_length: int = 512,
win_len_smooth: int = 41,
) -> np.ndarray:
"""从已加载的音频样本提取 1536 维 Chromagram 特征向量。
若 sr 不等于 TARGET_SR,先用 librosa.resample 在内存中降采样,
避免重新走 ffmpeg 流程。
Args:
samples: 单声道音频样本(任意采样率)。
sr: samples 对应的采样率。
hop_length: CQT hop 大小,增大可成比例降低计算量,不影响最终 128 帧精度。
win_len_smooth: CENS 平滑窗口帧数,应随 hop_length 等比缩小以保持相同的时间覆盖。
Returns:
shape 为 (1536,) 的 numpy 数组。
"""
y = samples if sr == TARGET_SR else librosa.resample(samples, orig_sr=sr, target_sr=TARGET_SR)
# 提取 CENS Chromagram (12×T)
chroma = librosa.feature.chroma_cens(y=y, sr=TARGET_SR, hop_length=hop_length, win_len_smooth=win_len_smooth)
# 主音对齐
tonic = int(np.argmax(chroma.sum(axis=1)))
if tonic != 0:
chroma = np.roll(chroma, -tonic, axis=0)
# 时间归一化到 12×128
if chroma.shape[1] != TARGET_FRAMES:
chroma = resample(chroma, TARGET_FRAMES, axis=1)
feature = chroma.flatten().astype(np.float32)
assert feature.shape == (VECTOR_DIM,), (
f"特征维度错误: 期望 {VECTOR_DIM}, 实际 {feature.shape}"
)
return feature
def extract_chroma_matrix_from_samples(
samples: np.ndarray,
sr: int,
hop_length: int = 512,
win_len_smooth: int = 41,
) -> np.ndarray:
"""从已加载的音频样本提取 12×128 Chromagram 矩阵(供 DTW 精排使用)。"""
return extract_chroma_feature_from_samples(samples, sr, hop_length=hop_length, win_len_smooth=win_len_smooth).reshape(12, TARGET_FRAMES)
def extract_chroma_feature(audio_path: str, hop_length: int = 512, win_len_smooth: int = 41) -> np.ndarray:
"""从音频文件提取 1536 维 Chromagram 特征向量。
Args:
audio_path: 音频文件路径。
hop_length: CQT hop 大小。
win_len_smooth: CENS 平滑窗口帧数。
Returns:
shape 为 (1536,) 的 numpy 数组。
Raises:
FileNotFoundError: 音频文件不存在。
RuntimeError: ffmpeg 解码失败。
"""
if not os.path.isfile(audio_path):
raise FileNotFoundError(f"音频文件不存在: {audio_path}")
y = _load_audio_via_pipe(audio_path)
return extract_chroma_feature_from_samples(y, TARGET_SR, hop_length=hop_length, win_len_smooth=win_len_smooth)
def extract_chroma_matrix(audio_path: str, hop_length: int = 512, win_len_smooth: int = 41) -> np.ndarray:
"""从音频文件提取 12×128 Chromagram 矩阵(未展平,供 DTW 精排使用)。
Returns:
shape 为 (12, 128) 的 numpy 数组,已做主音对齐。
"""
return extract_chroma_feature(audio_path, hop_length=hop_length, win_len_smooth=win_len_smooth).reshape(12, TARGET_FRAMES)