synthetic.py 7.83 KB
"""
Synthetic audio dataset generator for ACR demo.

Generates melodies from fundamental frequencies, simulates:
- Different "songs" (unique note sequences at different base frequencies)
- Song fragments (random crops from songs)
- Humming variants (pitch shifted, time stretched versions)

This allows the full pipeline to be validated without external data.
"""

import numpy as np
import soundfile as sf
import json
import random
import os
from pathlib import Path
from typing import List, Tuple
from tqdm import tqdm


_SR = 16000


def sine_wave(freq: float, duration: float, sr: int = _SR, amp: float = 0.5) -> np.ndarray:
    t = np.linspace(0, duration, int(sr * duration), endpoint=False)
    return amp * np.sin(2 * np.pi * freq * t)


def harmonic_tone(freq: float, duration: float, sr: int = _SR, n_harmonics: int = 4) -> np.ndarray:
    t = np.linspace(0, duration, int(sr * duration), endpoint=False)
    y = np.zeros_like(t)
    for h in range(1, n_harmonics + 1):
        amp = 0.5 / h
        y += amp * np.sin(2 * np.pi * freq * h * t)
    return y / np.max(np.abs(y)) * 0.5


def generate_melody(
    base_freq: float,
    note_count: int = 16,
    note_dur: float = 0.5,
    sr: int = _SR,
    timbre: str = "harmonic",
) -> np.ndarray:
    notes = []
    freq = base_freq
    for i in range(note_count):
        interval = random.choice([0, 2, 4, 5, 7, 9, 11, 12])  # diatonic intervals
        freq = base_freq * (2 ** (interval / 12))
        dur = note_dur * random.uniform(0.8, 1.2)

        if timbre == "sine":
            note = sine_wave(freq, dur, sr)
        else:
            note = harmonic_tone(freq, dur, sr)

        if random.random() < 0.15:
            fade = np.linspace(0, 1, min(int(sr * 0.02), len(note)))
            note[:len(fade)] *= fade

        notes.append(note)

    return np.concatenate(notes)


_CHORD_PROGRESSIONS = [
    [0, 3, 7],       # Cm
    [0, 4, 7],       # C
    [0, 3, 7, 10],   # Cm7
    [0, 4, 7, 11],   # Cmaj7
    [0, 4, 9],       # Csus4 → C
    [0, 5, 7],       # Csus2
]


def generate_song(
    song_id: str,
    base_freq: float,
    duration: float = 30.0,
    sr: int = _SR,
    with_vocals: bool = True,
) -> Tuple[np.ndarray, float]:
    segments_per_sec = 2
    total_segments = int(duration * segments_per_sec)
    y = np.zeros(int(sr * duration))

    for i in range(total_segments):
        t_start = i / segments_per_sec
        t_end = (i + 1) / segments_per_sec
        start_sample = int(t_start * sr)
        end_sample = int(t_end * sr)
        seg_len = end_sample - start_sample

        chord = random.choice(_CHORD_PROGRESSIONS)
        for interval in chord:
            freq = base_freq * (2 ** (interval / 12))
            env = np.exp(-np.linspace(0, 3, seg_len))
            note = harmonic_tone(freq, seg_len / sr, sr) * env * 0.3
            min_len = min(seg_len, len(note))
            y[start_sample:start_sample + min_len] += note[:min_len]

    if with_vocals:
        melody = generate_melody(base_freq * 2, note_count=int(duration * 2), note_dur=0.5, sr=sr)
        min_len = min(len(y), len(melody))
        y[:min_len] += melody[:min_len] * 0.2

    peak = np.max(np.abs(y))
    if peak > 0:
        y = y / peak * 0.5

    return y, duration


