Commit f8ad329c f8ad329cb556651f2762949f4906fb6200501f89 by 沈秋雨

更新大样本下测试集生成流程

1 parent 51ddab43
......@@ -78,16 +78,20 @@ CSV 里重点看这些列:
python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_10.csv \
--size 10 \
--positive-ratio 0.6
--csv data/generated_eval/eval_50000.csv \
--index outputs/indexes/lyrics.pkl \
--size 50000 \
--positive-ratio 0.3
```
生成器的业务口径:
- `应去重` 样本只生成全曲歌词的样式变化,例如时间戳、标点、平台噪声、空行、LRC 样式、附加中文翻译。
- `不应去重` 样本包含片段歌词、短句碰撞、不同歌曲片段混合、同主题新歌词、仅翻译相似。
- 先扫描整个曲库,按有效歌词行数、语言类型、文件来源前缀做分层采样,不再按排序前缀取样。
- `应去重` 样本只生成全曲歌词的样式变化,例如时间戳、标点、平台噪声、空行、重复副歌次数变化、附加中文翻译。
- `不应去重` 样本包含同主题新歌词、hard negative、片段歌词、重复副歌碰撞、仅翻译相似、短歌词/占位边界样本。
- 片段歌词即使命中已有歌曲的一部分,也不应该输出 `duplicate`;最多进入 `review`
- 如果传入 `--index`,生成器会用现有索引构造更接近线上召回风险的 hard negative。
- 同时会生成 `*.manifest.json`,记录 seed、曲库规模、样本类型分布、语言/来源分桶和样本来源覆盖数。
先准备一个 CSV,例如 `data/eval/eval.csv`
......
......@@ -67,10 +67,10 @@ python scripts/process_library.py \
python scripts/process_library.py \
--library-dir data/library \
--index outputs/indexes/library_lyrics.pkl \
--eval-size 1180 \
--positive-ratio 0.2 \
--eval-csv data/generated_eval/eval_1180.csv \
--eval-out outputs/results/library_eval_1180.csv
--eval-size 50000 \
--positive-ratio 0.3 \
--eval-csv data/generated_eval/eval_50000.csv \
--eval-out outputs/results/library_eval_50000.csv
```
隔离出来的文件默认会移动到:
......@@ -95,22 +95,23 @@ outputs/indexes/library_lyrics.pkl
注意:如果修改了 `data/library`,或修改了预处理/判重逻辑,需要重新执行本步骤。
## 3. 生成 100 条测试样本
## 3. 生成生产评估样本
```bash
python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_500.csv \
--size 500 \
--positive-ratio 0.2
--csv data/generated_eval/eval_50000.csv \
--index outputs/indexes/library_lyrics.pkl \
--size 50000 \
--positive-ratio 0.3
```
默认生
默认生产评估口径
```text
应去重: 60
不应去重: 40
应去重: 30%
不应去重: 70%
```
生成器会先清理 `data/generated_eval/incoming/` 下旧的 `.txt` / `.lrc` 生成文件,再写入新样本。
......@@ -118,8 +119,28 @@ python -m lyric_dedup.cli generate-eval-set \
业务口径:
```text
pos_* = 应去重,全曲歌词样式变化
neg_* = 不应去重,片段/短句碰撞/混合片段/同主题新歌词/仅翻译相似
positive_* = 应去重,全曲歌词样式变化
negative_random_unrelated = 不应去重,同主题新歌词
negative_hard_candidate = 不应去重,系统容易召回的短句/局部重合样本
negative_fragment = 不应去重,单曲片段
negative_shared_chorus = 不应去重,重复副歌碰撞
negative_translation_only = 不应去重,仅翻译相似
edge_short_or_placeholder = 不应去重,短歌词/占位边界样本
```
生成器会扫描整个曲库并按有效歌词行数、语言类型、文件来源前缀分层采样。传入 `--index` 后会用现有索引生成 hard negative。每次还会输出:
```text
data/generated_eval/eval_50000.csv.manifest.json
```
manifest 里重点看:
```text
library_files 曲库歌词文件数
sample_type_counts 各样本类型数量
line_count_bucket_counts / language_bucket_counts / source_bucket_counts
unique_source_records 本次评估覆盖了多少真实源文件
```
## 4. 严格评估:只把 duplicate 算作去重
......@@ -127,9 +148,9 @@ neg_* = 不应去重,片段/短句碰撞/混合片段/同主题新歌词/仅
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/library_lyrics.pkl \
--csv data/generated_eval/eval_500.csv \
--csv data/generated_eval/eval_50000.csv \
--base-dir data/generated_eval \
--out outputs/results/library_eval_500.csv
--out outputs/results/library_eval_50000.csv
```
这个口径下:
......@@ -151,10 +172,10 @@ false_positive
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/library_lyrics.pkl \
--csv data/generated_eval/eval_500.csv \
--csv data/generated_eval/eval_50000.csv \
--base-dir data/generated_eval \
--positive-decisions duplicate,review \
--out outputs/results/library_eval_500_review_positive.csv
--out outputs/results/library_eval_50000_review_positive.csv
```
这个口径下:
......
......@@ -48,8 +48,9 @@ def main() -> None:
generate.add_argument("--lyrics-dir", required=True)
generate.add_argument("--csv", required=True)
generate.add_argument("--size", type=int, default=100)
generate.add_argument("--positive-ratio", type=float, default=0.6)
generate.add_argument("--positive-ratio", type=float, default=0.3)
generate.add_argument("--seed", type=int, default=20260602)
generate.add_argument("--index", default="", help="optional existing index for hard-negative generation")
args = parser.parse_args()
if args.command == "build-index":
......@@ -75,6 +76,7 @@ def main() -> None:
size=args.size,
positive_ratio=args.positive_ratio,
seed=args.seed,
index_path=Path(args.index) if args.index else None,
)
print(json.dumps(summary, ensure_ascii=False))
......
"""Generate labeled evaluation samples from an existing lyric library."""
"""Generate production-style labeled evaluation samples from a lyric library."""
from __future__ import annotations
import csv
import hashlib
import json
import random
import re
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import DuplicateDecision
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import record_from_file
from lyric_dedup.normalization import NormalizedLyrics
from lyric_dedup.normalization import fingerprint_text
from lyric_dedup.normalization import normalize_lyrics
DEFAULT_SAMPLE_MIX = {
"positive_full_duplicate": 0.30,
"negative_random_unrelated": 0.20,
"negative_hard_candidate": 0.25,
"negative_fragment": 0.10,
"negative_shared_chorus": 0.05,
"negative_translation_only": 0.05,
"edge_short_or_placeholder": 0.05,
}
@dataclass(frozen=True)
class LyricProfile:
path: Path
record_id: str
title: str
artist: str
normalized: NormalizedLyrics
line_count: int
char_count: int
line_count_bucket: str
language_bucket: str
source_bucket: str
normalized_hash: str
has_translation: bool
@dataclass(frozen=True)
class GeneratedSample:
sample_id: str
......@@ -21,8 +55,14 @@ class GeneratedSample:
expected: str
sample_type: str
source: str
source_record_id: str = ""
candidate_record_id: str = ""
line_count_bucket: str = ""
language_bucket: str = ""
source_bucket: str = ""
title: str = ""
artist: str = ""
notes: str = ""
def generate_eval_set(
......@@ -31,104 +71,555 @@ def generate_eval_set(
output_dir: Path,
csv_path: Path,
size: int = 100,
positive_ratio: float = 0.6,
positive_ratio: float = 0.30,
seed: int = 20260602,
index_path: Path | None = None,
) -> dict[str, object]:
"""Generate a stratified production evaluation set.
``positive_ratio`` is kept for CLI compatibility. It overrides the default
positive quota while keeping the remaining negative categories proportional.
"""
if size <= 0:
raise ValueError("size must be positive")
rng = random.Random(seed)
source_files = iter_lyric_files(library_dir)
if not source_files:
profiles = profile_library(library_dir)
if not profiles:
raise ValueError(f"{library_dir} 下没有 .lrc/.txt 歌词文件")
output_dir.mkdir(parents=True, exist_ok=True)
csv_path.parent.mkdir(parents=True, exist_ok=True)
_clean_generated_output_dir(output_dir)
positives = round(size * positive_ratio)
negatives = size - positives
checker = DuplicateChecker.load(index_path) if index_path else None
plan = _sample_plan(size, positive_ratio=positive_ratio)
groups = _profile_groups(profiles)
samples: list[GeneratedSample] = []
for index in range(positives):
source = source_files[index % len(source_files)]
samples.append(_positive_sample(index + 1, source, output_dir, csv_path.parent, rng))
for index in range(negatives):
left = source_files[index % len(source_files)]
right = source_files[(index + 1) % len(source_files)]
samples.append(_negative_sample(positives + index + 1, left, right, output_dir, csv_path.parent, rng))
samples.extend(
_build_positive_samples(
_stratified_sample(groups["normal"], plan["positive_full_duplicate"], rng),
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
samples.extend(
_build_random_unrelated_samples(
plan["negative_random_unrelated"],
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
samples.extend(
_build_hard_candidate_samples(
groups["normal"],
plan["negative_hard_candidate"],
output_dir,
csv_path.parent,
rng,
checker=checker,
start_index=len(samples) + 1,
)
)
samples.extend(
_build_fragment_samples(
_stratified_sample(groups["fragmentable"], plan["negative_fragment"], rng),
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
samples.extend(
_build_shared_chorus_samples(
_stratified_sample(groups["normal"], plan["negative_shared_chorus"], rng),
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
samples.extend(
_build_translation_only_samples(
_stratified_sample(groups["foreign"], plan["negative_translation_only"], rng),
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
samples.extend(
_build_edge_samples(
_stratified_sample(groups["edge"], plan["edge_short_or_placeholder"], rng),
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
if len(samples) < size:
samples.extend(
_build_random_unrelated_samples(
size - len(samples),
output_dir,
csv_path.parent,
rng,
start_index=len(samples) + 1,
)
)
samples = samples[:size]
rng.shuffle(samples)
_write_csv(samples, csv_path, seed=seed)
manifest = _write_manifest(
profiles=profiles,
samples=samples,
csv_path=csv_path,
output_dir=output_dir,
seed=seed,
plan=plan,
index_path=index_path,
)
return manifest
def profile_library(library_dir: Path) -> list[LyricProfile]:
profiles: list[LyricProfile] = []
for path in iter_lyric_files(library_dir):
record = record_from_file(path, base_dir=library_dir)
normalized = normalize_lyrics(record.lyrics)
lines = normalized.primary_lines or normalized.unique_lines
line_count = len(lines)
normalized_text = fingerprint_text(normalized) or normalized.normalized_full_text
source_bucket = _source_bucket(path)
profiles.append(
LyricProfile(
path=path,
record_id=record.record_id,
title=record.title or "",
artist=record.artist or "",
normalized=normalized,
line_count=line_count,
char_count=len(normalized_text),
line_count_bucket=_line_count_bucket(line_count),
language_bucket=_language_bucket(lines),
source_bucket=source_bucket,
normalized_hash=hashlib.sha256(normalized_text.encode("utf-8")).hexdigest(),
has_translation=bool(normalized.translation_lines),
)
)
return profiles
def _sample_plan(size: int, *, positive_ratio: float) -> dict[str, int]:
positive_ratio = max(0.0, min(1.0, positive_ratio))
mix = dict(DEFAULT_SAMPLE_MIX)
negative_total = sum(value for key, value in mix.items() if key != "positive_full_duplicate")
mix["positive_full_duplicate"] = positive_ratio
for key in list(mix):
if key != "positive_full_duplicate":
mix[key] = (1.0 - positive_ratio) * (DEFAULT_SAMPLE_MIX[key] / negative_total)
plan = {key: int(size * value) for key, value in mix.items()}
remainder = size - sum(plan.values())
for key in sorted(mix, key=mix.get, reverse=True):
if remainder <= 0:
break
plan[key] += 1
remainder -= 1
return plan
def _profile_groups(profiles: list[LyricProfile]) -> dict[str, list[LyricProfile]]:
normal = [profile for profile in profiles if profile.line_count >= 6]
edge = [profile for profile in profiles if profile.line_count <= 5]
return {
"normal": normal or profiles,
"fragmentable": [profile for profile in profiles if profile.line_count >= 12] or normal or profiles,
"foreign": [
profile
for profile in profiles
if profile.language_bucket in {"latin", "mixed", "jp_kr"} and profile.line_count >= 4
]
or normal
or profiles,
"edge": edge or normal or profiles,
}
def _stratified_sample(profiles: list[LyricProfile], count: int, rng: random.Random) -> list[LyricProfile]:
if count <= 0 or not profiles:
return []
buckets: dict[tuple[str, str, str], list[LyricProfile]] = {}
for profile in profiles:
key = (profile.line_count_bucket, profile.language_bucket, profile.source_bucket)
buckets.setdefault(key, []).append(profile)
selected: list[LyricProfile] = []
bucket_keys = list(buckets)
rng.shuffle(bucket_keys)
cursors = {key: rng.sample(items, len(items)) for key, items in buckets.items()}
while len(selected) < count and bucket_keys:
progressed = False
for key in list(bucket_keys):
if len(selected) >= count:
break
items = cursors[key]
if not items:
bucket_keys.remove(key)
continue
selected.append(items.pop())
progressed = True
if not progressed:
break
while len(selected) < count:
selected.append(rng.choice(profiles))
return selected
def _build_positive_samples(
profiles: list[LyricProfile],
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
start_index: int,
) -> list[GeneratedSample]:
samples: list[GeneratedSample] = []
for offset, profile in enumerate(profiles):
raw = read_lyric_file(profile.path)
lines = _content_lines(raw)
variants = [
("positive_exact_copy", raw),
("positive_timestamped", _add_timestamps(lines)),
("positive_punctuation_noise", _add_punctuation_noise(lines, rng)),
("positive_platform_noise", _with_platform_noise(lines)),
("positive_blank_line_noise", _add_blank_line_noise(lines)),
("positive_chorus_count_changed", _change_repeated_line_counts(lines)),
("positive_translation_added", _translation_added(lines)),
]
sample_type, text = variants[offset % len(variants)]
index = start_index + offset
path = _write_sample_file(output_dir, f"pos_{index:05d}_{sample_type}.txt", text)
samples.append(_sample_from_profile(index, path, csv_base, "应去重", sample_type, profile))
return samples
def _build_random_unrelated_samples(
count: int,
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
start_index: int,
) -> list[GeneratedSample]:
samples: list[GeneratedSample] = []
for offset in range(count):
index = start_index + offset
text = _same_theme_synthetic(index, rng)
path = _write_sample_file(output_dir, f"neg_{index:05d}_negative_random_unrelated.txt", text)
samples.append(
GeneratedSample(
sample_id=f"sample-{index:05d}",
file=str(path.relative_to(csv_base)),
expected="不应去重",
sample_type="negative_random_unrelated",
source="synthetic",
notes="same-theme synthetic full lyric not copied from library",
)
)
return samples
def _build_hard_candidate_samples(
profiles: list[LyricProfile],
count: int,
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
checker: DuplicateChecker | None,
start_index: int,
) -> list[GeneratedSample]:
if count <= 0:
return []
sources = _stratified_sample(profiles, count * 3, rng)
samples: list[GeneratedSample] = []
for profile in sources:
if len(samples) >= count:
break
lines = list(profile.normalized.primary_lines or profile.normalized.unique_lines)
text = _short_shared_snippet(lines, rng)
candidate_id = ""
if checker is not None:
result = checker.check(text, max_candidates=5)
candidate = next(
(
item
for item in result.candidates
if item.record_id != profile.record_id and item.decision != DuplicateDecision.NEW
),
result.candidates[0] if result.candidates else None,
)
candidate_id = candidate.record_id if candidate else ""
index = start_index + len(samples)
path = _write_sample_file(output_dir, f"neg_{index:05d}_negative_hard_candidate.txt", text)
samples.append(
_sample_from_profile(
index,
path,
csv_base,
"不应去重",
"negative_hard_candidate",
profile,
candidate_record_id=candidate_id,
notes="shares a few real lines plus new filler; should not auto duplicate",
)
)
return samples
def _build_fragment_samples(
profiles: list[LyricProfile],
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
start_index: int,
) -> list[GeneratedSample]:
samples: list[GeneratedSample] = []
for offset, profile in enumerate(profiles):
lines = list(profile.normalized.primary_lines or profile.normalized.unique_lines)
text = _single_song_fragment(lines, rng)
index = start_index + offset
path = _write_sample_file(output_dir, f"neg_{index:05d}_negative_fragment.txt", text)
samples.append(
_sample_from_profile(
index,
path,
csv_base,
"不应去重",
"negative_fragment",
profile,
notes="partial lyric fragment only",
)
)
return samples
def _build_shared_chorus_samples(
profiles: list[LyricProfile],
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
start_index: int,
) -> list[GeneratedSample]:
samples: list[GeneratedSample] = []
for offset, profile in enumerate(profiles):
lines = list(profile.normalized.primary_lines or profile.normalized.unique_lines)
repeated = _repeated_or_sampled_lines(profile.normalized, rng)
text = "\n".join(
[
"清晨的光落在新的街口",
"我把故事重新写给以后",
*repeated,
*repeated,
"所有答案都从这里开始",
]
)
index = start_index + offset
path = _write_sample_file(output_dir, f"neg_{index:05d}_negative_shared_chorus.txt", text)
samples.append(
_sample_from_profile(
index,
path,
csv_base,
"不应去重",
"negative_shared_chorus",
profile,
notes="shared repeated lines with new surrounding content",
)
)
return samples
def _build_translation_only_samples(
profiles: list[LyricProfile],
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
start_index: int,
) -> list[GeneratedSample]:
samples: list[GeneratedSample] = []
for offset, profile in enumerate(profiles):
lines = list(profile.normalized.translation_lines) or [
_pseudo_translation(idx) for idx in range(1, min(8, max(profile.line_count, 4)) + 1)
]
rng.shuffle(lines)
text = "\n".join(lines[:8])
index = start_index + offset
path = _write_sample_file(output_dir, f"neg_{index:05d}_negative_translation_only.txt", text)
samples.append(
_sample_from_profile(
index,
path,
csv_base,
"不应去重",
"negative_translation_only",
profile,
notes="translation-like text without matching original lyric",
)
)
return samples
def _build_edge_samples(
profiles: list[LyricProfile],
output_dir: Path,
csv_base: Path,
rng: random.Random,
*,
start_index: int,
) -> list[GeneratedSample]:
samples: list[GeneratedSample] = []
for offset, profile in enumerate(profiles):
lines = list(profile.normalized.primary_lines or profile.normalized.unique_lines)
if profile.line_count <= 1:
text = _same_theme_synthetic(start_index + offset, rng)
notes = "zero or one effective line; use synthetic edge negative"
else:
text = _short_shared_snippet(lines, rng)
notes = "short lyric edge case with limited overlap"
index = start_index + offset
path = _write_sample_file(output_dir, f"neg_{index:05d}_edge_short_or_placeholder.txt", text)
samples.append(
_sample_from_profile(
index,
path,
csv_base,
"不应去重",
"edge_short_or_placeholder",
profile,
notes=notes,
)
)
return samples
def _sample_from_profile(
index: int,
path: Path,
csv_base: Path,
expected: str,
sample_type: str,
profile: LyricProfile,
*,
candidate_record_id: str = "",
notes: str = "",
) -> GeneratedSample:
return GeneratedSample(
sample_id=f"sample-{index:05d}",
file=str(path.relative_to(csv_base)),
expected=expected,
sample_type=sample_type,
source=str(profile.path),
source_record_id=profile.record_id,
candidate_record_id=candidate_record_id,
line_count_bucket=profile.line_count_bucket,
language_bucket=profile.language_bucket,
source_bucket=profile.source_bucket,
title=profile.title,
artist=profile.artist,
notes=notes,
)
def _write_sample_file(output_dir: Path, name: str, text: str) -> Path:
path = output_dir / name
path.write_text(text.strip() + "\n", encoding="utf-8")
return path
def _write_csv(samples: list[GeneratedSample], csv_path: Path, *, seed: int) -> None:
fieldnames = [
"id",
"file",
"expected",
"sample_type",
"source",
"source_record_id",
"candidate_record_id",
"line_count_bucket",
"language_bucket",
"source_bucket",
"title",
"artist",
"seed",
"notes",
]
with csv_path.open("w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=["id", "file", "expected", "sample_type", "source", "title", "artist"])
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(
for sample in samples:
writer.writerow(
{
"id": sample.sample_id,
"file": sample.file,
"expected": sample.expected,
"sample_type": sample.sample_type,
"source": sample.source,
"source_record_id": sample.source_record_id,
"candidate_record_id": sample.candidate_record_id,
"line_count_bucket": sample.line_count_bucket,
"language_bucket": sample.language_bucket,
"source_bucket": sample.source_bucket,
"title": sample.title,
"artist": sample.artist,
"seed": seed,
"notes": sample.notes,
}
for sample in samples
)
return {
"size": size,
"positive": positives,
"negative": negatives,
"library_files": len(source_files),
def _write_manifest(
*,
profiles: list[LyricProfile],
samples: list[GeneratedSample],
csv_path: Path,
output_dir: Path,
seed: int,
plan: dict[str, int],
index_path: Path | None,
) -> dict[str, object]:
manifest = {
"seed": seed,
"library_files": len(profiles),
"sample_size": len(samples),
"plan": plan,
"index": str(index_path) if index_path else "",
"lyrics_dir": str(output_dir),
"csv": str(csv_path),
"manifest": str(csv_path.with_suffix(csv_path.suffix + ".manifest.json")),
"sample_type_counts": dict(Counter(sample.sample_type for sample in samples)),
"expected_counts": dict(Counter(sample.expected for sample in samples)),
"line_count_bucket_counts": dict(Counter(profile.line_count_bucket for profile in profiles)),
"language_bucket_counts": dict(Counter(profile.language_bucket for profile in profiles)),
"source_bucket_counts": dict(Counter(profile.source_bucket for profile in profiles).most_common(50)),
"unique_source_records": len({sample.source_record_id for sample in samples if sample.source_record_id}),
}
def _positive_sample(index: int, source: Path, output_dir: Path, csv_base: Path, rng: random.Random) -> GeneratedSample:
raw = read_lyric_file(source)
source_record = record_from_file(source)
variants = [
("exact_copy", raw),
("timestamped", _add_timestamps(_content_lines(raw))),
("punctuation_noise", _add_punctuation_noise(_content_lines(raw), rng)),
("with_platform_noise", _with_platform_noise(_content_lines(raw))),
("blank_line_noise", _add_blank_line_noise(_content_lines(raw))),
("lrc_with_platform_noise", _add_timestamps(_content_lines(_with_platform_noise(_content_lines(raw))))),
("translation_added", _translation_added(_content_lines(raw))),
]
sample_type, text = variants[(index - 1) % len(variants)]
name = f"pos_{index:03d}_{sample_type}.txt"
path = output_dir / name
path.write_text(text, encoding="utf-8")
return GeneratedSample(
sample_id=f"pos-{index:03d}",
file=str(path.relative_to(csv_base)),
expected="应去重",
sample_type=sample_type,
source=str(source),
title=source_record.title or "",
artist=source_record.artist or "",
)
def _negative_sample(index: int, left: Path, right: Path, output_dir: Path, csv_base: Path, rng: random.Random) -> GeneratedSample:
left_lines = _normalized_lines(left)
right_lines = _normalized_lines(right)
variants = [
("single_song_fragment", _single_song_fragment(left_lines)),
("short_shared_snippet", _short_shared_snippet(left_lines, rng)),
("mixed_fragments", _mixed_fragments(left_lines, right_lines, rng)),
("same_theme_synthetic", _same_theme_synthetic(index)),
("translation_only_like", _translation_only_like(left_lines)),
]
sample_type, text = variants[(index - 1) % len(variants)]
name = f"neg_{index:03d}_{sample_type}.txt"
path = output_dir / name
path.write_text(text, encoding="utf-8")
return GeneratedSample(
sample_id=f"neg-{index:03d}",
file=str(path.relative_to(csv_base)),
expected="不应去重",
sample_type=sample_type,
source=f"{left} | {right}",
csv_path.with_suffix(csv_path.suffix + ".manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return manifest
def _content_lines(text: str) -> list[str]:
......@@ -142,9 +633,40 @@ def _clean_generated_output_dir(output_dir: Path) -> None:
path.unlink()
def _normalized_lines(path: Path) -> list[str]:
normalized = normalize_lyrics(read_lyric_file(path))
return list(normalized.primary_lines or normalized.unique_lines)
def _line_count_bucket(line_count: int) -> str:
if line_count == 0:
return "zero"
if line_count <= 5:
return "short"
if line_count <= 40:
return "normal"
return "long"
def _language_bucket(lines: tuple[str, ...]) -> str:
text = "\n".join(lines)
cjk = len(re.findall(r"[\u4e00-\u9fff]", text))
latin = len(re.findall(r"[A-Za-z]", text))
kana = len(re.findall(r"[\u3040-\u30ff]", text))
hangul = len(re.findall(r"[\uac00-\ud7af]", text))
if kana or hangul:
return "jp_kr"
if cjk and latin:
return "mixed"
if cjk:
return "zh"
if latin:
return "latin"
return "other"
def _source_bucket(path: Path) -> str:
stem = path.stem
parts = stem.split("_")
if len(parts) >= 2:
code = re.sub(r"\d+$", "", parts[-1])
return code or "unknown"
return "unknown"
def _add_timestamps(lines: list[str]) -> str:
......@@ -169,6 +691,17 @@ def _add_blank_line_noise(lines: list[str]) -> str:
return "\n".join(result)
def _change_repeated_line_counts(lines: list[str]) -> str:
seen: set[str] = set()
result: list[str] = []
for line in lines:
if line in seen:
continue
seen.add(line)
result.append(line)
return "\n".join(result or lines)
def _translation_added(lines: list[str]) -> str:
result: list[str] = []
for idx, line in enumerate(lines, start=1):
......@@ -178,11 +711,11 @@ def _translation_added(lines: list[str]) -> str:
return "\n".join(result)
def _single_song_fragment(lines: list[str]) -> str:
def _single_song_fragment(lines: list[str], rng: random.Random) -> str:
if len(lines) <= 4:
return "\n".join(lines[: max(1, len(lines) // 2)])
fragment_len = max(2, min(8, len(lines) // 4))
start = max(0, (len(lines) - fragment_len) // 2)
fragment_len = max(2, min(8, len(lines) // rng.choice([3, 4, 5])))
start = rng.randrange(0, max(1, len(lines) - fragment_len + 1))
return "\n".join(lines[start : start + fragment_len])
......@@ -198,29 +731,26 @@ def _short_shared_snippet(lines: list[str], rng: random.Random) -> str:
return "\n".join(synthetic)
def _mixed_fragments(left_lines: list[str], right_lines: list[str], rng: random.Random) -> str:
left_pick = rng.sample(left_lines, k=min(2, len(left_lines))) if left_lines else []
right_pick = rng.sample(right_lines, k=min(2, len(right_lines))) if right_lines else []
filler = ["新的旋律慢慢靠近", "陌生的名字写在风里", "没有人停在原地"]
return "\n".join([*left_pick, *filler, *right_pick])
def _repeated_or_sampled_lines(normalized: NormalizedLyrics, rng: random.Random) -> list[str]:
repeated = [line for line, count in normalized.line_counts.items() if count >= 2]
if repeated:
return rng.sample(repeated, k=min(2, len(repeated)))
lines = list(normalized.primary_lines or normalized.unique_lines)
return rng.sample(lines, k=min(2, len(lines))) if lines else []
def _same_theme_synthetic(index: int) -> str:
themes = [
"我在夜里想起远方的你",
"城市灯火陪我走过雨季",
"那些没说完的话留在风里",
"明天醒来我们各自继续",
def _same_theme_synthetic(index: int, rng: random.Random) -> str:
starts = ["我在夜里想起远方的你", "城市灯火陪我走过雨季", "风把旧名字吹向清晨"]
middles = ["那些没说完的话留在风里", "新的路口慢慢亮起", "时间把答案交给下一站"]
ends = ["明天醒来我们各自继续", "我会把今天写成新的旋律", "故事从这里重新开始"]
return "\n".join(
[
rng.choice(starts),
rng.choice(middles),
rng.choice(ends),
f"这是第 {index} 个全新测试样本",
]
return "\n".join(themes)
def _translation_only_like(lines: list[str]) -> str:
foreign_count = sum(1 for line in lines if _looks_foreign(line))
if foreign_count < 2:
return _same_theme_synthetic(foreign_count + len(lines))
return "\n".join(_pseudo_translation(idx) for idx in range(1, min(8, foreign_count) + 1))
)
def _pseudo_translation(index: int) -> str:
......
......@@ -77,6 +77,7 @@ def main() -> None:
csv_path=Path(args.eval_csv),
size=args.eval_size,
positive_ratio=args.positive_ratio,
index_path=Path(args.index),
)
evaluate_csv(
Path(args.index),
......
import csv
import json
from lyric_dedup import DuplicateChecker
from lyric_dedup import DuplicateDecision
......@@ -285,23 +286,32 @@ def test_evaluate_csv_reports_binary_metrics(tmp_path) -> None:
assert (tmp_path / "eval_out.csv.summary.json").exists()
def test_generated_eval_set_marks_fragments_as_negative(tmp_path) -> None:
def test_generated_eval_set_uses_stratified_production_mix(tmp_path) -> None:
library = tmp_path / "library"
incoming = tmp_path / "generated" / "incoming"
eval_csv = tmp_path / "generated" / "eval.csv"
library.mkdir()
(library / "song.txt").write_text(BASE_LYRIC, encoding="utf-8")
for idx in range(12):
prefix = "AY" if idx % 2 == 0 else "WHHY"
(library / f"{idx}_{prefix}{idx:06d}.txt").write_text(
BASE_LYRIC.replace("我爱你", f"我想你{idx}").replace("城市", f"城市{idx}"),
encoding="utf-8",
)
generate_eval_set(library_dir=library, output_dir=incoming, csv_path=eval_csv, size=20, positive_ratio=0.5)
generate_eval_set(library_dir=library, output_dir=incoming, csv_path=eval_csv, size=30, positive_ratio=0.3)
rows = list(csv.DictReader(eval_csv.open(encoding="utf-8")))
positive_types = {row["sample_type"] for row in rows if row["expected"] == "应去重"}
fragment_rows = [row for row in rows if row["sample_type"] == "single_song_fragment"]
assert "trimmed_version" not in positive_types
assert "single_song_fragment" not in positive_types
assert fragment_rows
assert all(row["expected"] == "不应去重" for row in fragment_rows)
manifest = json.loads((tmp_path / "generated" / "eval.csv.manifest.json").read_text(encoding="utf-8"))
negative_types = {row["sample_type"] for row in rows if row["expected"] == "不应去重"}
assert len(rows) == 30
assert manifest["library_files"] == 12
assert manifest["sample_size"] == 30
assert manifest["unique_source_records"] > 1
assert "positive_full_duplicate" in manifest["plan"]
assert "negative_fragment" in negative_types
assert "negative_hard_candidate" in negative_types
assert all(row["expected"] == "不应去重" for row in rows if row["sample_type"].startswith("negative_"))
def test_foreign_original_with_added_chinese_translation_is_duplicate() -> None:
......