Commit fec2556e fec2556ea008688f2ceac576f400a5d1cc9c22d7 by 沈秋雨

简化去重链路,仅保留使用pg作为数据库的链路

使用opencc作为简繁转换
1 parent d39197d3
# Lyric Duplicate Checker
# 歌词查重系统
第一版用于“新增歌词查重”:先用已有 `.lrc` / `.txt` 歌词建立索引,再把新增歌词拿来查询,返回 `duplicate``review``new`
这是一个使用 PostgreSQL 作为数据存储和候选召回层的歌词查重项目。Python 侧只负责歌词规范化、候选打分和最终判定,不再构建或加载 `.pkl` 本地索引
## 建立索引
## 架构
假设已有曲库在 `data/library/`
```text
PostgreSQL:
lyrics 保存原始歌词、规范化文本、原文/翻译文本、exact_hash
lyric_lines 保存规范化歌词行和 line_hash
exact_hash 索引 精确重复召回
pg_trgm 索引 可选的近似文本召回
line_hash 索引 行级重合召回
Python:
normalize_lyrics 歌词清洗、时间戳/平台噪声处理、繁简转换、翻译行拆分
DuplicateChecker 只对 PostgreSQL 召回的候选打分和排序
决策规则 输出 duplicate / review / new
```
核心原则:
```text
数据库负责召回候选。
Python 负责最终判断。
不再使用 pickle、本地 MinHash 索引或 outputs/indexes/*.pkl 作为生产链路。
```
## 安装依赖
```bash
python -m lyric_dedup.cli build-index \
--lyrics-dir data/library \
--index outputs/indexes/lyrics.pkl
python -m pip install -r requirements.txt
```
## 检查单个新增歌词
## 初始化 PostgreSQL
创建数据库:
```bash
python -m lyric_dedup.cli check-file \
--index outputs/indexes/lyrics.pkl \
--file data/incoming/new_song.lrc
createdb lyric_dedup
```
## 批量检查新增目录
初始化表结构和索引:
```bash
python -m lyric_dedup.cli batch-check \
--index outputs/indexes/lyrics.pkl \
--lyrics-dir data/incoming \
--out outputs/results/incoming_check.csv
python scripts/init_postgres.py \
--dsn postgresql:///lyric_dedup
```
CSV 里重点看这些列:
会创建:
```text
lyrics
lyric_lines
pg_trgm extension
exact_hash / primary_text_trgm / line_hash 索引
```
- `decision`: 总判断。
- `best_candidate_id`: 最像的已有歌词。
- `best_candidate_jaccard`: n-gram 字面相似度。
- `best_candidate_line_coverage`: 行级覆盖率。
- `matched_unique_lines`: 命中的规范化歌词行。
- `best_candidate_reason`: 中文判定原因,说明为什么判重、复核或判新。
## 导入曲库
```bash
python scripts/import_library_postgres.py \
--dsn postgresql:///lyric_dedup \
--lyrics-dir data/library
```
导入脚本会:
```text
1. 扫描 data/library 下的 .lrc / .txt。
2. 读取并规范化歌词。
3. 写入 lyrics 和 lyric_lines。
4. 默认对 exact_hash 完全一致的记录做 soft delete,只保留质量更高的一条。
5. 输出重复报告到 outputs/results/postgres_exact_duplicates.csv。
```
如果只导入、不做 exact 去重:
```bash
python scripts/import_library_postgres.py \
--dsn postgresql:///lyric_dedup \
--lyrics-dir data/library \
--skip-dedup-exact
```
生产判断建议:`duplicate` 可自动拦截;`review` 进人工池;`new` 入库前仍可抽样检查。
## 检查单个歌词文件
## 原文 + 中文翻译歌词的防护规则
```bash
python -m lyric_dedup.cli check-file \
--dsn postgresql:///lyric_dedup \
--file data/incoming/new_song.lrc
```
当前会把歌词拆成三类行
常用参数
- `primary_lines`: 原文行,自动判重主要依赖这部分。
- `translation_lines`: 中文翻译行,只用于召回和复核解释。
- `unknown_lines`: 无法稳定判断的行。
```text
--recall-limit 每个 PostgreSQL 召回层最多返回多少候选
--max-candidates 最终返回和排序多少候选
--enable-trgm 启用 pg_trgm 近似文本召回
--trgm-threshold pg_trgm similarity 阈值
--statement-timeout-ms PostgreSQL statement_timeout
```
高置信拆分包括
返回字段
- 同一个时间戳下出现外文行和中文行。
- 多组稳定的外文行 + 中文行交替。
```text
decision duplicate / review / new
duplicate duplicate 或 review 时为 true,new 时为 false
confidence 当前判定置信度
reason 中文判定原因
candidate_count 参与最终排序的候选数
```
中置信拆分包括:
## 启动 API
- 同一行内明显的外文 / 中文翻译,例如 `I miss you / 今晚我想你`
```bash
export LYRIC_DEDUP_DSN=postgresql:///lyric_dedup
uvicorn lyric_dedup_server.app:app --host 0.0.0.0 --port 8000
```
低置信拆分包括
接口
- 先整段外文,再整段中文翻译。
```text
POST /api/v1/check
```
判定策略
请求示例
- 原文高度一致,即使新增歌词多了中文翻译,也可以 `duplicate`
- 只有翻译行相似,原文相似不足,只能 `review`,不自动判重。
- 疑似整段翻译结构属于低置信拆分,即使原文 hash 一致,也先 `review`
- 普通中文歌没有检测到翻译结构时,全部有效行按原文处理。
```json
{
"url": "https://example.com/song.lrc",
"title": "Song Title",
"artist": "Artist"
}
```
由于索引里会保存拆分后的原文/翻译特征,修改拆分规则后需要重建索引
服务会下载 URL 对应的 `.lrc` / `.txt` 文件,使用 PostgreSQL 召回候选并判定。若结果为 `new`,且请求带有 URL,服务会把这首新歌词写入 PostgreSQL
## 用标注 CSV 评估正确率
## 生成评估集
可以先从已有曲库自动生成一批评估样本
常规生产口径
```bash
python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_50000.csv \
--index outputs/indexes/lyrics.pkl \
--eval-index data/generated_eval/eval_50000.csv.index.pkl \
--size 50000 \
--csv data/generated_eval/eval_5000.csv \
--size 5000 \
--positive-ratio 0.3
```
默认 `--profile standard` 生成常规生产评估集。也可以生成更贴近业务边界的 hard 集
hard 业务边界口径
```bash
python -m lyric_dedup.cli generate-eval-set \
......@@ -93,79 +157,55 @@ python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/hard_incoming \
--csv data/generated_eval/eval_hard_5000.csv \
--eval-index data/generated_eval/eval_hard_5000.csv.index.pkl \
--size 5000 \
--positive-ratio 0.3
```
standard 业务口径:
- 先扫描整个曲库,按有效歌词行数、语言类型、文件来源前缀做分层采样,不再按排序前缀取样。
- `应去重` 样本只生成全曲歌词的样式变化,例如时间戳、标点、平台噪声、空行、重复副歌次数变化、附加中文翻译、少量错别字/英文拼写错误。
- `不应去重` 样本以真实 holdout 完整歌词为主,也包含片段歌词、重复副歌碰撞、仅翻译相似、同主题新歌词、短歌词/占位边界样本。
- 片段歌词即使命中已有歌曲的一部分,也不应该输出 `duplicate`;最多进入 `review`
- 生成器会额外写出 `--eval-index`,这个索引排除了 holdout 歌,评估生成 CSV 时应使用它。
- 同时会生成 `*.manifest.json`,记录 seed、曲库规模、holdout 数、样本类型分布、语言/来源分桶和样本来源覆盖数。
生成器只写:
hard 业务口径不故意制造反常输入,主要覆盖上线更容易踩边界的情况:
- `应去重`: 同曲平台版本噪声、较完整歌词缺少一段、整段中文翻译附加、较真实的录入/OCR 错别字、时间戳和平台元信息混合。
- `不应去重`: 真实 holdout 新歌、从 holdout 中优先挑选和曲库有行重合的近邻新歌、较长但不完整的单曲片段、多曲 medley/串烧式片段、重复副歌碰撞、仅翻译相似、短歌词边界。
先准备一个 CSV,例如 `data/eval/eval.csv`
```csv
id,file,expected
case-001,incoming/song_a.lrc,应去重
case-002,incoming/song_b.txt,不应去重
```
也可以不用文件路径,直接把歌词放在 `lyrics` 列:
```csv
id,lyrics,expected
case-003,"我爱你在每个夜里\n听风说话也听见你",duplicate
case-004,"南方的雨穿过街心\n你把故事说给云听",new
```text
评估 CSV
样本歌词文件
manifest.json
```
`expected` 支持这些写法:
不会再生成 `.index.pkl`。评估时由 PostgreSQL 召回候选,并根据 CSV 里的 `source_record_id` 排除 holdout 样本自身。
- 应去重:`应去重``重复``duplicate``1``true``yes`
- 不应去重:`不应去重``不重复``new``0``false``no`
## 使用 PostgreSQL 评估
运行评估:
严格自动拦截口径:只有 `duplicate` 算预测应去重。
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/lyrics.pkl \
--csv data/eval/eval.csv \
--base-dir data \
--out outputs/results/eval_result.csv
python scripts/evaluate_postgres.py \
--dsn postgresql:///lyric_dedup \
--csv data/generated_eval/eval_hard_5000.csv \
--base-dir data/generated_eval \
--out outputs/results/postgres_eval_hard_5000.csv
```
默认只有系统输出 `duplicate` 才算“预测应去重”。这适合评估自动拦截的准确率,误杀会更明显。
如果你想评估“可疑样本召回率”,也就是 `duplicate``review` 都算命中:
可疑样本召回口径:`duplicate``review` 都算抓到。
```bash
python -m lyric_dedup.cli evaluate-csv \
--index outputs/indexes/lyrics.pkl \
--csv data/eval/eval.csv \
--base-dir data \
python scripts/evaluate_postgres.py \
--dsn postgresql:///lyric_dedup \
--csv data/generated_eval/eval_hard_5000.csv \
--base-dir data/generated_eval \
--positive-decisions duplicate,review \
--out outputs/results/eval_result_review_as_positive.csv
--out outputs/results/postgres_eval_hard_5000_review_positive.csv
```
会生成两个文件
评估会生成
- `outputs/results/eval_result.csv`: 每条样本的预测、候选、原因和是否正确。
- `outputs/results/eval_result.csv.summary.json`: 总体指标。
```text
outputs/results/*.csv
outputs/results/*.csv.summary.json
```
summary 重点看:
summary 重点看:
- `accuracy`: 总正确率。
- `precision`: 预测应去重的样本里,有多少是真的应去重。自动拦截优先看这个。
- `recall`: 真实应去重的样本里,有多少被系统抓到。
- `f1`: precision 和 recall 的综合指标。
- `false_positive`: 不应去重但被判为应去重,属于误杀。
- `false_negative`: 应去重但没抓到,属于漏召。
```text
precision 自动拦截准确率,重点关注 false_positive
recall 应去重样本召回率,重点关注 false_negative
f1 precision 和 recall 的综合指标
duplicate/review/new 三类判定分布
```
......
# 歌词查重测试流程
本文档记录从已有歌词目录建立索引、生成测试集、批量评估和查看结果的完整命令
本文档记录当前项目的 PostgreSQL-only 测试流程。当前链路不再使用 `outputs/indexes/*.pkl`,也不再生成 `*.index.pkl` 评估索引
## 1. 准备目录
## 1. 准备数据
已有曲库放在
已有曲库:
```text
data/library/
......@@ -17,125 +17,111 @@ data/library/
.txt
```
生成的测试样本会放在
生成的评估样本目录
```text
data/generated_eval/incoming/
data/generated_eval/hard_incoming/
```
测试集标注 CSV 会放在
评估结果目录
```text
data/generated_eval/eval_100.csv
outputs/results/
```
评估结果会放在:
## 2. 初始化 PostgreSQL
```text
outputs/results/
```
创建数据库:
## 2. 建立已有曲库索引
```bash
createdb lyric_dedup
```
如果刚往 `data/library` 新增了一批样本,建议先运行处理脚本
初始化 schema
```bash
python scripts/process_library.py \
--library-dir data/library \
--index outputs/indexes/library_lyrics.pkl
python scripts/init_postgres.py \
--dsn postgresql:///lyric_dedup
```
这个脚本会
检查表
```text
1. 扫描并隔离纯音乐占位样本,例如包含【曲库专用】或“此歌曲为没有填词的纯音乐”的文件。
2. 重建 outputs/indexes/library_lyrics.pkl。
3. 输出处理报告 outputs/results/library_process_report.json。
```bash
psql postgresql:///lyric_dedup -c '\dt'
```
如果你想先看会处理哪些文件,不实际移动和重建索引:
## 3. 导入曲库
```bash
python scripts/process_library.py \
--library-dir data/library \
--dry-run
python scripts/import_library_postgres.py \
--dsn postgresql:///lyric_dedup \
--lyrics-dir data/library
```
如果要顺手生成并评估 500 条测试样本
导入完成后检查数量
```bash
python scripts/process_library.py \
--library-dir data/library \
--index outputs/indexes/library_lyrics.pkl \
--eval-size 50000 \
--positive-ratio 0.3 \
--eval-csv data/generated_eval/eval_50000.csv \
--eval-out outputs/results/library_eval_50000.csv
psql postgresql:///lyric_dedup -c 'select count(*) from lyrics where deleted_at is null;'
psql postgresql:///lyric_dedup -c 'select count(*) from lyric_lines;'
```
隔离出来的文件默认会移动到
导入脚本默认会 soft delete exact_hash 完全一致的重复记录,并输出
```text
data/quarantine/no_lyrics_placeholders/
outputs/results/postgres_exact_duplicates.csv
```
也可以只手动建索引
如果要额外查看高行级覆盖的疑似重复
```bash
python -m lyric_dedup.cli build-index \
python scripts/import_library_postgres.py \
--dsn postgresql:///lyric_dedup \
--lyrics-dir data/library \
--index outputs/indexes/library_lyrics.pkl
--line-duplicate-report outputs/results/postgres_line_duplicates.csv
```
索引文件:
## 4. 检查单个文件
```text
outputs/indexes/library_lyrics.pkl
```bash
python -m lyric_dedup.cli check-file \
--dsn postgresql:///lyric_dedup \
--file test_api/test_lyric.txt
```
注意:如果修改了 `data/library`,或修改了预处理/判重逻辑,需要重新执行本步骤。
## 3. 生成生产评估样本
如需启用 trigram 文本召回:
```bash
python -m lyric_dedup.cli generate-eval-set \
--library-dir data/library \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_50000.csv \
--index outputs/indexes/library_lyrics.pkl \
--eval-index data/generated_eval/eval_50000.csv.index.pkl \
--size 50000 \
--positive-ratio 0.3
python -m lyric_dedup.cli check-file \
--dsn postgresql:///lyric_dedup \
--file test_api/test_lyric.txt \
--enable-trgm \
--trgm-threshold 0.3
```
如需生成更贴近业务边界的 hard 口径测试集:
## 5. 生成 standard 评估集
```bash
python -m lyric_dedup.cli generate-eval-set \
--profile hard \
--library-dir data/library \
--lyrics-dir data/generated_eval/hard_incoming \
--csv data/generated_eval/eval_hard_5000.csv \
--index outputs/indexes/library_lyrics.pkl \
--eval-index data/generated_eval/eval_hard_5000.csv.index.pkl \
--lyrics-dir data/generated_eval/incoming \
--csv data/generated_eval/eval_5000.csv \
--size 5000 \
--positive-ratio 0.3
```
默认生产评估口径:
standard 口径:
```text
应去重: 30%
不应去重: 70%
```
生成器会先清理 `data/generated_eval/incoming/` 下旧的 `.txt` / `.lrc` 生成文件,再写入新样本。
业务口径:
样本类型:
```text
positive_* = 应去重,全曲歌词样式变化,包括少量错别字/英文拼写错误扰动
negative_real_holdout_full_song = 不应去重,完整真实歌词,已从评估索引中排除
positive_* = 应去重,全曲歌词样式变化,例如时间戳、标点、平台噪声、空行、重复副歌次数变化、附加翻译、少量错别字
negative_real_holdout_full_song = 不应去重,完整真实歌词,从评估候选里排除自身
negative_fragment = 不应去重,单曲片段
negative_shared_chorus = 不应去重,重复副歌碰撞
negative_translation_only = 不应去重,仅翻译相似
......@@ -143,7 +129,19 @@ negative_same_theme_synthetic = 不应去重,同主题新歌词
edge_short_or_placeholder = 不应去重,短歌词/占位边界样本
```
hard 口径额外强调真实业务边界,而不是故意制造反常难题:
## 6. 生成 hard 评估集
```bash
python -m lyric_dedup.cli generate-eval-set \
--profile hard \
--library-dir data/library \
--lyrics-dir data/generated_eval/hard_incoming \
--csv data/generated_eval/eval_hard_5000.csv \
--size 5000 \
--positive-ratio 0.3
```
hard 口径强调真实业务边界,不故意制造反常输入:
```text
positive_realistic_variant = 应去重,同曲平台版本噪声、较完整缺段、整段翻译附加、真实录入/OCR 错
......@@ -152,84 +150,50 @@ negative_long_fragment = 不应去重,较长但不完整的单曲片段
negative_catalog_mashup = 不应去重,多首真实歌词片段组成的串烧/混剪式输入
```
生成器会扫描整个曲库并按有效歌词行数、语言类型、文件来源前缀分层采样。它会分出一批 holdout 完整歌词作为真实新歌负样本,并生成一个排除 holdout 的评估索引。每次还会输出:
## 7. 严格评估
```text
data/generated_eval/eval_50000.csv.manifest.json
data/generated_eval/eval_50000.csv.index.pkl
```
manifest 里重点看:
```text
library_files 曲库歌词文件数
holdout_records 从评估索引中排除、作为真实新歌负样本的数量
sample_type_counts 各样本类型数量
line_count_bucket_counts / language_bucket_counts / source_bucket_counts
unique_source_records 本次评估覆盖了多少真实源文件
```
## 4. 严格评估:只把 duplicate 算作去重
严格口径只把 `duplicate` 算作预测应去重:
```bash
python -m lyric_dedup.cli evaluate-csv \
--index data/generated_eval/eval_50000.csv.index.pkl \
--csv data/generated_eval/eval_50000.csv \
python scripts/evaluate_postgres.py \
--dsn postgresql:///lyric_dedup \
--csv data/generated_eval/eval_hard_5000.csv \
--base-dir data/generated_eval \
--out outputs/results/library_eval_50000.csv
--out outputs/results/postgres_eval_hard_5000.csv
```
这个口径下:
```text
duplicate -> 预测应去重
review -> 预测不应去重
new -> 预测不应去重
```
适合评估自动拦截的 precision,重点看:
适合看自动拦截质量,重点关注:
```text
precision
false_positive
```
## 5. 召回评估:把 duplicate 和 review 都算作抓到可疑样本
## 8. 召回评估
召回口径把 `duplicate``review` 都算作抓到可疑样本:
```bash
python -m lyric_dedup.cli evaluate-csv \
--index data/generated_eval/eval_50000.csv.index.pkl \
--csv data/generated_eval/eval_50000.csv \
python scripts/evaluate_postgres.py \
--dsn postgresql:///lyric_dedup \
--csv data/generated_eval/eval_hard_5000.csv \
--base-dir data/generated_eval \
--positive-decisions duplicate,review \
--out outputs/results/library_eval_50000_review_positive.csv
--out outputs/results/postgres_eval_hard_5000_review_positive.csv
```
这个口径下:
```text
duplicate -> 预测应去重
review -> 预测应去重
new -> 预测不应去重
```
适合评估可疑样本召回,重点看:
适合看漏召风险,重点关注:
```text
recall
false_negative
```
## 6. 查看总体指标
严格口径:
## 9. 查看 summary
```bash
cat outputs/results/library_eval_100.csv.summary.json
```
召回口径:
```bash
cat outputs/results/library_eval_100_review_positive.csv.summary.json
cat outputs/results/postgres_eval_hard_5000.csv.summary.json
cat outputs/results/postgres_eval_hard_5000_review_positive.csv.summary.json
```
指标含义:
......@@ -245,84 +209,16 @@ true_negative 不应去重且预测不应去重
false_negative 应去重但预测不应去重,漏召
```
## 7. 查看每条样本结果
```bash
open outputs/results/library_eval_100.csv
```
如果不能使用 `open`,可以直接查看 CSV:
```bash
python -c 'import csv; rows=csv.DictReader(open("outputs/results/library_eval_100.csv", encoding="utf-8")); [print(r["id"], r["decision"], r["correct"], r["reason"], sep=" | ") for r in rows]'
```
## 8. 查看失败样本
## 10. 查看失败样本
严格口径失败样本:
```bash
python -c 'import csv; rows=csv.DictReader(open("outputs/results/library_eval_100.csv", encoding="utf-8")); [print(r["id"], r["source"], r["decision"], r["reason"], sep=" | ") for r in rows if r["correct"] == "False"]'
```
查看某个样本的完整候选:
```bash
python -m lyric_dedup.cli check-file \
--index outputs/indexes/library_lyrics.pkl \
--file data/generated_eval/incoming/neg_068_mixed_fragments.txt \
--max-candidates 10
python -c 'import csv; rows=csv.DictReader(open("outputs/results/postgres_eval_hard_5000.csv", encoding="utf-8")); [print(r["id"], r["expected_duplicate"], r["decision"], r["reason"], sep=" | ") for r in rows if r["correct"] == "False"]'
```
## 9. 核对测试集分布
按样本类型统计:
```bash
python -c 'import csv, collections; rows=list(csv.DictReader(open("data/generated_eval/eval_10.csv", encoding="utf-8"))); print(len(rows)); print(collections.Counter(r["expected"] for r in rows)); print(collections.Counter(r["sample_type"] for r in rows)); print(collections.Counter(r["sample_type"] for r in rows if r["expected"]=="应去重")); print(collections.Counter(r["sample_type"] for r in rows if r["expected"]=="不应去重"))'
python -c 'import csv,collections; meta={r["id"]:r for r in csv.DictReader(open("data/generated_eval/eval_hard_5000.csv", encoding="utf-8-sig"))}; rows=csv.DictReader(open("outputs/results/postgres_eval_hard_5000.csv", encoding="utf-8")); c=collections.Counter(meta.get(r["id"],{}).get("sample_type","") for r in rows if r["correct"]=="False"); print(c)'
```
核对生成目录文件数:
```bash
find data/generated_eval/incoming -type f | wc -l
```
## 10. 运行代码测试
```bash
python -m pytest tests
```
编译检查:
```bash
python -m compileall -q lyric_dedup tests
```
## 11. 关于测试集不重复
当前自动生成的 100 条是规则覆盖测试集,不保证样本之间规范化后完全不重复。
如果要求 100 条测试样本彼此不重复,并且仍使用默认比例:
```text
size = 100
positive_ratio = 0.6
```
则至少需要:
```text
60 首互不重复的种子歌词
```
原因:应去重样本是全曲变体,同一首歌的多个样式变化规范化后仍然是同一首歌。
更稳妥的真实准确率评估方式是准备人工标注 CSV:
```csv
id,file,expected
case-001,incoming/song_a.lrc,应去重
case-002,incoming/song_b.txt,不应去重
```
然后直接执行第 4 节或第 5 节的 `evaluate-csv`
......
"""Incremental lyric duplicate checker."""
"""Lyric candidate ranking and duplicate decision rules."""
from __future__ import annotations
import hashlib
import pickle
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from lyric_dedup.minhash_lsh import MinHashConfig
from lyric_dedup.minhash_lsh import MinHashLSH
from lyric_dedup.normalization import NormalizedLyrics
from lyric_dedup.normalization import fingerprint_text
from lyric_dedup.normalization import lyric_tokens
......@@ -64,103 +60,61 @@ class _IndexedRecord:
translation_tokens: set[str]
fallback_lines: tuple[str, ...]
fallback_tokens: set[str]
signature: tuple[int, ...]
class DuplicateChecker:
"""In-memory first version for checking newly submitted lyrics.
The API is intentionally small: build or load records with ``add_record``, then
call ``check`` for a new lyric. Persistence can serialize the indexed fields
later without changing result semantics.
"""
"""Rank PostgreSQL-recalled candidates and produce the final decision."""
def __init__(
self,
*,
minhash_config: MinHashConfig | None = None,
duplicate_jaccard_threshold: float = 0.78,
duplicate_line_coverage_threshold: float = 0.72,
duplicate_high_coverage_jaccard_threshold: float = 0.78,
duplicate_high_coverage_line_coverage_threshold: float = 0.90,
review_jaccard_threshold: float = 0.45,
review_line_coverage_threshold: float = 0.35,
review_query_coverage_threshold: float = 0.40,
chorus_short_line_count_threshold: int = 6,
chorus_material_overlap_threshold: float = 0.20,
chorus_material_query_coverage_threshold: float = 0.40,
confidence_jaccard_weight: float = 0.58,
confidence_line_coverage_weight: float = 0.42,
) -> None:
self._lsh = MinHashLSH(minhash_config)
self._records: dict[str, _IndexedRecord] = {}
self._exact_hash_to_ids: dict[str, set[str]] = {}
self._line_to_ids: dict[str, set[str]] = {}
self._token_to_ids: dict[str, set[str]] = {}
self.duplicate_jaccard_threshold = duplicate_jaccard_threshold
self.duplicate_line_coverage_threshold = duplicate_line_coverage_threshold
self.duplicate_high_coverage_jaccard_threshold = duplicate_high_coverage_jaccard_threshold
self.duplicate_high_coverage_line_coverage_threshold = duplicate_high_coverage_line_coverage_threshold
self.review_jaccard_threshold = review_jaccard_threshold
self.review_line_coverage_threshold = review_line_coverage_threshold
self.review_query_coverage_threshold = review_query_coverage_threshold
self.chorus_short_line_count_threshold = chorus_short_line_count_threshold
self.chorus_material_overlap_threshold = chorus_material_overlap_threshold
self.chorus_material_query_coverage_threshold = chorus_material_query_coverage_threshold
self.confidence_jaccard_weight = confidence_jaccard_weight
self.confidence_line_coverage_weight = confidence_line_coverage_weight
def check_record_against_candidates(
self,
record: LyricRecord,
candidates: list[LyricRecord],
*,
max_candidates: int = 10,
) -> DuplicateCheckResult:
"""Rank explicitly supplied candidates without doing in-memory recall.
def add_record(self, record: LyricRecord) -> None:
indexed = self._index(record)
self._add_indexed(record.record_id, indexed)
def add_normalized_record(self, record: LyricRecord, normalized: NormalizedLyrics) -> None:
"""Add a record when normalized lyrics have already been computed."""
indexed = self._index_normalized(record, normalized)
self._add_indexed(record.record_id, indexed)
def _add_indexed(self, record_id: str, indexed: _IndexedRecord) -> None:
self._records[record_id] = indexed
self._exact_hash_to_ids.setdefault(indexed.exact_hash, set()).add(record_id)
for line in indexed.normalized.unique_lines:
if len(line) >= 4:
self._line_to_ids.setdefault(line, set()).add(record_id)
for token in indexed.tokens:
self._token_to_ids.setdefault(token, set()).add(record_id)
for token in indexed.fallback_tokens:
self._token_to_ids.setdefault(token, set()).add(record_id)
self._lsh.add(record_id, indexed.signature)
def save(self, path: str | Path) -> None:
"""Persist the in-memory index for later checks."""
with Path(path).open("wb") as file:
pickle.dump(self, file, protocol=pickle.HIGHEST_PROTOCOL)
@classmethod
def load(cls, path: str | Path) -> "DuplicateChecker":
"""Load a previously persisted index."""
with Path(path).open("rb") as file:
checker = pickle.load(file)
if not isinstance(checker, cls):
raise TypeError(f"{path} does not contain a DuplicateChecker index")
return checker
@property
def record_count(self) -> int:
return len(self._records)
def check(self, lyrics: str, *, max_candidates: int = 10) -> DuplicateCheckResult:
return self.check_record(LyricRecord(record_id="__query__", lyrics=lyrics), max_candidates=max_candidates)
def check_record(self, record: LyricRecord, *, max_candidates: int = 10) -> DuplicateCheckResult:
PostgreSQL-backed callers should use this method after database recall so
there is only one retrieval path: PG returns candidates, Python ranks and
decides.
"""
query = self._index(record)
exact_ids = self._exact_hash_to_ids.get(query.exact_hash, set())
if exact_ids:
candidates = tuple(self._rank_exact_candidate(query, self._records[record_id]) for record_id in sorted(exact_ids)[:max_candidates])
duplicate = next((candidate for candidate in candidates if candidate.decision == DuplicateDecision.DUPLICATE), None)
if duplicate is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.DUPLICATE,
confidence=duplicate.confidence,
candidates=candidates,
normalized_full_text=query.normalized.normalized_full_text,
reason=duplicate.reason,
)
return DuplicateCheckResult(
decision=DuplicateDecision.REVIEW,
confidence=candidates[0].confidence,
candidates=candidates,
normalized_full_text=query.normalized.normalized_full_text,
reason=candidates[0].reason,
)
candidate_ids = self._recall_candidates(query)
ranked = sorted(
(self._rank_candidate(query, self._records[record_id]) for record_id in candidate_ids),
(
self._rank_exact_candidate(query, indexed)
if indexed.exact_hash == query.exact_hash
else self._rank_candidate(query, indexed)
for indexed in (self._index(candidate) for candidate in candidates)
),
key=lambda item: (item.decision == DuplicateDecision.DUPLICATE, item.confidence, item.jaccard),
reverse=True,
)[:max_candidates]
......@@ -203,7 +157,6 @@ class DuplicateChecker:
translation_tokens = lyric_tokens(normalized, lines=normalized.translation_lines)
fallback_lines = tuple(_fallback_no_lyrics_lines(record.lyrics))
fallback_tokens = set(fallback_lines)
signature = self._lsh.signature(primary_tokens or tokens or fallback_tokens)
exact_hash = hashlib.sha256(_exact_fingerprint(normalized, fallback_lines).encode("utf-8")).hexdigest()
return _IndexedRecord(
record=record,
......@@ -214,25 +167,8 @@ class DuplicateChecker:
translation_tokens=translation_tokens,
fallback_lines=fallback_lines,
fallback_tokens=fallback_tokens,
signature=signature,
)
def _recall_candidates(self, query: _IndexedRecord) -> set[str]:
candidate_ids = self._lsh.query(query.signature)
for line in query.normalized.primary_lines:
if len(line) >= 4:
candidate_ids.update(self._line_to_ids.get(line, set()))
for line in query.normalized.translation_lines:
if len(line) >= 4:
candidate_ids.update(self._line_to_ids.get(line, set()))
for token in query.primary_tokens or query.tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
for token in query.translation_tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
for token in query.fallback_tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
return candidate_ids
def _rank_exact_candidate(self, query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
low_confidence_split = (
query.normalized.split_confidence == "low" or candidate.normalized.split_confidence == "low"
......@@ -306,25 +242,47 @@ class DuplicateChecker:
or jaccard >= self.review_jaccard_threshold
or (
primary_coverage >= self.review_line_coverage_threshold
and query_primary_coverage >= 0.40
and query_primary_coverage >= self.review_query_coverage_threshold
)
or (
coverage >= self.review_line_coverage_threshold
and query_coverage >= 0.40
and query_coverage >= self.review_query_coverage_threshold
)
)
has_material_chorus_overlap = chorus_only and (
query.normalized.content_line_count <= 6
or (primary_jaccard >= 0.20 and query_primary_coverage >= 0.40)
or (jaccard >= 0.20 and query_coverage >= 0.40)
or (primary_coverage >= 0.20 and query_primary_coverage >= 0.40)
or (coverage >= 0.20 and query_coverage >= 0.40)
query.normalized.content_line_count <= self.chorus_short_line_count_threshold
or (
primary_jaccard >= self.chorus_material_overlap_threshold
and query_primary_coverage >= self.chorus_material_query_coverage_threshold
)
or (
jaccard >= self.chorus_material_overlap_threshold
and query_coverage >= self.chorus_material_query_coverage_threshold
)
or (
primary_coverage >= self.chorus_material_overlap_threshold
and query_primary_coverage >= self.chorus_material_query_coverage_threshold
)
or (
coverage >= self.chorus_material_overlap_threshold
and query_coverage >= self.chorus_material_query_coverage_threshold
)
)
has_low_confidence_split_overlap = low_confidence_split and has_review_level_overlap
confidence = round((0.58 * primary_jaccard) + (0.42 * primary_coverage), 4)
confidence = round(
(self.confidence_jaccard_weight * primary_jaccard)
+ (self.confidence_line_coverage_weight * primary_coverage),
4,
)
if (
(primary_jaccard >= self.duplicate_jaccard_threshold or (primary_jaccard >= 0.78 and primary_coverage >= 0.9))
(
primary_jaccard >= self.duplicate_jaccard_threshold
or (
primary_jaccard >= self.duplicate_high_coverage_jaccard_threshold
and primary_coverage >= self.duplicate_high_coverage_line_coverage_threshold
)
)
and primary_coverage >= self.duplicate_line_coverage_threshold
and not chorus_only
and not translation_only
......
"""Command line tools for lyric duplicate checking."""
"""PostgreSQL-backed command line tools for lyric duplicate checking."""
from __future__ import annotations
import argparse
import csv
import json
import sys
from pathlib import Path
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import LyricRecord
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import record_from_file
from lyric_dedup.file_import import records_from_dir
def main() -> None:
parser = argparse.ArgumentParser(prog="lyric-dedup")
subparsers = parser.add_subparsers(dest="command", required=True)
build = subparsers.add_parser("build-index", help="build an index from .lrc/.txt files")
build.add_argument("--lyrics-dir", required=True)
build.add_argument("--index", required=True)
check = subparsers.add_parser("check-file", help="check one .lrc/.txt file against an index")
check.add_argument("--index", required=True)
check = subparsers.add_parser("check-file", help="check one .lrc/.txt file using PostgreSQL recall")
check.add_argument("--dsn", default="postgresql:///lyric_dedup")
check.add_argument("--file", required=True)
check.add_argument("--max-candidates", type=int, default=10)
batch = subparsers.add_parser("batch-check", help="check a directory of .lrc/.txt files against an index")
batch.add_argument("--index", required=True)
batch.add_argument("--lyrics-dir", required=True)
batch.add_argument("--out", required=True)
batch.add_argument("--max-candidates", type=int, default=5)
evaluate = subparsers.add_parser("evaluate-csv", help="evaluate labeled duplicate samples from a CSV file")
evaluate.add_argument("--index", required=True)
evaluate.add_argument("--csv", required=True)
evaluate.add_argument("--out", required=True)
evaluate.add_argument("--base-dir", default="")
evaluate.add_argument("--positive-decisions", default="duplicate")
evaluate.add_argument("--max-candidates", type=int, default=5)
check.add_argument("--max-candidates", type=int, default=5)
check.add_argument("--recall-limit", type=int, default=100)
check.add_argument("--enable-trgm", action="store_true")
check.add_argument("--trgm-threshold", type=float, default=0.3)
check.add_argument("--statement-timeout-ms", type=int, default=5000)
generate = subparsers.add_parser("generate-eval-set", help="generate labeled eval samples from a lyric library")
generate.add_argument("--library-dir", required=True)
......@@ -51,8 +30,6 @@ def main() -> None:
generate.add_argument("--size", type=int, default=100)
generate.add_argument("--positive-ratio", type=float, default=0.3)
generate.add_argument("--seed", type=int, default=20260602)
generate.add_argument("--index", default="", help="optional source index path recorded in the manifest")
generate.add_argument("--eval-index", default="", help="output index built from non-holdout records for this eval set")
generate.add_argument(
"--profile",
choices=("standard", "hard"),
......@@ -61,21 +38,8 @@ def main() -> None:
)
args = parser.parse_args()
if args.command == "build-index":
build_index(Path(args.lyrics_dir), Path(args.index))
elif args.command == "check-file":
check_file(Path(args.index), Path(args.file), args.max_candidates)
elif args.command == "batch-check":
batch_check(Path(args.index), Path(args.lyrics_dir), Path(args.out), args.max_candidates)
elif args.command == "evaluate-csv":
evaluate_csv(
Path(args.index),
Path(args.csv),
Path(args.out),
base_dir=Path(args.base_dir) if args.base_dir else None,
positive_decisions={item.strip() for item in args.positive_decisions.split(",") if item.strip()},
max_candidates=args.max_candidates,
)
if args.command == "check-file":
check_file_pg(args)
elif args.command == "generate-eval-set":
summary = generate_eval_set(
library_dir=Path(args.library_dir),
......@@ -84,315 +48,40 @@ def main() -> None:
size=args.size,
positive_ratio=args.positive_ratio,
seed=args.seed,
index_path=Path(args.index) if args.index else None,
eval_index_path=Path(args.eval_index) if args.eval_index else None,
profile=args.profile,
)
print(json.dumps(summary, ensure_ascii=False))
def build_index(lyrics_dir: Path, index_path: Path) -> None:
checker = DuplicateChecker()
records = records_from_dir(lyrics_dir)
for record in records:
checker.add_record(record)
index_path.parent.mkdir(parents=True, exist_ok=True)
checker.save(index_path)
print(json.dumps({"indexed": checker.record_count, "index": str(index_path)}, ensure_ascii=False))
def check_file(index_path: Path, file_path: Path, max_candidates: int) -> None:
checker = DuplicateChecker.load(index_path)
record = record_from_file(file_path)
result = checker.check_record(record, max_candidates=max_candidates)
print(json.dumps(_result_to_dict(result, source=str(file_path)), ensure_ascii=False, indent=2))
def batch_check(index_path: Path, lyrics_dir: Path, out_path: Path, max_candidates: int) -> None:
checker = DuplicateChecker.load(index_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
rows: list[dict[str, object]] = []
for path in iter_lyric_files(lyrics_dir):
record = record_from_file(path, base_dir=lyrics_dir)
result = checker.check_record(record, max_candidates=max_candidates)
best = result.candidates[0] if result.candidates else None
rows.append(
{
"source": str(path),
"record_id": record.record_id,
"decision": result.decision.value,
"confidence": result.confidence,
"reason": result.reason,
"best_candidate_id": best.record_id if best else "",
"best_candidate_decision": best.decision.value if best else "",
"best_candidate_confidence": best.confidence if best else "",
"best_candidate_jaccard": best.jaccard if best else "",
"best_candidate_line_coverage": best.line_coverage if best else "",
"best_candidate_primary_jaccard": best.primary_jaccard if best else "",
"best_candidate_primary_line_coverage": best.primary_line_coverage if best else "",
"best_candidate_translation_jaccard": best.translation_jaccard if best else "",
"best_candidate_translation_line_coverage": best.translation_line_coverage if best else "",
"best_candidate_reason": best.reason if best else "",
"matched_unique_lines": " | ".join(best.matched_unique_lines) if best else "",
}
)
if out_path.suffix.lower() == ".jsonl":
with out_path.open("w", encoding="utf-8") as file:
for row in rows:
file.write(json.dumps(row, ensure_ascii=False) + "\n")
else:
with out_path.open("w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=list(rows[0].keys()) if rows else ["source"])
writer.writeheader()
writer.writerows(rows)
summary = {
"checked": len(rows),
"duplicate": sum(1 for row in rows if row["decision"] == "duplicate"),
"review": sum(1 for row in rows if row["decision"] == "review"),
"new": sum(1 for row in rows if row["decision"] == "new"),
"out": str(out_path),
}
print(json.dumps(summary, ensure_ascii=False))
def check_file_pg(args: argparse.Namespace) -> None:
from lyric_dedup_server.config import ServerConfig
from lyric_dedup_server.service import DedupService
def evaluate_csv(
index_path: Path,
csv_path: Path,
out_path: Path,
*,
base_dir: Path | None,
positive_decisions: set[str],
max_candidates: int,
) -> None:
_progress(f"load index: {index_path}")
checker = DuplicateChecker.load(index_path)
rows: list[dict[str, object]] = []
total = _csv_data_row_count(csv_path)
_progress(f"evaluate csv: 0/{total}")
out_path.parent.mkdir(parents=True, exist_ok=True)
with csv_path.open(encoding="utf-8-sig", newline="") as file:
reader = csv.DictReader(file)
if reader.fieldnames is None:
raise ValueError("评估 CSV 需要表头")
fieldnames = [
"id",
"source",
"expected_duplicate",
"decision",
"predicted_duplicate",
"correct",
"confidence",
"reason",
"best_candidate_id",
"best_candidate_decision",
"best_candidate_confidence",
"best_candidate_jaccard",
"best_candidate_line_coverage",
"best_candidate_primary_jaccard",
"best_candidate_primary_line_coverage",
"best_candidate_translation_jaccard",
"best_candidate_translation_line_coverage",
"best_candidate_reason",
"matched_unique_lines",
]
with out_path.open("w", encoding="utf-8", newline="") as out_file:
writer = csv.DictWriter(out_file, fieldnames=fieldnames)
writer.writeheader()
for index, row in enumerate(reader, start=1):
row_out = _evaluate_row(
row,
row_number=index + 1,
checker=checker,
csv_path=csv_path,
base_dir=base_dir,
positive_decisions=positive_decisions,
max_candidates=max_candidates,
record = record_from_file(Path(args.file))
config = ServerConfig(
dsn=args.dsn,
max_candidates=args.max_candidates,
recall_limit=args.recall_limit,
enable_trgm=args.enable_trgm,
trgm_threshold=args.trgm_threshold,
statement_timeout_ms=args.statement_timeout_ms,
)
rows.append(row_out)
writer.writerow(row_out)
_progress_count("evaluate csv", index, total, step=1000)
summary = _evaluation_summary(rows, positive_decisions=positive_decisions, out_path=out_path)
summary_path = out_path.with_suffix(out_path.suffix + ".summary.json")
summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
_progress("evaluation complete")
print(json.dumps(summary, ensure_ascii=False))
def _result_to_dict(result, *, source: str) -> dict[str, object]:
return {
"source": source,
"decision": result.decision.value,
"confidence": result.confidence,
"reason": result.reason,
"candidates": [
service = DedupService(config=config)
result = service.check(record.lyrics, title=record.title, artist=record.artist)
print(
json.dumps(
{
"record_id": candidate.record_id,
"decision": candidate.decision.value,
"confidence": candidate.confidence,
"jaccard": candidate.jaccard,
"line_coverage": candidate.line_coverage,
"primary_jaccard": candidate.primary_jaccard,
"primary_line_coverage": candidate.primary_line_coverage,
"translation_jaccard": candidate.translation_jaccard,
"translation_line_coverage": candidate.translation_line_coverage,
"reason": candidate.reason,
"matched_unique_lines": list(candidate.matched_unique_lines),
}
for candidate in result.candidates
],
}
def _evaluate_row(
row: dict[str, str],
*,
row_number: int,
checker: DuplicateChecker,
csv_path: Path,
base_dir: Path | None,
positive_decisions: set[str],
max_candidates: int,
) -> dict[str, object]:
sample_id = row.get("id") or row.get("sample_id") or str(row_number)
record, source = _record_from_eval_row(row, csv_path=csv_path, base_dir=base_dir)
expected_duplicate = _parse_expected(row.get("expected") or row.get("label") or row.get("target"))
result = checker.check_record(record, max_candidates=max_candidates)
predicted_duplicate = result.decision.value in positive_decisions
best = result.candidates[0] if result.candidates else None
return {
"id": sample_id,
"source": source,
"expected_duplicate": expected_duplicate,
"decision": result.decision.value,
"predicted_duplicate": predicted_duplicate,
"correct": expected_duplicate == predicted_duplicate,
"source": args.file,
"decision": result.decision,
"duplicate": result.duplicate,
"confidence": result.confidence,
"reason": result.reason,
"best_candidate_id": best.record_id if best else "",
"best_candidate_decision": best.decision.value if best else "",
"best_candidate_confidence": best.confidence if best else "",
"best_candidate_jaccard": best.jaccard if best else "",
"best_candidate_line_coverage": best.line_coverage if best else "",
"best_candidate_primary_jaccard": best.primary_jaccard if best else "",
"best_candidate_primary_line_coverage": best.primary_line_coverage if best else "",
"best_candidate_translation_jaccard": best.translation_jaccard if best else "",
"best_candidate_translation_line_coverage": best.translation_line_coverage if best else "",
"best_candidate_reason": best.reason if best else "",
"matched_unique_lines": " | ".join(best.matched_unique_lines) if best else "",
}
def _lyrics_from_eval_row(row: dict[str, str], *, csv_path: Path, base_dir: Path | None) -> tuple[str, str]:
lyrics = (row.get("lyrics") or "").strip()
if lyrics:
return lyrics.replace("\\n", "\n"), "inline"
file_value = (row.get("file") or row.get("path") or row.get("source") or "").strip()
if not file_value:
raise ValueError("评估 CSV 每行需要提供 lyrics,或 file/path/source 文件路径")
file_path = Path(file_value)
if not file_path.is_absolute():
file_path = (base_dir or csv_path.parent) / file_path
return read_lyric_file(file_path), str(file_path)
def _record_from_eval_row(row: dict[str, str], *, csv_path: Path, base_dir: Path | None):
lyrics = (row.get("lyrics") or "").strip()
if lyrics:
return (
LyricRecord(
record_id=row.get("id") or row.get("sample_id") or "__eval__",
lyrics=lyrics.replace("\\n", "\n"),
title=row.get("title") or None,
artist=row.get("artist") or None,
),
"inline",
"candidate_count": result.candidate_count,
},
ensure_ascii=False,
indent=2,
)
file_value = (row.get("file") or row.get("path") or row.get("source") or "").strip()
if not file_value:
raise ValueError("评估 CSV 每行需要 lyrics,或 file/path/source 文件路径")
file_path = Path(file_value)
if not file_path.is_absolute():
file_path = (base_dir or csv_path.parent) / file_path
record = record_from_file(file_path)
if row.get("title") or row.get("artist"):
record = LyricRecord(
record_id=record.record_id,
lyrics=record.lyrics,
title=row.get("title") or record.title,
artist=row.get("artist") or record.artist,
)
return record, str(file_path)
def _parse_expected(value: str | None) -> bool:
if value is None:
raise ValueError("评估 CSV 每行需要 expected/label/target 列")
normalized = value.strip().lower()
positives = {"1", "true", "yes", "y", "duplicate", "dup", "重复", "应去重", "去重", "是"}
negatives = {"0", "false", "no", "n", "new", "not_duplicate", "non_duplicate", "不重复", "不应去重", "新歌", "否"}
if normalized in positives:
return True
if normalized in negatives:
return False
raise ValueError(f"无法识别 expected 值: {value!r}")
def _evaluation_summary(
rows: list[dict[str, object]],
*,
positive_decisions: set[str],
out_path: Path,
) -> dict[str, object]:
tp = sum(1 for row in rows if row["expected_duplicate"] is True and row["predicted_duplicate"] is True)
fp = sum(1 for row in rows if row["expected_duplicate"] is False and row["predicted_duplicate"] is True)
tn = sum(1 for row in rows if row["expected_duplicate"] is False and row["predicted_duplicate"] is False)
fn = sum(1 for row in rows if row["expected_duplicate"] is True and row["predicted_duplicate"] is False)
total = len(rows)
precision = tp / (tp + fp) if tp + fp else 0.0
recall = tp / (tp + fn) if tp + fn else 0.0
accuracy = (tp + tn) / total if total else 0.0
f1 = (2 * precision * recall / (precision + recall)) if precision + recall else 0.0
return {
"total": total,
"positive_decisions": sorted(positive_decisions),
"accuracy": round(accuracy, 4),
"precision": round(precision, 4),
"recall": round(recall, 4),
"f1": round(f1, 4),
"true_positive": tp,
"false_positive": fp,
"true_negative": tn,
"false_negative": fn,
"duplicate": sum(1 for row in rows if row["decision"] == "duplicate"),
"review": sum(1 for row in rows if row["decision"] == "review"),
"new": sum(1 for row in rows if row["decision"] == "new"),
"out": str(out_path),
"summary": str(out_path.with_suffix(out_path.suffix + ".summary.json")),
}
def _csv_data_row_count(csv_path: Path) -> int:
with csv_path.open(encoding="utf-8-sig", newline="") as file:
reader = csv.reader(file)
next(reader, None)
return sum(1 for _ in reader)
def _progress(message: str) -> None:
print(f"[eval] {message}", file=sys.stderr, flush=True)
def _progress_count(label: str, current: int, total: int, *, step: int = 1000) -> None:
if total <= 0:
return
if current == 1 or current == total or current % step == 0:
_progress(f"{label}: {current}/{total}")
if __name__ == "__main__":
......
......@@ -12,7 +12,6 @@ from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import LyricRecord
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import record_from_file
......@@ -133,8 +132,6 @@ def generate_eval_set(
)
holdout_ids = {profile.record_id for profile in holdout_profiles}
indexed_profiles = [profile for profile in profiles if profile.record_id not in holdout_ids] or profiles
eval_index_path = eval_index_path or csv_path.with_suffix(csv_path.suffix + ".index.pkl")
_build_eval_index(indexed_profiles, eval_index_path)
groups = _profile_groups(indexed_profiles)
samples: list[GeneratedSample] = []
......@@ -373,25 +370,6 @@ def _stratified_unique_sample(profiles: list[LyricProfile], count: int, rng: ran
return _stratified_sample(profiles, min(count, len(profiles)), rng)
def _build_eval_index(profiles: list[LyricProfile], index_path: Path) -> None:
_progress(f"build eval index excluding holdout: {index_path}")
checker = DuplicateChecker()
total = len(profiles)
for index, profile in enumerate(profiles, start=1):
checker.add_normalized_record(
LyricRecord(
record_id=profile.record_id,
lyrics=profile.raw_text,
title=profile.title or None,
artist=profile.artist or None,
),
profile.normalized,
)
_progress_count("build eval index", index, total, step=5000)
index_path.parent.mkdir(parents=True, exist_ok=True)
checker.save(index_path)
def _build_positive_samples(
profiles: list[LyricProfile],
output_dir: Path,
......@@ -889,7 +867,7 @@ def _write_manifest(
"sample_size": len(samples),
"plan": plan,
"source_index": str(index_path) if index_path else "",
"eval_index": str(eval_index_path),
"eval_index": str(eval_index_path) if eval_index_path else "",
"holdout_records": holdout_count,
"lyrics_dir": str(output_dir),
"csv": str(csv_path),
......
"""Small in-memory MinHash LSH index for incremental lyric lookup."""
from __future__ import annotations
import hashlib
from collections import defaultdict
from dataclasses import dataclass
_MAX_HASH = (1 << 64) - 1
@dataclass(frozen=True)
class MinHashConfig:
num_perm: int = 96
bands: int = 24
seed: int = 17
@property
def rows_per_band(self) -> int:
if self.num_perm % self.bands != 0:
raise ValueError("num_perm must be divisible by bands")
return self.num_perm // self.bands
class MinHashLSH:
def __init__(self, config: MinHashConfig | None = None) -> None:
self.config = config or MinHashConfig()
self._buckets: dict[tuple[int, tuple[int, ...]], set[str]] = defaultdict(set)
def signature(self, tokens: set[str]) -> tuple[int, ...]:
if not tokens:
return tuple([_MAX_HASH] * self.config.num_perm)
signature = [_MAX_HASH] * self.config.num_perm
for token in tokens:
encoded = token.encode("utf-8")
for idx in range(self.config.num_perm):
digest = hashlib.blake2b(
encoded,
digest_size=8,
person=f"lyr{self.config.seed + idx:05d}".encode("ascii")[:16],
).digest()
value = int.from_bytes(digest, "big")
if value < signature[idx]:
signature[idx] = value
return tuple(signature)
def add(self, record_id: str, signature: tuple[int, ...]) -> None:
for key in self._band_keys(signature):
self._buckets[key].add(record_id)
def query(self, signature: tuple[int, ...]) -> set[str]:
candidates: set[str] = set()
for key in self._band_keys(signature):
candidates.update(self._buckets.get(key, set()))
return candidates
def _band_keys(self, signature: tuple[int, ...]) -> list[tuple[int, tuple[int, ...]]]:
rows = self.config.rows_per_band
return [(band, signature[band * rows : (band + 1) * rows]) for band in range(self.config.bands)]
......@@ -8,69 +8,10 @@ import unicodedata
from collections import Counter
from dataclasses import dataclass
import opencc
_TRADITIONAL_TO_SIMPLIFIED = str.maketrans(
{
"愛": "爱",
"會": "会",
"個": "个",
"妳": "你",
"們": "们",
"麼": "么",
"夢": "梦",
"憶": "忆",
"風": "风",
"無": "无",
"與": "与",
"聽": "听",
"說": "说",
"見": "见",
"話": "话",
"還": "还",
"這": "这",
"那": "那",
"裡": "里",
"裏": "里",
"過": "过",
"來": "来",
"進": "进",
"去": "去",
"給": "给",
"讓": "让",
"嗎": "吗",
"為": "为",
"誰": "谁",
"對": "对",
"錯": "错",
"淚": "泪",
"寫": "写",
"雲": "云",
"藍": "蓝",
"紅": "红",
"綠": "绿",
"黃": "黄",
"長": "长",
"遠": "远",
"燈": "灯",
"臺": "台",
"台": "台",
"後": "后",
"從": "从",
"時": "时",
"間": "间",
"葉": "叶",
"歲": "岁",
"聲": "声",
"邊": "边",
"歡": "欢",
"繼": "继",
"續": "续",
"難": "难",
"雙": "双",
"舊": "旧",
"離": "离",
}
)
_T2S_CONVERTER = opencc.OpenCC("t2s.json")
_TIMESTAMP_RE = re.compile(r"\[((?:\d{1,2}:)?\d{1,2}:\d{2}(?:[.:]\d{1,3})?)\]")
_BRACKET_RE = re.compile(r"[\[((【<《].{0,40}?[\]))】>》]")
......@@ -212,7 +153,7 @@ def _split_inline_translation(line: str, timestamp: str | None, source_index: in
def _entry_from_text(text: str, timestamp: str | None, source_index: int) -> list[_LineEntry]:
line = _BRACKET_RE.sub("", text)
line = line.strip().lower().translate(_TRADITIONAL_TO_SIMPLIFIED)
line = _T2S_CONVERTER.convert(line.strip().lower())
if not line or _is_noise_line(line):
return []
line = _strip_symbols(line)
......
from .config import ServerConfig
from .service import DedupService
__all__ = ["ServerConfig", "DedupService"]
__all__ = ["ServerConfig"]
......
......@@ -4,14 +4,101 @@ from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
def _load_env_file() -> None:
"""Load root .env values without overriding real environment variables."""
env_path = Path(__file__).resolve().parent.parent / ".env"
if not env_path.exists():
return
with env_path.open(encoding="utf-8") as file:
for raw_line in file:
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
_load_env_file()
@dataclass
class ServerConfig:
# PostgreSQL DSN used by the dedup service.
dsn: str = os.getenv("LYRIC_DEDUP_DSN", "postgresql:///lyric_dedup")
# Maximum ranked candidates returned in the final API result.
max_candidates: int = int(os.getenv("LYRIC_DEDUP_MAX_CANDIDATES", "5"))
# Maximum candidates recalled from each PostgreSQL recall tier.
recall_limit: int = int(os.getenv("LYRIC_DEDUP_RECALL_LIMIT", "100"))
# Whether to use pg_trgm similarity recall in addition to exact hash and line hash recall.
enable_trgm: bool = os.getenv("LYRIC_DEDUP_ENABLE_TRGM", "false").lower() == "true"
# PostgreSQL pg_trgm recall threshold; lower values recall more candidates and cost more.
trgm_threshold: float = float(os.getenv("LYRIC_DEDUP_TRGM_THRESHOLD", "0.3"))
# PostgreSQL statement timeout for one dedup check, in milliseconds.
statement_timeout_ms: int = int(os.getenv("LYRIC_DEDUP_STATEMENT_TIMEOUT_MS", "5000"))
# HTTP download timeout for fetching lyric URLs, in seconds.
download_timeout: int = int(os.getenv("LYRIC_DEDUP_DOWNLOAD_TIMEOUT", "10"))
# Minimum primary n-gram Jaccard similarity required for automatic duplicate.
# Raising this makes automatic duplicate stricter; lowering it may increase false positives.
duplicate_jaccard_threshold: float = float(os.getenv("LYRIC_DEDUP_DUPLICATE_JACCARD_THRESHOLD", "0.78"))
# Minimum line coverage required for automatic duplicate.
# This is the main guard against treating partial lyric fragments as full duplicates.
duplicate_line_coverage_threshold: float = float(
os.getenv("LYRIC_DEDUP_DUPLICATE_LINE_COVERAGE_THRESHOLD", "0.72")
)
# Alternate automatic duplicate path: lower/normal Jaccard can still duplicate when line coverage is very high.
# Keep this aligned with duplicate_jaccard_threshold to avoid an unintended duplicate backdoor.
duplicate_high_coverage_jaccard_threshold: float = float(
os.getenv("LYRIC_DEDUP_DUPLICATE_HIGH_COVERAGE_JACCARD_THRESHOLD", "0.78")
)
# Line coverage required by the alternate high-coverage duplicate path.
# Raising this makes the alternate duplicate path stricter for near-complete variants.
duplicate_high_coverage_line_coverage_threshold: float = float(
os.getenv("LYRIC_DEDUP_DUPLICATE_HIGH_COVERAGE_LINE_COVERAGE_THRESHOLD", "0.90")
)
# Minimum primary/full n-gram Jaccard similarity that can send a candidate to review.
# Raising this reduces review volume; lowering it catches weaker suspicious overlaps.
review_jaccard_threshold: float = float(os.getenv("LYRIC_DEDUP_REVIEW_JACCARD_THRESHOLD", "0.45"))
# Minimum line coverage that can send a candidate to review when query coverage is also material.
# Raising this reduces fragment/short-overlap reviews; lowering it increases suspicious recall.
review_line_coverage_threshold: float = float(os.getenv("LYRIC_DEDUP_REVIEW_LINE_COVERAGE_THRESHOLD", "0.35"))
# Minimum share of query lines that must match before line coverage alone can trigger review.
# Raising this makes partial-fragment review stricter.
review_query_coverage_threshold: float = float(os.getenv("LYRIC_DEDUP_REVIEW_QUERY_COVERAGE_THRESHOLD", "0.40"))
# Very short query lyric line count that can force repeated-chorus overlap into review.
# Raising this catches more short chorus-like inputs; lowering it reduces review volume.
chorus_short_line_count_threshold: int = int(os.getenv("LYRIC_DEDUP_CHORUS_SHORT_LINE_COUNT_THRESHOLD", "6"))
# Minimum similarity/coverage signal for repeated-chorus overlap to be considered material.
# Raising this makes chorus-only review stricter.
chorus_material_overlap_threshold: float = float(os.getenv("LYRIC_DEDUP_CHORUS_MATERIAL_OVERLAP_THRESHOLD", "0.20"))
# Minimum query-side coverage for repeated-chorus overlap to be considered material.
# Raising this reduces review decisions caused by small shared chorus fragments.
chorus_material_query_coverage_threshold: float = float(
os.getenv("LYRIC_DEDUP_CHORUS_MATERIAL_QUERY_COVERAGE_THRESHOLD", "0.40")
)
# Weight assigned to primary n-gram Jaccard when computing confidence.
# This affects the reported confidence score, not the duplicate/review threshold checks directly.
confidence_jaccard_weight: float = float(os.getenv("LYRIC_DEDUP_CONFIDENCE_JACCARD_WEIGHT", "0.58"))
# Weight assigned to primary line coverage when computing confidence.
# Keep this coordinated with confidence_jaccard_weight; defaults sum to 1.0.
confidence_line_coverage_weight: float = float(os.getenv("LYRIC_DEDUP_CONFIDENCE_LINE_COVERAGE_WEIGHT", "0.42"))
......
......@@ -189,10 +189,25 @@ class DedupService:
candidates: list[LyricRecord],
) -> CheckResult:
"""Run DuplicateChecker against recalled candidates."""
checker = DuplicateChecker()
for candidate in candidates:
checker.add_record(candidate)
result = checker.check_record(record, max_candidates=self.config.max_candidates)
checker = DuplicateChecker(
duplicate_jaccard_threshold=self.config.duplicate_jaccard_threshold,
duplicate_line_coverage_threshold=self.config.duplicate_line_coverage_threshold,
duplicate_high_coverage_jaccard_threshold=self.config.duplicate_high_coverage_jaccard_threshold,
duplicate_high_coverage_line_coverage_threshold=self.config.duplicate_high_coverage_line_coverage_threshold,
review_jaccard_threshold=self.config.review_jaccard_threshold,
review_line_coverage_threshold=self.config.review_line_coverage_threshold,
review_query_coverage_threshold=self.config.review_query_coverage_threshold,
chorus_short_line_count_threshold=self.config.chorus_short_line_count_threshold,
chorus_material_overlap_threshold=self.config.chorus_material_overlap_threshold,
chorus_material_query_coverage_threshold=self.config.chorus_material_query_coverage_threshold,
confidence_jaccard_weight=self.config.confidence_jaccard_weight,
confidence_line_coverage_weight=self.config.confidence_line_coverage_weight,
)
result = checker.check_record_against_candidates(
record,
candidates,
max_candidates=self.config.max_candidates,
)
return CheckResult(
duplicate=result.decision in (DuplicateDecision.DUPLICATE, DuplicateDecision.REVIEW),
decision=result.decision.value,
......
......@@ -3,6 +3,7 @@ pytest>=8.0
# PostgreSQL storage prototype
psycopg[binary]>=3.2
OpenCC>=1.3.1
# Existing MySQL/COS lyric download utilities
pymysql>=1.1
......
......@@ -249,9 +249,7 @@ def _check_against_candidates(
max_candidates: int,
):
checker = DuplicateChecker()
for candidate in candidates:
checker.add_record(candidate)
return checker.check_record(record, max_candidates=max_candidates)
return checker.check_record_against_candidates(record, candidates, max_candidates=max_candidates)
def _record_from_eval_row(row: dict[str, str], *, csv_path: Path, base_dir: Path | None) -> tuple[LyricRecord, str]:
......
"""Process newly added lyric library files.
This script is intended for the recurring workflow after adding files to
``data/library``:
1. Move pure-music placeholder lyric files out of the active library.
2. Move duplicate lyric files out of the active library.
3. Rebuild the duplicate-checking index from retained files.
4. Optionally regenerate and evaluate a production-style eval set.
"""
from __future__ import annotations
import argparse
import csv
import json
import shutil
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import DuplicateDecision
from lyric_dedup.checker import LyricRecord
from lyric_dedup.cli import evaluate_csv
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import record_from_file
from lyric_dedup.normalization import NormalizedLyrics
from lyric_dedup.normalization import normalize_lyrics
PLACEHOLDER_MARKERS = (
"【曲库专用】",
"此歌曲为没有填词的纯音乐",
)
@dataclass(frozen=True)
class LibraryProfile:
path: Path
record: LyricRecord
normalized: NormalizedLyrics
line_count: int
char_count: int
def main() -> None:
parser = argparse.ArgumentParser(description="Process lyric library additions.")
parser.add_argument("--library-dir", default="data/library")
parser.add_argument("--index", default="outputs/indexes/library_lyrics.pkl")
parser.add_argument("--quarantine-dir", default="data/quarantine/no_lyrics_placeholders")
parser.add_argument("--duplicate-quarantine-dir", default="data/quarantine/duplicates")
parser.add_argument("--dry-run", action="store_true", help="Only report placeholder files; do not move or write outputs.")
parser.add_argument("--delete-placeholders", action="store_true", help="Delete matched placeholder files instead of moving them.")
parser.add_argument("--delete-duplicates", action="store_true", help="Delete duplicate lyric files instead of moving them.")
parser.add_argument("--skip-library-dedup", action="store_true", help="Skip internal duplicate cleanup before rebuilding the index.")
parser.add_argument("--eval-size", type=int, default=0, help="Generate and evaluate this many synthetic samples. 0 disables eval.")
parser.add_argument("--positive-ratio", type=float, default=0.2)
parser.add_argument("--eval-dir", default="data/generated_eval/incoming")
parser.add_argument("--eval-csv", default="data/generated_eval/eval.csv")
parser.add_argument("--eval-out", default="outputs/results/library_eval.csv")
parser.add_argument("--report", default="outputs/results/library_process_report.json")
args = parser.parse_args()
library_dir = Path(args.library_dir)
quarantine_dir = Path(args.quarantine_dir)
duplicate_quarantine_dir = Path(args.duplicate_quarantine_dir)
report_path = Path(args.report)
files_before = iter_lyric_files(library_dir)
placeholders = _find_placeholder_files(library_dir)
duplicate_report_path = report_path.with_suffix(".duplicates.csv")
moved_or_deleted: list[str] = []
duplicate_actions: list[str] = []
duplicate_rows: list[dict[str, object]] = []
short_effective: dict[str, int]
retained_count = 0
if not args.dry_run:
moved_or_deleted = _handle_placeholders(
placeholders,
library_dir=library_dir,
quarantine_dir=quarantine_dir,
delete=args.delete_placeholders,
)
if args.skip_library_dedup:
profiles = _profile_library(library_dir)
short_effective = _effective_line_report_from_profiles(profiles)
retained_count = _build_index_from_profiles(profiles, Path(args.index))
else:
profiles = _profile_library(library_dir)
short_effective = _effective_line_report_from_profiles(profiles)
retained_count, duplicate_rows, duplicate_actions = _deduplicate_and_build_index(
profiles,
library_dir=library_dir,
index_path=Path(args.index),
duplicate_quarantine_dir=duplicate_quarantine_dir,
delete=args.delete_duplicates,
dry_run=False,
)
_write_duplicate_report(duplicate_rows, duplicate_report_path)
if args.eval_size > 0:
eval_index_path = Path(args.eval_csv).with_suffix(".index.pkl")
generate_eval_set(
library_dir=library_dir,
output_dir=Path(args.eval_dir),
csv_path=Path(args.eval_csv),
size=args.eval_size,
positive_ratio=args.positive_ratio,
index_path=Path(args.index),
eval_index_path=eval_index_path,
)
evaluate_csv(
eval_index_path,
Path(args.eval_csv),
Path(args.eval_out),
base_dir=Path(args.eval_csv).parent,
positive_decisions={"duplicate"},
max_candidates=5,
)
evaluate_csv(
eval_index_path,
Path(args.eval_csv),
Path(args.eval_out).with_name(Path(args.eval_out).stem + "_review_positive.csv"),
base_dir=Path(args.eval_csv).parent,
positive_decisions={"duplicate", "review"},
max_candidates=5,
)
else:
profiles = _profile_library(library_dir)
short_effective = _effective_line_report_from_profiles(profiles)
if not args.skip_library_dedup:
retained_count, duplicate_rows, duplicate_actions = _deduplicate_and_build_index(
profiles,
library_dir=library_dir,
index_path=Path(args.index),
duplicate_quarantine_dir=duplicate_quarantine_dir,
delete=args.delete_duplicates,
dry_run=True,
)
else:
retained_count = len(profiles)
report = {
"timestamp": datetime.now().isoformat(timespec="seconds"),
"dry_run": args.dry_run,
"library_dir": str(library_dir),
"files_before": len(files_before),
"placeholder_matches": len(placeholders),
"placeholder_files": [str(path) for path in placeholders],
"handled_placeholder_files": moved_or_deleted,
"library_dedup_skipped": args.skip_library_dedup,
"duplicate_matches": len(duplicate_rows),
"duplicate_report": str(duplicate_report_path) if duplicate_rows else "",
"handled_duplicate_files": duplicate_actions[:1000],
"handled_duplicate_files_truncated": len(duplicate_actions) > 1000,
"retained_index_records": retained_count,
"files_after": len(iter_lyric_files(library_dir)),
"index": str(args.index),
"eval_size": args.eval_size,
"eval_csv": str(args.eval_csv) if args.eval_size > 0 else "",
"eval_out": str(args.eval_out) if args.eval_size > 0 else "",
"eval_index": str(Path(args.eval_csv).with_suffix(".index.pkl")) if args.eval_size > 0 else "",
"short_effective_line_counts": short_effective,
}
print(json.dumps(report, ensure_ascii=False, indent=2))
if not args.dry_run:
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
def _find_placeholder_files(library_dir: Path) -> list[Path]:
matches: list[Path] = []
for path in iter_lyric_files(library_dir):
text = read_lyric_file(path)
if any(marker in text for marker in PLACEHOLDER_MARKERS):
matches.append(path)
return matches
def _handle_placeholders(
placeholders: list[Path],
*,
library_dir: Path,
quarantine_dir: Path,
delete: bool,
) -> list[str]:
handled: list[str] = []
if not placeholders:
return handled
if not delete:
quarantine_dir.mkdir(parents=True, exist_ok=True)
for path in placeholders:
if delete:
path.unlink()
handled.append(f"deleted:{path}")
continue
relative = path.resolve().relative_to(library_dir.resolve())
destination = quarantine_dir / relative
destination.parent.mkdir(parents=True, exist_ok=True)
if destination.exists():
destination = destination.with_name(f"{destination.stem}_{datetime.now().strftime('%Y%m%d%H%M%S')}{destination.suffix}")
shutil.move(str(path), str(destination))
handled.append(f"moved:{path}->{destination}")
return handled
def _profile_library(library_dir: Path) -> list[LibraryProfile]:
profiles: list[LibraryProfile] = []
files = iter_lyric_files(library_dir)
_progress(f"profile active library: 0/{len(files)}")
for index, path in enumerate(files, start=1):
record = record_from_file(path, base_dir=library_dir)
normalized = normalize_lyrics(record.lyrics)
lines = normalized.primary_lines or normalized.unique_lines
normalized_text = normalized.normalized_full_text
profiles.append(
LibraryProfile(
path=path,
record=record,
normalized=normalized,
line_count=len(lines),
char_count=len(normalized_text),
)
)
_progress_count("profile active library", index, len(files), step=5000)
return profiles
def _build_index_from_profiles(profiles: list[LibraryProfile], index_path: Path) -> int:
checker = DuplicateChecker()
for index, profile in enumerate(profiles, start=1):
checker.add_normalized_record(profile.record, profile.normalized)
_progress_count("build index", index, len(profiles), step=5000)
index_path.parent.mkdir(parents=True, exist_ok=True)
checker.save(index_path)
return checker.record_count
def _deduplicate_and_build_index(
profiles: list[LibraryProfile],
*,
library_dir: Path,
index_path: Path,
duplicate_quarantine_dir: Path,
delete: bool,
dry_run: bool,
) -> tuple[int, list[dict[str, object]], list[str]]:
checker = DuplicateChecker()
duplicate_rows: list[dict[str, object]] = []
duplicate_actions: list[str] = []
ordered = sorted(profiles, key=_profile_quality_key)
_progress(f"deduplicate active library: 0/{len(ordered)}")
for index, profile in enumerate(ordered, start=1):
result = checker.check_record(profile.record, max_candidates=1)
best = result.candidates[0] if result.candidates else None
if result.decision == DuplicateDecision.DUPLICATE and best is not None:
duplicate_rows.append(
{
"duplicate_path": str(profile.path),
"duplicate_record_id": profile.record.record_id,
"kept_record_id": best.record_id,
"decision": result.decision.value,
"confidence": result.confidence,
"reason": result.reason,
"best_candidate_jaccard": best.jaccard,
"best_candidate_line_coverage": best.line_coverage,
"best_candidate_primary_jaccard": best.primary_jaccard,
"best_candidate_primary_line_coverage": best.primary_line_coverage,
"matched_unique_lines": " | ".join(best.matched_unique_lines),
"line_count": profile.line_count,
"char_count": profile.char_count,
}
)
if not dry_run:
duplicate_actions.append(
_handle_duplicate_file(
profile.path,
library_dir=library_dir,
duplicate_quarantine_dir=duplicate_quarantine_dir,
delete=delete,
)
)
else:
checker.add_normalized_record(profile.record, profile.normalized)
_progress_count("deduplicate active library", index, len(ordered), step=5000)
if not dry_run:
index_path.parent.mkdir(parents=True, exist_ok=True)
checker.save(index_path)
return checker.record_count, duplicate_rows, duplicate_actions
def _handle_duplicate_file(
path: Path,
*,
library_dir: Path,
duplicate_quarantine_dir: Path,
delete: bool,
) -> str:
if delete:
path.unlink()
return f"deleted:{path}"
duplicate_quarantine_dir.mkdir(parents=True, exist_ok=True)
relative = path.resolve().relative_to(library_dir.resolve())
destination = duplicate_quarantine_dir / relative
destination.parent.mkdir(parents=True, exist_ok=True)
if destination.exists():
destination = destination.with_name(f"{destination.stem}_{datetime.now().strftime('%Y%m%d%H%M%S')}{destination.suffix}")
shutil.move(str(path), str(destination))
return f"moved:{path}->{destination}"
def _profile_quality_key(profile: LibraryProfile) -> tuple[int, int, int, str]:
# Sort ascending; negative values make higher-quality records come first.
filename_quality = 0 if not profile.path.name.startswith("None_") else 1
return (filename_quality, -profile.line_count, -profile.char_count, str(profile.path))
def _write_duplicate_report(rows: list[dict[str, object]], report_path: Path) -> None:
if not rows:
return
report_path.parent.mkdir(parents=True, exist_ok=True)
with report_path.open("w", encoding="utf-8", newline="") as file:
writer = csv.DictWriter(file, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
def _effective_line_report(library_dir: Path) -> dict[str, int]:
return _effective_line_report_from_profiles(_profile_library(library_dir))
def _effective_line_report_from_profiles(profiles: list[LibraryProfile]) -> dict[str, int]:
buckets = {
"total": 0,
"zero_effective_lines": 0,
"one_to_three_effective_lines": 0,
"four_to_five_effective_lines": 0,
"six_plus_effective_lines": 0,
}
for profile in profiles:
buckets["total"] += 1
line_count = profile.line_count
if line_count == 0:
buckets["zero_effective_lines"] += 1
elif line_count <= 3:
buckets["one_to_three_effective_lines"] += 1
elif line_count <= 5:
buckets["four_to_five_effective_lines"] += 1
else:
buckets["six_plus_effective_lines"] += 1
return buckets
def _progress(message: str) -> None:
print(f"[process-library] {message}", file=sys.stderr, flush=True)
def _progress_count(label: str, current: int, total: int, *, step: int = 1000) -> None:
if total <= 0:
return
if current == 1 or current == total or current % step == 0:
_progress(f"{label}: {current}/{total}")
if __name__ == "__main__":
main()
# Lyric Dedup Sample Set
基准歌词: `test_api/test_lyric.txt`
这些样本用于检查当前去重系统的两类行为:
- `positive_*`: 应被判定为与基准歌词重复或高度重复。
- `negative_*`: 不应被判定为重复,用于检查主题、关键词或风格相似时的误杀。
## 样本说明
| 文件 | 期望 | 测试点 |
| --- | --- | --- |
| `positive_01_format_spacing_punctuation_duplicate.txt` | 去重命中 | 去掉标题/分隔线、改变空行、弱化标点后的同文变体 |
| `positive_02_minor_wording_typos_duplicate.txt` | 去重命中 | 少量错字、近义词、语序微调后的近重复 |
| `positive_03_section_order_shift_duplicate.txt` | 去重命中 | 段落顺序变化但核心文本大量重合 |
| `positive_04_partial_core_chorus_duplicate.txt` | 去重命中 | 只提交核心副歌/高潮片段时的局部重复检测 |
| `negative_01_same_theme_new_lyrics_not_duplicate.txt` | 不应命中 | 同样是凌晨、长安、雪、追梦,但逐句原创 |
| `negative_02_same_keywords_different_scene_not_duplicate.txt` | 不应命中 | 复用高频关键词,叙事场景和句法明显不同 |
| `negative_03_style_similar_low_overlap_not_duplicate.txt` | 不应命中 | 国风+Rap+都市融合风格相似,但文本低重合 |
| `negative_04_common_hook_phrases_not_duplicate.txt` | 不应命中 | 只含常见短语/意象,防止短文本公共表达误杀 |
......@@ -4,7 +4,6 @@ import json
from lyric_dedup import DuplicateChecker
from lyric_dedup import DuplicateDecision
from lyric_dedup import LyricRecord
from lyric_dedup.cli import evaluate_csv
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import record_from_file
from lyric_dedup.normalization import normalize_lyrics
......@@ -22,6 +21,14 @@ BASE_LYRIC = """
"""
def check_against(candidates: list[LyricRecord], lyrics: str, *, max_candidates: int = 10):
return DuplicateChecker().check_record_against_candidates(
LyricRecord("__query__", lyrics),
candidates,
max_candidates=max_candidates,
)
def test_normalization_removes_lyric_noise_and_simplifies() -> None:
normalized = normalize_lyrics("[00:01.20]我愛你!\nQQ音乐 www.example.com\n(副歌)\n聽風說話\n")
......@@ -31,10 +38,8 @@ def test_normalization_removes_lyric_noise_and_simplifies() -> None:
def test_exact_duplicate_handles_timestamps_punctuation_traditional_and_chorus_counts() -> None:
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
result = checker.check(
result = check_against(
[LyricRecord("song-1", BASE_LYRIC)],
"""
我愛你,在每個夜裡!!!
聽風說話,也聽見你
......@@ -51,8 +56,8 @@ def test_exact_duplicate_handles_timestamps_punctuation_traditional_and_chorus_c
def test_short_shared_repeated_chorus_is_review_not_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -63,9 +68,7 @@ def test_short_shared_repeated_chorus_is_review_not_duplicate() -> None:
转身以后各自旅行
""",
)
)
result = checker.check(
],
"""
山谷的雨落在清晨
我把名字交给星辰
......@@ -79,11 +82,9 @@ def test_short_shared_repeated_chorus_is_review_not_duplicate() -> None:
assert result.candidates[0].reason == "重合内容主要集中在重复副歌行,不自动判重"
def test_substantial_line_overlap_is_duplicate_after_lsh_recall() -> None:
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
result = checker.check(
def test_substantial_line_overlap_is_duplicate_after_pg_recall() -> None:
result = check_against(
[LyricRecord("song-1", BASE_LYRIC)],
"""
我爱你在每个夜里
听风说话也听见你
......@@ -100,10 +101,8 @@ def test_substantial_line_overlap_is_duplicate_after_lsh_recall() -> None:
def test_fragment_of_full_song_is_not_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
result = checker.check(
result = check_against(
[LyricRecord("song-1", BASE_LYRIC)],
"""
听风说话也听见你
城市的灯慢慢亮起
......@@ -116,8 +115,8 @@ def test_fragment_of_full_song_is_not_duplicate() -> None:
def test_catalog_mashup_fragments_are_new_not_review() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -127,9 +126,7 @@ def test_catalog_mashup_fragments_are_new_not_review() -> None:
还要瞒着所有人不说
第一首歌的结尾
""",
)
)
checker.add_record(
),
LyricRecord(
"song-2",
"""
......@@ -139,9 +136,7 @@ def test_catalog_mashup_fragments_are_new_not_review() -> None:
我们走过人群
第二首歌的结尾
""",
)
)
checker.add_record(
),
LyricRecord(
"song-3",
"""
......@@ -151,10 +146,8 @@ def test_catalog_mashup_fragments_are_new_not_review() -> None:
你没有再回来
第三首歌的结尾
""",
)
)
result = checker.check(
),
],
"""
每天都在伪装幸福快乐
还要瞒着所有人不说
......@@ -169,8 +162,8 @@ def test_catalog_mashup_fragments_are_new_not_review() -> None:
def test_large_mashup_with_one_recognizable_song_fragment_is_new() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -188,9 +181,7 @@ def test_large_mashup_with_one_recognizable_song_fragment_is_new() -> None:
无人在只剩下我自己
""",
)
)
result = checker.check(
],
"""
scroll through the pictures from a year ago
the pixels change but the feelings dont grow
......@@ -238,15 +229,13 @@ def test_no_effective_lyrics_use_metadata_fallback_without_empty_hash_collision(
混音:DJ金木
【未经著作权人许可 不得翻唱 翻录或使用】
"""
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", placeholder, title="Amnesia(House)", artist="DJ金木"))
checker.add_record(LyricRecord("song-2", placeholder, title="Angel(纯音乐)", artist="DJ金木"))
same_song = checker.check_record(
LyricRecord("__query__", placeholder, title="Amnesia(House)", artist="DJ金木")
same_song = DuplicateChecker().check_record_against_candidates(
LyricRecord("__query__", placeholder, title="Amnesia(House)", artist="DJ金木"),
[LyricRecord("song-1", placeholder, title="Amnesia(House)", artist="DJ金木")],
)
different_title = checker.check_record(
LyricRecord("__query__", placeholder, title="Different Song", artist="DJ金木")
different_title = DuplicateChecker().check_record_against_candidates(
LyricRecord("__query__", placeholder, title="Different Song", artist="DJ金木"),
[LyricRecord("song-2", placeholder, title="Angel(纯音乐)", artist="DJ金木")],
)
assert same_song.decision == DuplicateDecision.DUPLICATE
......@@ -269,18 +258,18 @@ def test_no_effective_lyrics_metadata_fallback_ignores_placeholder_noise() -> No
[00:04.00]作曲:DJ金木...
[00:05.00]未经著作权人许可 不得翻唱
"""
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", source, title="Amnesia(House)", artist="DJ金木"))
result = checker.check_record(LyricRecord("__query__", noisy, title="Amnesia(House)", artist="DJ金木"))
result = DuplicateChecker().check_record_against_candidates(
LyricRecord("__query__", noisy, title="Amnesia(House)", artist="DJ金木"),
[LyricRecord("song-1", source, title="Amnesia(House)", artist="DJ金木")],
)
assert result.decision == DuplicateDecision.DUPLICATE
assert result.reason == "无有效歌词,文件内容兜底特征高度相似"
def test_unrelated_lyrics_with_shared_watermark_are_new() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -289,9 +278,7 @@ def test_unrelated_lyrics_with_shared_watermark_are_new() -> None:
我等一封迟来的信
""",
)
)
result = checker.check(
],
"""
歌词来自QQ音乐
南方的雨穿过街心
......@@ -300,12 +287,12 @@ def test_unrelated_lyrics_with_shared_watermark_are_new() -> None:
)
assert result.decision == DuplicateDecision.NEW
assert result.candidates == ()
assert result.candidates[0].decision == DuplicateDecision.NEW
def test_mixed_chinese_english_tokenization_recalls_candidate() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -315,9 +302,7 @@ def test_mixed_chinese_english_tokenization_recalls_candidate() -> None:
never let me go
""",
)
)
result = checker.check(
],
"""
say hello 在风里
hold me close tonight
......@@ -329,17 +314,14 @@ def test_mixed_chinese_english_tokenization_recalls_candidate() -> None:
assert result.decision == DuplicateDecision.DUPLICATE
def test_checker_can_persist_index(tmp_path) -> None:
index_path = tmp_path / "lyrics.pkl"
checker = DuplicateChecker()
checker.add_record(LyricRecord("song-1", BASE_LYRIC))
checker.save(index_path)
loaded = DuplicateChecker.load(index_path)
result = loaded.check(BASE_LYRIC)
def test_checker_can_rank_explicit_pg_recalled_candidates_without_in_memory_recall() -> None:
result = DuplicateChecker().check_record_against_candidates(
LyricRecord("__query__", BASE_LYRIC),
candidates=[],
)
assert loaded.record_count == 1
assert result.decision == DuplicateDecision.DUPLICATE
assert result.decision == DuplicateDecision.NEW
assert result.candidates == ()
def test_record_from_lrc_file(tmp_path) -> None:
......@@ -363,44 +345,6 @@ def test_record_from_song_artist_lyrics_filename(tmp_path) -> None:
assert record.artist == "DJ金木"
def test_evaluate_csv_reports_binary_metrics(tmp_path) -> None:
library = tmp_path / "library"
incoming = tmp_path / "incoming"
library.mkdir()
incoming.mkdir()
(library / "歌手A - 夜里.lrc").write_text(BASE_LYRIC, encoding="utf-8")
(incoming / "dup.lrc").write_text(BASE_LYRIC.replace("我爱你", "我愛你"), encoding="utf-8")
(incoming / "new.txt").write_text("南方的雨穿过街心\n你把故事说给云听\n", encoding="utf-8")
checker = DuplicateChecker()
checker.add_record(record_from_file(library / "歌手A - 夜里.lrc", base_dir=library))
index_path = tmp_path / "lyrics.pkl"
checker.save(index_path)
eval_csv = tmp_path / "eval.csv"
eval_csv.write_text(
"id,file,expected\n"
"case-1,incoming/dup.lrc,应去重\n"
"case-2,incoming/new.txt,不应去重\n",
encoding="utf-8",
)
out_path = tmp_path / "eval_out.csv"
evaluate_csv(
index_path,
eval_csv,
out_path,
base_dir=tmp_path,
positive_decisions={"duplicate"},
max_candidates=5,
)
rows = list(csv.DictReader(out_path.open(encoding="utf-8")))
assert [row["correct"] for row in rows] == ["True", "True"]
assert rows[0]["reason"] == "规范化后的原文歌词哈希完全一致"
assert (tmp_path / "eval_out.csv.summary.json").exists()
def test_generated_eval_set_uses_stratified_production_mix(tmp_path) -> None:
library = tmp_path / "library"
incoming = tmp_path / "generated" / "incoming"
......@@ -424,7 +368,7 @@ def test_generated_eval_set_uses_stratified_production_mix(tmp_path) -> None:
assert manifest["sample_size"] == 30
assert manifest["unique_source_records"] > 1
assert manifest["holdout_records"] > 1
assert (tmp_path / "generated" / "eval.csv.index.pkl").exists()
assert manifest["eval_index"] == ""
assert "positive_full_duplicate" in manifest["plan"]
assert "negative_real_holdout_full_song" in negative_types
assert "negative_fragment" in negative_types
......@@ -466,8 +410,8 @@ def test_generated_hard_eval_set_uses_business_realistic_edge_mix(tmp_path) -> N
def test_foreign_original_with_added_chinese_translation_is_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -476,9 +420,7 @@ def test_foreign_original_with_added_chinese_translation_is_duplicate() -> None:
Never let me go
""",
)
)
result = checker.check(
],
"""
I miss you tonight
今晚我想你
......@@ -509,8 +451,8 @@ def test_same_timestamp_translation_split_is_high_confidence() -> None:
def test_translation_only_overlap_is_review_not_duplicate() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -522,9 +464,7 @@ def test_translation_only_overlap_is_review_not_duplicate() -> None:
永远不要让我离开
""",
)
)
result = checker.check(
],
"""
Te extrano esta noche
今晚我想你
......@@ -541,8 +481,8 @@ def test_translation_only_overlap_is_review_not_duplicate() -> None:
def test_block_translation_split_is_review_when_primary_matches() -> None:
checker = DuplicateChecker()
checker.add_record(
result = check_against(
[
LyricRecord(
"song-1",
"""
......@@ -551,9 +491,7 @@ def test_block_translation_split_is_review_when_primary_matches() -> None:
Never let me go
""",
)
)
result = checker.check(
],
"""
I miss you tonight
Under the moonlight
......