process_library.py 15 KB
"""Process newly added lyric library files.

This script is intended for the recurring workflow after adding files to
``data/library``:

1. Move pure-music placeholder lyric files out of the active library.
2. Move duplicate lyric files out of the active library.
3. Rebuild the duplicate-checking index from retained files.
4. Optionally regenerate and evaluate a production-style eval set.
"""

from __future__ import annotations

import argparse
import csv
import json
import shutil
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import DuplicateDecision
from lyric_dedup.checker import LyricRecord
from lyric_dedup.cli import evaluate_csv
from lyric_dedup.eval_dataset import generate_eval_set
from lyric_dedup.file_import import iter_lyric_files
from lyric_dedup.file_import import read_lyric_file
from lyric_dedup.file_import import record_from_file
from lyric_dedup.normalization import NormalizedLyrics
from lyric_dedup.normalization import normalize_lyrics


PLACEHOLDER_MARKERS = (
    "【曲库专用】",
    "此歌曲为没有填词的纯音乐",
)


@dataclass(frozen=True)
class LibraryProfile:
    path: Path
    record: LyricRecord
    normalized: NormalizedLyrics
    line_count: int
    char_count: int


def main() -> None:
    parser = argparse.ArgumentParser(description="Process lyric library additions.")
    parser.add_argument("--library-dir", default="data/library")
    parser.add_argument("--index", default="outputs/indexes/library_lyrics.pkl")
    parser.add_argument("--quarantine-dir", default="data/quarantine/no_lyrics_placeholders")
    parser.add_argument("--duplicate-quarantine-dir", default="data/quarantine/duplicates")
    parser.add_argument("--dry-run", action="store_true", help="Only report placeholder files; do not move or write outputs.")
    parser.add_argument("--delete-placeholders", action="store_true", help="Delete matched placeholder files instead of moving them.")
    parser.add_argument("--delete-duplicates", action="store_true", help="Delete duplicate lyric files instead of moving them.")
    parser.add_argument("--skip-library-dedup", action="store_true", help="Skip internal duplicate cleanup before rebuilding the index.")
    parser.add_argument("--eval-size", type=int, default=0, help="Generate and evaluate this many synthetic samples. 0 disables eval.")
    parser.add_argument("--positive-ratio", type=float, default=0.2)
    parser.add_argument("--eval-dir", default="data/generated_eval/incoming")
    parser.add_argument("--eval-csv", default="data/generated_eval/eval.csv")
    parser.add_argument("--eval-out", default="outputs/results/library_eval.csv")
    parser.add_argument("--report", default="outputs/results/library_process_report.json")
    args = parser.parse_args()

    library_dir = Path(args.library_dir)
    quarantine_dir = Path(args.quarantine_dir)
    duplicate_quarantine_dir = Path(args.duplicate_quarantine_dir)
    report_path = Path(args.report)

    files_before = iter_lyric_files(library_dir)
    placeholders = _find_placeholder_files(library_dir)
    duplicate_report_path = report_path.with_suffix(".duplicates.csv")

    moved_or_deleted: list[str] = []
    duplicate_actions: list[str] = []
    duplicate_rows: list[dict[str, object]] = []
    short_effective: dict[str, int]
    retained_count = 0
    if not args.dry_run:
        moved_or_deleted = _handle_placeholders(
            placeholders,
            library_dir=library_dir,
            quarantine_dir=quarantine_dir,
            delete=args.delete_placeholders,
        )
        if args.skip_library_dedup:
            profiles = _profile_library(library_dir)
            short_effective = _effective_line_report_from_profiles(profiles)
            retained_count = _build_index_from_profiles(profiles, Path(args.index))
        else:
            profiles = _profile_library(library_dir)
            short_effective = _effective_line_report_from_profiles(profiles)
            retained_count, duplicate_rows, duplicate_actions = _deduplicate_and_build_index(
                profiles,
                library_dir=library_dir,
                index_path=Path(args.index),
                duplicate_quarantine_dir=duplicate_quarantine_dir,
                delete=args.delete_duplicates,
                dry_run=False,
            )
            _write_duplicate_report(duplicate_rows, duplicate_report_path)

        if args.eval_size > 0:
            eval_index_path = Path(args.eval_csv).with_suffix(".index.pkl")
            generate_eval_set(
                library_dir=library_dir,
                output_dir=Path(args.eval_dir),
                csv_path=Path(args.eval_csv),
                size=args.eval_size,
                positive_ratio=args.positive_ratio,
                index_path=Path(args.index),
                eval_index_path=eval_index_path,
            )
            evaluate_csv(
                eval_index_path,
                Path(args.eval_csv),
                Path(args.eval_out),
                base_dir=Path(args.eval_csv).parent,
                positive_decisions={"duplicate"},
                max_candidates=5,
            )
            evaluate_csv(
                eval_index_path,
                Path(args.eval_csv),
                Path(args.eval_out).with_name(Path(args.eval_out).stem + "_review_positive.csv"),
                base_dir=Path(args.eval_csv).parent,
                positive_decisions={"duplicate", "review"},
                max_candidates=5,
            )
    else:
        profiles = _profile_library(library_dir)
        short_effective = _effective_line_report_from_profiles(profiles)
        if not args.skip_library_dedup:
            retained_count, duplicate_rows, duplicate_actions = _deduplicate_and_build_index(
                profiles,
                library_dir=library_dir,
                index_path=Path(args.index),
                duplicate_quarantine_dir=duplicate_quarantine_dir,
                delete=args.delete_duplicates,
                dry_run=True,
            )
        else:
            retained_count = len(profiles)

    report = {
        "timestamp": datetime.now().isoformat(timespec="seconds"),
        "dry_run": args.dry_run,
        "library_dir": str(library_dir),
        "files_before": len(files_before),
        "placeholder_matches": len(placeholders),
        "placeholder_files": [str(path) for path in placeholders],
        "handled_placeholder_files": moved_or_deleted,
        "library_dedup_skipped": args.skip_library_dedup,
        "duplicate_matches": len(duplicate_rows),
        "duplicate_report": str(duplicate_report_path) if duplicate_rows else "",
        "handled_duplicate_files": duplicate_actions[:1000],
        "handled_duplicate_files_truncated": len(duplicate_actions) > 1000,
        "retained_index_records": retained_count,
        "files_after": len(iter_lyric_files(library_dir)),
        "index": str(args.index),
        "eval_size": args.eval_size,
        "eval_csv": str(args.eval_csv) if args.eval_size > 0 else "",
        "eval_out": str(args.eval_out) if args.eval_size > 0 else "",
        "eval_index": str(Path(args.eval_csv).with_suffix(".index.pkl")) if args.eval_size > 0 else "",
        "short_effective_line_counts": short_effective,
    }

    print(json.dumps(report, ensure_ascii=False, indent=2))
    if not args.dry_run:
        report_path.parent.mkdir(parents=True, exist_ok=True)
        report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")


