synthetic.py 8.77 KB
"""
Synthetic audio dataset generator for ACR demo.

Generates melodies from fundamental frequencies, simulates:
- Different "songs" (unique note sequences at different base frequencies)
- Song fragments (random crops from songs)
- Humming variants (pitch shifted, time stretched versions)
- Hard negatives / confusing variants for robustness testing

This allows the full pipeline to be validated without external data.
"""

import numpy as np
import soundfile as sf
import json
import random
from pathlib import Path
from typing import Tuple
from tqdm import tqdm


_SR = 16000


def sine_wave(freq: float, duration: float, sr: int = _SR, amp: float = 0.5) -> np.ndarray:
    t = np.linspace(0, duration, int(sr * duration), endpoint=False)
    return amp * np.sin(2 * np.pi * freq * t)


def harmonic_tone(freq: float, duration: float, sr: int = _SR, n_harmonics: int = 4) -> np.ndarray:
    t = np.linspace(0, duration, int(sr * duration), endpoint=False)
    y = np.zeros_like(t)
    for h in range(1, n_harmonics + 1):
        amp = 0.5 / h
        y += amp * np.sin(2 * np.pi * freq * h * t)
    peak = np.max(np.abs(y))
    if peak > 0:
        y = y / peak * 0.5
    return y


def generate_melody(
    base_freq: float,
    note_count: int = 16,
    note_dur: float = 0.5,
    sr: int = _SR,
    timbre: str = "harmonic",
) -> np.ndarray:
    notes = []
    for _ in range(note_count):
        interval = random.choice([0, 2, 4, 5, 7, 9, 11, 12])
        freq = base_freq * (2 ** (interval / 12))
        dur = note_dur * random.uniform(0.8, 1.2)

        if timbre == "sine":
            note = sine_wave(freq, dur, sr)
        else:
            note = harmonic_tone(freq, dur, sr)

        if random.random() < 0.15:
            fade = np.linspace(0, 1, min(int(sr * 0.02), len(note)))
            note[: len(fade)] *= fade

        notes.append(note)

    return np.concatenate(notes)


_CHORD_PROGRESSIONS = [
    [0, 3, 7],
    [0, 4, 7],
    [0, 3, 7, 10],
    [0, 4, 7, 11],
    [0, 4, 9],
    [0, 5, 7],
]


def apply_confusion_mix(y: np.ndarray, sr: int, strength: float = 0.22) -> np.ndarray:
    t = np.linspace(0, len(y) / sr, len(y), endpoint=False)
    distractor = 0.0
    for f in [220.0, 330.0, 440.0]:
        distractor += np.sin(2 * np.pi * f * t + random.uniform(0, np.pi))
    distractor /= max(np.max(np.abs(distractor)), 1e-8)
    mixed = y + strength * distractor
    peak = np.max(np.abs(mixed))
    return mixed / peak * 0.5 if peak > 0 else mixed


