Commit 51ddab43 51ddab43fb5e3638a8b8c9cd8049679fe8b2ccc7 by 沈秋雨

Add lyric duplicate detection workflow

0 parents
.DS_Store
__pycache__/
*.py[cod]
.pytest_cache/
# Local lyric data and generated artifacts
data/
outputs/
downloaded_lyrics/
downloaded_lyrics_type3/
download_failed*.txt
# Local downloader / scratch utilities
download_lyrics.py
test_db_connection.py
*.env
# Reference project kept locally only
text-dedup-main/
# Virtual environments and editor files
.venv/
venv/
.idea/
.vscode/
# Lyric Duplicate Checker
第一版用于“新增歌词查重”:先用已有 `.lrc` / `.txt` 歌词建立索引,再把新增歌词拿来查询,返回 `duplicate``review``new`
## 建立索引
假设已有曲库在 `data/library/`
```bash
python -m lyric_dedup.cli build-index \
--lyrics-dir data/library \
--index outputs/indexes/lyrics.pkl
```
## 检查单个新增歌词
```bash
python -m lyric_dedup.cli check-file \
--index outputs/indexes/lyrics.pkl \
--file data/incoming/new_song.lrc
```
## 批量检查新增目录
```bash
python -m lyric_dedup.cli batch-check \
--index outputs/indexes/lyrics.pkl \
--lyrics-dir data/incoming \
--out outputs/results/incoming_check.csv
```
CSV 里重点看这些列:
- `decision`: 总判断。
- `best_candidate_id`: 最像的已有歌词。
- `best_candidate_jaccard`: n-gram 字面相似度。
- `best_candidate_line_coverage`: 行级覆盖率。
- `matched_unique_lines`: 命中的规范化歌词行。
- `best_candidate_reason`: 中文判定原因,说明为什么判重、复核或判新。
生产判断建议:`duplicate` 可自动拦截;`review` 进人工池;`new` 入库前仍可抽样检查。
## 原文 + 中文翻译歌词的防护规则
当前会把歌词拆成三类行:
- `primary_lines`: 原文行,自动判重主要依赖这部分。
- `translation_lines`: 中文翻译行,只用于召回和复核解释。
- `unknown_lines`: 无法稳定判断的行。
高置信拆分包括:
- 同一个时间戳下出现外文行和中文行。
- 多组稳定的外文行 + 中文行交替。
中置信拆分包括:
- 同一行内明显的外文 / 中文翻译,例如 `I miss you / 今晚我想你`
低置信拆分包括:
- 先整段外文,再整段中文翻译。
判定策略:
- 原文高度一致,即使新增歌词多了中文翻译,也可以 `duplicate`
- 只有翻译行相似,原文相似不足,只能 `review`,不自动判重。
- 疑似整段翻译结构属于低置信拆分,即使原文 hash 一致,也先 `review`
- 普通中文歌没有检测到翻译结构时,全部有效行按原文处理。
由于索引里会保存拆分后的原文/翻译特征,修改拆分规则后需要重建索引。
## 用标注 CSV 评估正确率
可以先从已有曲库自动生成一批评估样本:
```bash
python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_10.csv \
--size 10 \
--positive-ratio 0.6
```
生成器的业务口径:
- `应去重` 样本只生成全曲歌词的样式变化,例如时间戳、标点、平台噪声、空行、LRC 样式、附加中文翻译。
- `不应去重` 样本包含片段歌词、短句碰撞、不同歌曲片段混合、同主题新歌词、仅翻译相似。
- 片段歌词即使命中已有歌曲的一部分,也不应该输出 `duplicate`;最多进入 `review`
先准备一个 CSV,例如 `data/eval/eval.csv`
```csv
id,file,expected
case-001,incoming/song_a.lrc,应去重
case-002,incoming/song_b.txt,不应去重
```
也可以不用文件路径,直接把歌词放在 `lyrics` 列:
```csv
id,lyrics,expected
case-003,"我爱你在每个夜里\n听风说话也听见你",duplicate
case-004,"南方的雨穿过街心\n你把故事说给云听",new
```
`expected` 支持这些写法:
- 应去重:`应去重``重复``duplicate``1``true``yes`
- 不应去重:`不应去重``不重复``new``0``false``no`
运行评估:
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/lyrics.pkl \
--csv data/eval/eval.csv \
--base-dir data \
--out outputs/results/eval_result.csv
```
默认只有系统输出 `duplicate` 才算“预测应去重”。这适合评估自动拦截的准确率,误杀会更明显。
如果你想评估“可疑样本召回率”,也就是 `duplicate``review` 都算命中:
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/lyrics.pkl \
--csv data/eval/eval.csv \
--base-dir data \
--positive-decisions duplicate,review \
--out outputs/results/eval_result_review_as_positive.csv
```
会生成两个文件:
- `outputs/results/eval_result.csv`: 每条样本的预测、候选、原因和是否正确。
- `outputs/results/eval_result.csv.summary.json`: 总体指标。
summary 里重点看:
- `accuracy`: 总正确率。
- `precision`: 预测应去重的样本里,有多少是真的应去重。自动拦截优先看这个。
- `recall`: 真实应去重的样本里,有多少被系统抓到。
- `f1`: precision 和 recall 的综合指标。
- `false_positive`: 不应去重但被判为应去重,属于误杀。
- `false_negative`: 应去重但没抓到,属于漏召。
# 歌词查重测试流程
本文档记录从已有歌词目录建立索引、生成测试集、批量评估和查看结果的完整命令。
## 1. 准备目录
已有曲库放在:
```text
data/library/
```
支持文件:
```text
.lrc
.txt
```
生成的测试样本会放在:
```text
data/generated_eval/incoming/
```
测试集标注 CSV 会放在:
```text
data/generated_eval/eval_100.csv
```
评估结果会放在:
```text
outputs/results/
```
## 2. 建立已有曲库索引
如果刚往 `data/library` 新增了一批样本,建议先运行处理脚本:
```bash
python scripts/process_library.py \
--library-dir data/library \
--index outputs/indexes/library_lyrics.pkl
```
这个脚本会:
```text
1. 扫描并隔离纯音乐占位样本,例如包含【曲库专用】或“此歌曲为没有填词的纯音乐”的文件。
2. 重建 outputs/indexes/library_lyrics.pkl。
3. 输出处理报告 outputs/results/library_process_report.json。
```
如果你想先看会处理哪些文件,不实际移动和重建索引:
```bash
python scripts/process_library.py \
--library-dir data/library \
--dry-run
```
如果要顺手生成并评估 500 条测试样本:
```bash
python scripts/process_library.py \
--library-dir data/library \
--index outputs/indexes/library_lyrics.pkl \
--eval-size 1180 \
--positive-ratio 0.2 \
--eval-csv data/generated_eval/eval_1180.csv \
--eval-out outputs/results/library_eval_1180.csv
```
隔离出来的文件默认会移动到:
```text
data/quarantine/no_lyrics_placeholders/
```
也可以只手动建索引:
```bash
python -m lyric_dedup.cli build-index \
--lyrics-dir data/library \
--index outputs/indexes/library_lyrics.pkl
```
索引文件:
```text
outputs/indexes/library_lyrics.pkl
```
注意:如果修改了 `data/library`,或修改了预处理/判重逻辑,需要重新执行本步骤。
## 3. 生成 100 条测试样本
```bash
python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_500.csv \
--size 500 \
--positive-ratio 0.2
```
默认生成:
```text
应去重: 60
不应去重: 40
```
生成器会先清理 `data/generated_eval/incoming/` 下旧的 `.txt` / `.lrc` 生成文件,再写入新样本。
业务口径:
```text
pos_* = 应去重,全曲歌词样式变化
neg_* = 不应去重,片段/短句碰撞/混合片段/同主题新歌词/仅翻译相似
```
## 4. 严格评估:只把 duplicate 算作去重
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/library_lyrics.pkl \
--csv data/generated_eval/eval_500.csv \
--base-dir data/generated_eval \
--out outputs/results/library_eval_500.csv
```
这个口径下:
```text
duplicate -> 预测应去重
review -> 预测不应去重
new -> 预测不应去重
```
适合评估自动拦截的 precision,重点看:
```text
false_positive
```
## 5. 召回评估:把 duplicate 和 review 都算作抓到可疑样本
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/library_lyrics.pkl \
--csv data/generated_eval/eval_500.csv \
--base-dir data/generated_eval \
--positive-decisions duplicate,review \
--out outputs/results/library_eval_500_review_positive.csv
```
这个口径下:
```text
duplicate -> 预测应去重
review -> 预测应去重
new -> 预测不应去重
```
适合评估可疑样本召回,重点看:
```text
false_negative
```
## 6. 查看总体指标
严格口径:
```bash
cat outputs/results/library_eval_100.csv.summary.json
```
召回口径:
```bash
cat outputs/results/library_eval_100_review_positive.csv.summary.json
```
指标含义:
```text
accuracy 总正确率
precision 预测应去重的样本里,有多少是真的应去重
recall 真实应去重的样本里,有多少被系统抓到
f1 precision 和 recall 的综合指标
true_positive 应去重且预测应去重
false_positive 不应去重但预测应去重,误杀
true_negative 不应去重且预测不应去重
false_negative 应去重但预测不应去重,漏召
```
## 7. 查看每条样本结果
```bash
open outputs/results/library_eval_100.csv
```
如果不能使用 `open`,可以直接查看 CSV:
```bash
python -c 'import csv; rows=csv.DictReader(open("outputs/results/library_eval_100.csv", encoding="utf-8")); [print(r["id"], r["decision"], r["correct"], r["reason"], sep=" | ") for r in rows]'
```
## 8. 查看失败样本
严格口径失败样本:
```bash
python -c 'import csv; rows=csv.DictReader(open("outputs/results/library_eval_100.csv", encoding="utf-8")); [print(r["id"], r["source"], r["decision"], r["reason"], sep=" | ") for r in rows if r["correct"] == "False"]'
```
查看某个样本的完整候选:
```bash
python -m lyric_dedup.cli check-file \
--index outputs/indexes/library_lyrics.pkl \
--file data/generated_eval/incoming/neg_068_mixed_fragments.txt \
--max-candidates 10
```
## 9. 核对测试集分布
```bash
python -c 'import csv, collections; rows=list(csv.DictReader(open("data/generated_eval/eval_10.csv", encoding="utf-8"))); print(len(rows)); print(collections.Counter(r["expected"] for r in rows)); print(collections.Counter(r["sample_type"] for r in rows)); print(collections.Counter(r["sample_type"] for r in rows if r["expected"]=="应去重")); print(collections.Counter(r["sample_type"] for r in rows if r["expected"]=="不应去重"))'
```
核对生成目录文件数:
```bash
find data/generated_eval/incoming -type f | wc -l
```
## 10. 运行代码测试
```bash
python -m pytest tests
```
编译检查:
```bash
python -m compileall -q lyric_dedup tests
```
## 11. 关于测试集不重复
当前自动生成的 100 条是规则覆盖测试集,不保证样本之间规范化后完全不重复。
如果要求 100 条测试样本彼此不重复,并且仍使用默认比例:
```text
size = 100
positive_ratio = 0.6
```
则至少需要:
```text
60 首互不重复的种子歌词
```
原因:应去重样本是全曲变体,同一首歌的多个样式变化规范化后仍然是同一首歌。
更稳妥的真实准确率评估方式是准备人工标注 CSV:
```csv
id,file,expected
case-001,incoming/song_a.lrc,应去重
case-002,incoming/song_b.txt,不应去重
```
然后直接执行第 4 节或第 5 节的 `evaluate-csv`
"""Lyric duplicate detection utilities."""
from lyric_dedup.checker import DuplicateCheckResult
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import DuplicateDecision
from lyric_dedup.checker import LyricRecord
__all__ = [
"DuplicateCheckResult",
"DuplicateChecker",
"DuplicateDecision",
"LyricRecord",
]
"""Incremental lyric duplicate checker."""
from __future__ import annotations
import hashlib
import pickle
from dataclasses import dataclass
from enum import StrEnum
from pathlib import Path
from lyric_dedup.minhash_lsh import MinHashConfig
from lyric_dedup.minhash_lsh import MinHashLSH
from lyric_dedup.normalization import NormalizedLyrics
from lyric_dedup.normalization import fingerprint_text
from lyric_dedup.normalization import lyric_tokens
from lyric_dedup.normalization import normalize_lyrics
class DuplicateDecision(StrEnum):
DUPLICATE = "duplicate"
REVIEW = "review"
NEW = "new"
@dataclass(frozen=True)
class LyricRecord:
record_id: str
lyrics: str
title: str | None = None
artist: str | None = None
@dataclass(frozen=True)
class CandidateMatch:
record_id: str
decision: DuplicateDecision
confidence: float
jaccard: float
line_coverage: float
primary_jaccard: float
primary_line_coverage: float
translation_jaccard: float
translation_line_coverage: float
matched_unique_lines: tuple[str, ...]
reason: str
@dataclass(frozen=True)
class DuplicateCheckResult:
decision: DuplicateDecision
confidence: float
candidates: tuple[CandidateMatch, ...]
normalized_full_text: str
reason: str
@dataclass(frozen=True)
class _IndexedRecord:
record: LyricRecord
normalized: NormalizedLyrics
exact_hash: str
tokens: set[str]
primary_tokens: set[str]
translation_tokens: set[str]
fallback_lines: tuple[str, ...]
fallback_tokens: set[str]
signature: tuple[int, ...]
class DuplicateChecker:
"""In-memory first version for checking newly submitted lyrics.
The API is intentionally small: build or load records with ``add_record``, then
call ``check`` for a new lyric. Persistence can serialize the indexed fields
later without changing result semantics.
"""
def __init__(
self,
*,
minhash_config: MinHashConfig | None = None,
duplicate_jaccard_threshold: float = 0.78,
duplicate_line_coverage_threshold: float = 0.72,
review_jaccard_threshold: float = 0.45,
review_line_coverage_threshold: float = 0.35,
) -> None:
self._lsh = MinHashLSH(minhash_config)
self._records: dict[str, _IndexedRecord] = {}
self._exact_hash_to_ids: dict[str, set[str]] = {}
self._line_to_ids: dict[str, set[str]] = {}
self._token_to_ids: dict[str, set[str]] = {}
self.duplicate_jaccard_threshold = duplicate_jaccard_threshold
self.duplicate_line_coverage_threshold = duplicate_line_coverage_threshold
self.review_jaccard_threshold = review_jaccard_threshold
self.review_line_coverage_threshold = review_line_coverage_threshold
def add_record(self, record: LyricRecord) -> None:
indexed = self._index(record)
self._records[record.record_id] = indexed
self._exact_hash_to_ids.setdefault(indexed.exact_hash, set()).add(record.record_id)
for line in indexed.normalized.unique_lines:
if len(line) >= 4:
self._line_to_ids.setdefault(line, set()).add(record.record_id)
for token in indexed.tokens:
self._token_to_ids.setdefault(token, set()).add(record.record_id)
for token in indexed.fallback_tokens:
self._token_to_ids.setdefault(token, set()).add(record.record_id)
self._lsh.add(record.record_id, indexed.signature)
def save(self, path: str | Path) -> None:
"""Persist the in-memory index for later checks."""
with Path(path).open("wb") as file:
pickle.dump(self, file, protocol=pickle.HIGHEST_PROTOCOL)
@classmethod
def load(cls, path: str | Path) -> "DuplicateChecker":
"""Load a previously persisted index."""
with Path(path).open("rb") as file:
checker = pickle.load(file)
if not isinstance(checker, cls):
raise TypeError(f"{path} does not contain a DuplicateChecker index")
return checker
@property
def record_count(self) -> int:
return len(self._records)
def check(self, lyrics: str, *, max_candidates: int = 10) -> DuplicateCheckResult:
return self.check_record(LyricRecord(record_id="__query__", lyrics=lyrics), max_candidates=max_candidates)
def check_record(self, record: LyricRecord, *, max_candidates: int = 10) -> DuplicateCheckResult:
query = self._index(record)
exact_ids = self._exact_hash_to_ids.get(query.exact_hash, set())
if exact_ids:
candidates = tuple(self._rank_exact_candidate(query, self._records[record_id]) for record_id in sorted(exact_ids)[:max_candidates])
duplicate = next((candidate for candidate in candidates if candidate.decision == DuplicateDecision.DUPLICATE), None)
if duplicate is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.DUPLICATE,
confidence=duplicate.confidence,
candidates=candidates,
normalized_full_text=query.normalized.normalized_full_text,
reason=duplicate.reason,
)
return DuplicateCheckResult(
decision=DuplicateDecision.REVIEW,
confidence=candidates[0].confidence,
candidates=candidates,
normalized_full_text=query.normalized.normalized_full_text,
reason=candidates[0].reason,
)
candidate_ids = self._recall_candidates(query)
ranked = sorted(
(self._rank_candidate(query, self._records[record_id]) for record_id in candidate_ids),
key=lambda item: (item.decision == DuplicateDecision.DUPLICATE, item.confidence, item.jaccard),
reverse=True,
)[:max_candidates]
duplicate = next((candidate for candidate in ranked if candidate.decision == DuplicateDecision.DUPLICATE), None)
if duplicate is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.DUPLICATE,
confidence=duplicate.confidence,
candidates=tuple(ranked),
normalized_full_text=query.normalized.normalized_full_text,
reason=duplicate.reason,
)
review = next((candidate for candidate in ranked if candidate.decision == DuplicateDecision.REVIEW), None)
if review is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.REVIEW,
confidence=review.confidence,
candidates=tuple(ranked),
normalized_full_text=query.normalized.normalized_full_text,
reason=review.reason,
)
return DuplicateCheckResult(
decision=DuplicateDecision.NEW,
confidence=1.0 - (ranked[0].confidence if ranked else 0.0),
candidates=tuple(ranked),
normalized_full_text=query.normalized.normalized_full_text,
reason="精确匹配、近重复召回和字面重合信号都较低",
)
def _index(self, record: LyricRecord) -> _IndexedRecord:
normalized = normalize_lyrics(record.lyrics)
tokens = lyric_tokens(normalized)
primary_tokens = lyric_tokens(normalized, lines=normalized.primary_lines)
translation_tokens = lyric_tokens(normalized, lines=normalized.translation_lines)
fallback_lines = tuple(_fallback_no_lyrics_lines(record.lyrics))
fallback_tokens = set(fallback_lines)
signature = self._lsh.signature(primary_tokens or tokens or fallback_tokens)
exact_hash = hashlib.sha256(_exact_fingerprint(normalized, fallback_lines).encode("utf-8")).hexdigest()
return _IndexedRecord(
record=record,
normalized=normalized,
exact_hash=exact_hash,
tokens=tokens,
primary_tokens=primary_tokens,
translation_tokens=translation_tokens,
fallback_lines=fallback_lines,
fallback_tokens=fallback_tokens,
signature=signature,
)
def _recall_candidates(self, query: _IndexedRecord) -> set[str]:
candidate_ids = self._lsh.query(query.signature)
for line in query.normalized.primary_lines:
if len(line) >= 4:
candidate_ids.update(self._line_to_ids.get(line, set()))
for line in query.normalized.translation_lines:
if len(line) >= 4:
candidate_ids.update(self._line_to_ids.get(line, set()))
for token in query.primary_tokens or query.tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
for token in query.translation_tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
for token in query.fallback_tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
return candidate_ids
def _rank_exact_candidate(self, query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
low_confidence_split = (
query.normalized.split_confidence == "low" or candidate.normalized.split_confidence == "low"
)
translation_jaccard = _jaccard(query.translation_tokens, candidate.translation_tokens)
translation_coverage, _ = _line_coverage_lines(
query.normalized.translation_lines,
candidate.normalized.translation_lines,
)
no_effective_lyrics = not query.normalized.primary_lines and not candidate.normalized.primary_lines
if no_effective_lyrics:
decision = DuplicateDecision.DUPLICATE
confidence = 1.0
reason = "无有效歌词,使用文件内容兜底指纹命中"
elif low_confidence_split:
decision = DuplicateDecision.REVIEW
confidence = 0.95
reason = "原文哈希一致,但疑似整段翻译结构拆分置信度较低,需要人工复核"
elif query.normalized.translation_lines or candidate.normalized.translation_lines:
decision = DuplicateDecision.DUPLICATE
confidence = 1.0
reason = "规范化后的原文歌词哈希完全一致,翻译行未参与自动判重"
else:
decision = DuplicateDecision.DUPLICATE
confidence = 1.0
reason = "规范化后的原文歌词哈希完全一致"
return CandidateMatch(
record_id=candidate.record.record_id,
decision=decision,
confidence=confidence,
jaccard=1.0,
line_coverage=1.0,
primary_jaccard=1.0,
primary_line_coverage=1.0,
translation_jaccard=round(translation_jaccard, 4),
translation_line_coverage=round(translation_coverage, 4),
matched_unique_lines=query.normalized.primary_lines,
reason=reason,
)
def _rank_candidate(self, query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
if not query.normalized.primary_lines or not candidate.normalized.primary_lines:
return _rank_no_effective_lyrics_candidate(query, candidate)
jaccard = _jaccard(query.tokens, candidate.tokens)
coverage, matched_lines = _line_coverage(query.normalized, candidate.normalized)
primary_jaccard = _jaccard(query.primary_tokens, candidate.primary_tokens)
primary_coverage, primary_matched_lines = _line_coverage_lines(
query.normalized.primary_lines,
candidate.normalized.primary_lines,
)
translation_jaccard = _jaccard(query.translation_tokens, candidate.translation_tokens)
translation_coverage, translation_matched_lines = _line_coverage_lines(
query.normalized.translation_lines,
candidate.normalized.translation_lines,
)
chorus_only = _is_chorus_only_match(query.normalized, candidate.normalized, primary_matched_lines)
translation_only = (
bool(translation_matched_lines)
and primary_jaccard < self.review_jaccard_threshold
and primary_coverage < self.review_line_coverage_threshold
and (translation_jaccard >= self.review_jaccard_threshold or translation_coverage >= self.review_line_coverage_threshold)
)
low_confidence_split = (
query.normalized.split_confidence == "low" or candidate.normalized.split_confidence == "low"
)
confidence = round((0.58 * primary_jaccard) + (0.42 * primary_coverage), 4)
if (
(primary_jaccard >= self.duplicate_jaccard_threshold or (primary_jaccard >= 0.78 and primary_coverage >= 0.9))
and primary_coverage >= self.duplicate_line_coverage_threshold
and not chorus_only
and not translation_only
and not low_confidence_split
):
decision = DuplicateDecision.DUPLICATE
if query.normalized.translation_lines or candidate.normalized.translation_lines:
reason = "原文歌词高度一致,翻译行未参与自动判重"
else:
reason = "原文 n-gram 字面相似度高,且行级覆盖范围广"
elif (
chorus_only
or translation_only
or low_confidence_split
or primary_jaccard >= self.review_jaccard_threshold
or primary_coverage >= self.review_line_coverage_threshold
or jaccard >= self.review_jaccard_threshold
or coverage >= self.review_line_coverage_threshold
):
decision = DuplicateDecision.REVIEW
reason = "候选相似度达到复核阈值,需要人工确认"
if chorus_only:
reason = "重合内容主要集中在重复副歌行,不自动判重"
elif translation_only:
reason = "仅翻译行相似,原文字面重合不足,不自动判重"
elif low_confidence_split:
reason = "疑似整段翻译结构但拆分置信度较低,需要人工复核"
else:
decision = DuplicateDecision.NEW
reason = "候选重合度低于复核阈值"
return CandidateMatch(
record_id=candidate.record.record_id,
decision=decision,
confidence=confidence,
jaccard=round(jaccard, 4),
line_coverage=round(coverage, 4),
primary_jaccard=round(primary_jaccard, 4),
primary_line_coverage=round(primary_coverage, 4),
translation_jaccard=round(translation_jaccard, 4),
translation_line_coverage=round(translation_coverage, 4),
matched_unique_lines=tuple(matched_lines),
reason=reason,
)
def _rank_no_effective_lyrics_candidate(query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
fallback_jaccard = _jaccard(query.fallback_tokens, candidate.fallback_tokens)
fallback_coverage, matched_lines = _line_coverage_lines(query.fallback_lines, candidate.fallback_lines)
if fallback_jaccard >= 0.35 and fallback_coverage >= 0.35 and len(matched_lines) >= 2:
return CandidateMatch(
record_id=candidate.record.record_id,
decision=DuplicateDecision.DUPLICATE,
confidence=round((0.58 * fallback_jaccard) + (0.42 * fallback_coverage), 4),
jaccard=round(fallback_jaccard, 4),
line_coverage=round(fallback_coverage, 4),
primary_jaccard=0.0,
primary_line_coverage=0.0,
translation_jaccard=0.0,
translation_line_coverage=0.0,
matched_unique_lines=tuple(matched_lines),
reason="无有效歌词,文件内容兜底特征高度相似",
)
if fallback_jaccard >= 0.2 or fallback_coverage >= 0.2:
return CandidateMatch(
record_id=candidate.record.record_id,
decision=DuplicateDecision.REVIEW,
confidence=round((0.58 * fallback_jaccard) + (0.42 * fallback_coverage), 4),
jaccard=round(fallback_jaccard, 4),
line_coverage=round(fallback_coverage, 4),
primary_jaccard=0.0,
primary_line_coverage=0.0,
translation_jaccard=0.0,
translation_line_coverage=0.0,
matched_unique_lines=tuple(matched_lines),
reason="无有效歌词,文件内容兜底特征部分相似,需要人工复核",
)
return CandidateMatch(
record_id=candidate.record.record_id,
decision=DuplicateDecision.NEW,
confidence=0.0,
jaccard=round(fallback_jaccard, 4),
line_coverage=round(fallback_coverage, 4),
primary_jaccard=0.0,
primary_line_coverage=0.0,
translation_jaccard=0.0,
translation_line_coverage=0.0,
matched_unique_lines=(),
reason="无有效歌词,且文件内容兜底特征未命中",
)
def _jaccard(left: set[str], right: set[str]) -> float:
if not left and not right:
return 1.0
if not left or not right:
return 0.0
return len(left & right) / len(left | right)
def _exact_fingerprint(normalized: NormalizedLyrics, fallback_lines: tuple[str, ...]) -> str:
primary_text = fingerprint_text(normalized)
if primary_text:
return f"lyrics|{primary_text}"
return "no_effective_lyrics_content|" + "\n".join(fallback_lines)
def _fallback_no_lyrics_lines(text: str) -> list[str]:
import re
import unicodedata
lines: list[str] = []
for raw_line in unicodedata.normalize("NFKC", text).splitlines():
line = raw_line.strip().lower()
line = re.sub(r"\[(?:\d{1,2}:)?\d{1,2}:\d{2}(?:[.:]\d{1,3})?\]", "", line)
line = re.sub(r"[【\[].{0,80}?[】\]]", "", line)
if "歌词来自" in line or "qq音乐" in line or "网易云" in line or "酷狗" in line:
continue
if "未经" in line or "不得翻唱" in line or "不得翻录" in line or "著作权" in line:
continue
punctuation = ",。!?;:、“”‘’·…—~!¥()【】《》〈〉「」『』﹏,.;:!?()[]{}<>|/\\_-"
line = "".join(" " if char in punctuation else char for char in line)
line = re.sub(r"\s+", " ", line).strip()
if line:
lines.append(line)
return list(dict.fromkeys(lines))
def _line_coverage(left: NormalizedLyrics, right: NormalizedLyrics) -> tuple[float, list[str]]:
return _line_coverage_lines(left.unique_lines, right.unique_lines)
def _line_coverage_lines(left: tuple[str, ...], right: tuple[str, ...]) -> tuple[float, list[str]]:
left_lines = set(left)
right_lines = set(right)
if not left_lines and not right_lines:
return 1.0, []
if not left_lines or not right_lines:
return 0.0, []
matched = sorted(left_lines & right_lines)
return len(matched) / max(len(left_lines), len(right_lines)), matched
def _is_chorus_only_match(left: NormalizedLyrics, right: NormalizedLyrics, matched_lines: list[str]) -> bool:
if not matched_lines:
return False
matched = set(matched_lines)
repeated_matches = [
line
for line in matched
if left.line_counts.get(line, 0) >= 2 or right.line_counts.get(line, 0) >= 2
]
if len(matched) <= 2 and repeated_matches:
return True
if repeated_matches and len(repeated_matches) / len(matched) >= 0.8:
matched_ratio_left = sum(left.line_counts.get(line, 0) for line in matched) / max(left.content_line_count, 1)
matched_ratio_right = sum(right.line_counts.get(line, 0) for line in matched) / max(right.content_line_count, 1)
return min(matched_ratio_left, matched_ratio_right) < 0.7
return False
"""Command line tools for lyric duplicate checking."""
from __future__ import annotations
import argparse
import csv
import json
from pathlib import Path
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import LyricRecord
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import record_from_file
from lyric_dedup.file_import import records_from_dir
def main() -> None:
parser = argparse.ArgumentParser(prog="lyric-dedup")
subparsers = parser.add_subparsers(dest="command", required=True)
build = subparsers.add_parser("build-index", help="build an index from .lrc/.txt files")
build.add_argument("--lyrics-dir", required=True)
build.add_argument("--index", required=True)
check = subparsers.add_parser("check-file", help="check one .lrc/.txt file against an index")
check.add_argument("--index", required=True)
check.add_argument("--file", required=True)
check.add_argument("--max-candidates", type=int, default=10)
batch = subparsers.add_parser("batch-check", help="check a directory of .lrc/.txt files against an index")
batch.add_argument("--index", required=True)
batch.add_argument("--lyrics-dir", required=True)
batch.add_argument("--out", required=True)
batch.add_argument("--max-candidates", type=int, default=5)
evaluate = subparsers.add_parser("evaluate-csv", help="evaluate labeled duplicate samples from a CSV file")
evaluate.add_argument("--index", required=True)
evaluate.add_argument("--csv", required=True)
evaluate.add_argument("--out", required=True)
evaluate.add_argument("--base-dir", default="")
evaluate.add_argument("--positive-decisions", default="duplicate")
evaluate.add_argument("--max-candidates", type=int, default=5)
generate = subparsers.add_parser("generate-eval-set", help="generate labeled eval samples from a lyric library")
generate.add_argument("--library-dir", required=True)
generate.add_argument("--lyrics-dir", required=True)
generate.add_argument("--csv", required=True)
generate.add_argument("--size", type=int, default=100)
generate.add_argument("--positive-ratio", type=float, default=0.6)
generate.add_argument("--seed", type=int, default=20260602)
args = parser.parse_args()
if args.command == "build-index":
build_index(Path(args.lyrics_dir), Path(args.index))
elif args.command == "check-file":
check_file(Path(args.index), Path(args.file), args.max_candidates)
elif args.command == "batch-check":
batch_check(Path(args.index), Path(args.lyrics_dir), Path(args.out), args.max_candidates)
elif args.command == "evaluate-csv":
evaluate_csv(
Path(args.index),
Path(args.csv),
Path(args.out),
base_dir=Path(args.base_dir) if args.base_dir else None,
positive_decisions={item.strip() for item in args.positive_decisions.split(",") if item.strip()},
max_candidates=args.max_candidates,
)
elif args.command == "generate-eval-set":
summary = generate_eval_set(
library_dir=Path(args.library_dir),
output_dir=Path(args.lyrics_dir),
csv_path=Path(args.csv),
size=args.size,
positive_ratio=args.positive_ratio,
seed=args.seed,
)
print(json.dumps(summary, ensure_ascii=False))
def build_index(lyrics_dir: Path, index_path: Path) -> None:
checker = DuplicateChecker()
records = records_from_dir(lyrics_dir)
for record in records:
checker.add_record(record)
index_path.parent.mkdir(parents=True, exist_ok=True)
checker.save(index_path)
print(json.dumps({"indexed": checker.record_count, "index": str(index_path)}, ensure_ascii=False))
def check_file(index_path: Path, file_path: Path, max_candidates: int) -> None:
checker = DuplicateChecker.load(index_path)
record = record_from_file(file_path)
result = checker.check_record(record, max_candidates=max_candidates)
print(json.dumps(_result_to_dict(result, source=str(file_path)), ensure_ascii=False, indent=2))
def batch_check(index_path: Path, lyrics_dir: Path, out_path: Path, max_candidates: int) -> None:
checker = DuplicateChecker.load(index_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
rows: list[dict[str, object]] = []
for path in iter_lyric_files(lyrics_dir):
record = record_from_file(path, base_dir=lyrics_dir)
result = checker.check_record(record, max_candidates=max_candidates)
best = result.candidates[0] if result.candidates else None
rows.append(
{
"source": str(path),
"record_id": record.record_id,
"decision": result.decision.value,
"confidence": result.confidence,
"reason": result.reason,
"best_candidate_id": best.record_id if best else "",
"best_candidate_decision": best.decision.value if best else "",
"best_candidate_confidence": best.confidence if best else "",
"best_candidate_jaccard": best.jaccard if best else "",
"best_candidate_line_coverage": best.line_coverage if best else "",
"best_candidate_primary_jaccard": best.primary_jaccard if best else "",
"best_candidate_primary_line_coverage": best.primary_line_coverage if best else "",
"best_candidate_translation_jaccard": best.translation_jaccard if best else "",
"best_candidate_translation_line_coverage": best.translation_line_coverage if best else "",
"best_candidate_reason": best.reason if best else "",
"matched_unique_lines": " | ".join(best.matched_unique_lines) if best else "",
}
)
if out_path.suffix.lower() == ".jsonl":
with out_path.open("w", encoding="utf-8") as file:
for row in rows:
file.write(json.dumps(row, ensure_ascii=False) + "\n")
else:
with out_path.open("w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=list(rows[0].keys()) if rows else ["source"])
writer.writeheader()
writer.writerows(rows)
summary = {
"checked": len(rows),
"duplicate": sum(1 for row in rows if row["decision"] == "duplicate"),
"review": sum(1 for row in rows if row["decision"] == "review"),
"new": sum(1 for row in rows if row["decision"] == "new"),
"out": str(out_path),
}
print(json.dumps(summary, ensure_ascii=False))
def evaluate_csv(
index_path: Path,
csv_path: Path,
out_path: Path,
*,
base_dir: Path | None,
positive_decisions: set[str],
max_candidates: int,
) -> None:
checker = DuplicateChecker.load(index_path)
rows: list[dict[str, object]] = []
with csv_path.open(encoding="utf-8-sig", newline="") as file:
reader = csv.DictReader(file)
if reader.fieldnames is None:
raise ValueError("评估 CSV 需要表头")
for row_number, row in enumerate(reader, start=2):
sample_id = row.get("id") or row.get("sample_id") or str(row_number)
record, source = _record_from_eval_row(row, csv_path=csv_path, base_dir=base_dir)
expected_duplicate = _parse_expected(row.get("expected") or row.get("label") or row.get("target"))
result = checker.check_record(record, max_candidates=max_candidates)
predicted_duplicate = result.decision.value in positive_decisions
best = result.candidates[0] if result.candidates else None
rows.append(
{
"id": sample_id,
"source": source,
"expected_duplicate": expected_duplicate,
"decision": result.decision.value,
"predicted_duplicate": predicted_duplicate,
"correct": expected_duplicate == predicted_duplicate,
"confidence": result.confidence,
"reason": result.reason,
"best_candidate_id": best.record_id if best else "",
"best_candidate_decision": best.decision.value if best else "",
"best_candidate_confidence": best.confidence if best else "",
"best_candidate_jaccard": best.jaccard if best else "",
"best_candidate_line_coverage": best.line_coverage if best else "",
"best_candidate_primary_jaccard": best.primary_jaccard if best else "",
"best_candidate_primary_line_coverage": best.primary_line_coverage if best else "",
"best_candidate_translation_jaccard": best.translation_jaccard if best else "",
"best_candidate_translation_line_coverage": best.translation_line_coverage if best else "",
"best_candidate_reason": best.reason if best else "",
"matched_unique_lines": " | ".join(best.matched_unique_lines) if best else "",
}
)
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=list(rows[0].keys()) if rows else ["id"])
writer.writeheader()
writer.writerows(rows)
summary = _evaluation_summary(rows, positive_decisions=positive_decisions, out_path=out_path)
summary_path = out_path.with_suffix(out_path.suffix + ".summary.json")
summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(summary, ensure_ascii=False))
def _result_to_dict(result, *, source: str) -> dict[str, object]:
return {
"source": source,
"decision": result.decision.value,
"confidence": result.confidence,
"reason": result.reason,
"candidates": [
{
"record_id": candidate.record_id,
"decision": candidate.decision.value,
"confidence": candidate.confidence,
"jaccard": candidate.jaccard,
"line_coverage": candidate.line_coverage,
"primary_jaccard": candidate.primary_jaccard,
"primary_line_coverage": candidate.primary_line_coverage,
"translation_jaccard": candidate.translation_jaccard,
"translation_line_coverage": candidate.translation_line_coverage,
"reason": candidate.reason,
"matched_unique_lines": list(candidate.matched_unique_lines),
}
for candidate in result.candidates
],
}
def _lyrics_from_eval_row(row: dict[str, str], *, csv_path: Path, base_dir: Path | None) -> tuple[str, str]:
lyrics = (row.get("lyrics") or "").strip()
if lyrics:
return lyrics.replace("\\n", "\n"), "inline"
file_value = (row.get("file") or row.get("path") or row.get("source") or "").strip()
if not file_value:
raise ValueError("评估 CSV 每行需要提供 lyrics,或 file/path/source 文件路径")
file_path = Path(file_value)
if not file_path.is_absolute():
file_path = (base_dir or csv_path.parent) / file_path
return read_lyric_file(file_path), str(file_path)
def _record_from_eval_row(row: dict[str, str], *, csv_path: Path, base_dir: Path | None):
lyrics = (row.get("lyrics") or "").strip()
if lyrics:
return (
LyricRecord(
record_id=row.get("id") or row.get("sample_id") or "__eval__",
lyrics=lyrics.replace("\\n", "\n"),
title=row.get("title") or None,
artist=row.get("artist") or None,
),
"inline",
)
file_value = (row.get("file") or row.get("path") or row.get("source") or "").strip()
if not file_value:
raise ValueError("评估 CSV 每行需要 lyrics,或 file/path/source 文件路径")
file_path = Path(file_value)
if not file_path.is_absolute():
file_path = (base_dir or csv_path.parent) / file_path
record = record_from_file(file_path)
if row.get("title") or row.get("artist"):
record = LyricRecord(
record_id=record.record_id,
lyrics=record.lyrics,
title=row.get("title") or record.title,
artist=row.get("artist") or record.artist,
)
return record, str(file_path)
def _parse_expected(value: str | None) -> bool:
if value is None:
raise ValueError("评估 CSV 每行需要 expected/label/target 列")
normalized = value.strip().lower()
positives = {"1", "true", "yes", "y", "duplicate", "dup", "重复", "应去重", "去重", "是"}
negatives = {"0", "false", "no", "n", "new", "not_duplicate", "non_duplicate", "不重复", "不应去重", "新歌", "否"}
if normalized in positives:
return True
if normalized in negatives:
return False
raise ValueError(f"无法识别 expected 值: {value!r}")
def _evaluation_summary(
rows: list[dict[str, object]],
*,
positive_decisions: set[str],
out_path: Path,
) -> dict[str, object]:
tp = sum(1 for row in rows if row["expected_duplicate"] is True and row["predicted_duplicate"] is True)
fp = sum(1 for row in rows if row["expected_duplicate"] is False and row["predicted_duplicate"] is True)
tn = sum(1 for row in rows if row["expected_duplicate"] is False and row["predicted_duplicate"] is False)
fn = sum(1 for row in rows if row["expected_duplicate"] is True and row["predicted_duplicate"] is False)
total = len(rows)
precision = tp / (tp + fp) if tp + fp else 0.0
recall = tp / (tp + fn) if tp + fn else 0.0
accuracy = (tp + tn) / total if total else 0.0
f1 = (2 * precision * recall / (precision + recall)) if precision + recall else 0.0
return {
"total": total,
"positive_decisions": sorted(positive_decisions),
"accuracy": round(accuracy, 4),
"precision": round(precision, 4),
"recall": round(recall, 4),
"f1": round(f1, 4),
"true_positive": tp,
"false_positive": fp,
"true_negative": tn,
"false_negative": fn,
"duplicate": sum(1 for row in rows if row["decision"] == "duplicate"),
"review": sum(1 for row in rows if row["decision"] == "review"),
"new": sum(1 for row in rows if row["decision"] == "new"),
"out": str(out_path),
"summary": str(out_path.with_suffix(out_path.suffix + ".summary.json")),
}
if __name__ == "__main__":
main()
"""Generate labeled evaluation samples from an existing lyric library."""
from __future__ import annotations
import csv
import random
import re
from dataclasses import dataclass
from pathlib import Path
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import record_from_file
from lyric_dedup.normalization import normalize_lyrics
@dataclass(frozen=True)
class GeneratedSample:
sample_id: str
file: str
expected: str
sample_type: str
source: str
title: str = ""
artist: str = ""
def generate_eval_set(
*,
library_dir: Path,
output_dir: Path,
csv_path: Path,
size: int = 100,
positive_ratio: float = 0.6,
seed: int = 20260602,
) -> dict[str, object]:
rng = random.Random(seed)
source_files = iter_lyric_files(library_dir)
if not source_files:
raise ValueError(f"{library_dir} 下没有 .lrc/.txt 歌词文件")
output_dir.mkdir(parents=True, exist_ok=True)
csv_path.parent.mkdir(parents=True, exist_ok=True)
_clean_generated_output_dir(output_dir)
positives = round(size * positive_ratio)
negatives = size - positives
samples: list[GeneratedSample] = []
for index in range(positives):
source = source_files[index % len(source_files)]
samples.append(_positive_sample(index + 1, source, output_dir, csv_path.parent, rng))
for index in range(negatives):
left = source_files[index % len(source_files)]
right = source_files[(index + 1) % len(source_files)]
samples.append(_negative_sample(positives + index + 1, left, right, output_dir, csv_path.parent, rng))
rng.shuffle(samples)
with csv_path.open("w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=["id", "file", "expected", "sample_type", "source", "title", "artist"])
writer.writeheader()
writer.writerows(
{
"id": sample.sample_id,
"file": sample.file,
"expected": sample.expected,
"sample_type": sample.sample_type,
"source": sample.source,
"title": sample.title,
"artist": sample.artist,
}
for sample in samples
)
return {
"size": size,
"positive": positives,
"negative": negatives,
"library_files": len(source_files),
"lyrics_dir": str(output_dir),
"csv": str(csv_path),
}
def _positive_sample(index: int, source: Path, output_dir: Path, csv_base: Path, rng: random.Random) -> GeneratedSample:
raw = read_lyric_file(source)
source_record = record_from_file(source)
variants = [
("exact_copy", raw),
("timestamped", _add_timestamps(_content_lines(raw))),
("punctuation_noise", _add_punctuation_noise(_content_lines(raw), rng)),
("with_platform_noise", _with_platform_noise(_content_lines(raw))),
("blank_line_noise", _add_blank_line_noise(_content_lines(raw))),
("lrc_with_platform_noise", _add_timestamps(_content_lines(_with_platform_noise(_content_lines(raw))))),
("translation_added", _translation_added(_content_lines(raw))),
]
sample_type, text = variants[(index - 1) % len(variants)]
name = f"pos_{index:03d}_{sample_type}.txt"
path = output_dir / name
path.write_text(text, encoding="utf-8")
return GeneratedSample(
sample_id=f"pos-{index:03d}",
file=str(path.relative_to(csv_base)),
expected="应去重",
sample_type=sample_type,
source=str(source),
title=source_record.title or "",
artist=source_record.artist or "",
)
def _negative_sample(index: int, left: Path, right: Path, output_dir: Path, csv_base: Path, rng: random.Random) -> GeneratedSample:
left_lines = _normalized_lines(left)
right_lines = _normalized_lines(right)
variants = [
("single_song_fragment", _single_song_fragment(left_lines)),
("short_shared_snippet", _short_shared_snippet(left_lines, rng)),
("mixed_fragments", _mixed_fragments(left_lines, right_lines, rng)),
("same_theme_synthetic", _same_theme_synthetic(index)),
("translation_only_like", _translation_only_like(left_lines)),
]
sample_type, text = variants[(index - 1) % len(variants)]
name = f"neg_{index:03d}_{sample_type}.txt"
path = output_dir / name
path.write_text(text, encoding="utf-8")
return GeneratedSample(
sample_id=f"neg-{index:03d}",
file=str(path.relative_to(csv_base)),
expected="不应去重",
sample_type=sample_type,
source=f"{left} | {right}",
)
def _content_lines(text: str) -> list[str]:
lines = [line.strip() for line in text.splitlines() if line.strip()]
return lines or [text.strip()]
def _clean_generated_output_dir(output_dir: Path) -> None:
for path in output_dir.iterdir():
if path.is_file() and path.suffix.lower() in {".txt", ".lrc"}:
path.unlink()
def _normalized_lines(path: Path) -> list[str]:
normalized = normalize_lyrics(read_lyric_file(path))
return list(normalized.primary_lines or normalized.unique_lines)
def _add_timestamps(lines: list[str]) -> str:
return "\n".join(f"[00:{idx % 60:02d}.00]{line}" for idx, line in enumerate(lines, start=1))
def _add_punctuation_noise(lines: list[str], rng: random.Random) -> str:
marks = ["!", "?", "...", ",", "。"]
return "\n".join(f"{line}{rng.choice(marks)}" for line in lines)
def _with_platform_noise(lines: list[str]) -> str:
return "\n".join(["歌词来自QQ音乐", "作词:测试", *lines, "未经著作权人许可 不得翻唱"])
def _add_blank_line_noise(lines: list[str]) -> str:
result: list[str] = []
for idx, line in enumerate(lines, start=1):
result.append(line)
if idx % 4 == 0:
result.append("")
return "\n".join(result)
def _translation_added(lines: list[str]) -> str:
result: list[str] = []
for idx, line in enumerate(lines, start=1):
result.append(line)
if _looks_foreign(line) and idx <= 24:
result.append(_pseudo_translation(idx))
return "\n".join(result)
def _single_song_fragment(lines: list[str]) -> str:
if len(lines) <= 4:
return "\n".join(lines[: max(1, len(lines) // 2)])
fragment_len = max(2, min(8, len(lines) // 4))
start = max(0, (len(lines) - fragment_len) // 2)
return "\n".join(lines[start : start + fragment_len])
def _short_shared_snippet(lines: list[str], rng: random.Random) -> str:
snippet = rng.sample(lines, k=min(2, len(lines))) if lines else []
synthetic = [
"清晨的风吹过新的街口",
"我把昨天放进安静的口袋",
*snippet,
"故事从这里重新开始",
"灯光落下我继续往前走",
]
return "\n".join(synthetic)
def _mixed_fragments(left_lines: list[str], right_lines: list[str], rng: random.Random) -> str:
left_pick = rng.sample(left_lines, k=min(2, len(left_lines))) if left_lines else []
right_pick = rng.sample(right_lines, k=min(2, len(right_lines))) if right_lines else []
filler = ["新的旋律慢慢靠近", "陌生的名字写在风里", "没有人停在原地"]
return "\n".join([*left_pick, *filler, *right_pick])
def _same_theme_synthetic(index: int) -> str:
themes = [
"我在夜里想起远方的你",
"城市灯火陪我走过雨季",
"那些没说完的话留在风里",
"明天醒来我们各自继续",
f"这是第 {index} 个全新测试样本",
]
return "\n".join(themes)
def _translation_only_like(lines: list[str]) -> str:
foreign_count = sum(1 for line in lines if _looks_foreign(line))
if foreign_count < 2:
return _same_theme_synthetic(foreign_count + len(lines))
return "\n".join(_pseudo_translation(idx) for idx in range(1, min(8, foreign_count) + 1))
def _pseudo_translation(index: int) -> str:
translations = [
"今晚我仍然想念你",
"风会带走所有疲惫",
"黑暗里也会有光",
"别让昨天困住自己",
"我们终会继续向前",
"雨停以后天空会亮",
"把遗憾留在旧时光",
"你已经足够好了",
]
return translations[(index - 1) % len(translations)]
def _looks_foreign(line: str) -> bool:
latin = len(re.findall(r"[A-Za-z]", line))
cjk = len(re.findall(r"[\u4e00-\u9fff]", line))
return latin > 0 and cjk == 0
"""Import LRC/TXT lyric files into records."""
from __future__ import annotations
import hashlib
from pathlib import Path
from lyric_dedup.checker import LyricRecord
SUPPORTED_SUFFIXES = {".lrc", ".txt"}
def iter_lyric_files(root: str | Path) -> list[Path]:
base = Path(root)
return sorted(
path
for path in base.rglob("*")
if path.is_file() and path.suffix.lower() in SUPPORTED_SUFFIXES
)
def read_lyric_file(path: str | Path) -> str:
file_path = Path(path)
data = file_path.read_bytes()
for encoding in ("utf-8-sig", "utf-8", "gb18030", "big5"):
try:
return data.decode(encoding)
except UnicodeDecodeError:
continue
return data.decode("utf-8", errors="replace")
def record_from_file(path: str | Path, *, base_dir: str | Path | None = None) -> LyricRecord:
file_path = Path(path)
lyrics = read_lyric_file(file_path)
title, artist = _metadata_from_name(file_path.stem)
record_id = _record_id(file_path, base_dir)
return LyricRecord(record_id=record_id, lyrics=lyrics, title=title, artist=artist)
def records_from_dir(root: str | Path) -> list[LyricRecord]:
return [record_from_file(path, base_dir=root) for path in iter_lyric_files(root)]
def _record_id(path: Path, base_dir: str | Path | None) -> str:
if base_dir is None:
source = str(path.resolve())
else:
source = str(path.resolve().relative_to(Path(base_dir).resolve()))
digest = hashlib.sha1(source.encode("utf-8")).hexdigest()[:12]
return f"{digest}:{source}"
def _metadata_from_name(stem: str) -> tuple[str | None, str | None]:
cleaned = stem.removesuffix("-歌词").removesuffix("_歌词").removesuffix(" 歌词").strip()
if " - " in cleaned:
artist, title = cleaned.split(" - ", 1)
return title.strip() or None, artist.strip() or None
for sep in ("-", "_"):
if sep in cleaned:
title, artist = cleaned.rsplit(sep, 1)
return title.strip() or None, artist.strip() or None
return stem.strip() or None, None
"""Small in-memory MinHash LSH index for incremental lyric lookup."""
from __future__ import annotations
import hashlib
from collections import defaultdict
from dataclasses import dataclass
_MAX_HASH = (1 << 64) - 1
@dataclass(frozen=True)
class MinHashConfig:
num_perm: int = 96
bands: int = 24
seed: int = 17
@property
def rows_per_band(self) -> int:
if self.num_perm % self.bands != 0:
raise ValueError("num_perm must be divisible by bands")
return self.num_perm // self.bands
class MinHashLSH:
def __init__(self, config: MinHashConfig | None = None) -> None:
self.config = config or MinHashConfig()
self._buckets: dict[tuple[int, tuple[int, ...]], set[str]] = defaultdict(set)
def signature(self, tokens: set[str]) -> tuple[int, ...]:
if not tokens:
return tuple([_MAX_HASH] * self.config.num_perm)
signature = [_MAX_HASH] * self.config.num_perm
for token in tokens:
encoded = token.encode("utf-8")
for idx in range(self.config.num_perm):
digest = hashlib.blake2b(
encoded,
digest_size=8,
person=f"lyr{self.config.seed + idx:05d}".encode("ascii")[:16],
).digest()
value = int.from_bytes(digest, "big")
if value < signature[idx]:
signature[idx] = value
return tuple(signature)
def add(self, record_id: str, signature: tuple[int, ...]) -> None:
for key in self._band_keys(signature):
self._buckets[key].add(record_id)
def query(self, signature: tuple[int, ...]) -> set[str]:
candidates: set[str] = set()
for key in self._band_keys(signature):
candidates.update(self._buckets.get(key, set()))
return candidates
def _band_keys(self, signature: tuple[int, ...]) -> list[tuple[int, tuple[int, ...]]]:
rows = self.config.rows_per_band
return [(band, signature[band * rows : (band + 1) * rows]) for band in range(self.config.bands)]
"""Lyric-specific normalization and feature extraction."""
from __future__ import annotations
import re
import string
import unicodedata
from collections import Counter
from dataclasses import dataclass
_TRADITIONAL_TO_SIMPLIFIED = str.maketrans(
{
"愛": "爱",
"會": "会",
"個": "个",
"妳": "你",
"們": "们",
"麼": "么",
"夢": "梦",
"憶": "忆",
"風": "风",
"無": "无",
"與": "与",
"聽": "听",
"說": "说",
"見": "见",
"話": "话",
"還": "还",
"這": "这",
"那": "那",
"裡": "里",
"裏": "里",
"過": "过",
"來": "来",
"進": "进",
"去": "去",
"給": "给",
"讓": "让",
"嗎": "吗",
"為": "为",
"誰": "谁",
"對": "对",
"錯": "错",
"淚": "泪",
"寫": "写",
"雲": "云",
"藍": "蓝",
"紅": "红",
"綠": "绿",
"黃": "黄",
"長": "长",
"遠": "远",
"燈": "灯",
"臺": "台",
"台": "台",
"後": "后",
"從": "从",
"時": "时",
"間": "间",
"葉": "叶",
"歲": "岁",
"聲": "声",
"邊": "边",
"歡": "欢",
"繼": "继",
"續": "续",
"難": "难",
"雙": "双",
"舊": "旧",
"離": "离",
}
)
_TIMESTAMP_RE = re.compile(r"\[((?:\d{1,2}:)?\d{1,2}:\d{2}(?:[.:]\d{1,3})?)\]")
_BRACKET_RE = re.compile(r"[\[((【<《].{0,40}?[\]))】>》]")
_ROLE_PREFIX_RE = re.compile(r"^\s*(?:男|女|合|主歌|副歌|verse|chorus|bridge|rap)\s*[::]\s*", re.IGNORECASE)
_CREDIT_PREFIX_RE = re.compile(
r"^\s*(?:作词|作詞|作曲|编曲|編曲|制作|製作|监制|監製|录音|錄音|混音|母带|"
r"出品|发行|發行|歌词|歌詞|lyric(?:s)?|composer|writer|producer|arranger|"
r"copyright|未经|未經|qq音乐|酷狗|网易云|網易雲|lrc)",
re.IGNORECASE,
)
_WATERMARK_RE = re.compile(
r"(?:qq音乐|酷狗音乐|网易云音乐|網易雲音樂|虾米音乐|歌词网|歌詞網|"
r"music\.163\.com|www\.|http[s]?://|\blrc\b)",
re.IGNORECASE,
)
_CJK_RE = re.compile(r"[\u4e00-\u9fff]")
_LATIN_RE = re.compile(r"[a-zA-Z]")
_KANA_RE = re.compile(r"[\u3040-\u30ff]")
_HANGUL_RE = re.compile(r"[\uac00-\ud7af]")
_WORD_RE = re.compile(r"[a-z0-9]+|[\u4e00-\u9fff]", re.IGNORECASE)
_INLINE_SPLIT_RE = re.compile(r"\s+(?:/|\|||)\s+|(?<=[A-Za-z])\s*[-—]\s*(?=[\u4e00-\u9fff])")
@dataclass(frozen=True)
class _LineEntry:
text: str
timestamp: str | None
language: str
source_index: int
@dataclass(frozen=True)
class NormalizedLyrics:
raw_text: str
normalized_full_text: str
normalized_lines: tuple[str, ...]
unique_lines: tuple[str, ...]
line_counts: dict[str, int]
content_line_count: int
primary_lines: tuple[str, ...]
translation_lines: tuple[str, ...]
unknown_lines: tuple[str, ...]
line_roles: tuple[str, ...]
split_confidence: str
split_reason: str
def normalize_lyrics(text: str) -> NormalizedLyrics:
"""Normalize lyrics while preserving line-level structure for ranking."""
entries: list[_LineEntry] = []
for index, raw_line in enumerate(unicodedata.normalize("NFKC", text).splitlines()):
entries.extend(_clean_line_entries(raw_line, index))
cleaned_lines = [entry.text for entry in entries]
roles, confidence, reason = _assign_line_roles(entries)
primary_lines = tuple(entry.text for entry, role in zip(entries, roles, strict=False) if role == "primary")
translation_lines = tuple(entry.text for entry, role in zip(entries, roles, strict=False) if role == "translation")
unknown_lines = tuple(entry.text for entry, role in zip(entries, roles, strict=False) if role == "unknown")
if not primary_lines:
primary_lines = tuple(cleaned_lines)
roles = tuple("primary" for _ in cleaned_lines)
if cleaned_lines and confidence == "none":
reason = "未检测到可分离的翻译结构,全部有效行按原文处理"
counts = Counter(cleaned_lines)
unique_lines = tuple(dict.fromkeys(cleaned_lines))
return NormalizedLyrics(
raw_text=text,
normalized_full_text="\n".join(cleaned_lines),
normalized_lines=tuple(cleaned_lines),
unique_lines=unique_lines,
line_counts=dict(counts),
content_line_count=len(cleaned_lines),
primary_lines=tuple(dict.fromkeys(primary_lines)),
translation_lines=tuple(dict.fromkeys(translation_lines)),
unknown_lines=tuple(dict.fromkeys(unknown_lines)),
line_roles=tuple(roles),
split_confidence=confidence,
split_reason=reason,
)
def fingerprint_text(normalized: NormalizedLyrics) -> str:
"""Return a text form suitable for exact hashing.
Repeated adjacent or non-adjacent lyric lines are collapsed so different chorus
repeat counts do not prevent exact duplicate detection.
"""
return "\n".join(normalized.primary_lines or normalized.unique_lines)
def lyric_tokens(
normalized: NormalizedLyrics,
ngram_size: int = 3,
*,
lines: tuple[str, ...] | None = None,
) -> set[str]:
"""Build mixed CJK/Latin n-grams with repeated lines down-weighted."""
tokens: set[str] = set()
selected_lines = lines if lines is not None else normalized.unique_lines
for line in selected_lines:
units = _token_units(line)
if len(units) < ngram_size:
if units:
tokens.add(" ".join(units))
continue
for start in range(len(units) - ngram_size + 1):
tokens.add(" ".join(units[start : start + ngram_size]))
return tokens
def _clean_line_entries(raw_line: str, source_index: int) -> list[_LineEntry]:
timestamp_match = _TIMESTAMP_RE.search(raw_line)
timestamp = timestamp_match.group(1) if timestamp_match else None
line = _TIMESTAMP_RE.sub("", raw_line)
line = _ROLE_PREFIX_RE.sub("", line).strip()
inline_entries = _split_inline_translation(line, timestamp, source_index)
if inline_entries:
return inline_entries
return _entry_from_text(line, timestamp, source_index)
def _split_inline_translation(line: str, timestamp: str | None, source_index: int) -> list[_LineEntry]:
parts = [part.strip() for part in _INLINE_SPLIT_RE.split(line, maxsplit=1)]
if len(parts) != 2:
return []
left_entries = _entry_from_text(parts[0], timestamp, source_index)
right_entries = _entry_from_text(parts[1], timestamp, source_index)
if not left_entries or not right_entries:
return []
left_lang = left_entries[0].language
right_lang = right_entries[0].language
if _is_foreign_language(left_lang) and right_lang == "zh":
return [left_entries[0], right_entries[0]]
if left_lang == "zh" and _is_foreign_language(right_lang):
return [right_entries[0], left_entries[0]]
return []
def _entry_from_text(text: str, timestamp: str | None, source_index: int) -> list[_LineEntry]:
line = _BRACKET_RE.sub("", text)
line = line.strip().lower().translate(_TRADITIONAL_TO_SIMPLIFIED)
if not line or _is_noise_line(line):
return []
line = _strip_symbols(line)
if not line:
return []
return [_LineEntry(text=line, timestamp=timestamp, language=_detect_language(line), source_index=source_index)]
def _assign_line_roles(entries: list[_LineEntry]) -> tuple[tuple[str, ...], str, str]:
if not entries:
return (), "none", "没有有效歌词行"
timestamp_roles = _roles_by_same_timestamp(entries)
if timestamp_roles is not None:
return timestamp_roles, "high", "同时间戳下检测到外文行和中文行配对"
inline_roles = _roles_by_inline_translation(entries)
if inline_roles is not None:
return inline_roles, "medium", "同一原始行内检测到明显的外文和中文翻译"
alternating_roles = _roles_by_alternating_translation(entries)
if alternating_roles is not None:
return alternating_roles, "high", "检测到稳定的外文行和中文翻译行交替结构"
block_roles = _roles_by_translation_block(entries)
if block_roles is not None:
return block_roles, "low", "检测到疑似原文段落加中文翻译段落,置信度较低"
return tuple("primary" for _ in entries), "none", "未检测到可分离的翻译结构,全部有效行按原文处理"
def _roles_by_same_timestamp(entries: list[_LineEntry]) -> tuple[str, ...] | None:
roles = ["unknown"] * len(entries)
groups: dict[str, list[int]] = {}
for idx, entry in enumerate(entries):
if entry.timestamp:
groups.setdefault(entry.timestamp, []).append(idx)
paired = 0
for indexes in groups.values():
if len(indexes) < 2:
continue
foreign = [idx for idx in indexes if _is_foreign_language(entries[idx].language)]
chinese = [idx for idx in indexes if entries[idx].language == "zh"]
if not foreign or not chinese:
continue
for idx in foreign:
roles[idx] = "primary"
for idx in chinese:
roles[idx] = "translation"
paired += 1
if paired == 0:
return None
for idx, role in enumerate(roles):
if role == "unknown":
roles[idx] = "primary"
return tuple(roles)
def _roles_by_alternating_translation(entries: list[_LineEntry]) -> tuple[str, ...] | None:
roles = ["unknown"] * len(entries)
pairs = 0
idx = 0
while idx < len(entries) - 1:
current = entries[idx]
nxt = entries[idx + 1]
if _is_foreign_language(current.language) and nxt.language == "zh":
roles[idx] = "primary"
roles[idx + 1] = "translation"
pairs += 1
idx += 2
continue
idx += 1
if pairs < 2:
return None
assigned = sum(1 for role in roles if role != "unknown")
if assigned / len(entries) < 0.65:
return None
for idx, role in enumerate(roles):
if role == "unknown":
roles[idx] = "primary"
return tuple(roles)
def _roles_by_inline_translation(entries: list[_LineEntry]) -> tuple[str, ...] | None:
roles = ["primary"] * len(entries)
pairs = 0
by_source: dict[int, list[int]] = {}
for idx, entry in enumerate(entries):
by_source.setdefault(entry.source_index, []).append(idx)
for indexes in by_source.values():
if len(indexes) != 2:
continue
first, second = indexes
if _is_foreign_language(entries[first].language) and entries[second].language == "zh":
roles[first] = "primary"
roles[second] = "translation"
pairs += 1
elif entries[first].language == "zh" and _is_foreign_language(entries[second].language):
roles[first] = "translation"
roles[second] = "primary"
pairs += 1
return tuple(roles) if pairs else None
def _roles_by_translation_block(entries: list[_LineEntry]) -> tuple[str, ...] | None:
if len(entries) < 4:
return None
midpoint = len(entries) // 2
first = entries[:midpoint]
second = entries[midpoint:]
first_foreign = sum(1 for entry in first if _is_foreign_language(entry.language))
second_zh = sum(1 for entry in second if entry.language == "zh")
if first_foreign / len(first) >= 0.75 and second_zh / len(second) >= 0.75:
return tuple("primary" if idx < midpoint else "translation" for idx in range(len(entries)))
return None
def _detect_language(line: str) -> str:
cjk = len(_CJK_RE.findall(line))
latin = len(_LATIN_RE.findall(line))
kana = len(_KANA_RE.findall(line))
hangul = len(_HANGUL_RE.findall(line))
if hangul:
return "kr"
if kana:
return "jp"
if cjk and latin:
return "mixed"
if cjk:
return "zh"
if latin:
return "latin"
return "other"
def _is_foreign_language(language: str) -> bool:
return language in {"latin", "jp", "kr", "other"}
def _is_noise_line(line: str) -> bool:
if _CREDIT_PREFIX_RE.search(line) or _WATERMARK_RE.search(line):
return True
has_cjk_or_latin = bool(_CJK_RE.search(line) or _LATIN_RE.search(line))
if not has_cjk_or_latin:
return True
compact = _strip_symbols(line)
return len(compact) <= 1
def _strip_symbols(line: str) -> str:
punctuation = string.punctuation + ",。!?;:、“”‘’·…—~!¥()【】《》〈〉「」『』﹏"
line = "".join(" " if char in punctuation else char for char in line)
line = re.sub(r"\s+", " ", line)
line = re.sub(r"(?<=[\u4e00-\u9fff])\s+(?=[\u4e00-\u9fff])", "", line)
return line.strip()
def _token_units(line: str) -> list[str]:
units: list[str] = []
for match in _WORD_RE.finditer(line):
token = match.group(0).lower()
if _CJK_RE.fullmatch(token):
units.append(token)
else:
units.append(token)
return units
"""Process newly added lyric library files.
This script is intended for the recurring workflow after adding files to
``data/library``:
1. Move pure-music placeholder lyric files out of the active library.
2. Rebuild the duplicate-checking index.
3. Optionally regenerate and evaluate a synthetic regression set.
"""
from __future__ import annotations
import argparse
import csv
import json
import shutil
import sys
from datetime import datetime
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.cli import evaluate_csv
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import records_from_dir
from lyric_dedup.normalization import normalize_lyrics
PLACEHOLDER_MARKERS = (
"【曲库专用】",
"此歌曲为没有填词的纯音乐",
)
def main() -> None:
parser = argparse.ArgumentParser(description="Process lyric library additions.")
parser.add_argument("--library-dir", default="data/library")
parser.add_argument("--index", default="outputs/indexes/library_lyrics.pkl")
parser.add_argument("--quarantine-dir", default="data/quarantine/no_lyrics_placeholders")
parser.add_argument("--dry-run", action="store_true", help="Only report placeholder files; do not move or write outputs.")
parser.add_argument("--delete-placeholders", action="store_true", help="Delete matched placeholder files instead of moving them.")
parser.add_argument("--eval-size", type=int, default=0, help="Generate and evaluate this many synthetic samples. 0 disables eval.")
parser.add_argument("--positive-ratio", type=float, default=0.2)
parser.add_argument("--eval-dir", default="data/generated_eval/incoming")
parser.add_argument("--eval-csv", default="data/generated_eval/eval.csv")
parser.add_argument("--eval-out", default="outputs/results/library_eval.csv")
parser.add_argument("--report", default="outputs/results/library_process_report.json")
args = parser.parse_args()
library_dir = Path(args.library_dir)
quarantine_dir = Path(args.quarantine_dir)
report_path = Path(args.report)
files_before = iter_lyric_files(library_dir)
placeholders = _find_placeholder_files(library_dir)
short_effective = _effective_line_report(library_dir)
moved_or_deleted: list[str] = []
if not args.dry_run:
moved_or_deleted = _handle_placeholders(
placeholders,
library_dir=library_dir,
quarantine_dir=quarantine_dir,
delete=args.delete_placeholders,
)
_build_index(library_dir, Path(args.index))
if args.eval_size > 0:
generate_eval_set(
library_dir=library_dir,
output_dir=Path(args.eval_dir),
csv_path=Path(args.eval_csv),
size=args.eval_size,
positive_ratio=args.positive_ratio,
)
evaluate_csv(
Path(args.index),
Path(args.eval_csv),
Path(args.eval_out),
base_dir=Path(args.eval_csv).parent,
positive_decisions={"duplicate"},
max_candidates=5,
)
evaluate_csv(
Path(args.index),
Path(args.eval_csv),
Path(args.eval_out).with_name(Path(args.eval_out).stem + "_review_positive.csv"),
base_dir=Path(args.eval_csv).parent,
positive_decisions={"duplicate", "review"},
max_candidates=5,
)
report = {
"timestamp": datetime.now().isoformat(timespec="seconds"),
"dry_run": args.dry_run,
"library_dir": str(library_dir),
"files_before": len(files_before),
"placeholder_matches": len(placeholders),
"placeholder_files": [str(path) for path in placeholders],
"handled_placeholder_files": moved_or_deleted,
"files_after": len(iter_lyric_files(library_dir)),
"index": str(args.index),
"eval_size": args.eval_size,
"eval_csv": str(args.eval_csv) if args.eval_size > 0 else "",
"eval_out": str(args.eval_out) if args.eval_size > 0 else "",
"short_effective_line_counts": short_effective,
}
print(json.dumps(report, ensure_ascii=False, indent=2))
if not args.dry_run:
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
def _find_placeholder_files(library_dir: Path) -> list[Path]:
matches: list[Path] = []
for path in iter_lyric_files(library_dir):
text = read_lyric_file(path)
if any(marker in text for marker in PLACEHOLDER_MARKERS):
matches.append(path)
return matches
def _handle_placeholders(
placeholders: list[Path],
*,
library_dir: Path,
quarantine_dir: Path,
delete: bool,
) -> list[str]:
handled: list[str] = []
if not placeholders:
return handled
if not delete:
quarantine_dir.mkdir(parents=True, exist_ok=True)
for path in placeholders:
if delete:
path.unlink()
handled.append(f"deleted:{path}")
continue
relative = path.resolve().relative_to(library_dir.resolve())
destination = quarantine_dir / relative
destination.parent.mkdir(parents=True, exist_ok=True)
if destination.exists():
destination = destination.with_name(f"{destination.stem}_{datetime.now().strftime('%Y%m%d%H%M%S')}{destination.suffix}")
shutil.move(str(path), str(destination))
handled.append(f"moved:{path}->{destination}")
return handled
def _build_index(library_dir: Path, index_path: Path) -> None:
checker = DuplicateChecker()
for record in records_from_dir(library_dir):
checker.add_record(record)
index_path.parent.mkdir(parents=True, exist_ok=True)
checker.save(index_path)
def _effective_line_report(library_dir: Path) -> dict[str, int]:
buckets = {
"total": 0,
"zero_effective_lines": 0,
"one_to_three_effective_lines": 0,
"four_to_five_effective_lines": 0,
"six_plus_effective_lines": 0,
}
for path in iter_lyric_files(library_dir):
buckets["total"] += 1
normalized = normalize_lyrics(read_lyric_file(path))
line_count = len(normalized.primary_lines or normalized.unique_lines)
if line_count == 0:
buckets["zero_effective_lines"] += 1
elif line_count <= 3:
buckets["one_to_three_effective_lines"] += 1
elif line_count <= 5:
buckets["four_to_five_effective_lines"] += 1
else:
buckets["six_plus_effective_lines"] += 1
return buckets
if __name__ == "__main__":
main()
import csv
from lyric_dedup import DuplicateChecker
from lyric_dedup import DuplicateDecision
from lyric_dedup import LyricRecord
from lyric_dedup.cli import evaluate_csv
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import record_from_file
from lyric_dedup.normalization import normalize_lyrics
BASE_LYRIC = """
[00:01.00]作词:Someone
[00:02.00]我爱你在每个夜里
[00:03.00]听风说话也听见你
[00:04.00]城市的灯慢慢亮起
[00:05.00]我把回忆写进歌曲
[00:06.00]啦啦啦 我们不分离
[00:07.00]啦啦啦 我们不分离
[00:08.00]明天还会继续想你
"""
def test_normalization_removes_lyric_noise_and_simplifies() -> None:
normalized = normalize_lyrics("[00:01.20]我愛你!\nQQ音乐 www.example.com\n(副歌)\n聽風說話\n")
assert normalized.normalized_lines == ("我爱你", "听风说话")
assert normalized.normalized_full_text == "我爱你\n听风说话"
assert normalized.primary_lines == ("我爱你", "听风说话")
def test_exact_duplicate_handles_timestamps_punctuation_traditional_and_chorus_counts() -> None:
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
result = checker.check(
"""
我愛你,在每個夜裡!!!
聽風說話,也聽見你
城市的燈慢慢亮起
我把回憶寫進歌曲
啦啦啦 我們不分離
明天還會繼續想你
"""
)
assert result.decision == DuplicateDecision.DUPLICATE
assert result.confidence == 1.0
assert result.candidates[0].record_id == "song-1"
def test_short_shared_repeated_chorus_is_review_not_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
海边的风吹过旧信
你说夏天不会远去
啦啦啦 我们不分离
啦啦啦 我们不分离
转身以后各自旅行
""",
)
)
result = checker.check(
"""
山谷的雨落在清晨
我把名字交给星辰
啦啦啦 我们不分离
啦啦啦 我们不分离
世界安静等一个人
"""
)
assert result.decision == DuplicateDecision.REVIEW
assert result.candidates[0].reason == "重合内容主要集中在重复副歌行,不自动判重"
def test_substantial_line_overlap_is_duplicate_after_lsh_recall() -> None:
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
result = checker.check(
"""
我爱你在每个夜里
听风说话也听见你
城市灯火慢慢亮起
我把回忆写进歌曲
啦啦啦 我们不分离
明天还会继续想你
"""
)
assert result.decision == DuplicateDecision.DUPLICATE
assert result.candidates[0].jaccard >= 0.78
assert result.candidates[0].line_coverage >= 0.72
def test_fragment_of_full_song_is_not_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
result = checker.check(
"""
听风说话也听见你
城市的灯慢慢亮起
我把回忆写进歌曲
"""
)
assert result.decision != DuplicateDecision.DUPLICATE
assert result.candidates[0].primary_line_coverage < 0.72
def test_no_effective_lyrics_use_metadata_fallback_without_empty_hash_collision() -> None:
placeholder = """
作词:DJ金木
作曲:DJ金木
编曲:DJ金木
混音:DJ金木
【未经著作权人许可 不得翻唱 翻录或使用】
"""
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", placeholder, title="Amnesia(House)", artist="DJ金木"))
checker.add_record(LyricRecord("song-2", placeholder, title="Angel(纯音乐)", artist="DJ金木"))
same_song = checker.check_record(
LyricRecord("__query__", placeholder, title="Amnesia(House)", artist="DJ金木")
)
different_title = checker.check_record(
LyricRecord("__query__", placeholder, title="Different Song", artist="DJ金木")
)
assert same_song.decision == DuplicateDecision.DUPLICATE
assert same_song.reason == "无有效歌词,使用文件内容兜底指纹命中"
assert different_title.decision == DuplicateDecision.DUPLICATE
def test_no_effective_lyrics_metadata_fallback_ignores_placeholder_noise() -> None:
source = """
作词:DJ金木
作曲:DJ金木
编曲:DJ金木
混音:DJ金木
【未经著作权人许可 不得翻唱 翻录或使用】
"""
noisy = """
[00:01.00]歌词来自QQ音乐
[00:02.00]作词:测试
[00:03.00]作词:DJ金木!
[00:04.00]作曲:DJ金木...
[00:05.00]未经著作权人许可 不得翻唱
"""
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", source, title="Amnesia(House)", artist="DJ金木"))
result = checker.check_record(LyricRecord("__query__", noisy, title="Amnesia(House)", artist="DJ金木"))
assert result.decision == DuplicateDecision.DUPLICATE
assert result.reason == "无有效歌词,文件内容兜底特征高度相似"
def test_unrelated_lyrics_with_shared_watermark_are_new() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
歌词来自QQ音乐
北方的雪落在窗前
我等一封迟来的信
""",
)
)
result = checker.check(
"""
歌词来自QQ音乐
南方的雨穿过街心
你把故事说给云听
"""
)
assert result.decision == DuplicateDecision.NEW
assert result.candidates == ()
def test_mixed_chinese_english_tokenization_recalls_candidate() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
say hello 在风里
hold me close tonight
我们穿过蓝色街道
never let me go
""",
)
)
result = checker.check(
"""
say hello 在风里
hold me close tonight
我们穿过蓝色街道
never let me go
"""
)
assert result.decision == DuplicateDecision.DUPLICATE
def test_checker_can_persist_index(tmp_path) -> None:
index_path = tmp_path / "lyrics.pkl"
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
checker.save(index_path)
loaded = DuplicateChecker.load(index_path)
result = loaded.check(BASE_LYRIC)
assert loaded.record_count == 1
assert result.decision == DuplicateDecision.DUPLICATE
def test_record_from_lrc_file(tmp_path) -> None:
lyric_file = tmp_path / "周杰伦 - 测试歌.lrc"
lyric_file.write_text("[00:01.00]我愛你\n", encoding="utf-8")
record = record_from_file(lyric_file, base_dir=tmp_path)
assert record.title == "测试歌"
assert record.artist == "周杰伦"
assert record.lyrics == "[00:01.00]我愛你\n"
def test_record_from_song_artist_lyrics_filename(tmp_path) -> None:
lyric_file = tmp_path / "Amnesia(House)-DJ金木-歌词.txt"
lyric_file.write_text("作词:DJ金木\n", encoding="utf-8")
record = record_from_file(lyric_file, base_dir=tmp_path)
assert record.title == "Amnesia(House)"
assert record.artist == "DJ金木"
def test_evaluate_csv_reports_binary_metrics(tmp_path) -> None:
library = tmp_path / "library"
incoming = tmp_path / "incoming"
library.mkdir()
incoming.mkdir()
(library / "歌手A - 夜里.lrc").write_text(BASE_LYRIC, encoding="utf-8")
(incoming / "dup.lrc").write_text(BASE_LYRIC.replace("我爱你", "我愛你"), encoding="utf-8")
(incoming / "new.txt").write_text("南方的雨穿过街心\n你把故事说给云听\n", encoding="utf-8")
checker = DuplicateChecker()
checker.add_record(record_from_file(library / "歌手A - 夜里.lrc", base_dir=library))
index_path = tmp_path / "lyrics.pkl"
checker.save(index_path)
eval_csv = tmp_path / "eval.csv"
eval_csv.write_text(
"id,file,expected\n"
"case-1,incoming/dup.lrc,应去重\n"
"case-2,incoming/new.txt,不应去重\n",
encoding="utf-8",
)
out_path = tmp_path / "eval_out.csv"
evaluate_csv(
index_path,
eval_csv,
out_path,
base_dir=tmp_path,
positive_decisions={"duplicate"},
max_candidates=5,
)
rows = list(csv.DictReader(out_path.open(encoding="utf-8")))
assert [row["correct"] for row in rows] == ["True", "True"]
assert rows[0]["reason"] == "规范化后的原文歌词哈希完全一致"
assert (tmp_path / "eval_out.csv.summary.json").exists()
def test_generated_eval_set_marks_fragments_as_negative(tmp_path) -> None:
library = tmp_path / "library"
incoming = tmp_path / "generated" / "incoming"
eval_csv = tmp_path / "generated" / "eval.csv"
library.mkdir()
(library / "song.txt").write_text(BASE_LYRIC, encoding="utf-8")
generate_eval_set(library_dir=library, output_dir=incoming, csv_path=eval_csv, size=20, positive_ratio=0.5)
rows = list(csv.DictReader(eval_csv.open(encoding="utf-8")))
positive_types = {row["sample_type"] for row in rows if row["expected"] == "应去重"}
fragment_rows = [row for row in rows if row["sample_type"] == "single_song_fragment"]
assert "trimmed_version" not in positive_types
assert "single_song_fragment" not in positive_types
assert fragment_rows
assert all(row["expected"] == "不应去重" for row in fragment_rows)
def test_foreign_original_with_added_chinese_translation_is_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
I miss you tonight
Under the moonlight
Never let me go
""",
)
)
result = checker.check(
"""
I miss you tonight
今晚我想你
Under the moonlight
月光之下
Never let me go
永远不要让我离开
"""
)
assert result.decision == DuplicateDecision.DUPLICATE
assert result.reason == "规范化后的原文歌词哈希完全一致,翻译行未参与自动判重"
def test_same_timestamp_translation_split_is_high_confidence() -> None:
normalized = normalize_lyrics(
"""
[00:01.00]I miss you tonight
[00:01.00]今晚我想你
[00:02.00]Under the moonlight
[00:02.00]月光之下
"""
)
assert normalized.primary_lines == ("i miss you tonight", "under the moonlight")
assert normalized.translation_lines == ("今晚我想你", "月光之下")
assert normalized.split_confidence == "high"
def test_translation_only_overlap_is_review_not_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
I miss you tonight
今晚我想你
Under the moonlight
月光之下
Never let me go
永远不要让我离开
""",
)
)
result = checker.check(
"""
Te extrano esta noche
今晚我想你
Bajo la luna
月光之下
No me dejes ir
永远不要让我离开
"""
)
assert result.decision == DuplicateDecision.REVIEW
assert result.reason == "仅翻译行相似,原文字面重合不足,不自动判重"
assert result.candidates[0].translation_jaccard >= 0.45
def test_block_translation_split_is_review_when_primary_matches() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
I miss you tonight
Under the moonlight
Never let me go
""",
)
)
result = checker.check(
"""
I miss you tonight
Under the moonlight
Never let me go
今晚我想你
月光之下
永远不要让我离开
"""
)
assert result.decision == DuplicateDecision.REVIEW
assert result.reason == "原文哈希一致,但疑似整段翻译结构拆分置信度较低,需要人工复核"