def _find_placeholder_files(library_dir: Path) -> list[Path]:
    matches: list[Path] = []
    for path in iter_lyric_files(library_dir):
        text = read_lyric_file(path)
        if any(marker in text for marker in PLACEHOLDER_MARKERS):
            matches.append(path)
    return matches


def _handle_placeholders(
    placeholders: list[Path],
    *,
    library_dir: Path,
    quarantine_dir: Path,
    delete: bool,
) -> list[str]:
    handled: list[str] = []
    if not placeholders:
        return handled
    if not delete:
        quarantine_dir.mkdir(parents=True, exist_ok=True)
    for path in placeholders:
        if delete:
            path.unlink()
            handled.append(f"deleted:{path}")
            continue
        relative = path.resolve().relative_to(library_dir.resolve())
        destination = quarantine_dir / relative
        destination.parent.mkdir(parents=True, exist_ok=True)
        if destination.exists():
            destination = destination.with_name(f"{destination.stem}_{datetime.now().strftime('%Y%m%d%H%M%S')}{destination.suffix}")
        shutil.move(str(path), str(destination))
        handled.append(f"moved:{path}->{destination}")
    return handled


def _profile_library(library_dir: Path) -> list[LibraryProfile]:
    profiles: list[LibraryProfile] = []
    files = iter_lyric_files(library_dir)
    _progress(f"profile active library: 0/{len(files)}")
    for index, path in enumerate(files, start=1):
        record = record_from_file(path, base_dir=library_dir)
        normalized = normalize_lyrics(record.lyrics)
        lines = normalized.primary_lines or normalized.unique_lines
        normalized_text = normalized.normalized_full_text
        profiles.append(
            LibraryProfile(
                path=path,
                record=record,
                normalized=normalized,
                line_count=len(lines),
                char_count=len(normalized_text),
            )
        )
        _progress_count("profile active library", index, len(files), step=5000)
    return profiles


def _build_index_from_profiles(profiles: list[LibraryProfile], index_path: Path) -> int:
    checker = DuplicateChecker()
    for index, profile in enumerate(profiles, start=1):
        checker.add_normalized_record(profile.record, profile.normalized)
        _progress_count("build index", index, len(profiles), step=5000)
    index_path.parent.mkdir(parents=True, exist_ok=True)
    checker.save(index_path)
    return checker.record_count


