checker.py
20 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
"""Incremental lyric duplicate checker."""
from __future__ import annotations
import hashlib
import pickle
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from lyric_dedup.minhash_lsh import MinHashConfig
from lyric_dedup.minhash_lsh import MinHashLSH
from lyric_dedup.normalization import NormalizedLyrics
from lyric_dedup.normalization import fingerprint_text
from lyric_dedup.normalization import lyric_tokens
from lyric_dedup.normalization import normalize_lyrics
class DuplicateDecision(str, Enum):
DUPLICATE = "duplicate"
REVIEW = "review"
NEW = "new"
@dataclass(frozen=True)
class LyricRecord:
record_id: str
lyrics: str
title: str | None = None
artist: str | None = None
@dataclass(frozen=True)
class CandidateMatch:
record_id: str
decision: DuplicateDecision
confidence: float
jaccard: float
line_coverage: float
primary_jaccard: float
primary_line_coverage: float
translation_jaccard: float
translation_line_coverage: float
matched_unique_lines: tuple[str, ...]
reason: str
@dataclass(frozen=True)
class DuplicateCheckResult:
decision: DuplicateDecision
confidence: float
candidates: tuple[CandidateMatch, ...]
normalized_full_text: str
reason: str
@dataclass(frozen=True)
class _IndexedRecord:
record: LyricRecord
normalized: NormalizedLyrics
exact_hash: str
tokens: set[str]
primary_tokens: set[str]
translation_tokens: set[str]
fallback_lines: tuple[str, ...]
fallback_tokens: set[str]
signature: tuple[int, ...]
class DuplicateChecker:
"""In-memory first version for checking newly submitted lyrics.
The API is intentionally small: build or load records with ``add_record``, then
call ``check`` for a new lyric. Persistence can serialize the indexed fields
later without changing result semantics.
"""
def __init__(
self,
*,
minhash_config: MinHashConfig | None = None,
duplicate_jaccard_threshold: float = 0.78,
duplicate_line_coverage_threshold: float = 0.72,
review_jaccard_threshold: float = 0.45,
review_line_coverage_threshold: float = 0.35,
) -> None:
self._lsh = MinHashLSH(minhash_config)
self._records: dict[str, _IndexedRecord] = {}
self._exact_hash_to_ids: dict[str, set[str]] = {}
self._line_to_ids: dict[str, set[str]] = {}
self._token_to_ids: dict[str, set[str]] = {}
self.duplicate_jaccard_threshold = duplicate_jaccard_threshold
self.duplicate_line_coverage_threshold = duplicate_line_coverage_threshold
self.review_jaccard_threshold = review_jaccard_threshold
self.review_line_coverage_threshold = review_line_coverage_threshold
def add_record(self, record: LyricRecord) -> None:
indexed = self._index(record)
self._add_indexed(record.record_id, indexed)
def add_normalized_record(self, record: LyricRecord, normalized: NormalizedLyrics) -> None:
"""Add a record when normalized lyrics have already been computed."""
indexed = self._index_normalized(record, normalized)
self._add_indexed(record.record_id, indexed)
def _add_indexed(self, record_id: str, indexed: _IndexedRecord) -> None:
self._records[record_id] = indexed
self._exact_hash_to_ids.setdefault(indexed.exact_hash, set()).add(record_id)
for line in indexed.normalized.unique_lines:
if len(line) >= 4:
self._line_to_ids.setdefault(line, set()).add(record_id)
for token in indexed.tokens:
self._token_to_ids.setdefault(token, set()).add(record_id)
for token in indexed.fallback_tokens:
self._token_to_ids.setdefault(token, set()).add(record_id)
self._lsh.add(record_id, indexed.signature)
def save(self, path: str | Path) -> None:
"""Persist the in-memory index for later checks."""
with Path(path).open("wb") as file:
pickle.dump(self, file, protocol=pickle.HIGHEST_PROTOCOL)
@classmethod
def load(cls, path: str | Path) -> "DuplicateChecker":
"""Load a previously persisted index."""
with Path(path).open("rb") as file:
checker = pickle.load(file)
if not isinstance(checker, cls):
raise TypeError(f"{path} does not contain a DuplicateChecker index")
return checker
@property
def record_count(self) -> int:
return len(self._records)
def check(self, lyrics: str, *, max_candidates: int = 10) -> DuplicateCheckResult:
return self.check_record(LyricRecord(record_id="__query__", lyrics=lyrics), max_candidates=max_candidates)
def check_record(self, record: LyricRecord, *, max_candidates: int = 10) -> DuplicateCheckResult:
query = self._index(record)
exact_ids = self._exact_hash_to_ids.get(query.exact_hash, set())
if exact_ids:
candidates = tuple(self._rank_exact_candidate(query, self._records[record_id]) for record_id in sorted(exact_ids)[:max_candidates])
duplicate = next((candidate for candidate in candidates if candidate.decision == DuplicateDecision.DUPLICATE), None)
if duplicate is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.DUPLICATE,
confidence=duplicate.confidence,
candidates=candidates,
normalized_full_text=query.normalized.normalized_full_text,
reason=duplicate.reason,
)
return DuplicateCheckResult(
decision=DuplicateDecision.REVIEW,
confidence=candidates[0].confidence,
candidates=candidates,
normalized_full_text=query.normalized.normalized_full_text,
reason=candidates[0].reason,
)
candidate_ids = self._recall_candidates(query)
ranked = sorted(
(self._rank_candidate(query, self._records[record_id]) for record_id in candidate_ids),
key=lambda item: (item.decision == DuplicateDecision.DUPLICATE, item.confidence, item.jaccard),
reverse=True,
)[:max_candidates]
duplicate = next((candidate for candidate in ranked if candidate.decision == DuplicateDecision.DUPLICATE), None)
if duplicate is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.DUPLICATE,
confidence=duplicate.confidence,
candidates=tuple(ranked),
normalized_full_text=query.normalized.normalized_full_text,
reason=duplicate.reason,
)
review = next((candidate for candidate in ranked if candidate.decision == DuplicateDecision.REVIEW), None)
if review is not None:
return DuplicateCheckResult(
decision=DuplicateDecision.REVIEW,
confidence=review.confidence,
candidates=tuple(ranked),
normalized_full_text=query.normalized.normalized_full_text,
reason=review.reason,
)
return DuplicateCheckResult(
decision=DuplicateDecision.NEW,
confidence=1.0 - (ranked[0].confidence if ranked else 0.0),
candidates=tuple(ranked),
normalized_full_text=query.normalized.normalized_full_text,
reason="精确匹配、近重复召回和字面重合信号都较低",
)
def _index(self, record: LyricRecord) -> _IndexedRecord:
normalized = normalize_lyrics(record.lyrics)
return self._index_normalized(record, normalized)
def _index_normalized(self, record: LyricRecord, normalized: NormalizedLyrics) -> _IndexedRecord:
tokens = lyric_tokens(normalized)
primary_tokens = lyric_tokens(normalized, lines=normalized.primary_lines)
translation_tokens = lyric_tokens(normalized, lines=normalized.translation_lines)
fallback_lines = tuple(_fallback_no_lyrics_lines(record.lyrics))
fallback_tokens = set(fallback_lines)
signature = self._lsh.signature(primary_tokens or tokens or fallback_tokens)
exact_hash = hashlib.sha256(_exact_fingerprint(normalized, fallback_lines).encode("utf-8")).hexdigest()
return _IndexedRecord(
record=record,
normalized=normalized,
exact_hash=exact_hash,
tokens=tokens,
primary_tokens=primary_tokens,
translation_tokens=translation_tokens,
fallback_lines=fallback_lines,
fallback_tokens=fallback_tokens,
signature=signature,
)
def _recall_candidates(self, query: _IndexedRecord) -> set[str]:
candidate_ids = self._lsh.query(query.signature)
for line in query.normalized.primary_lines:
if len(line) >= 4:
candidate_ids.update(self._line_to_ids.get(line, set()))
for line in query.normalized.translation_lines:
if len(line) >= 4:
candidate_ids.update(self._line_to_ids.get(line, set()))
for token in query.primary_tokens or query.tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
for token in query.translation_tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
for token in query.fallback_tokens:
candidate_ids.update(self._token_to_ids.get(token, set()))
return candidate_ids
def _rank_exact_candidate(self, query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
low_confidence_split = (
query.normalized.split_confidence == "low" or candidate.normalized.split_confidence == "low"
)
translation_jaccard = _jaccard(query.translation_tokens, candidate.translation_tokens)
translation_coverage, _ = _line_coverage_lines(
query.normalized.translation_lines,
candidate.normalized.translation_lines,
)
no_effective_lyrics = not query.normalized.primary_lines and not candidate.normalized.primary_lines
if no_effective_lyrics:
decision = DuplicateDecision.DUPLICATE
confidence = 1.0
reason = "无有效歌词,使用文件内容兜底指纹命中"
elif low_confidence_split:
decision = DuplicateDecision.REVIEW
confidence = 0.95
reason = "原文哈希一致,但疑似整段翻译结构拆分置信度较低,需要人工复核"
elif query.normalized.translation_lines or candidate.normalized.translation_lines:
decision = DuplicateDecision.DUPLICATE
confidence = 1.0
reason = "规范化后的原文歌词哈希完全一致,翻译行未参与自动判重"
else:
decision = DuplicateDecision.DUPLICATE
confidence = 1.0
reason = "规范化后的原文歌词哈希完全一致"
return CandidateMatch(
record_id=candidate.record.record_id,
decision=decision,
confidence=confidence,
jaccard=1.0,
line_coverage=1.0,
primary_jaccard=1.0,
primary_line_coverage=1.0,
translation_jaccard=round(translation_jaccard, 4),
translation_line_coverage=round(translation_coverage, 4),
matched_unique_lines=query.normalized.primary_lines,
reason=reason,
)
def _rank_candidate(self, query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
if not query.normalized.primary_lines or not candidate.normalized.primary_lines:
return _rank_no_effective_lyrics_candidate(query, candidate)
jaccard = _jaccard(query.tokens, candidate.tokens)
coverage, matched_lines = _line_coverage(query.normalized, candidate.normalized)
primary_jaccard = _jaccard(query.primary_tokens, candidate.primary_tokens)
primary_coverage, primary_matched_lines = _line_coverage_lines(
query.normalized.primary_lines,
candidate.normalized.primary_lines,
)
translation_jaccard = _jaccard(query.translation_tokens, candidate.translation_tokens)
translation_coverage, translation_matched_lines = _line_coverage_lines(
query.normalized.translation_lines,
candidate.normalized.translation_lines,
)
chorus_only = _is_chorus_only_match(query.normalized, candidate.normalized, primary_matched_lines)
translation_only = (
bool(translation_matched_lines)
and primary_jaccard < self.review_jaccard_threshold
and primary_coverage < self.review_line_coverage_threshold
and (translation_jaccard >= self.review_jaccard_threshold or translation_coverage >= self.review_line_coverage_threshold)
)
low_confidence_split = (
query.normalized.split_confidence == "low" or candidate.normalized.split_confidence == "low"
)
confidence = round((0.58 * primary_jaccard) + (0.42 * primary_coverage), 4)
if (
(primary_jaccard >= self.duplicate_jaccard_threshold or (primary_jaccard >= 0.78 and primary_coverage >= 0.9))
and primary_coverage >= self.duplicate_line_coverage_threshold
and not chorus_only
and not translation_only
and not low_confidence_split
):
decision = DuplicateDecision.DUPLICATE
if query.normalized.translation_lines or candidate.normalized.translation_lines:
reason = "原文歌词高度一致,翻译行未参与自动判重"
else:
reason = "原文 n-gram 字面相似度高,且行级覆盖范围广"
elif (
chorus_only
or translation_only
or low_confidence_split
or primary_jaccard >= self.review_jaccard_threshold
or primary_coverage >= self.review_line_coverage_threshold
or jaccard >= self.review_jaccard_threshold
or coverage >= self.review_line_coverage_threshold
):
decision = DuplicateDecision.REVIEW
reason = "候选相似度达到复核阈值,需要人工确认"
if chorus_only:
reason = "重合内容主要集中在重复副歌行,不自动判重"
elif translation_only:
reason = "仅翻译行相似,原文字面重合不足,不自动判重"
elif low_confidence_split:
reason = "疑似整段翻译结构但拆分置信度较低,需要人工复核"
else:
decision = DuplicateDecision.NEW
reason = "候选重合度低于复核阈值"
return CandidateMatch(
record_id=candidate.record.record_id,
decision=decision,
confidence=confidence,
jaccard=round(jaccard, 4),
line_coverage=round(coverage, 4),
primary_jaccard=round(primary_jaccard, 4),
primary_line_coverage=round(primary_coverage, 4),
translation_jaccard=round(translation_jaccard, 4),
translation_line_coverage=round(translation_coverage, 4),
matched_unique_lines=tuple(matched_lines),
reason=reason,
)
def _rank_no_effective_lyrics_candidate(query: _IndexedRecord, candidate: _IndexedRecord) -> CandidateMatch:
fallback_jaccard = _jaccard(query.fallback_tokens, candidate.fallback_tokens)
fallback_coverage, matched_lines = _line_coverage_lines(query.fallback_lines, candidate.fallback_lines)
if fallback_jaccard >= 0.35 and fallback_coverage >= 0.35 and len(matched_lines) >= 2:
return CandidateMatch(
record_id=candidate.record.record_id,
decision=DuplicateDecision.DUPLICATE,
confidence=round((0.58 * fallback_jaccard) + (0.42 * fallback_coverage), 4),
jaccard=round(fallback_jaccard, 4),
line_coverage=round(fallback_coverage, 4),
primary_jaccard=0.0,
primary_line_coverage=0.0,
translation_jaccard=0.0,
translation_line_coverage=0.0,
matched_unique_lines=tuple(matched_lines),
reason="无有效歌词,文件内容兜底特征高度相似",
)
if fallback_jaccard >= 0.2 or fallback_coverage >= 0.2:
return CandidateMatch(
record_id=candidate.record.record_id,
decision=DuplicateDecision.REVIEW,
confidence=round((0.58 * fallback_jaccard) + (0.42 * fallback_coverage), 4),
jaccard=round(fallback_jaccard, 4),
line_coverage=round(fallback_coverage, 4),
primary_jaccard=0.0,
primary_line_coverage=0.0,
translation_jaccard=0.0,
translation_line_coverage=0.0,
matched_unique_lines=tuple(matched_lines),
reason="无有效歌词,文件内容兜底特征部分相似,需要人工复核",
)
return CandidateMatch(
record_id=candidate.record.record_id,
decision=DuplicateDecision.NEW,
confidence=0.0,
jaccard=round(fallback_jaccard, 4),
line_coverage=round(fallback_coverage, 4),
primary_jaccard=0.0,
primary_line_coverage=0.0,
translation_jaccard=0.0,
translation_line_coverage=0.0,
matched_unique_lines=(),
reason="无有效歌词,且文件内容兜底特征未命中",
)
def _jaccard(left: set[str], right: set[str]) -> float:
if not left and not right:
return 1.0
if not left or not right:
return 0.0
return len(left & right) / len(left | right)
def _exact_fingerprint(normalized: NormalizedLyrics, fallback_lines: tuple[str, ...]) -> str:
primary_text = fingerprint_text(normalized)
if primary_text:
return f"lyrics|{primary_text}"
return "no_effective_lyrics_content|" + "\n".join(fallback_lines)
def _fallback_no_lyrics_lines(text: str) -> list[str]:
import re
import unicodedata
lines: list[str] = []
for raw_line in unicodedata.normalize("NFKC", text).splitlines():
line = raw_line.strip().lower()
line = re.sub(r"\[(?:\d{1,2}:)?\d{1,2}:\d{2}(?:[.:]\d{1,3})?\]", "", line)
line = re.sub(r"[【\[].{0,80}?[】\]]", "", line)
if "歌词来自" in line or "qq音乐" in line or "网易云" in line or "酷狗" in line:
continue
if "未经" in line or "不得翻唱" in line or "不得翻录" in line or "著作权" in line:
continue
punctuation = ",。!?;:、“”‘’·…—~!¥()【】《》〈〉「」『』﹏,.;:!?()[]{}<>|/\\_-"
line = "".join(" " if char in punctuation else char for char in line)
line = re.sub(r"\s+", " ", line).strip()
if line:
lines.append(line)
return list(dict.fromkeys(lines))
def _line_coverage(left: NormalizedLyrics, right: NormalizedLyrics) -> tuple[float, list[str]]:
return _line_coverage_lines(left.unique_lines, right.unique_lines)
def _line_coverage_lines(left: tuple[str, ...], right: tuple[str, ...]) -> tuple[float, list[str]]:
left_lines = set(left)
right_lines = set(right)
if not left_lines and not right_lines:
return 1.0, []
if not left_lines or not right_lines:
return 0.0, []
matched = sorted(left_lines & right_lines)
return len(matched) / max(len(left_lines), len(right_lines)), matched
def _is_chorus_only_match(left: NormalizedLyrics, right: NormalizedLyrics, matched_lines: list[str]) -> bool:
if not matched_lines:
return False
matched = set(matched_lines)
repeated_matches = [
line
for line in matched
if left.line_counts.get(line, 0) >= 2 or right.line_counts.get(line, 0) >= 2
]
if len(matched) <= 2 and repeated_matches:
return True
if repeated_matches and len(repeated_matches) / len(matched) >= 0.8:
matched_ratio_left = sum(left.line_counts.get(line, 0) for line in matched) / max(left.content_line_count, 1)
matched_ratio_right = sum(right.line_counts.get(line, 0) for line in matched) / max(right.content_line_count, 1)
return min(matched_ratio_left, matched_ratio_right) < 0.7
return False