Commit 21677240 216772407a081c1c8d391a03bc7d687b995dd040 by 沈秋雨

接口化服务

1 parent ed19c4ee
......@@ -284,6 +284,7 @@ class DuplicateChecker:
query.normalized.primary_lines,
candidate.normalized.primary_lines,
)
query_primary_coverage = _matched_query_line_ratio(query.normalized.primary_lines, primary_matched_lines)
translation_jaccard = _jaccard(query.translation_tokens, candidate.translation_tokens)
translation_coverage, translation_matched_lines = _line_coverage_lines(
query.normalized.translation_lines,
......@@ -299,6 +300,27 @@ class DuplicateChecker:
low_confidence_split = (
query.normalized.split_confidence == "low" or candidate.normalized.split_confidence == "low"
)
query_coverage = _matched_query_line_ratio(query.normalized.unique_lines, matched_lines)
has_review_level_overlap = (
primary_jaccard >= self.review_jaccard_threshold
or jaccard >= self.review_jaccard_threshold
or (
primary_coverage >= self.review_line_coverage_threshold
and query_primary_coverage >= 0.40
)
or (
coverage >= self.review_line_coverage_threshold
and query_coverage >= 0.40
)
)
has_material_chorus_overlap = chorus_only and (
query.normalized.content_line_count <= 6
or (primary_jaccard >= 0.20 and query_primary_coverage >= 0.40)
or (jaccard >= 0.20 and query_coverage >= 0.40)
or (primary_coverage >= 0.20 and query_primary_coverage >= 0.40)
or (coverage >= 0.20 and query_coverage >= 0.40)
)
has_low_confidence_split_overlap = low_confidence_split and has_review_level_overlap
confidence = round((0.58 * primary_jaccard) + (0.42 * primary_coverage), 4)
if (
......@@ -314,21 +336,18 @@ class DuplicateChecker:
else:
reason = "原文 n-gram 字面相似度高,且行级覆盖范围广"
elif (
chorus_only
has_material_chorus_overlap
or translation_only
or low_confidence_split
or primary_jaccard >= self.review_jaccard_threshold
or primary_coverage >= self.review_line_coverage_threshold
or jaccard >= self.review_jaccard_threshold
or coverage >= self.review_line_coverage_threshold
or has_low_confidence_split_overlap
or has_review_level_overlap
):
decision = DuplicateDecision.REVIEW
reason = "候选相似度达到复核阈值,需要人工确认"
if chorus_only:
if has_material_chorus_overlap:
reason = "重合内容主要集中在重复副歌行,不自动判重"
elif translation_only:
reason = "仅翻译行相似,原文字面重合不足,不自动判重"
elif low_confidence_split:
elif has_low_confidence_split_overlap:
reason = "疑似整段翻译结构但拆分置信度较低,需要人工复核"
else:
decision = DuplicateDecision.NEW
......@@ -446,6 +465,13 @@ def _line_coverage_lines(left: tuple[str, ...], right: tuple[str, ...]) -> tuple
return len(matched) / max(len(left_lines), len(right_lines)), matched
def _matched_query_line_ratio(query_lines: tuple[str, ...], matched_lines: list[str]) -> float:
query_unique_lines = set(query_lines)
if not query_unique_lines:
return 0.0
return len(set(matched_lines)) / len(query_unique_lines)
def _is_chorus_only_match(left: NormalizedLyrics, right: NormalizedLyrics, matched_lines: list[str]) -> bool:
if not matched_lines:
return False
......
from .config import ServerConfig
from .service import DedupService
__all__ = ["ServerConfig", "DedupService"]
"""FastAPI application for lyric duplicate checking."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from .config import ServerConfig
from .service import DedupService
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# App lifecycle
# ---------------------------------------------------------------------------
app = FastAPI(title="Lyric Dedup API", version="0.1.0")
_config: ServerConfig | None = None
_service: DedupService | None = None
@app.on_event("startup")
def _startup() -> None:
global _config, _service
_config = ServerConfig()
_service = DedupService(config=_config)
logger.info("Lyric Dedup API started (DSN=%s, trgm=%s)", _config.dsn, _config.enable_trgm)
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class CheckRequest(BaseModel):
url: str = Field(..., description="URL of the LRC/TXT lyric file")
title: str | None = Field(None, description="Song title (optional)")
artist: str | None = Field(None, description="Artist name (optional)")
class CheckResponse(BaseModel):
duplicate: bool
decision: str | None = None
confidence: float | None = None
reason: str | None = None
class HealthResponse(BaseModel):
status: str
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/health", response_model=HealthResponse)
def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/api/v1/check", response_model=CheckResponse)
def check_lyric(req: CheckRequest) -> Any:
if _service is None:
return JSONResponse(
status_code=503,
content={"detail": "service not initialized"},
)
# 校验文件格式(仅接受 .txt / .lrc)
if not _is_valid_lyric_url(req.url):
return JSONResponse(
status_code=400,
content={"detail": "仅支持 .txt 或 .lrc 格式的歌词文件"},
)
try:
lyrics = _download_lyrics(req.url)
except ValueError as exc:
return JSONResponse(
status_code=400,
content={"detail": str(exc)},
)
except Exception as exc:
logger.exception("unexpected error during download")
return JSONResponse(
status_code=500,
content={"detail": f"下载歌词失败: {exc}"},
)
try:
result = _service.check(lyrics, title=req.title, artist=req.artist, source_url=req.url)
except Exception as exc:
logger.exception("unexpected error during dedup check")
return JSONResponse(
status_code=500,
content={"detail": f"歌词去重检测失败: {exc}"},
)
return CheckResponse(
duplicate=result.duplicate,
decision=result.decision,
confidence=result.confidence,
reason=result.reason,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_ENCODING_CHAIN = ("utf-8-sig", "utf-8", "gb18030", "big5")
_ALLOWED_EXTENSIONS = {".txt", ".lrc"}
def _is_valid_lyric_url(url: str) -> bool:
"""Check if URL points to a .txt or .lrc file."""
from urllib.parse import urlparse
ext = Path(urlparse(url).path).suffix.lower()
return ext in _ALLOWED_EXTENSIONS
def _download_lyrics(url: str) -> str:
"""Download a lyric file and decode with encoding fallback chain."""
import urllib.error
import urllib.request
try:
with urllib.request.urlopen(url, timeout=_config.download_timeout if _config else 10) as resp:
data = resp.read()
except urllib.error.HTTPError as exc:
raise ValueError(f"下载失败: HTTP {exc.code}") from exc
except urllib.error.URLError as exc:
raise ValueError(f"下载失败: {exc.reason}") from exc
except TimeoutError as exc:
raise ValueError("下载超时") from exc
except Exception as exc:
raise ValueError(f"下载失败: {exc}") from exc
for encoding in _ENCODING_CHAIN:
try:
return data.decode(encoding)
except UnicodeDecodeError:
continue
raise ValueError("无法解析文件编码,支持: utf-8-sig / utf-8 / gb18030 / big5")
"""Server configuration loaded from environment variables."""
from __future__ import annotations
import os
from dataclasses import dataclass
@dataclass
class ServerConfig:
dsn: str = os.getenv("LYRIC_DEDUP_DSN", "postgresql:///lyric_dedup")
max_candidates: int = int(os.getenv("LYRIC_DEDUP_MAX_CANDIDATES", "5"))
recall_limit: int = int(os.getenv("LYRIC_DEDUP_RECALL_LIMIT", "100"))
enable_trgm: bool = os.getenv("LYRIC_DEDUP_ENABLE_TRGM", "false").lower() == "true"
trgm_threshold: float = float(os.getenv("LYRIC_DEDUP_TRGM_THRESHOLD", "0.3"))
statement_timeout_ms: int = int(os.getenv("LYRIC_DEDUP_STATEMENT_TIMEOUT_MS", "5000"))
download_timeout: int = int(os.getenv("LYRIC_DEDUP_DOWNLOAD_TIMEOUT", "10"))
"""Core deduplication service: PostgreSQL recall + DuplicateChecker."""
from __future__ import annotations
import hashlib
import logging
from dataclasses import dataclass, field
from typing import Any
import psycopg
from lyric_dedup.checker import DuplicateChecker
from lyric_dedup.checker import DuplicateDecision
from lyric_dedup.checker import LyricRecord
from lyric_dedup.normalization import fingerprint_text
from lyric_dedup.normalization import normalize_lyrics
from .config import ServerConfig
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class CheckResult:
duplicate: bool
decision: str = ""
confidence: float = 0.0
reason: str = ""
candidate_count: int = 0
@dataclass
class DedupService:
"""Thin wrapper around the PostgreSQL recall + DuplicateChecker pipeline."""
config: ServerConfig
_logger: logging.Logger = field(default_factory=lambda: logger, repr=False)
def check(
self,
lyrics_text: str,
title: str | None = None,
artist: str | None = None,
source_url: str | None = None,
) -> CheckResult:
"""Core entry: download lyrics, recall candidates from PG, decide."""
record = LyricRecord(
record_id="__query__",
lyrics=lyrics_text,
title=title,
artist=artist,
)
with psycopg.connect(self.config.dsn) as conn:
with conn.cursor() as cursor:
cursor.execute("select set_config('statement_timeout', %s, false)", (str(self.config.statement_timeout_ms),))
cursor.execute("select set_config('pg_trgm.similarity_threshold', %s, false)", (str(self.config.trgm_threshold),))
candidates = self._recall_candidates(conn, record)
result = self._check_against_candidates(record, candidates)
if result.decision == "new" and source_url:
self._insert_new_record(conn, record, source_url)
return result
def _insert_new_record(self, conn: Any, record: LyricRecord, source_url: str) -> None:
"""Insert new lyric into PostgreSQL (lyrics + lyric_lines tables)."""
raw_text = _pg_text(record.lyrics)[0] or ""
normalized = normalize_lyrics(raw_text)
primary_text = _pg_text("\n".join(normalized.primary_lines))[0]
translation_text = _pg_text("\n".join(normalized.translation_lines))[0] or None
normalized_text = _pg_text(normalized.normalized_full_text)[0]
exact_text = fingerprint_text(normalized)
exact_hash = hashlib.sha256(exact_text.encode("utf-8")).hexdigest()
with conn.cursor() as cursor:
cursor.execute(
"""
insert into lyrics (
record_id, source_path, title, artist, raw_text, normalized_text,
primary_text, translation_text, exact_hash, split_confidence,
split_reason, line_count, updated_at, deleted_at
) values (
%(record_id)s, %(source_path)s, %(title)s, %(artist)s, %(raw_text)s,
%(normalized_text)s, %(primary_text)s, %(translation_text)s,
%(exact_hash)s, %(split_confidence)s, %(split_reason)s,
%(line_count)s, now(), null
)
on conflict (record_id) do update set
source_path = excluded.source_path, title = excluded.title,
artist = excluded.artist, raw_text = excluded.raw_text,
normalized_text = excluded.normalized_text, primary_text = excluded.primary_text,
translation_text = excluded.translation_text, exact_hash = excluded.exact_hash,
split_confidence = excluded.split_confidence, split_reason = excluded.split_reason,
line_count = excluded.line_count, updated_at = now(), deleted_at = null
returning id
""",
{
"record_id": _build_record_id(source_url),
"source_path": source_url,
"title": _pg_text(record.title)[0],
"artist": _pg_text(record.artist)[0],
"raw_text": raw_text,
"normalized_text": normalized_text,
"primary_text": primary_text,
"translation_text": translation_text,
"exact_hash": exact_hash,
"split_confidence": _pg_text(normalized.split_confidence)[0],
"split_reason": _pg_text(normalized.split_reason)[0],
"line_count": len(normalized.primary_lines or normalized.unique_lines),
},
)
lyric_id = cursor.fetchone()[0]
cursor.execute("delete from lyric_lines where lyric_id = %s", (lyric_id,))
line_rows: list[tuple] = list(_line_rows(lyric_id, "primary", normalized.primary_lines))
line_rows.extend(_line_rows(lyric_id, "translation", normalized.translation_lines))
line_rows.extend(_line_rows(lyric_id, "unknown", normalized.unknown_lines))
if line_rows:
cursor.executemany(
"insert into lyric_lines (lyric_id, role, line_no, normalized_line, line_hash) values (%s, %s, %s, %s, %s)",
line_rows,
)
conn.commit()
def _recall_candidates(self, conn: Any, record: LyricRecord) -> list[LyricRecord]:
"""Three-tier recall: exact_hash → pg_trgm → line_hash."""
query_lyrics = _pg_text(record.lyrics)[0] or ""
normalized = normalize_lyrics(query_lyrics)
exact_text = fingerprint_text(normalized)
exact_hash = hashlib.sha256(exact_text.encode("utf-8")).hexdigest()
primary_text = "\n".join(normalized.primary_lines)
line_hashes = [hashlib.sha256(line.encode("utf-8")).hexdigest() for line in normalized.primary_lines if line]
candidates: dict[str, LyricRecord] = {}
exclude_record_ids: list[str] = []
with conn.cursor() as cursor:
# Tier 1: exact hash match
cursor.execute(
"""
select record_id, raw_text, title, artist
from lyrics
where deleted_at is null
and exact_hash = %s
and not (record_id = any(%s))
limit %s
""",
(exact_hash, exclude_record_ids, self.config.recall_limit),
)
_add_rows(candidates, cursor.fetchall())
# Tier 2: pg_trgm similarity (optional)
if self.config.enable_trgm and primary_text:
cursor.execute(
"""
select record_id, raw_text, title, artist
from lyrics
where deleted_at is null
and not (record_id = any(%s))
and primary_text %% %s
order by similarity(primary_text, %s) desc
limit %s
""",
(exclude_record_ids, primary_text, primary_text, self.config.recall_limit),
)
_add_rows(candidates, cursor.fetchall())
# Tier 3: line hash match
if line_hashes:
cursor.execute(
"""
select l.record_id, l.raw_text, l.title, l.artist
from lyric_lines ll
join lyrics l on l.id = ll.lyric_id
where l.deleted_at is null
and not (l.record_id = any(%s))
and ll.role = 'primary'
and ll.line_hash = any(%s)
group by l.id
order by count(*) desc
limit %s
""",
(exclude_record_ids, line_hashes, self.config.recall_limit),
)
_add_rows(candidates, cursor.fetchall())
return list(candidates.values())
def _check_against_candidates(
self,
record: LyricRecord,
candidates: list[LyricRecord],
) -> CheckResult:
"""Run DuplicateChecker against recalled candidates."""
checker = DuplicateChecker()
for candidate in candidates:
checker.add_record(candidate)
result = checker.check_record(record, max_candidates=self.config.max_candidates)
return CheckResult(
duplicate=result.decision in (DuplicateDecision.DUPLICATE, DuplicateDecision.REVIEW),
decision=result.decision.value,
confidence=result.confidence,
reason=result.reason,
candidate_count=len(result.candidates),
)
def _add_rows(candidates: dict[str, LyricRecord], rows: list[tuple[object, ...]]) -> None:
for record_id, raw_text, title, artist in rows:
candidates.setdefault(
str(record_id),
LyricRecord(
record_id=str(record_id),
lyrics=str(raw_text),
title=str(title) if title is not None else None,
artist=str(artist) if artist is not None else None,
),
)
def _build_record_id(source_url: str) -> str:
"""From URL to record_id, format url:{sha12}:{url}."""
digest = hashlib.sha1(source_url.encode("utf-8")).hexdigest()[:12]
return f"url:{digest}:{source_url}"
def _line_rows(lyric_id: int, role: str, lines: tuple[str, ...]) -> list[tuple]:
rows: list[tuple] = []
for index, line in enumerate(lines):
line = _pg_text(line)[0] or ""
line_hash = hashlib.sha256(line.encode("utf-8")).hexdigest()
rows.append((lyric_id, role, index, line, line_hash))
return rows
def _pg_text(value: str | None) -> tuple[str | None, bool]:
"""Return (text, had_nul)."""
if value is None:
return None, False
if "\x00" not in value:
return value, False
return value.replace("\x00", ""), True
......@@ -8,3 +8,7 @@ psycopg[binary]>=3.2
pymysql>=1.1
cos-python-sdk-v5>=1.9
tqdm>=4.66
# HTTP API server
fastapi>=0.110.0
uvicorn[standard]>=0.29.0
......
"""测试环境配置,从 .env 或环境变量读取 OSS 凭据"""
import os
from pathlib import Path
# 自动加载 .env 文件
_env_path = Path(__file__).parent / ".env"
if _env_path.exists():
with open(_env_path, encoding="utf-8") as _f:
for _line in _f:
_line = _line.strip()
if _line and not _line.startswith("#") and "=" in _line:
_key, _value = _line.split("=", 1)
os.environ.setdefault(_key.strip(), _value.strip())
OSS_ACCESS_KEY_ID = os.getenv("OSS_ACCESS_KEY_ID", "")
OSS_ACCESS_KEY_SECRET = os.getenv("OSS_ACCESS_KEY_SECRET", "")
OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "oss-cn-hangzhou.aliyuncs.com")
OSS_BUCKET_NAME = os.getenv("OSS_BUCKET_NAME", "")
OSS_ENDPOINT_INTERNAL = os.getenv("OSS_ENDPOINT_INTERNAL", OSS_ENDPOINT)
"""
阿里云OSS文件上传模块
"""
import uuid
import oss2
import os
from datetime import datetime, timedelta
from .config import OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT, OSS_BUCKET_NAME, OSS_ENDPOINT_INTERNAL
class OSSUploader:
"""阿里云OSS上传器"""
def __init__(self):
"""初始化OSS客户端"""
self.access_key_id = OSS_ACCESS_KEY_ID
self.access_key_secret = OSS_ACCESS_KEY_SECRET
self.endpoint = OSS_ENDPOINT
self.bucket_name = OSS_BUCKET_NAME
self.endpoint_internal = OSS_ENDPOINT_INTERNAL
# 创建认证对象
self.auth = oss2.Auth(self.access_key_id, self.access_key_secret)
# 创建Bucket对象
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
def upload_file(self, local_file_path, oss_object_name=None):
"""
上传文件到OSS
Args:
local_file_path: 本地文件路径
oss_object_name: OSS对象名称,如果不指定则使用时间戳+原文件名
Returns:
tuple: (success: bool, url: str) 或 (success: bool, error: str)
"""
try:
if not os.path.exists(local_file_path):
return False, "本地文件不存在"
if not oss_object_name:
_, ext = os.path.splitext(local_file_path)
oss_object_name = f"{uuid.uuid4()}{ext}"
# 如果没有指定OSS对象名称,则生成一个
date = datetime.now().strftime("%Y%m%d")
oss_object_name = f"public_test/{date}/{oss_object_name}"
# 上传文件
result = self.bucket.put_object_from_file(oss_object_name, local_file_path)
# 构建文件URL(使用标准公网域名格式)
endpoint_host = self.endpoint.lstrip("https://").lstrip("http://").split("/")[0]
file_url = f"https://{self.bucket_name}.{endpoint_host}/{oss_object_name}"
return True, file_url
except Exception as e:
return False, str(e)
def upload_data(self, data, oss_object_name):
"""
上传数据到OSS
Args:
data: 要上传的数据(字符串或字节)
oss_object_name: OSS对象名称
Returns:
dict: 包含上传结果的字典
"""
try:
# 上传数据
result = self.bucket.put_object(oss_object_name, data)
# 构建文件URL
file_url = f"{self.endpoint.rstrip('/')}/{self.bucket_name}/{oss_object_name}"
return {
"success": True,
"oss_object_name": oss_object_name,
"file_url": file_url,
"etag": result.etag,
"size": len(data) if isinstance(data, (str, bytes)) else 0
}
except Exception as e:
return {"success": False, "error": str(e)}
def get_bucket():
"""获取Bucket对象"""
auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
return bucket
def clean_expire_file():
"""核心任务函数"""
print(f"\n[{datetime.now()}] 开始执行每日清理任务...")
ROOT_PREFIX = 'temp_ai/'
bucket = get_bucket()
# 1. 计算时间阈值
now = datetime.now()
yesterday_date = (now - timedelta(days=1)).date()
print(f"保留阈值: {yesterday_date} (即 {yesterday_date} 之前的数据将被删除)")
# 2. 遍历目录
try:
for obj in oss2.ObjectIterator(bucket, prefix=ROOT_PREFIX, delimiter='/'):
path = ""
is_directory = False
# --- [核心修改] 统一路径获取方式 ---
# 情况 A: 它是虚拟目录 (CommonPrefix)
if hasattr(obj, 'prefix'):
path = obj.prefix
is_directory = True
# 情况 B: 它是实际对象 (SimplifiedObjectInfo)
elif hasattr(obj, 'key'):
path = obj.key
# 如果 key 以 / 结尾,说明它是一个显式创建的文件夹对象
if path.endswith('/'):
is_directory = True
else:
is_directory = False # 这是一个普通文件
# --- 逻辑分流 ---
if not is_directory:
# 这是一个真正的文件(且不是文件夹对象),直接跳过
# print(f"[跳过] 散落文件: {path}")
continue
# 此时 path 必定是目录格式 (如 'temp_ai/20251229/')
# 下面开始正常的日期判断逻辑
# 防御性去空,防止路径即为 'temp_ai/' 本身
if path == ROOT_PREFIX:
continue
# 解析目录名 (取倒数第二个元素,因为最后一位是空字符串)
folder_name_raw = path.strip('/').split('/')[-1]
try:
folder_date_obj = datetime.strptime(folder_name_raw, "%Y%m%d").date()
if folder_date_obj < yesterday_date:
print(f"[删除] 发现过期目录: {path}")
# 注意:delete_objects_by_prefix 会删除该前缀下的所有文件
# 如果这个目录本身是个对象,也会被一并删除,无需特殊处理
delete_objects_by_prefix(bucket, path)
else:
# print(f"[跳过] 目录较新: {path}")
pass
except ValueError:
print(f"[跳过] 非日期命名目录: {path}")
except Exception as e:
import traceback
print(f"[严重错误] 任务执行失败: {e}")
traceback.print_exc()
def delete_objects_by_prefix(bucket, prefix):
"""递归删除指定前缀下的所有文件"""
print(f" -> 正在清理目录: {prefix} ...")
batch_list = []
try:
for obj in oss2.ObjectIterator(bucket, prefix=prefix):
batch_list.append(obj.key)
if len(batch_list) >= 1000:
bucket.batch_delete_objects(batch_list)
batch_list = []
if batch_list:
bucket.batch_delete_objects(batch_list)
print(f" -> 目录 {prefix} 清理完毕。")
except Exception as e:
print(f" [错误] 删除过程出错: {e}")
# 创建OSS上传器实例
oss_uploader = OSSUploader()
if __name__ == '__main__':
clean_expire_file()
\ No newline at end of file
"""歌词去重 API 测试脚本
用法:
# 上传指定歌词文件并调用去重 API
python test_api/test_dedup_api.py --file data/library/None_WHHY134166.lrc
# 指定标题和歌手
python test_api/test_dedup_api.py --file data/library/None_WHHY134166.lrc --title "夜曲" --artist "周杰伦"
# 仅上传不调用 API
python test_api/test_dedup_api.py --file data/library/None_WHHY134166.lrc --upload-only
# 仅调用 API(使用已有 URL)
python test_api/test_dedup_api.py --url "https://hikoon-ai-test.oss-cn-hangzhou.aliyuncs.com/temp_ai/20250603/xxx.lrc"
# 指定 API 地址
python test_api/test_dedup_api.py --file data/library/None_WHHY134166.lrc --api-url "http://localhost:8000"
"""
import argparse
import json
import os
import sys
# 确保项目根目录在 path 中
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
import urllib.request
import urllib.error
from test_api.config import OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT, OSS_BUCKET_NAME
from test_api.oss_uploader import OSSUploader
def upload_lyric_file(file_path: str) -> str:
"""上传歌词文件到 OSS,返回公开 URL"""
uploader = OSSUploader()
success, result = uploader.upload_file(file_path)
if not success:
print(f"上传失败: {result}")
sys.exit(1)
return result
def call_dedup_api(url: str, title: str | None, artist: str | None, api_base: str) -> dict:
"""调用去重 API"""
payload = json.dumps({
"url": url,
"title": title,
"artist": artist,
}).encode("utf-8")
req = urllib.request.Request(
f"{api_base.rstrip('/')}/api/v1/check",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = json.loads(resp.read().decode("utf-8"))
return body
except urllib.error.HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
print(f"API 请求失败 (HTTP {exc.code}): {error_body}")
sys.exit(1)
except urllib.error.URLError as exc:
print(f"API 请求失败: {exc.reason}")
print("请确认 API 服务已启动: uvicorn lyric_dedup_server.app:app --host 0.0.0.0 --port 8000")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="歌词去重 API 测试")
parser.add_argument("--file", "-f", help="本地歌词文件路径")
parser.add_argument("--url", "-u", help="已上传的歌词 URL(跳过上传步骤)")
parser.add_argument("--title", "-t", help="歌曲标题(可选)")
parser.add_argument("--artist", "-a", help="歌手名(可选)")
parser.add_argument("--api-url", default="http://localhost:8000", help="API 服务地址 (默认 http://localhost:8000)")
parser.add_argument("--upload-only", action="store_true", help="仅上传到 OSS,不调用 API")
args = parser.parse_args()
if not args.file and not args.url:
parser.error("需要指定 --file 或 --url")
# Step 1: 上传
if args.file:
abs_path = os.path.join(PROJECT_ROOT, args.file) if not os.path.isabs(args.file) else args.file
if not os.path.exists(abs_path):
print(f"文件不存在: {abs_path}")
sys.exit(1)
print(f"正在上传: {abs_path}")
lyric_url = upload_lyric_file(abs_path)
print(f"上传成功: {lyric_url}")
else:
lyric_url = args.url
print(f"使用已有 URL: {lyric_url}")
if args.upload_only:
return
# Step 2: 调用去重 API
print(f"\n正在调用去重 API...")
result = call_dedup_api(lyric_url, title=args.title, artist=args.artist, api_base=args.api_url)
print(f"\n结果:")
print(f" duplicate: {result.get('duplicate')}")
print(f" decision: {result.get('decision', 'N/A')}")
print(f" confidence: {result.get('confidence', 'N/A')}")
print(f" reason: {result.get('reason', 'N/A')}")
if __name__ == "__main__":
main()
## 消失的波段
### 【主歌 1】 — *(压抑、低沉的叙事)*
霓虹灯……在车窗外退后,
霓虹——和夜色融为一体。
收音机里,只剩沙沙的电流……
(像你在旧地址留下的呼吸……)
有些习惯……总是很难去修正,
比如——在人群中,辨认你的背影。
### 【主歌 2】 — *(情绪渐进,带有一丝无奈)*
朋友圈里……你更新了风景,
坐标是——没听过的、陌、生、城、市。
我们从无话不说……退回到【静音】,
像两条失去交集的——平行线。
那些没有寄出的长信……
最后都变成,草稿箱里的——灰、尘。
### 【副歌】 —— *(情感爆发,高亢而撕裂)*
我们成了彼此消 逝 的 波 段 !!
在同一个频段……却再也无法呼喊!
那些同频共振的夜晚……
最终被淹没在——嘈杂的市中心!!
我调整着微弱的接收信号……
却只听到——时光断裂的声音!!!
### 【桥段】 —— *(节奏加快,连续的内心追问)*
是不是所有的连接……都有保质期?!
到期后……就自动切断了所有联系?!
我们在各自的轨道里——加!速!运!行!
再也找不到……那天傍晚的引力。
### 【副歌】 —— *(最后一次宣泄,带有哭腔的强音)*
我们成了彼此消 逝 的 波 段 ——!!
在同一个频段……却再也无法呼喊!
那些同频共振的夜晚……
最终被淹没在——嘈杂的市中心!!
我调整着微弱的接收信号……
却只听到……(时光断裂的声音)……
### 【尾奏】 —— *(情绪下沉,最终归于死寂)*
【信号中断……请勿追赶。】
城市入睡……灯光渐暗……
一个人的波段。
(查……无……此……人……)
【 挂 断 。】
### 副歌
我们成了彼此消失的波段
在同一个频段却再也无法呼喊
那些同频共振的夜晚
最终被淹没在嘈杂的市中心
我调整着微弱的接收信号
却只听到时光断裂的声音
### 桥段
是不是所有的连接都有保质期
到期后就自动切断了所有联系
我们在各自的轨道里加速运行
再也找不到那天傍晚的引力
### 副歌
我们成了彼此消失的波段
在同一个频段却再也无法呼喊
那些同频共振的夜晚
最终被淹没在嘈杂的市中心
我调整着微弱的接收信号
却只听到时光断裂的声音
### 尾奏
信号中断,请勿追赶
城市入睡,灯光渐暗
一个人的波段
查无此人
挂断
\ No newline at end of file
......@@ -115,6 +115,121 @@ def test_fragment_of_full_song_is_not_duplicate() -> None:
assert result.candidates[0].primary_line_coverage < 0.72
def test_catalog_mashup_fragments_are_new_not_review() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
第一首歌的清晨
第一首歌的街口
每天都在伪装幸福快乐
还要瞒着所有人不说
第一首歌的结尾
""",
)
)
checker.add_record(
LyricRecord(
"song-2",
"""
第二首歌的海边
第二首歌的远方
想起那年夏天
我们走过人群
第二首歌的结尾
""",
)
)
checker.add_record(
LyricRecord(
"song-3",
"""
第三首歌的月光
第三首歌的旧梦
风吹过了窗前
你没有再回来
第三首歌的结尾
""",
)
)
result = checker.check(
"""
每天都在伪装幸福快乐
还要瞒着所有人不说
想起那年夏天
我们走过人群
风吹过了窗前
你没有再回来
"""
)
assert result.decision == DuplicateDecision.NEW
def test_large_mashup_with_one_recognizable_song_fragment_is_new() -> None:
checker = DuplicateChecker()
checker.add_record(
LyricRecord(
"song-1",
"""
桃花春风十里
花瓣飘散满地
对不起我无法忘记你
一去遥遥无期
一个人一支笔
多想你能留在我这里
天空下起了雨
淋湿我的心里
久别中多少人都不是你
屋檐下一人想起
关于你的回忆
无人在只剩下我自己
""",
)
)
result = checker.check(
"""
scroll through the pictures from a year ago
the pixels change but the feelings dont grow
an empty inbox and a dial tone heart
we built a network just to tear it apart
im tracking signals that have long gone cold
living a script that has already been sold
当我睁开了眼睛
感受到一片的灰烬
我的梦一直都fighting 可是我没
也许我只有加足马力
让他们看见都诧异
留下的华丽的背影 才
桃花春风十里
花瓣飘散满地
对不起我无法忘记你
一去遥遥无期
一个人一支笔
多想你能留在我这里
天空下起了雨
淋湿我的心里
久别中多少人都不是你
屋檐下一人想起
关于你的回忆
无人在只剩下我自己
疼痛感很弱
我想我堕落
哎呦 我逃脱
是不是我的
不管你拿不拿走
我反正都不会动
哎呦 我难过
反复的折磨
"""
)
assert result.decision == DuplicateDecision.NEW
def test_no_effective_lyrics_use_metadata_fallback_without_empty_hash_collision() -> None:
placeholder = """
作词:DJ金木
......