def _deduplicate_and_build_index(
    profiles: list[LibraryProfile],
    *,
    library_dir: Path,
    index_path: Path,
    duplicate_quarantine_dir: Path,
    delete: bool,
    dry_run: bool,
) -> tuple[int, list[dict[str, object]], list[str]]:
    checker = DuplicateChecker()
    duplicate_rows: list[dict[str, object]] = []
    duplicate_actions: list[str] = []
    ordered = sorted(profiles, key=_profile_quality_key)
    _progress(f"deduplicate active library: 0/{len(ordered)}")
    for index, profile in enumerate(ordered, start=1):
        result = checker.check_record(profile.record, max_candidates=1)
        best = result.candidates[0] if result.candidates else None
        if result.decision == DuplicateDecision.DUPLICATE and best is not None:
            duplicate_rows.append(
                {
                    "duplicate_path": str(profile.path),
                    "duplicate_record_id": profile.record.record_id,
                    "kept_record_id": best.record_id,
                    "decision": result.decision.value,
                    "confidence": result.confidence,
                    "reason": result.reason,
                    "best_candidate_jaccard": best.jaccard,
                    "best_candidate_line_coverage": best.line_coverage,
                    "best_candidate_primary_jaccard": best.primary_jaccard,
                    "best_candidate_primary_line_coverage": best.primary_line_coverage,
                    "matched_unique_lines": " | ".join(best.matched_unique_lines),
                    "line_count": profile.line_count,
                    "char_count": profile.char_count,
                }
            )
            if not dry_run:
                duplicate_actions.append(
                    _handle_duplicate_file(
                        profile.path,
                        library_dir=library_dir,
                        duplicate_quarantine_dir=duplicate_quarantine_dir,
                        delete=delete,
                    )
                )
        else:
            checker.add_normalized_record(profile.record, profile.normalized)
        _progress_count("deduplicate active library", index, len(ordered), step=5000)

    if not dry_run:
        index_path.parent.mkdir(parents=True, exist_ok=True)
        checker.save(index_path)
    return checker.record_count, duplicate_rows, duplicate_actions


def _handle_duplicate_file(
    path: Path,
    *,
    library_dir: Path,
    duplicate_quarantine_dir: Path,
    delete: bool,
) -> str:
    if delete:
        path.unlink()
        return f"deleted:{path}"
    duplicate_quarantine_dir.mkdir(parents=True, exist_ok=True)
    relative = path.resolve().relative_to(library_dir.resolve())
    destination = duplicate_quarantine_dir / relative
    destination.parent.mkdir(parents=True, exist_ok=True)
    if destination.exists():
        destination = destination.with_name(f"{destination.stem}_{datetime.now().strftime('%Y%m%d%H%M%S')}{destination.suffix}")
    shutil.move(str(path), str(destination))
    return f"moved:{path}->{destination}"


def _profile_quality_key(profile: LibraryProfile) -> tuple[int, int, int, str]:
    # Sort ascending; negative values make higher-quality records come first.
    filename_quality = 0 if not profile.path.name.startswith("None_") else 1
    return (filename_quality, -profile.line_count, -profile.char_count, str(profile.path))


def _write_duplicate_report(rows: list[dict[str, object]], report_path: Path) -> None:
    if not rows:
        return
    report_path.parent.mkdir(parents=True, exist_ok=True)
    with report_path.open("w", encoding="utf-8", newline="") as file:
        writer = csv.DictWriter(file, fieldnames=list(rows[0].keys()))
        writer.writeheader()
        writer.writerows(rows)


def _effective_line_report(library_dir: Path) -> dict[str, int]:
    return _effective_line_report_from_profiles(_profile_library(library_dir))


def _effective_line_report_from_profiles(profiles: list[LibraryProfile]) -> dict[str, int]:
    buckets = {
        "total": 0,
        "zero_effective_lines": 0,
        "one_to_three_effective_lines": 0,
        "four_to_five_effective_lines": 0,
        "six_plus_effective_lines": 0,
    }
    for profile in profiles:
        buckets["total"] += 1
        line_count = profile.line_count
        if line_count == 0:
            buckets["zero_effective_lines"] += 1
        elif line_count <= 3:
            buckets["one_to_three_effective_lines"] += 1
        elif line_count <= 5:
            buckets["four_to_five_effective_lines"] += 1
        else:
            buckets["six_plus_effective_lines"] += 1
    return buckets


def _progress(message: str) -> None:
    print(f"[process-library] {message}", file=sys.stderr, flush=True)


def _progress_count(label: str, current: int, total: int, *, step: int = 1000) -> None:
    if total <= 0:
        return
    if current == 1 or current == total or current % step == 0:
        _progress(f"{label}: {current}/{total}")


if __name__ == "__main__":
    main()