Commit abad6fce abad6fcea91dbc01b7be3bdeaa11857b7635d1eb by 沈秋雨

Generate Ragas QA directly from WeKnora chunks

1 parent 463ef51b
......@@ -22,6 +22,7 @@ RAGAS_JUDGE_MODEL=gpt-4o-mini
RAGAS_EMBEDDING_MODEL=text-embedding-3-small
TESTSET_SIZE=50
TESTSET_RAGAS_MODE=direct
TESTSET_MAX_DOCUMENT_CHARS=2000
TESTSET_SOURCE_MULTIPLIER=3
TESTSET_GENERATOR_MAX_TOKENS=4096
......
......@@ -68,7 +68,7 @@ python scripts/10_report.py
首轮建议只使用 2 个 PDF、1 个 XLSX 和 10 条审核通过 QA,确认 `retrieved_contexts``response`、Ragas 输入字段都正常后再扩展样本量。
默认 `04_parse_docs.py` 从 WeKnora 导出的 `data/exported/chunks.jsonl` 构造测试集来源,不再重复调用外部 PDF 解析器。`05_generate_testset.py` 默认使用 Ragas 结合评估侧 LLM 自动生成 QA;生成阶段会用 `TESTSET_MAX_DOCUMENT_CHARS` 限制单条来源上下文长度,并用 `TESTSET_GENERATOR_MAX_TOKENS` 控制生成输出预算,避免和后续评测用的 `ragas.max_tokens` 混在一起。`local``mineru``rule_based` 只作为可选实验/兜底配置保留。
默认 `04_parse_docs.py` 从 WeKnora 导出的 `data/exported/chunks.jsonl` 构造测试集来源,不再重复调用外部 PDF 解析器。`05_generate_testset.py` 默认使用 Ragas 结合评估侧 LLM 自动生成 QA;生成阶段使用 `TESTSET_RAGAS_MODE=direct`,直接把 WeKnora chunks 组装成 Ragas KnowledgeGraph 并生成单跳 QA,避免 Ragas 默认文档预处理链路重新抽标题、摘要和实体。生成阶段还会用 `TESTSET_MAX_DOCUMENT_CHARS` 限制单条来源上下文长度,并用 `TESTSET_GENERATOR_MAX_TOKENS` 控制生成输出预算,避免和后续评测用的 `ragas.max_tokens` 混在一起。`local``mineru``rule_based` 只作为可选实验/兜底配置保留。
## 主要产物
......
......@@ -293,10 +293,13 @@ max_tokens: 4096
如果 `05_generate_testset.py` 在生成 QA 时出现 `LLMDidNotFinishException`,优先不要继续盲目调大 `ragas.max_tokens``05` 有独立的生成预算和输入长度:
```bash
TESTSET_RAGAS_MODE=direct
TESTSET_GENERATOR_MAX_TOKENS=4096
TESTSET_MAX_DOCUMENT_CHARS=2000
```
`direct` 模式会跳过 Ragas 默认的 `HeadlinesExtractor``SummaryExtractor``NERExtractor` 文档预处理链路,直接把 WeKnora chunks 组装成 Ragas KnowledgeGraph 并生成单跳 QA。`prechunked``langchain_docs` 仅用于对比实验,遇到本地 vLLM 结构化输出不稳定时不建议使用。
如果 vLLM 仍然报生成未完成,先把 `TESTSET_SIZE` 降到 3,再把 `TESTSET_MAX_DOCUMENT_CHARS` 调到 1000-1500 验证链路;`ragas.max_tokens` 主要用于后续评测阶段,不应该拿来无限放大测试集生成阶段的输出长度。
### WeKnora 问答没有 retrieved_contexts
......
......@@ -10,6 +10,7 @@ weknora:
testset:
size: "${TESTSET_SIZE:-50}"
generator: "ragas" # ragas or rule_based
ragas_mode: "${TESTSET_RAGAS_MODE:-direct}" # direct, prechunked, or langchain_docs
include_pdf: true
include_xlsx: true
min_context_chars: 80
......
......@@ -8,7 +8,10 @@ from typing import Any
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from ragas.run_config import RunConfig
from ragas.testset.graph import KnowledgeGraph, Node, NodeType
from ragas.testset.persona import Persona
from ragas.testset import TestsetGenerator
from ragas.testset.synthesizers.single_hop.specific import SingleHopSpecificQuerySynthesizer
from weknora_eval.config import require_config
from weknora_eval.loaders import read_jsonl, write_jsonl
......@@ -46,6 +49,7 @@ def generate_ragas_testset(
generator_max_tokens = int(
testset_config.get("generator_max_tokens", ragas_config.get("max_tokens", 4096))
)
ragas_mode = str(testset_config.get("ragas_mode", "direct"))
source_rows = [
row
......@@ -71,11 +75,12 @@ def generate_ragas_testset(
for row in selected_source_rows
]
logger.info(
"Generating Ragas testset: target_size=%s source_documents=%s max_document_chars=%s generator_max_tokens=%s",
"Generating Ragas testset: target_size=%s source_documents=%s max_document_chars=%s generator_max_tokens=%s ragas_mode=%s",
size,
len(documents),
max_document_chars,
generator_max_tokens,
ragas_mode,
)
llm = ChatOpenAI(
......@@ -86,32 +91,165 @@ def generate_ragas_testset(
max_tokens=generator_max_tokens,
timeout=int(ragas_config.get("timeout_seconds", 600)),
)
embeddings = OpenAIEmbeddings(
model=str(require_config(config, "ragas.embedding_model")),
api_key=_required_ragas_value(ragas_config, "embedding_api_key"),
base_url=_required_ragas_value(ragas_config, "embedding_base_url"),
tiktoken_enabled=False,
check_embedding_ctx_length=False,
request_timeout=int(ragas_config.get("timeout_seconds", 600)),
run_config = RunConfig(
timeout=int(ragas_config.get("timeout_seconds", 600)),
max_workers=int(ragas_config.get("max_workers", 1)),
)
if ragas_mode == "direct":
result = _generate_ragas_direct(llm, documents, size, run_config)
elif ragas_mode == "prechunked":
result = _generate_ragas_prechunked(
config, ragas_config, llm, documents, size, run_config
)
elif ragas_mode == "langchain_docs":
result = _generate_ragas_langchain_docs(
config, ragas_config, llm, documents, size, run_config
)
else:
raise ValueError(f"Unsupported testset.ragas_mode: {ragas_mode}")
ragas_rows = result.to_list()
rows = _normalize_ragas_rows(ragas_rows, selected_source_rows)
write_jsonl(output_path, rows)
return rows
def _generate_ragas_direct(
llm: ChatOpenAI,
documents: list[Document],
size: int,
run_config: RunConfig,
) -> Any:
ragas_llm = _wrap_langchain_llm(llm)
kg = KnowledgeGraph(
nodes=[
Node(
type=NodeType.CHUNK,
properties={
"page_content": document.page_content,
"document_metadata": document.metadata,
"entities": _generation_terms(document),
"themes": _generation_terms(document),
},
)
for document in documents
if document.page_content.strip()
]
)
generator = TestsetGenerator(
llm=ragas_llm,
embedding_model=None,
knowledge_graph=kg,
persona_list=[
Persona(
name="合同审核人员",
role_description="关注合同条款、权利归属、授权范围和履约义务。",
),
Persona(
name="业务运营人员",
role_description="关注文档中可用于业务执行和信息核验的事实。",
),
Persona(
name="法务合规人员",
role_description="关注协议、版权、授权、责任和风险表述。",
),
],
)
generate_kwargs: dict[str, Any] = {
"testset_size": size,
"query_distribution": [(SingleHopSpecificQuerySynthesizer(llm=ragas_llm), 1.0)],
"num_personas": 3,
"run_config": run_config,
"raise_exceptions": False,
}
if "batch_size" in inspect.signature(generator.generate).parameters:
generate_kwargs["batch_size"] = 1
return generator.generate(**generate_kwargs)
def _generate_ragas_prechunked(
config: dict[str, Any],
ragas_config: dict[str, Any],
llm: ChatOpenAI,
documents: list[Document],
size: int,
run_config: RunConfig,
) -> Any:
embeddings = _build_embeddings(config, ragas_config)
ragas_llm, ragas_embeddings = _wrap_langchain_models(llm, embeddings)
generator = TestsetGenerator(llm=ragas_llm, embedding_model=ragas_embeddings)
return generator.generate_with_chunks(
documents,
testset_size=size,
run_config=run_config,
raise_exceptions=False,
)
def _generate_ragas_langchain_docs(
config: dict[str, Any],
ragas_config: dict[str, Any],
llm: ChatOpenAI,
documents: list[Document],
size: int,
run_config: RunConfig,
) -> Any:
embeddings = _build_embeddings(config, ragas_config)
ragas_llm, ragas_embeddings = _wrap_langchain_models(llm, embeddings)
generator = TestsetGenerator(llm=ragas_llm, embedding_model=ragas_embeddings)
generate_kwargs: dict[str, Any] = {
"testset_size": size,
"run_config": RunConfig(
timeout=int(ragas_config.get("timeout_seconds", 600)),
max_workers=int(ragas_config.get("max_workers", 1)),
),
"run_config": run_config,
"raise_exceptions": False,
}
if "batch_size" in inspect.signature(generator.generate_with_langchain_docs).parameters:
generate_kwargs["batch_size"] = 1
result = generator.generate_with_langchain_docs(documents, **generate_kwargs)
return generator.generate_with_langchain_docs(documents, **generate_kwargs)
ragas_rows = result.to_list()
rows = _normalize_ragas_rows(ragas_rows, selected_source_rows)
write_jsonl(output_path, rows)
return rows
def _build_embeddings(
config: dict[str, Any], ragas_config: dict[str, Any]
) -> OpenAIEmbeddings:
return OpenAIEmbeddings(
model=str(require_config(config, "ragas.embedding_model")),
api_key=_required_ragas_value(ragas_config, "embedding_api_key"),
base_url=_required_ragas_value(ragas_config, "embedding_base_url"),
tiktoken_enabled=False,
check_embedding_ctx_length=False,
request_timeout=int(ragas_config.get("timeout_seconds", 600)),
)
def _wrap_langchain_llm(llm: Any) -> Any:
try:
from ragas.llms import LangchainLLMWrapper
except ImportError:
return llm
return LangchainLLMWrapper(llm)
def _generation_terms(document: Document) -> list[str]:
text = f"{document.metadata.get('source_file') or ''} {document.page_content}"
candidates = [
"合同条款",
"权利归属",
"著作权",
"邻接权",
"录音权利",
"词权利",
"曲权利",
"授权范围",
"作品信息",
"甲方",
"乙方",
"协议",
"付款",
"违约责任",
"期限",
]
terms = [term for term in candidates if term in text]
source_file = str(document.metadata.get("source_file") or "").strip()
if source_file:
terms.append(source_file.rsplit(".", 1)[0][:40])
return terms[:6] or ["文档内容"]
def _truncate_for_generation(content: str, max_chars: int) -> str:
......