def apply_humming_style(y: np.ndarray, sr: int) -> np.ndarray:
    env = np.linspace(0.7, 1.0, len(y))
    hum = y * env
    kernel = np.ones(max(5, sr // 400)) / max(5, sr // 400)
    hum = np.convolve(hum, kernel, mode="same")
    peak = np.max(np.abs(hum))
    return hum / peak * 0.5 if peak > 0 else hum


def generate_song(
    song_id: str,
    base_freq: float,
    duration: float = 30.0,
    sr: int = _SR,
    with_vocals: bool = True,
) -> Tuple[np.ndarray, float]:
    segments_per_sec = 2
    total_segments = int(duration * segments_per_sec)
    y = np.zeros(int(sr * duration))

    for i in range(total_segments):
        t_start = i / segments_per_sec
        t_end = (i + 1) / segments_per_sec
        start_sample = int(t_start * sr)
        end_sample = int(t_end * sr)
        seg_len = end_sample - start_sample

        chord = random.choice(_CHORD_PROGRESSIONS)
        for interval in chord:
            freq = base_freq * (2 ** (interval / 12))
            env = np.exp(-np.linspace(0, 3, seg_len))
            note = harmonic_tone(freq, seg_len / sr, sr) * env * 0.3
            min_len = min(seg_len, len(note))
            y[start_sample : start_sample + min_len] += note[:min_len]

    if with_vocals:
        melody = generate_melody(base_freq * 2, note_count=int(duration * 2), note_dur=0.5, sr=sr)
        min_len = min(len(y), len(melody))
        y[:min_len] += melody[:min_len] * 0.2

    peak = np.max(np.abs(y))
    if peak > 0:
        y = y / peak * 0.5

    return y, duration


def generate_dataset(
    output_dir: str,
    num_songs: int = 50,
    song_duration: float = 30.0,
    num_segments_per_song: int = 6,
    segment_duration: float = 5.0,
    sr: int = _SR,
    seed: int = 42,
):
    random.seed(seed)
    np.random.seed(seed)

    output_dir = Path(output_dir)
    songs_dir = output_dir / "songs"
    segs_dir = output_dir / "segments"
    songs_dir.mkdir(parents=True, exist_ok=True)
    segs_dir.mkdir(parents=True, exist_ok=True)

    base_freqs = [
        130.81, 146.83, 164.81, 174.61, 196.0, 220.0, 246.94,
        261.63, 293.66, 329.63, 349.23, 392.0, 440.0, 493.88,
        523.25, 587.33, 659.25, 698.46, 783.99, 880.0, 987.77,
    ]

    train_meta = []
    val_meta = []
    test_meta = []

    print(f"Generating {num_songs} synthetic songs...")
    for i in tqdm(range(num_songs)):
        song_id = f"song_{i:04d}"
        base_freq = base_freqs[i % len(base_freqs)]
        key_offset = (i // len(base_freqs)) * 2
        base_freq *= 2 ** (key_offset / 12)

        y, dur = generate_song(song_id, base_freq, duration=song_duration, sr=sr)
        song_path = songs_dir / f"{song_id}.wav"
        sf.write(str(song_path), y, sr)

        for j in range(num_segments_per_song):
            max_offset = max(0, dur - segment_duration)
            offset = random.uniform(0, max_offset)
            start_s = int(offset * sr)
            end_s = start_s + int(segment_duration * sr)
            seg = y[start_s:end_s]
            target_len = int(segment_duration * sr)

            if len(seg) < target_len:
                seg = np.pad(seg, (0, target_len - len(seg)))

            variant_type = "clean"
            out_seg = seg.copy()
            if j >= num_segments_per_song // 2:
                from src.utils.augment import AugmentPipeline
                aug = AugmentPipeline(sr)
                out_seg = aug(out_seg)
                variant_type = "augmented"

            if j == num_segments_per_song - 1:
                out_seg = apply_confusion_mix(out_seg, sr)
                variant_type = "confused"
            elif j == num_segments_per_song - 2 and num_segments_per_song >= 4:
                out_seg = apply_humming_style(out_seg, sr)
                variant_type = "humming_like"

            seg_name = f"{song_id}_seg_{j:02d}_{variant_type}.wav" if variant_type != "clean" else f"{song_id}_seg_{j:02d}.wav"
            seg_path = segs_dir / seg_name
            sf.write(str(seg_path), out_seg, sr)

            meta_entry = {
                "song_id": song_id,
                "audio_path": f"segments/{seg_name}",
                "duration": segment_duration,
                "type": variant_type,
                "offset": offset,
            }

            if offset < dur * 0.2:
                seg_type = "intro"
            elif offset > dur * 0.7:
                seg_type = "outro"
            else:
                seg_type = "mid"
            meta_entry["segment_type"] = seg_type

            if i < int(num_songs * 0.7):
                train_meta.append(meta_entry)
            elif i < int(num_songs * 0.85):
                val_meta.append(meta_entry)
            else:
                test_meta.append(meta_entry)

        song_meta = {
            "song_id": song_id,
            "audio_path": f"songs/{song_id}.wav",
            "duration": dur,
            "base_freq": base_freq,
            "type": "reference",
        }
        if i < int(num_songs * 0.7):
            train_meta.append(song_meta)
        elif i < int(num_songs * 0.85):
            val_meta.append(song_meta)
        else:
            test_meta.append(song_meta)

    catalog_meta = [item for item in train_meta + val_meta + test_meta if item.get("type") == "reference"]
    with open(output_dir / "catalog.json", "w") as f:
        json.dump(catalog_meta, f, indent=2)

    for name, data in [("train", train_meta), ("val", val_meta), ("test", test_meta)]:
        with open(output_dir / f"{name}.json", "w") as f:
            json.dump(data, f, indent=2)
        print(f"  {name}: {len(data)} entries")

    print(f"\nDataset generated at {output_dir}")
    print(f"  Songs: {num_songs}")
    print(f"  Total segments: {len(train_meta) + len(val_meta) + len(test_meta)}")
    return output_dir


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("--output", type=str, default="data/synthetic")
    parser.add_argument("--num-songs", type=int, default=50)
    parser.add_argument("--song-duration", type=float, default=30.0)
    parser.add_argument("--segments-per-song", type=int, default=6)
    parser.add_argument("--segment-duration", type=float, default=5.0)
    args = parser.parse_args()

    generate_dataset(
        output_dir=args.output,
        num_songs=args.num_songs,
        song_duration=args.song_duration,
        num_segments_per_song=args.segments_per_song,
        segment_duration=args.segment_duration,
    )