def generate_dataset(
    output_dir: str,
    num_songs: int = 50,
    song_duration: float = 30.0,
    num_segments_per_song: int = 6,
    segment_duration: float = 5.0,
    sr: int = _SR,
    seed: int = 42,
):
    random.seed(seed)
    np.random.seed(seed)

    output_dir = Path(output_dir)
    songs_dir = output_dir / "songs"
    segs_dir = output_dir / "segments"
    songs_dir.mkdir(parents=True, exist_ok=True)
    segs_dir.mkdir(parents=True, exist_ok=True)

    base_freqs = [130.81, 146.83, 164.81, 174.61, 196.0, 220.0, 246.94,
                  261.63, 293.66, 329.63, 349.23, 392.0, 440.0, 493.88,
                  523.25, 587.33, 659.25, 698.46, 783.99, 880.0, 987.77]

    train_meta = []
    val_meta = []
    test_meta = []

    print(f"Generating {num_songs} synthetic songs...")
    for i in tqdm(range(num_songs)):
        song_id = f"song_{i:04d}"
        base_freq = base_freqs[i % len(base_freqs)]
        key_offset = (i // len(base_freqs)) * 2
        base_freq *= (2 ** (key_offset / 12))

        y, dur = generate_song(song_id, base_freq, duration=song_duration, sr=sr)
        song_path = songs_dir / f"{song_id}.wav"
        sf.write(str(song_path), y, sr)

        for j in range(num_segments_per_song):
            max_offset = max(0, dur - segment_duration)
            offset = random.uniform(0, max_offset)
            start_s = int(offset * sr)
            end_s = start_s + int(segment_duration * sr)
            seg = y[start_s:end_s]

            if len(seg) < int(segment_duration * sr):
                seg = np.pad(seg, (0, int(segment_duration * sr) - len(seg)))

            is_augmented = (j >= num_segments_per_song // 2)

            if is_augmented:
                from src.utils.augment import AugmentPipeline
                aug = AugmentPipeline(sr)
                seg_aug = aug(seg.copy())
                seg_name = f"{song_id}_seg_{j:02d}_aug.wav"
                seg_path = segs_dir / seg_name
                sf.write(str(seg_path), seg_aug, sr)
                meta_entry = {
                    "song_id": song_id,
                    "audio_path": f"segments/{seg_name}",
                    "duration": segment_duration,
                    "type": "augmented",
                    "offset": offset,
                }
            else:
                seg_name = f"{song_id}_seg_{j:02d}.wav"
                seg_path = segs_dir / seg_name
                sf.write(str(seg_path), seg, sr)
                meta_entry = {
                    "song_id": song_id,
                    "audio_path": f"segments/{seg_name}",
                    "duration": segment_duration,
                    "type": "clean",
                    "offset": offset,
                }

            offset_sec = offset
            if offset_sec < dur * 0.2:
                seg_type = "intro"
            elif offset_sec > dur * 0.7:
                seg_type = "outro"
            else:
                seg_type = "mid"
            meta_entry["segment_type"] = seg_type

            if i < int(num_songs * 0.7):
                train_meta.append(meta_entry)
            elif i < int(num_songs * 0.85):
                val_meta.append(meta_entry)
            else:
                test_meta.append(meta_entry)

        song_meta = {
            "song_id": song_id,
            "audio_path": f"songs/{song_id}.wav",
            "duration": dur,
            "base_freq": base_freq,
        }
        if i < int(num_songs * 0.7):
            train_meta.append(song_meta)
        elif i < int(num_songs * 0.85):
            val_meta.append(song_meta)
        else:
            test_meta.append(song_meta)

    for name, data in [("train", train_meta), ("val", val_meta), ("test", test_meta)]:
        with open(output_dir / f"{name}.json", "w") as f:
            json.dump(data, f, indent=2)
        print(f"  {name}: {len(data)} entries")

    print(f"\nDataset generated at {output_dir}")
    print(f"  Songs: {num_songs}")
    print(f"  Total segments: {len(train_meta) + len(val_meta) + len(test_meta)}")
    return output_dir


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--output", type=str, default="data/synthetic")
    parser.add_argument("--num-songs", type=int, default=50)
    parser.add_argument("--song-duration", type=float, default=30.0)
    parser.add_argument("--segments-per-song", type=int, default=6)
    parser.add_argument("--segment-duration", type=float, default=5.0)
    args = parser.parse_args()

    generate_dataset(
        output_dir=args.output,
        num_songs=args.num_songs,
        song_duration=args.song_duration,
        num_segments_per_song=args.segments_per_song,
        segment_duration=args.segment_duration,
    )