enrich_songcentric_manifest_with_local_features.py
4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import hashlib
import json
import math
import wave
from pathlib import Path
def load_jsonl(path: Path):
for line in path.read_text().splitlines():
line = line.strip()
if line:
yield json.loads(line)
def read_wav_stats(path: Path, start_ms: int, end_ms: int) -> dict:
with wave.open(str(path), 'rb') as wf:
rate = wf.getframerate()
sampwidth = wf.getsampwidth()
n_channels = wf.getnchannels()
start_frame = int(start_ms * rate / 1000)
end_frame = int(end_ms * rate / 1000)
wf.setpos(min(start_frame, wf.getnframes()))
frames = wf.readframes(max(end_frame - start_frame, 0))
digest = hashlib.sha256(frames).hexdigest()
energy = sum(abs(b - 128) for b in frames[: min(len(frames), 4000)]) if sampwidth == 1 else sum(abs(int.from_bytes(frames[i:i+2], 'little', signed=True)) for i in range(0, min(len(frames), 8000), 2))
return {
'digest': digest,
'energy': energy,
'rate': rate,
'channels': n_channels,
'bytes_read': len(frames),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument('--input-manifest', required=True)
parser.add_argument('--output-manifest', required=True)
parser.add_argument('--report-output')
args = parser.parse_args()
in_path = Path(args.input_manifest).resolve()
out_path = Path(args.output_manifest).resolve()
out_path.parent.mkdir(parents=True, exist_ok=True)
report_path = Path(args.report_output).resolve() if args.report_output else None
if report_path:
report_path.parent.mkdir(parents=True, exist_ok=True)
rows = []
feature_count = 0
wav_assets = 0
for row in load_jsonl(in_path):
asset = row['asset']
asset_path = Path(asset['storage_uri'])
for idx, window in enumerate(row.get('windows', []), start=1):
features = window.setdefault('features', [])
if asset_path.suffix.lower() == '.wav' and asset_path.exists():
wav_assets += 1
stats = read_wav_stats(asset_path, window['start_ms'], window['end_ms'])
fp = {
'feature_type': 'fingerprint',
'model_name': 'local_wavehash',
'model_version': 'v1',
'feature_set_name': 'wavehash_5s',
'fingerprint_value': stats['digest'][:32],
'checksum': f"fp:{stats['digest'][:16]}",
'metadata_json': {'energy': stats['energy'], 'bytes_read': stats['bytes_read']},
}
emb = {
'feature_type': 'embedding',
'model_name': 'local_wavehash_embed',
'model_version': 'v1',
'feature_set_name': 'wavehash_embed_5s',
'feature_schema_ver': 'v1',
'embedding_dim': 8,
'embedding_uri': f"inline://{stats['digest'][:16]}:{window['start_ms']}:{window['end_ms']}",
'vector_table_name': 'audio_embedding_vector_8_placeholder',
'checksum': f"emb:{stats['digest'][:16]}",
'metadata_json': {'energy': stats['energy'], 'rate': stats['rate'], 'channels': stats['channels']},
}
features.extend([fp, emb])
feature_count += 2
rows.append(row)
out_path.write_text('\n'.join(json.dumps(r, ensure_ascii=False) for r in rows) + ('\n' if rows else ''))
report = {
'input_manifest': str(in_path),
'output_manifest': str(out_path),
'rows': len(rows),
'wav_assets_seen': wav_assets,
'features_added': feature_count,
}
if report_path:
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2))
print(json.dumps(report, ensure_ascii=False, indent=2))
return 0
if __name__ == '__main__':
raise SystemExit(main())