Commit 147c79e0 147c79e07bbaa525c4f6c6ac143e9c5c109362f8 by 沈秋雨

Initial WeKnora Ragas evaluation project

0 parents
WEKNORA_BASE_URL=http://localhost:8080/api/v1
WEKNORA_API_KEY=
WEKNORA_KB_ID=
WEKNORA_KB_NAME=ragas-eval-pilot
# Ragas generation and judge models. These are evaluation-side models, not the
# model configuration used by the WeKnora backend.
OPENAI_API_KEY=replace-me
OPENAI_BASE_URL=https://api.openai.com/v1
# Optional split deployment. Use these when LLM and embedding are served by
# different OpenAI-compatible services, such as vLLM + Infinity.
RAGAS_LLM_API_KEY=replace-me
RAGAS_LLM_BASE_URL=http://localhost:8000/v1
RAGAS_EMBEDDING_API_KEY=replace-me
RAGAS_EMBEDDING_BASE_URL=http://localhost:7997/v1
RAGAS_RERANKER_API_KEY=replace-me
RAGAS_RERANKER_BASE_URL=http://localhost:7998/v1
RAGAS_RERANKER_MODEL=replace-me
RAGAS_GENERATOR_MODEL=gpt-4o-mini
RAGAS_JUDGE_MODEL=gpt-4o-mini
RAGAS_EMBEDDING_MODEL=text-embedding-3-small
TESTSET_SIZE=50
REQUEST_INTERVAL_SECONDS=0.2
.env
.venv/
__pycache__/
*.py[cod]
*.egg-info/
.pytest_cache/
.ruff_cache/
data/raw_docs/pdf/*
data/raw_docs/xlsx/*
data/parsed_docs/*.json
data/parsed_docs/*.jsonl
data/parsed_docs/mineru_raw/*
data/exported/*.json
data/exported/*.jsonl
data/testsets/*.jsonl
data/runs/*.jsonl
data/reports/*.csv
data/reports/*.md
!data/raw_docs/pdf/.gitkeep
!data/raw_docs/xlsx/.gitkeep
!data/parsed_docs/mineru_raw/.gitkeep
# Ragas 独立评估项目实施清单
## 1. 目标
基于 WeKnora 的公开 API 构建一个独立评估项目,不依赖 WeKnora 内置的 `/evaluation` 接口。
这个项目用于评估:
- 检索质量:WeKnora 是否召回了正确的 chunks。
- 生成质量:WeKnora 是否基于检索上下文正确、忠实地回答问题。
- 端到端 RAG 效果:问题 -> 检索 -> 回答 -> Ragas 指标。
最终输入 Ragas 的单条记录格式:
```json
{
"user_input": "问题",
"response": "WeKnora 生成的答案",
"retrieved_contexts": ["检索到的 chunk 文本 1", "检索到的 chunk 文本 2"],
"reference": "标准答案",
"reference_contexts": ["标准答案依据的原文片段"]
}
```
## 2. 推荐独立项目结构
按照以下结构创建独立的项目:
```text
README.md
pyproject.toml
.env.example
configs/
eval.yaml
data/
raw_docs/
pdf/
xlsx/
parsed_docs/
documents.jsonl
parse_summary.json
mineru_raw/
exported/
knowledge.jsonl
chunks.jsonl
testsets/
testset.raw.jsonl
testset.reviewed.jsonl
runs/
weknora_answers.jsonl
ragas_input.jsonl
reports/
ragas_scores.csv
summary.md
scripts/
01_upload_docs.py
02_wait_ingestion.py
03_export_chunks.py
04_parse_docs.py
05_generate_testset.py
06_review_testset.py
07_run_weknora_qa.py
08_build_ragas_input.py
09_run_ragas_eval.py
10_report.py
src/
weknora_eval/
api.py
schemas.py
loaders.py
parsers/
local.py
mineru.py
testset.py
sse.py
ragas_runner.py
report.py
```
`pyproject.toml` 示例:
```toml
[project]
name = "weknora-ragas-eval"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"ragas>=0.3,<0.5",
"datasets>=2.19.0",
"pandas>=2.2.0",
"openpyxl>=3.1.0",
"requests>=2.32.0",
"sseclient-py>=1.8.0",
"python-dotenv>=1.0.0",
"pyyaml>=6.0.0",
"langchain>=0.2.0",
"langchain-community>=0.2.0",
"langchain-openai>=0.1.0",
"pypdf>=4.2.0"
]
[project.optional-dependencies]
pdf = [
"pymupdf>=1.24.0",
"pdfplumber>=0.11.0"
]
dev = [
"ruff>=0.6.0",
"pytest>=8.0.0"
]
```
安装命令:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e .
```
如果 PDF 解析效果不好,安装 PDF 增强依赖:
```bash
pip install -e ".[pdf]"
```
如果需要开发和测试工具:
```bash
pip install -e ".[dev,pdf]"
```
## 3. 环境配置
`.env.example`
```bash
WEKNORA_BASE_URL=http://localhost:8080/api/v1
WEKNORA_API_KEY=replace-me
WEKNORA_KB_ID=replace-me
# Ragas 生成测试集和评测打分使用的模型服务。
# 这里不是 WeKnora 后端的模型配置,而是评估项目自己调用的 LLM/Embedding。
# 如果使用 OpenAI 官方接口:
OPENAI_API_KEY=replace-me
OPENAI_BASE_URL=https://api.openai.com/v1
# 如果使用 OpenAI 兼容服务,例如自建网关、OneAPI、LiteLLM、硅基流动、OpenRouter 等:
# OPENAI_API_KEY=replace-me
# OPENAI_BASE_URL=https://your-openai-compatible-endpoint/v1
# 用于 Ragas 自动生成 QA 测试集的 LLM。
RAGAS_GENERATOR_MODEL=gpt-4o-mini
# 用于 Ragas 评测打分的 LLM,也就是 judge/evaluator。
RAGAS_JUDGE_MODEL=gpt-4o-mini
# 用于 Ragas 中部分语义相似度或问题生成流程的 embedding 模型。
RAGAS_EMBEDDING_MODEL=text-embedding-3-small
TESTSET_SIZE=50
REQUEST_INTERVAL_SECONDS=0.2
```
这几个模型变量的来源:
| 变量 | 用途 | 从哪里来 |
| --- | --- | --- |
| `RAGAS_GENERATOR_MODEL` | 生成 QA 测试集 | 你选择的评估侧 LLM 服务中的模型名称 |
| `RAGAS_JUDGE_MODEL` | Ragas 指标打分,例如 faithfulness、context recall | 你选择的评估侧 LLM 服务中的模型名称 |
| `RAGAS_EMBEDDING_MODEL` | Ragas 生成/评估中需要 embedding 的步骤 | 你选择的评估侧 embedding 服务中的模型名称 |
| `OPENAI_API_KEY` | 调用评估侧模型服务的 API Key | OpenAI 或 OpenAI 兼容服务提供 |
| `OPENAI_BASE_URL` | 调用评估侧模型服务的 Base URL | OpenAI 官方或兼容服务地址 |
注意:
- WeKnora 自己回答问题时使用的是 WeKnora 后端已经配置好的模型。
- Ragas 评估项目调用的 `RAGAS_GENERATOR_MODEL``RAGAS_JUDGE_MODEL``RAGAS_EMBEDDING_MODEL` 是独立的“评估侧模型”。
- 两边可以使用同一个模型服务,也可以分开。为了避免被测系统和评测裁判互相影响,建议评测侧 judge 模型能力不低于 WeKnora 回答模型。
- 如果你不用 OpenAI 官方服务,只要目标服务兼容 OpenAI Chat Completions 和 Embeddings API,一般可以通过 `OPENAI_BASE_URL` 接入。
`configs/eval.yaml`
```yaml
weknora:
base_url: "${WEKNORA_BASE_URL}"
api_key: "${WEKNORA_API_KEY}"
knowledge_base_id: "${WEKNORA_KB_ID}"
timeout_seconds: 300
request_interval_seconds: 0.2
testset:
size: 50
include_pdf: true
include_xlsx: true
min_context_chars: 80
require_manual_review: true
parsing:
# 可选:local 或 mineru
provider: "local"
output_path: "data/parsed_docs/documents.jsonl"
local:
pdf_backend: "pymupdf" # 可选:pypdf, pymupdf, pdfplumber
xlsx_mode: "row_text" # 可选:row_text, markdown_table
min_chars: 80
mineru:
mode: "cli" # 可选:cli, http
cli_bin: "mineru"
output_dir: "data/parsed_docs/mineru_raw"
http_base_url: ""
api_key: ""
timeout_seconds: 600
fallback_to_local: true
qa:
one_session_per_question: true
disable_title: true
enable_memory: false
channel: "api"
ragas:
provider: "openai-compatible"
api_key: "${OPENAI_API_KEY}"
base_url: "${OPENAI_BASE_URL}"
generator_model: "${RAGAS_GENERATOR_MODEL}"
judge_model: "${RAGAS_JUDGE_MODEL}"
embedding_model: "${RAGAS_EMBEDDING_MODEL}"
metrics:
- faithfulness
- response_relevancy
- context_precision
- context_recall
- factual_correctness
```
## 4. Ragas 侧文档解析方案
Ragas 生成 QA 测试集前,需要先把原始 PDF/XLSX 转成统一的文本 Document。这里不要直接把文件路径交给 Ragas,而是先执行独立解析步骤,产出标准化的 `documents.jsonl`
支持两种解析方式:
- 本地解析:适合快速验证、纯文本 PDF、结构简单的 XLSX。
- MinerU 解析:适合复杂 PDF、扫描件、表格/公式/多栏排版较多的文档。
### 4.1 统一解析产物
无论使用本地解析还是 MinerU,最终都要产出 `data/parsed_docs/documents.jsonl`,一行一个 Document:
```json
{
"doc_id": "contract.pdf::page-1",
"source_file": "contract.pdf",
"file_type": "pdf",
"page": 1,
"sheet": null,
"row_index": null,
"content": "第1页解析后的正文文本...",
"metadata": {
"parser": "local:pymupdf"
}
}
```
XLSX 行级记录示例:
```json
{
"doc_id": "sales.xlsx::Sheet1::row-12",
"source_file": "sales.xlsx",
"file_type": "xlsx",
"page": null,
"sheet": "Sheet1",
"row_index": 12,
"content": "产品: A产品\n年份: 2024\n销售额: 120万元",
"metadata": {
"parser": "local:openpyxl",
"columns": ["产品", "年份", "销售额"]
}
}
```
后续 Ragas 测试集生成只读取 `documents.jsonl`,不直接读取原始 PDF/XLSX。
### 4.2 本地解析
本地解析用于最低依赖、最快跑通。
PDF 可选 backend:
- `pypdf`:依赖轻,适合文本型 PDF。
- `pymupdf`:解析速度快,通常比 pypdf 稳。
- `pdfplumber`:适合需要保留部分表格/版面信息的 PDF。
XLSX 解析模式:
- `row_text`:每行转成 `列名: 值` 的文本,适合问答和检索。
- `markdown_table`:每个 sheet 转成 Markdown 表格,适合保留表格整体结构,但长表容易过长。
本地解析配置:
```yaml
parsing:
provider: "local"
output_path: "data/parsed_docs/documents.jsonl"
local:
pdf_backend: "pymupdf"
xlsx_mode: "row_text"
min_chars: 80
```
`scripts/04_parse_docs.py` 在本地解析模式下的职责:
- 遍历 `data/raw_docs/pdf``data/raw_docs/xlsx`
- PDF 按页或按段落输出 Document。
- XLSX 按行或按 sheet 输出 Document。
- 过滤过短文本。
- 写入 `data/parsed_docs/documents.jsonl`
- 保留 `source_file``page``sheet``row_index` 等元数据。
### 4.3 MinerU 解析
MinerU 作为可选增强解析能力。适用于:
- PDF 版面复杂。
- PDF 中有表格、公式、多栏排版。
- 扫描件或图片型 PDF。
- 需要 Markdown 格式作为 QA 生成上下文。
MinerU 支持两种接入模式。
#### 4.3.1 MinerU CLI 模式
配置:
```yaml
parsing:
provider: "mineru"
mineru:
mode: "cli"
cli_bin: "mineru"
output_dir: "data/parsed_docs/mineru_raw"
timeout_seconds: 600
fallback_to_local: true
```
预期行为:
- `scripts/04_parse_docs.py` 调用 MinerU CLI。
- 每个 PDF 解析到 `data/parsed_docs/mineru_raw/{file_stem}/`
- 从 MinerU 输出中读取 Markdown 或 JSON。
- 转换成统一 `documents.jsonl`
CLI 命令需要按实际安装的 MinerU 版本适配。独立项目中应把 MinerU CLI 调用封装在 `src/weknora_eval/parsers/mineru.py`,不要把具体命令散落在业务脚本里。
#### 4.3.2 MinerU HTTP 服务模式
如果已有 MinerU 服务,可以通过 HTTP 调用。
配置:
```yaml
parsing:
provider: "mineru"
mineru:
mode: "http"
http_base_url: "http://mineru.example.com"
api_key: "replace-me"
output_dir: "data/parsed_docs/mineru_raw"
timeout_seconds: 600
fallback_to_local: true
```
预期行为:
- 上传 PDF 到 MinerU HTTP 服务。
- 等待解析任务完成。
- 下载 Markdown/JSON 结果。
- 转换成统一 `documents.jsonl`
HTTP 接口路径需要按实际 MinerU 服务部署约定实现,因此 MinerU HTTP Client 必须做成可替换模块。
### 4.4 解析回退策略
建议实现以下策略:
1. 默认使用配置指定的 provider。
2. 如果 `provider=mineru` 且某个文件解析失败:
- 记录到 `data/parsed_docs/failed_parse.jsonl`
- 如果 `fallback_to_local=true`,回退到本地解析。
3. 如果本地解析结果为空或过短:
- 标记该文件为低质量解析。
- 不进入自动 QA 生成,等待人工处理。
失败记录格式:
```json
{
"source_file": "contract.pdf",
"parser": "mineru:cli",
"status": "failed",
"error": "timeout",
"fallback_used": "local:pymupdf"
}
```
### 4.5 解析质量检查
解析完成后生成 `data/parsed_docs/parse_summary.json`
```json
{
"total_files": 3,
"parsed_files": 3,
"failed_files": 0,
"total_documents": 128,
"empty_documents": 0,
"avg_chars": 512.4,
"parser": "local:pymupdf"
}
```
最低质量要求:
- 每个文件至少产生 1 条 Document。
- `content` 非空。
- 大部分 Document 长度不低于 `min_chars`
- metadata 中必须保留 `source_file`
## 5. WeKnora API 调用契约
### 5.1 上传文档
如果独立评估项目负责把原始 PDF/XLSX 上传到 WeKnora,使用这个接口。
请求:
```http
POST /api/v1/knowledge-bases/{knowledge_base_id}/knowledge/file
X-API-Key: <api-key>
Content-Type: multipart/form-data
```
Multipart 字段:
```text
file=@/path/to/file.pdf
enable_multimodel=false
```
响应示例:
```json
{
"success": true,
"data": {
"id": "knowledge-0001",
"knowledge_base_id": "kb-0001",
"type": "file",
"title": "contract.pdf",
"parse_status": "processing",
"enable_status": "disabled",
"file_name": "contract.pdf",
"file_type": "pdf",
"error_message": ""
}
}
```
需要持久化:
```json
{
"knowledge_id": "knowledge-0001",
"file_name": "contract.pdf",
"file_type": "pdf",
"parse_status": "processing"
}
```
### 5.2 轮询文档入库状态
请求:
```http
GET /api/v1/knowledge-bases/{knowledge_base_id}/knowledge?page=1&page_size=100
X-API-Key: <api-key>
```
响应示例:
```json
{
"success": true,
"data": [
{
"id": "knowledge-0001",
"title": "contract.pdf",
"parse_status": "completed",
"enable_status": "enabled",
"file_name": "contract.pdf",
"file_type": "pdf",
"processed_at": "2026-04-20T10:03:00+08:00",
"error_message": ""
}
],
"page": 1,
"page_size": 100,
"total": 1
}
```
完成条件:
```text
parse_status == "completed"
enable_status == "enabled"
```
失败条件:
```text
parse_status == "failed"
```
### 5.3 导出 chunks
请求:
```http
GET /api/v1/chunks/{knowledge_id}?page=1&page_size=100
X-API-Key: <api-key>
```
响应示例:
```json
{
"success": true,
"data": [
{
"id": "chunk-0001",
"knowledge_id": "knowledge-0001",
"knowledge_base_id": "kb-0001",
"content": "分块文本...",
"chunk_index": 0,
"is_enabled": true,
"status": 2,
"start_at": 0,
"end_at": 500,
"chunk_type": "text",
"parent_chunk_id": "",
"metadata": null,
"image_info": ""
}
],
"page": 1,
"page_size": 100,
"total": 35
}
```
保存为 `data/exported/chunks.jsonl`
```json
{
"chunk_id": "chunk-0001",
"knowledge_id": "knowledge-0001",
"knowledge_base_id": "kb-0001",
"chunk_index": 0,
"content": "分块文本...",
"source_file": "contract.pdf",
"chunk_type": "text"
}
```
### 5.4 创建会话
建议每个评测问题创建一个独立 session,避免历史上下文影响答案。
请求:
```http
POST /api/v1/sessions
X-API-Key: <api-key>
Content-Type: application/json
```
请求体:
```json
{
"title": "ragas-eval-qa-0001",
"description": "Ragas evaluation session"
}
```
响应示例:
```json
{
"success": true,
"data": {
"id": "session-0001",
"title": "ragas-eval-qa-0001",
"description": "Ragas evaluation session"
}
}
```
### 5.5 执行知识库问答
请求:
```http
POST /api/v1/knowledge-chat/{session_id}
X-API-Key: <api-key>
Content-Type: application/json
```
请求体:
```json
{
"query": "合同中的付款期限是什么?",
"knowledge_base_ids": ["kb-0001"],
"disable_title": true,
"enable_memory": false,
"channel": "api"
}
```
如果要限制在指定文件内检索,可以传:
```json
{
"query": "合同中的付款期限是什么?",
"knowledge_ids": ["knowledge-0001"],
"disable_title": true,
"enable_memory": false,
"channel": "api"
}
```
响应类型:Server-Sent Events。
引用事件:
```text
event: message
data: {
"id": "request-0001",
"response_type": "references",
"content": "",
"done": false,
"knowledge_references": [
{
"id": "chunk-0012",
"content": "买方应在收到合法有效发票后30日内完成付款。",
"knowledge_id": "knowledge-0001",
"chunk_index": 12,
"knowledge_title": "contract.pdf",
"start_at": 1200,
"end_at": 1480,
"seq": 12,
"score": 0.92,
"match_type": 3,
"metadata": {},
"chunk_type": "text",
"parent_chunk_id": "",
"image_info": "",
"knowledge_filename": "contract.pdf",
"knowledge_source": "file"
}
]
}
```
答案事件:
```text
event: message
data: {
"id": "request-0001",
"response_type": "answer",
"content": "合同约定,付款期限为收到合法有效发票后30日内。",
"done": false,
"knowledge_references": null
}
```
结束事件:
```text
event: message
data: {
"id": "request-0001",
"response_type": "answer",
"content": "",
"done": true,
"knowledge_references": null
}
```
需要提取:
```json
{
"request_id": "request-0001",
"response": "合同约定,付款期限为收到合法有效发票后30日内。",
"retrieved_contexts": [
"买方应在收到合法有效发票后30日内完成付款。"
],
"weknora_references": [
{
"id": "chunk-0012",
"content": "买方应在收到合法有效发票后30日内完成付款。",
"knowledge_id": "knowledge-0001",
"chunk_index": 12,
"score": 0.92,
"knowledge_filename": "contract.pdf"
}
]
}
```
### 5.6 可选:读取落库后的消息
用于在 SSE 完成后校验或补取最终 assistant 答案。
请求:
```http
GET /api/v1/messages/{session_id}/load?limit=10
X-API-Key: <api-key>
```
响应示例:
```json
{
"success": true,
"data": [
{
"id": "assistant-message-0001",
"session_id": "session-0001",
"request_id": "request-0001",
"content": "合同约定,付款期限为收到合法有效发票后30日内。",
"role": "assistant",
"knowledge_references": [
{
"id": "chunk-0012",
"content": "买方应在收到合法有效发票后30日内完成付款。",
"knowledge_id": "knowledge-0001",
"chunk_index": 12,
"knowledge_title": "contract.pdf",
"score": 0.92,
"match_type": 3,
"chunk_type": "text",
"knowledge_filename": "contract.pdf"
}
],
"is_completed": true,
"is_fallback": false
}
]
}
```
### 5.7 可选:纯检索接口
用于只评估检索,不评估生成。
请求:
```http
POST /api/v1/knowledge-search
X-API-Key: <api-key>
Content-Type: application/json
```
请求体:
```json
{
"query": "合同中的付款期限是什么?",
"knowledge_base_ids": ["kb-0001"]
}
```
响应示例:
```json
{
"success": true,
"data": [
{
"id": "chunk-0012",
"content": "买方应在收到合法有效发票后30日内完成付款。",
"knowledge_id": "knowledge-0001",
"chunk_index": 12,
"knowledge_title": "contract.pdf",
"start_at": 1200,
"end_at": 1480,
"seq": 12,
"score": 0.92,
"match_type": 3,
"chunk_type": "text",
"metadata": {},
"knowledge_filename": "contract.pdf",
"knowledge_source": "file"
}
]
}
```
## 6. QA 测试集生成方案
### 6.1 输入数据
建议同时保留两类输入:
- `data/parsed_docs/documents.jsonl` 中的标准化解析结果。
- `data/exported/chunks.jsonl` 中从 WeKnora 导出的 chunks。
推荐顺序:
1. 优先基于 `documents.jsonl` 生成候选 QA。
2. 保存 QA 对应的来源文件和依据片段。
3. 可选:把 `reference_contexts` 匹配回 WeKnora 的 chunk ID,用于计算 hit@k、recall@k、mrr 等非 LLM 检索指标。
### 6.2 测试集记录格式
`data/testsets/testset.raw.jsonl`
```json
{
"sample_id": "qa-0001",
"user_input": "合同中的付款期限是什么?",
"reference": "付款期限为收到合法有效发票后30日内。",
"reference_contexts": [
"买方应在收到合法有效发票后30日内完成付款。"
],
"source_file": "contract.pdf",
"gold_chunk_ids": ["chunk-0012"],
"question_type": "single_hop",
"review_status": "pending"
}
```
人工审核后的 `data/testsets/testset.reviewed.jsonl`
```json
{
"sample_id": "qa-0001",
"user_input": "合同中的付款期限是什么?",
"reference": "付款期限为收到合法有效发票后30日内。",
"reference_contexts": [
"买方应在收到合法有效发票后30日内完成付款。"
],
"source_file": "contract.pdf",
"gold_chunk_ids": ["chunk-0012"],
"question_type": "single_hop",
"review_status": "approved"
}
```
### 6.3 问题类型建议
建议包含:
- PDF 单跳事实问答。
- PDF 多跳问答,例如跨相邻章节综合。
- PDF 定义、条件、例外条款类问题。
- XLSX 单行查询问题。
- XLSX 条件筛选问题。
第一阶段暂时避免:
- 复杂表格聚合问题,除非期望 WeKnora 本身支持表格计算。
- 依赖图片内容才能回答的问题。
- 需要外部知识的问题。
- 存在多个合理答案的模糊问题。
## 7. Ragas 输入构造方案
对每条审核通过的 QA:
1. 创建一个干净 session。
2. 调用 `POST /knowledge-chat/{session_id}`
3. 解析 SSE 中的 references 事件。
4. 解析 SSE 中的 answer 事件。
5. 构造一条 Ragas 输入记录。
`data/runs/ragas_input.jsonl`
```json
{
"sample_id": "qa-0001",
"user_input": "合同中的付款期限是什么?",
"response": "合同约定,付款期限为收到合法有效发票后30日内。",
"retrieved_contexts": [
"买方应在收到合法有效发票后30日内完成付款。"
],
"reference": "付款期限为收到合法有效发票后30日内。",
"reference_contexts": [
"买方应在收到合法有效发票后30日内完成付款。"
],
"session_id": "session-0001",
"request_id": "request-0001",
"weknora_references": [
{
"id": "chunk-0012",
"knowledge_id": "knowledge-0001",
"chunk_index": 12,
"score": 0.92,
"knowledge_filename": "contract.pdf"
}
]
}
```
## 8. Ragas 指标方案
第一阶段建议使用:
| 指标 | 必要字段 | 作用 |
| --- | --- | --- |
| faithfulness | response, retrieved_contexts | 检查答案是否被检索内容支撑。 |
| response_relevancy | user_input, response | 检查答案是否切题。 |
| context_precision | user_input, retrieved_contexts, reference | 检查靠前的检索上下文是否相关。 |
| context_recall | retrieved_contexts, reference | 检查检索上下文是否包含足够证据。 |
| factual_correctness | response, reference | 检查答案与标准答案事实是否一致。 |
如果测试集里有 `gold_chunk_ids`,建议额外计算非 LLM 检索指标:
- hit@1
- hit@3
- hit@5
- recall@k
- mrr
- ndcg@k
## 9. 报告方案
生成 `data/reports/summary.md`
```markdown
# Ragas 评估报告
## 运行信息
- WeKnora Base URL:
- 知识库 ID:
- 测试集规模:
- 审核通过样本数:
- 失败样本数:
- Judge 模型:
## 聚合指标
| 指标 | 平均值 | P50 | 失败阈值 |
| --- | --- | --- | --- |
## 检索失败样本
| sample_id | 问题 | 预期文件 | 实际召回文件 | context_recall | 备注 |
## 生成失败样本
| sample_id | 问题 | 模型答案 | 标准答案 | faithfulness | factual_correctness |
## 改进建议
- ...
```
同时保存:
- `ragas_scores.csv`:每条样本的指标。
- `weknora_answers.jsonl`:WeKnora 原始输出。
- `ragas_input.jsonl`:实际输入 Ragas 的数据。
- `failed_requests.jsonl`:API 失败或 SSE 解析失败记录。
## 10. 实施清单
### 阶段 1:项目脚手架
- [ ] 创建独立仓库或目录 `weknora-ragas-eval`
- [ ] 添加 Python 项目元数据和依赖锁定。
- [ ] 添加 `.env.example`
- [ ] 添加 `configs/eval.yaml`
- [ ] 创建 `data/` 下的各级目录。
- [ ] 添加结构化日志。
- [ ] 添加重试和超时策略。
### 阶段 2:WeKnora API Client
- [ ] 实现 `create_session`
- [ ] 实现 `upload_file`
- [ ] 实现 `list_knowledge`
- [ ] 实现 `wait_ingestion_completed`
- [ ] 实现 `list_chunks`
- [ ] 实现 `knowledge_chat_sse`
- [ ] 实现 `load_messages`
- [ ] 实现 `knowledge_search`
- [ ] API 错误时保存响应体。
- [ ] 实现分页工具函数。
### 阶段 3:文档与 chunk 导出
- [ ] 上传 PDF 文件。
- [ ] 上传 XLSX 文件。
- [ ] 轮询直到所有文档 completed 或 failed。
- [ ] 导出全部 knowledge 元数据。
- [ ] 导出全部 chunks。
- [ ] 过滤 disabled chunks。
- [ ] 过滤空 chunks。
- [ ] 保留来源文件元数据。
### 阶段 4:Ragas 侧文档解析
- [ ] 实现本地 PDF 解析。
- [ ] 实现本地 XLSX 解析。
- [ ] 实现 MinerU CLI 解析适配。
- [ ] 实现 MinerU HTTP 解析适配。
- [ ] 将所有解析结果转换为 `documents.jsonl`
- [ ] 记录解析失败文件。
- [ ] 生成 `parse_summary.json`
- [ ] 支持 MinerU 失败后回退本地解析。
### 阶段 5:测试集生成
- [ ] 加载 `data/parsed_docs/documents.jsonl`
- [ ] 使用 Ragas 生成候选 QA。
- [ ] 保存原始候选测试集。
- [ ] 增加人工审核字段。
- [ ] 生成审核后的测试集。
- [ ] 执行最低质量检查:
- [ ] 问题可以从给定上下文回答。
- [ ] 标准答案有依据。
- [ ] `reference_contexts` 非空。
- [ ] 记录来源文件。
### 阶段 6:运行 WeKnora QA
- [ ] 每条 QA 创建一个干净 session。
- [ ] 调用 `knowledge-chat`
- [ ] 解析 SSE references 事件。
- [ ] 解析 SSE answer 事件。
- [ ] 按 chunk ID 去重引用。
- [ ] 保存原始答案和引用。
- [ ] 记录空答案失败。
- [ ] 记录空检索失败。
- [ ] 可选:通过 message load API 校验最终答案。
### 阶段 7:构造 Ragas 输入
- [ ] 合并审核后的 QA 与 WeKnora 输出。
- [ ] 构造 `user_input`
- [ ] 构造 `response`
- [ ] 构造 `retrieved_contexts`
- [ ] 构造 `reference`
- [ ] 构造 `reference_contexts`
- [ ] 保留 `sample_id``session_id``request_id` 和 references,便于排查。
- [ ] 校验必要字段不缺失。
### 阶段 8:运行 Ragas 评估
- [ ] 配置 judge LLM。
- [ ] 配置 embedding 模型。
- [ ] 运行 faithfulness。
- [ ] 运行 response relevancy。
- [ ] 运行 context precision。
- [ ] 运行 context recall。
- [ ] 运行 factual correctness。
- [ ] 保存逐样本分数。
- [ ] 保存聚合分数。
- [ ] 按样本捕获 Ragas 异常。
### 阶段 9:基于 chunk ID 的检索指标
- [ ] 如果存在 `gold_chunk_ids`,计算 hit@k。
- [ ] 计算 recall@k。
- [ ] 计算 mrr。
- [ ] 计算 ndcg@k。
- [ ] 对比 chunk-ID 指标和 Ragas LLM-based context 指标。
### 阶段 10:生成报告
- [ ] 生成 Markdown 报告。
- [ ] 写入运行信息。
- [ ] 写入聚合指标。
- [ ] 写入最差检索样本。
- [ ] 写入最差生成样本。
- [ ] 写入空检索数量。
- [ ] 写入 fallback 答案数量。
- [ ] 写入来源文件分布。
- [ ] 写入改进建议。
## 11. 验收标准
独立评估项目达到以下条件即认为可用:
- [ ] 可以上传一小批 PDF/XLSX 到 WeKnora。
- [ ] 可以检测文档入库完成。
- [ ] 可以从 WeKnora 导出 chunks。
- [ ] 可以通过本地解析或 MinerU 解析生成 `documents.jsonl`
- [ ] 可以创建或导入至少 10 条审核通过的 QA。
- [ ] 可以对每条 QA 调用 WeKnora。
- [ ] 可以解析 `response``retrieved_contexts`
- [ ] 可以构造合法的 Ragas 输入 JSONL。
- [ ] 可以产出逐样本 Ragas 分数。
- [ ] 可以产出可读的汇总报告。
- [ ] 所有中间产物都已保存,便于复盘和排查。
## 12. 首轮 Pilot Run
先用很小的数据集跑通闭环:
- 2 个 PDF 文件。
- 1 个 XLSX 文件。
- 10 条人工审核通过的 QA。
- 每条样本一个独立 session。
- 指标:
- faithfulness
- response_relevancy
- context_precision
- context_recall
- factual_correctness
预期产物:
```text
data/exported/knowledge.jsonl
data/exported/chunks.jsonl
data/parsed_docs/documents.jsonl
data/parsed_docs/parse_summary.json
data/testsets/testset.reviewed.jsonl
data/runs/weknora_answers.jsonl
data/runs/ragas_input.jsonl
data/reports/ragas_scores.csv
data/reports/summary.md
```
只有当首轮确认以下问题都正常后,再扩展到 50-300 条样本:
- `retrieved_contexts` 没有系统性为空。
- `response` 能正确捕获。
- Ragas 输入字段合法。
- 人工审核确认 QA 集有评测意义。
# WeKnora Ragas Eval
独立的 WeKnora Ragas 评估项目。它只调用 WeKnora 公开 API,不依赖 WeKnora 内置的 `/evaluation` 接口。
## 安装
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e .
```
如果需要更好的 PDF 解析能力:
```bash
pip install -e ".[pdf]"
```
开发和测试工具:
```bash
pip install -e ".[dev,pdf]"
```
## 配置
```bash
cp .env.example .env
```
编辑 `.env` 后确认:
- `WEKNORA_BASE_URL` 指向 WeKnora API v1,例如 `http://localhost:9090/api/v1`
- `WEKNORA_API_KEY` 是 WeKnora API Key
- `WEKNORA_KB_ID` 是目标知识库 ID;如果还没有,先运行 `python scripts/00_create_kb.py`
- `WEKNORA_KB_NAME` 是创建知识库时使用的名称
- `OPENAI_API_KEY``OPENAI_BASE_URL``RAGAS_*_MODEL` 是评估侧模型配置
- 如果 LLM 和 embedding 分开部署,使用 `RAGAS_LLM_BASE_URL` 指向 vLLM 的 `/v1`,使用 `RAGAS_EMBEDDING_BASE_URL` 指向 Infinity 的 `/v1`
## 首轮 Pilot
把原始文件放到:
- `data/raw_docs/pdf/`
- `data/raw_docs/xlsx/`
按顺序执行:
```bash
python scripts/00_create_kb.py
python scripts/01_upload_docs.py
python scripts/02_wait_ingestion.py
python scripts/03_export_chunks.py
python scripts/04_parse_docs.py
python scripts/05_generate_testset.py
python scripts/06_review_testset.py
python scripts/07_run_weknora_qa.py
python scripts/08_build_ragas_input.py
python scripts/09_run_ragas_eval.py
python scripts/10_report.py
```
首轮建议只使用 2 个 PDF、1 个 XLSX 和 10 条审核通过 QA,确认 `retrieved_contexts``response`、Ragas 输入字段都正常后再扩展样本量。
## 主要产物
- `data/exported/knowledge.jsonl`
- `data/exported/chunks.jsonl`
- `data/parsed_docs/documents.jsonl`
- `data/parsed_docs/parse_summary.json`
- `data/testsets/testset.raw.jsonl`
- `data/testsets/testset.reviewed.jsonl`
- `data/runs/weknora_answers.jsonl`
- `data/runs/ragas_input.jsonl`
- `data/reports/ragas_scores.csv`
- `data/reports/summary.md`
weknora:
base_url: "${WEKNORA_BASE_URL}"
api_key: "${WEKNORA_API_KEY}"
knowledge_base_id: "${WEKNORA_KB_ID}"
knowledge_base_name: "${WEKNORA_KB_NAME:-ragas-eval-pilot}"
knowledge_base_description: "Knowledge base for independent Ragas evaluation."
timeout_seconds: 300
request_interval_seconds: "${REQUEST_INTERVAL_SECONDS:-0.2}"
testset:
size: "${TESTSET_SIZE:-50}"
include_pdf: true
include_xlsx: true
min_context_chars: 80
require_manual_review: true
parsing:
provider: "local"
output_path: "data/parsed_docs/documents.jsonl"
failed_path: "data/parsed_docs/failed_parse.jsonl"
summary_path: "data/parsed_docs/parse_summary.json"
local:
pdf_backend: "pymupdf"
xlsx_mode: "row_text"
min_chars: 80
mineru:
mode: "cli"
cli_bin: "mineru"
output_dir: "data/parsed_docs/mineru_raw"
http_base_url: "http://172.23.184.9:8002"
api_key: "mineru"
timeout_seconds: 600
fallback_to_local: false
qa:
one_session_per_question: true
disable_title: true
enable_memory: false
channel: "api"
verify_with_messages: false
ragas:
provider: "openai-compatible"
# Backward-compatible defaults. If the split LLM/embedding values below are
# empty, these values are used for both clients.
api_key: "${OPENAI_API_KEY}"
base_url: "${OPENAI_BASE_URL}"
# vLLM OpenAI-compatible endpoint, for example http://localhost:8000/v1.
llm_api_key: "${RAGAS_LLM_API_KEY}"
llm_base_url: "${RAGAS_LLM_BASE_URL}"
# Infinity OpenAI-compatible embedding endpoint, for example
# http://localhost:7997/v1.
embedding_api_key: "${RAGAS_EMBEDDING_API_KEY}"
embedding_base_url: "${RAGAS_EMBEDDING_BASE_URL}"
# Reserved for future retrieval/rerank metrics. The current Ragas pipeline
# does not call reranker APIs.
reranker_api_key: "${RAGAS_RERANKER_API_KEY}"
reranker_base_url: "${RAGAS_RERANKER_BASE_URL}"
reranker_model: "${RAGAS_RERANKER_MODEL}"
generator_model: "${RAGAS_GENERATOR_MODEL}"
judge_model: "${RAGAS_JUDGE_MODEL}"
embedding_model: "${RAGAS_EMBEDDING_MODEL}"
temperature: 0
max_tokens: 4096
timeout_seconds: 600
max_workers: 1
metrics:
- faithfulness
- response_relevancy
- context_precision
- context_recall
- factual_correctness
[project]
name = "weknora-ragas-eval"
version = "0.1.0"
description = "Independent Ragas evaluation pipeline for WeKnora public APIs."
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"ragas>=0.3,<0.5",
"datasets>=2.19.0",
"pandas>=2.2.0",
"openpyxl>=3.1.0",
"requests>=2.32.0",
"sseclient-py>=1.8.0",
"python-dotenv>=1.0.0",
"pyyaml>=6.0.0",
"langchain>=0.2.0",
"langchain-community>=0.2.0",
"langchain-openai>=0.1.0",
"pypdf>=4.2.0"
]
[project.optional-dependencies]
pdf = [
"pymupdf>=1.24.0",
"pdfplumber>=0.11.0"
]
dev = [
"ruff>=0.6.0",
"pytest>=8.0.0"
]
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.ruff]
line-length = 100
target-version = "py310"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B"]
from __future__ import annotations
import sys
from typing import Any
import _bootstrap # noqa: F401
from weknora_eval.api import bootstrap_client_from_config
from weknora_eval.config import load_config, require_config
from weknora_eval.envfile import set_env_value
from weknora_eval.loaders import setup_logging, write_json
def main() -> int:
setup_logging()
config = load_config()
client = bootstrap_client_from_config(config)
weknora = config["weknora"]
existing_id = str(weknora.get("knowledge_base_id") or "")
name = str(require_config(config, "weknora.knowledge_base_name"))
if existing_id and existing_id != "replace-me":
record = {"id": existing_id, "name": name, "source": "env"}
write_json("data/exported/knowledge_base.json", record)
print(f"WEKNORA_KB_ID already set: {existing_id}")
return 0
created = client.create_knowledge_base(name=name)
knowledge_base_id = _extract_knowledge_base_id(created)
if not knowledge_base_id:
print(f"Created knowledge base but could not extract id from response: {created}")
return 1
set_env_value(".env", "WEKNORA_KB_ID", knowledge_base_id)
write_json("data/exported/knowledge_base.json", {**created, "source": "create"})
print(f"WEKNORA_KB_ID={knowledge_base_id}")
print("Wrote ID to .env and data/exported/knowledge_base.json")
return 0
def _extract_knowledge_base_id(payload: dict[str, Any]) -> str | None:
candidates = [payload]
for key in ("data", "knowledge_base"):
nested = payload.get(key)
if isinstance(nested, dict):
candidates.append(nested)
for row in candidates:
for key in ("id", "knowledge_base_id", "kb_id", "uuid"):
value = row.get(key)
if value:
return str(value)
return None
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
from pathlib import Path
import _bootstrap # noqa: F401
from weknora_eval.api import client_from_config
from weknora_eval.config import load_config
from weknora_eval.loaders import setup_logging, write_jsonl
def main() -> int:
setup_logging()
config = load_config()
client = client_from_config(config)
files = sorted(Path("data/raw_docs/pdf").glob("*.pdf")) + sorted(
Path("data/raw_docs/xlsx").glob("*.xlsx")
)
rows = []
for path in files:
data = client.upload_file(path)
rows.append(
{
"knowledge_id": data.get("id"),
"file_name": data.get("file_name") or data.get("title") or path.name,
"file_type": data.get("file_type") or path.suffix.lstrip("."),
"parse_status": data.get("parse_status"),
"enable_status": data.get("enable_status"),
"raw": data,
}
)
write_jsonl("data/exported/knowledge_uploads.jsonl", rows)
print(f"Uploaded {len(rows)} files")
return 0
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.api import client_from_config
from weknora_eval.config import load_config
from weknora_eval.loaders import read_jsonl, setup_logging, write_jsonl
def main() -> int:
setup_logging()
config = load_config()
client = client_from_config(config)
uploads = read_jsonl("data/exported/knowledge_uploads.jsonl", missing_ok=True)
knowledge_ids = {row["knowledge_id"] for row in uploads if row.get("knowledge_id")} or None
result = client.wait_ingestion_completed(knowledge_ids=knowledge_ids)
knowledge = client.list_knowledge()
write_jsonl("data/exported/knowledge.jsonl", knowledge)
print(
"Ingestion status: "
f"completed={len(result['completed'])} failed={len(result['failed'])} "
f"pending={len(result['pending'])}"
)
return 1 if result["failed"] or result["pending"] else 0
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.api import client_from_config
from weknora_eval.config import load_config
from weknora_eval.loaders import setup_logging, write_jsonl
def main() -> int:
setup_logging()
config = load_config()
client = client_from_config(config)
knowledge_rows = client.list_knowledge()
write_jsonl("data/exported/knowledge.jsonl", knowledge_rows)
knowledge_by_id = {row.get("id"): row for row in knowledge_rows}
chunk_rows = []
for knowledge in knowledge_rows:
knowledge_id = knowledge.get("id")
if not knowledge_id:
continue
if knowledge.get("parse_status") != "completed" or knowledge.get("enable_status") != "enabled":
continue
for chunk in client.list_chunks(str(knowledge_id)):
content = (chunk.get("content") or "").strip()
if not content:
continue
if chunk.get("is_enabled") is False:
continue
source = knowledge_by_id.get(chunk.get("knowledge_id")) or knowledge
chunk_rows.append(
{
"chunk_id": chunk.get("id"),
"knowledge_id": chunk.get("knowledge_id") or knowledge_id,
"knowledge_base_id": chunk.get("knowledge_base_id")
or config["weknora"]["knowledge_base_id"],
"chunk_index": chunk.get("chunk_index"),
"content": content,
"source_file": source.get("file_name") or source.get("title"),
"chunk_type": chunk.get("chunk_type"),
"raw": chunk,
}
)
write_jsonl("data/exported/chunks.jsonl", chunk_rows)
print(f"Exported {len(chunk_rows)} chunks from {len(knowledge_rows)} knowledge records")
return 0
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.config import load_config
from weknora_eval.loaders import setup_logging
from weknora_eval.parsers.local import parse_raw_docs
from weknora_eval.parsers.mineru import parse_with_mineru
def main() -> int:
setup_logging()
config = load_config()
provider = config.get("parsing", {}).get("provider", "local")
if provider == "local":
rows, summary = parse_raw_docs(config)
elif provider == "mineru":
rows, summary = parse_with_mineru(config)
else:
raise ValueError(f"Unsupported parsing provider: {provider}")
print(f"Parsed {len(rows)} documents: {summary}")
return 0 if rows else 1
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.config import load_config
from weknora_eval.loaders import setup_logging
from weknora_eval.testset import generate_rule_based_testset
def main() -> int:
setup_logging()
config = load_config()
testset = config.get("testset", {})
rows = generate_rule_based_testset(
size=int(testset.get("size", 50)),
min_context_chars=int(testset.get("min_context_chars", 80)),
)
print(f"Generated {len(rows)} pending QA candidates at data/testsets/testset.raw.jsonl")
return 0 if rows else 1
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.loaders import setup_logging
from weknora_eval.testset import approve_pending_testset, validate_reviewed_testset
def main() -> int:
setup_logging()
rows = approve_pending_testset()
errors = validate_reviewed_testset()
if errors:
for error in errors:
print(error)
return 1
print(f"Wrote {len(rows)} approved QA records to data/testsets/testset.reviewed.jsonl")
return 0 if rows else 1
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.api import client_from_config
from weknora_eval.config import load_config
from weknora_eval.loaders import append_jsonl, read_jsonl, setup_logging, write_jsonl
def main() -> int:
setup_logging()
config = load_config()
client = client_from_config(config)
qa_config = config.get("qa", {})
rows = [row for row in read_jsonl("data/testsets/testset.reviewed.jsonl") if row.get("review_status") == "approved"]
answers = []
for index, row in enumerate(rows, start=1):
sample_id = row["sample_id"]
try:
session = client.create_session(title=f"ragas-eval-{sample_id}")
session_id = session.get("id")
if not session_id:
raise RuntimeError(f"create_session returned no id for {sample_id}")
result = client.knowledge_chat_sse(
session_id=session_id,
query=row["user_input"],
disable_title=bool(qa_config.get("disable_title", True)),
enable_memory=bool(qa_config.get("enable_memory", False)),
channel=str(qa_config.get("channel", "api")),
)
answer = {
"sample_id": sample_id,
"user_input": row["user_input"],
"session_id": session_id,
"request_id": result.get("request_id"),
"response": result.get("response") or "",
"retrieved_contexts": result.get("retrieved_contexts") or [],
"weknora_references": result.get("weknora_references") or [],
"error": None,
}
if not answer["response"]:
answer["error"] = "empty_response"
append_jsonl("data/runs/failed_requests.jsonl", answer)
elif not answer["retrieved_contexts"]:
append_jsonl("data/runs/failed_requests.jsonl", {**answer, "error": "empty_retrieval"})
answers.append(answer)
print(f"[{index}/{len(rows)}] {sample_id} response_chars={len(answer['response'])}")
except Exception as exc: # noqa: BLE001
failed = {
"sample_id": sample_id,
"user_input": row.get("user_input"),
"response": "",
"retrieved_contexts": [],
"weknora_references": [],
"session_id": None,
"request_id": None,
"error": str(exc),
}
answers.append(failed)
append_jsonl("data/runs/failed_requests.jsonl", failed)
print(f"[{index}/{len(rows)}] {sample_id} failed: {exc}")
write_jsonl("data/runs/weknora_answers.jsonl", answers)
failures = [row for row in answers if row.get("error") and row.get("error") != "empty_retrieval"]
return 1 if failures else 0
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.loaders import append_jsonl, read_jsonl, setup_logging, write_jsonl
def main() -> int:
setup_logging()
testset = {
row["sample_id"]: row
for row in read_jsonl("data/testsets/testset.reviewed.jsonl")
if row.get("review_status") == "approved"
}
answers = {row["sample_id"]: row for row in read_jsonl("data/runs/weknora_answers.jsonl")}
ragas_rows = []
for sample_id, qa in testset.items():
answer = answers.get(sample_id)
if not answer:
append_jsonl("data/runs/failed_requests.jsonl", {"sample_id": sample_id, "error": "missing_answer"})
continue
row = {
"sample_id": sample_id,
"user_input": qa["user_input"],
"response": answer.get("response") or "",
"retrieved_contexts": answer.get("retrieved_contexts") or [],
"reference": qa["reference"],
"reference_contexts": qa.get("reference_contexts") or [],
"session_id": answer.get("session_id"),
"request_id": answer.get("request_id"),
"weknora_references": answer.get("weknora_references") or [],
"source_file": qa.get("source_file"),
"gold_chunk_ids": qa.get("gold_chunk_ids") or [],
}
missing = [
key
for key in ("user_input", "response", "retrieved_contexts", "reference", "reference_contexts")
if not row.get(key)
]
if missing:
append_jsonl(
"data/runs/failed_requests.jsonl",
{"sample_id": sample_id, "error": f"missing_ragas_fields:{','.join(missing)}"},
)
continue
ragas_rows.append(row)
write_jsonl("data/runs/ragas_input.jsonl", ragas_rows)
print(f"Built {len(ragas_rows)} Ragas input rows")
return 0 if ragas_rows else 1
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.config import load_config
from weknora_eval.loaders import setup_logging
from weknora_eval.ragas_runner import run_ragas_eval
def main() -> int:
setup_logging()
config = load_config()
scores = run_ragas_eval(config)
print(f"Wrote {len(scores)} Ragas score rows to data/reports/ragas_scores.csv")
return 0
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
import _bootstrap # noqa: F401
from weknora_eval.config import load_config
from weknora_eval.loaders import setup_logging
from weknora_eval.report import generate_summary_report
def main() -> int:
setup_logging()
config = load_config()
generate_summary_report(config)
print("Wrote report to data/reports/summary.md")
return 0
if __name__ == "__main__":
sys.exit(main())
from __future__ import annotations
import sys
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
SRC = PROJECT_ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
"""Independent Ragas evaluation pipeline for WeKnora."""
__all__ = [
"__version__",
]
__version__ = "0.1.0"
from __future__ import annotations
import logging
import time
from pathlib import Path
from typing import Any
from urllib.parse import urljoin
import requests
from weknora_eval.config import require_config
from weknora_eval.loaders import append_jsonl
from weknora_eval.sse import normalize_reference, parse_sse_events
logger = logging.getLogger(__name__)
class WeKnoraApiError(RuntimeError):
pass
class WeKnoraClient:
def __init__(
self,
*,
base_url: str,
api_key: str,
knowledge_base_id: str,
timeout_seconds: int = 300,
request_interval_seconds: float = 0.2,
error_log_path: str | Path = "data/runs/api_errors.jsonl",
max_retries: int = 3,
) -> None:
self.base_url = base_url.rstrip("/") + "/"
self.api_key = api_key
self.knowledge_base_id = knowledge_base_id
self.timeout_seconds = timeout_seconds
self.request_interval_seconds = request_interval_seconds
self.error_log_path = Path(error_log_path)
self.max_retries = max_retries
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
def create_knowledge_base(self, *, name: str) -> dict[str, Any]:
return self._json_request("POST", "knowledge-bases", json={"name": name})
def create_session(
self,
title: str,
description: str = "Ragas evaluation session",
) -> dict[str, Any]:
payload = {"title": title, "description": description}
return self._json_request("POST", "sessions", json=payload)
def upload_file(self, file_path: str | Path, *, enable_multimodel: bool = False) -> dict[str, Any]:
self._ensure_knowledge_base_id()
target = Path(file_path)
with target.open("rb") as file:
files = {"file": (target.name, file)}
data = {"enable_multimodel": str(enable_multimodel).lower()}
return self._json_request(
"POST",
f"knowledge-bases/{self.knowledge_base_id}/knowledge/file",
files=files,
data=data,
)
def list_knowledge(self, *, page_size: int = 100) -> list[dict[str, Any]]:
self._ensure_knowledge_base_id()
return self._paginate(
f"knowledge-bases/{self.knowledge_base_id}/knowledge",
page_size=page_size,
)
def wait_ingestion_completed(
self,
*,
knowledge_ids: set[str] | None = None,
timeout_seconds: int | None = None,
poll_interval_seconds: float = 5.0,
) -> dict[str, list[dict[str, Any]]]:
deadline = time.monotonic() + (timeout_seconds or self.timeout_seconds)
target_ids = knowledge_ids or set()
while time.monotonic() < deadline:
rows = self.list_knowledge()
if target_ids:
rows = [row for row in rows if row.get("id") in target_ids]
completed = [
row
for row in rows
if row.get("parse_status") == "completed" and row.get("enable_status") == "enabled"
]
failed = [row for row in rows if row.get("parse_status") == "failed"]
if failed:
return {"completed": completed, "failed": failed, "pending": []}
if rows and len(completed) == len(rows):
return {"completed": completed, "failed": [], "pending": []}
pending = [row for row in rows if row not in completed]
logger.info("Waiting for ingestion: completed=%s pending=%s", len(completed), len(pending))
time.sleep(poll_interval_seconds)
rows = self.list_knowledge()
if target_ids:
rows = [row for row in rows if row.get("id") in target_ids]
completed = [
row
for row in rows
if row.get("parse_status") == "completed" and row.get("enable_status") == "enabled"
]
failed = [row for row in rows if row.get("parse_status") == "failed"]
pending = [row for row in rows if row not in completed and row not in failed]
return {"completed": completed, "failed": failed, "pending": pending}
def list_chunks(self, knowledge_id: str, *, page_size: int = 100) -> list[dict[str, Any]]:
return self._paginate(f"chunks/{knowledge_id}", page_size=page_size)
def knowledge_chat_sse(
self,
*,
session_id: str,
query: str,
knowledge_ids: list[str] | None = None,
knowledge_base_ids: list[str] | None = None,
disable_title: bool = True,
enable_memory: bool = False,
channel: str = "api",
) -> dict[str, Any]:
payload: dict[str, Any] = {
"query": query,
"disable_title": disable_title,
"enable_memory": enable_memory,
"channel": channel,
}
if knowledge_ids:
payload["knowledge_ids"] = knowledge_ids
else:
self._ensure_knowledge_base_id()
payload["knowledge_base_ids"] = knowledge_base_ids or [self.knowledge_base_id]
url = self._url(f"knowledge-chat/{session_id}")
response = self.session.post(
url,
json=payload,
timeout=self.timeout_seconds,
stream=True,
headers={"Accept": "text/event-stream"},
)
if response.status_code >= 400:
self._log_error("POST", url, response)
raise WeKnoraApiError(f"POST {url} failed with HTTP {response.status_code}")
answer_parts: list[str] = []
references: list[dict[str, Any]] = []
raw_events: list[dict[str, Any]] = []
request_id: str | None = None
seen_reference_ids: set[str] = set()
for event in parse_sse_events(response.iter_lines(decode_unicode=True)):
raw_events.append(event)
data = event.get("data")
if not isinstance(data, dict):
continue
request_id = request_id or data.get("id")
response_type = data.get("response_type")
if response_type == "references":
for reference in data.get("knowledge_references") or []:
normalized = normalize_reference(reference)
reference_id = str(normalized.get("id") or "")
if reference_id and reference_id in seen_reference_ids:
continue
if reference_id:
seen_reference_ids.add(reference_id)
references.append(normalized)
elif response_type == "answer" and not data.get("done"):
answer_parts.append(data.get("content") or "")
retrieved_contexts = [ref["content"] for ref in references if ref.get("content")]
return {
"request_id": request_id,
"response": "".join(answer_parts).strip(),
"retrieved_contexts": retrieved_contexts,
"weknora_references": references,
"raw_events": raw_events,
}
def load_messages(self, session_id: str, *, limit: int = 10) -> list[dict[str, Any]]:
payload = self._json_request("GET", f"messages/{session_id}/load", params={"limit": limit})
if isinstance(payload, list):
return payload
return []
def knowledge_search(
self,
query: str,
*,
knowledge_ids: list[str] | None = None,
knowledge_base_ids: list[str] | None = None,
) -> list[dict[str, Any]]:
payload: dict[str, Any] = {"query": query}
if knowledge_ids:
payload["knowledge_ids"] = knowledge_ids
else:
self._ensure_knowledge_base_id()
payload["knowledge_base_ids"] = knowledge_base_ids or [self.knowledge_base_id]
data = self._json_request("POST", "knowledge-search", json=payload)
return data if isinstance(data, list) else []
def _paginate(self, path: str, *, page_size: int = 100) -> list[dict[str, Any]]:
page = 1
rows: list[dict[str, Any]] = []
while True:
envelope = self._request("GET", path, params={"page": page, "page_size": page_size})
payload = self._decode_envelope(envelope)
if not isinstance(payload, list):
raise WeKnoraApiError(f"Expected list response for {path}, got {type(payload).__name__}")
rows.extend(payload)
total = int(envelope.get("total") or len(rows))
if len(rows) >= total or not payload:
return rows
page += 1
def _json_request(self, method: str, path: str, **kwargs: Any) -> Any:
envelope = self._request(method, path, **kwargs)
return self._decode_envelope(envelope)
def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]:
url = self._url(path)
last_error: Exception | None = None
for attempt in range(1, self.max_retries + 1):
try:
response = self.session.request(
method,
url,
timeout=self.timeout_seconds,
**kwargs,
)
if response.status_code in {429, 500, 502, 503, 504} and attempt < self.max_retries:
time.sleep(attempt)
continue
if response.status_code >= 400:
self._log_error(method, url, response)
raise WeKnoraApiError(f"{method} {url} failed with HTTP {response.status_code}")
time.sleep(self.request_interval_seconds)
return response.json()
except (requests.RequestException, ValueError, WeKnoraApiError) as exc:
last_error = exc
if attempt >= self.max_retries:
break
time.sleep(attempt)
raise WeKnoraApiError(f"{method} {url} failed: {last_error}") from last_error
def _decode_envelope(self, envelope: dict[str, Any]) -> Any:
if envelope.get("success") is False:
raise WeKnoraApiError(str(envelope))
return envelope.get("data", envelope)
def _url(self, path: str) -> str:
return urljoin(self.base_url, path.lstrip("/"))
def _ensure_knowledge_base_id(self) -> None:
if not self.knowledge_base_id:
raise WeKnoraApiError("Missing knowledge_base_id. Run scripts/00_create_kb.py first.")
def _log_error(self, method: str, url: str, response: requests.Response) -> None:
body = response.text[:5000]
append_jsonl(
self.error_log_path,
{
"method": method,
"url": url,
"status_code": response.status_code,
"response_body": body,
},
)
def client_from_config(config: dict[str, Any]) -> WeKnoraClient:
weknora = config["weknora"]
return WeKnoraClient(
base_url=require_config(config, "weknora.base_url"),
api_key=require_config(config, "weknora.api_key"),
knowledge_base_id=require_config(config, "weknora.knowledge_base_id"),
timeout_seconds=int(weknora.get("timeout_seconds", 300)),
request_interval_seconds=float(weknora.get("request_interval_seconds", 0.2)),
)
def bootstrap_client_from_config(config: dict[str, Any]) -> WeKnoraClient:
weknora = config["weknora"]
return WeKnoraClient(
base_url=require_config(config, "weknora.base_url"),
api_key=require_config(config, "weknora.api_key"),
knowledge_base_id=str(weknora.get("knowledge_base_id") or ""),
timeout_seconds=int(weknora.get("timeout_seconds", 300)),
request_interval_seconds=float(weknora.get("request_interval_seconds", 0.2)),
)
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Any
import yaml
from dotenv import load_dotenv
_ENV_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-([^}]*))?\}")
def _expand_env(value: Any) -> Any:
if isinstance(value, dict):
return {key: _expand_env(item) for key, item in value.items()}
if isinstance(value, list):
return [_expand_env(item) for item in value]
if not isinstance(value, str):
return value
def replace(match: re.Match[str]) -> str:
default = match.group(2) if match.group(2) is not None else ""
return os.getenv(match.group(1), default)
expanded = _ENV_PATTERN.sub(replace, value)
return _coerce_scalar(expanded)
def _coerce_scalar(value: str) -> Any:
lowered = value.lower()
if lowered in {"true", "false"}:
return lowered == "true"
if lowered in {"none", "null"}:
return None
try:
if "." not in value:
return int(value)
return float(value)
except ValueError:
return value
def load_config(path: str | Path = "configs/eval.yaml") -> dict[str, Any]:
load_dotenv()
config_path = Path(path)
with config_path.open("r", encoding="utf-8") as file:
raw = yaml.safe_load(file) or {}
return _expand_env(raw)
def require_config(config: dict[str, Any], dotted_key: str) -> Any:
current: Any = config
for part in dotted_key.split("."):
if not isinstance(current, dict) or part not in current:
raise ValueError(f"Missing required config value: {dotted_key}")
value = current[part]
if value is None or value == "":
raise ValueError(f"Missing required config value: {dotted_key}")
current = value
return current
def project_path(*parts: str) -> Path:
return Path.cwd().joinpath(*parts)
from __future__ import annotations
from pathlib import Path
def set_env_value(path: str | Path, key: str, value: str) -> None:
target = Path(path)
lines = target.read_text(encoding="utf-8").splitlines() if target.exists() else []
prefix = f"{key}="
replacement = f"{key}={value}"
updated = False
output: list[str] = []
for line in lines:
if line.startswith(prefix):
output.append(replacement)
updated = True
else:
output.append(line)
if not updated:
output.append(replacement)
target.write_text("\n".join(output) + "\n", encoding="utf-8")
from __future__ import annotations
import json
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Any
def setup_logging(level: int = logging.INFO) -> None:
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
def ensure_parent(path: str | Path) -> Path:
target = Path(path)
target.parent.mkdir(parents=True, exist_ok=True)
return target
def read_jsonl(path: str | Path, *, missing_ok: bool = False) -> list[dict[str, Any]]:
target = Path(path)
if not target.exists():
if missing_ok:
return []
raise FileNotFoundError(target)
rows: list[dict[str, Any]] = []
with target.open("r", encoding="utf-8") as file:
for line_no, line in enumerate(file, start=1):
stripped = line.strip()
if not stripped:
continue
try:
rows.append(json.loads(stripped))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSONL at {target}:{line_no}: {exc}") from exc
return rows
def iter_jsonl(path: str | Path, *, missing_ok: bool = False) -> Iterable[dict[str, Any]]:
target = Path(path)
if not target.exists():
if missing_ok:
return
raise FileNotFoundError(target)
with target.open("r", encoding="utf-8") as file:
for line_no, line in enumerate(file, start=1):
stripped = line.strip()
if not stripped:
continue
try:
yield json.loads(stripped)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSONL at {target}:{line_no}: {exc}") from exc
def write_jsonl(path: str | Path, rows: Iterable[dict[str, Any]]) -> int:
target = ensure_parent(path)
count = 0
with target.open("w", encoding="utf-8") as file:
for row in rows:
file.write(json.dumps(row, ensure_ascii=False) + "\n")
count += 1
return count
def append_jsonl(path: str | Path, row: dict[str, Any]) -> None:
target = ensure_parent(path)
with target.open("a", encoding="utf-8") as file:
file.write(json.dumps(row, ensure_ascii=False) + "\n")
def write_json(path: str | Path, payload: dict[str, Any]) -> None:
target = ensure_parent(path)
with target.open("w", encoding="utf-8") as file:
json.dump(payload, file, ensure_ascii=False, indent=2)
file.write("\n")
def compact_text(value: Any) -> str:
text = "" if value is None else str(value)
return "\n".join(line.strip() for line in text.splitlines() if line.strip()).strip()
"""Document parser adapters."""
from __future__ import annotations
import statistics
from pathlib import Path
from typing import Any
from openpyxl import load_workbook
from weknora_eval.loaders import compact_text, write_json, write_jsonl
from weknora_eval.schemas import ParsedDocument
def parse_raw_docs(config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
parsing = config["parsing"]
local_config = parsing.get("local", {})
min_chars = int(local_config.get("min_chars", 80))
pdf_backend = local_config.get("pdf_backend", "pypdf")
xlsx_mode = local_config.get("xlsx_mode", "row_text")
docs: list[ParsedDocument] = []
failures: list[dict[str, Any]] = []
for pdf_path in sorted(Path("data/raw_docs/pdf").glob("*.pdf")):
try:
docs.extend(parse_pdf(pdf_path, backend=pdf_backend, min_chars=min_chars))
except Exception as exc: # noqa: BLE001 - parser failures must be persisted.
failures.append(
{
"source_file": pdf_path.name,
"parser": f"local:{pdf_backend}",
"status": "failed",
"error": str(exc),
"fallback_used": None,
}
)
for xlsx_path in sorted(Path("data/raw_docs/xlsx").glob("*.xlsx")):
try:
docs.extend(parse_xlsx(xlsx_path, mode=xlsx_mode, min_chars=min_chars))
except Exception as exc: # noqa: BLE001
failures.append(
{
"source_file": xlsx_path.name,
"parser": "local:openpyxl",
"status": "failed",
"error": str(exc),
"fallback_used": None,
}
)
rows = [doc.to_dict() for doc in docs]
write_jsonl(parsing.get("output_path", "data/parsed_docs/documents.jsonl"), rows)
if failures:
write_jsonl(parsing.get("failed_path", "data/parsed_docs/failed_parse.jsonl"), failures)
summary = build_parse_summary(rows, failures, parser=f"local:{pdf_backend}")
write_json(parsing.get("summary_path", "data/parsed_docs/parse_summary.json"), summary)
return rows, summary
def parse_pdf(path: str | Path, *, backend: str = "pypdf", min_chars: int = 80) -> list[ParsedDocument]:
target = Path(path)
backend = backend.lower()
if backend == "pymupdf":
return _parse_pdf_pymupdf(target, min_chars=min_chars)
if backend == "pdfplumber":
return _parse_pdf_pdfplumber(target, min_chars=min_chars)
if backend == "pypdf":
return _parse_pdf_pypdf(target, min_chars=min_chars)
raise ValueError(f"Unsupported PDF backend: {backend}")
def _parse_pdf_pypdf(path: Path, *, min_chars: int) -> list[ParsedDocument]:
from pypdf import PdfReader
reader = PdfReader(str(path))
docs: list[ParsedDocument] = []
for index, page in enumerate(reader.pages, start=1):
content = compact_text(page.extract_text() or "")
if len(content) < min_chars:
continue
docs.append(_pdf_doc(path, index, content, "local:pypdf"))
return docs
def _parse_pdf_pymupdf(path: Path, *, min_chars: int) -> list[ParsedDocument]:
try:
import fitz
except ImportError as exc:
raise ImportError("pymupdf backend requires `pip install -e '.[pdf]'`") from exc
docs: list[ParsedDocument] = []
with fitz.open(path) as document:
for index, page in enumerate(document, start=1):
content = compact_text(page.get_text("text"))
if len(content) < min_chars:
continue
docs.append(_pdf_doc(path, index, content, "local:pymupdf"))
return docs
def _parse_pdf_pdfplumber(path: Path, *, min_chars: int) -> list[ParsedDocument]:
try:
import pdfplumber
except ImportError as exc:
raise ImportError("pdfplumber backend requires `pip install -e '.[pdf]'`") from exc
docs: list[ParsedDocument] = []
with pdfplumber.open(path) as pdf:
for index, page in enumerate(pdf.pages, start=1):
content = compact_text(page.extract_text() or "")
if len(content) < min_chars:
continue
docs.append(_pdf_doc(path, index, content, "local:pdfplumber"))
return docs
def _pdf_doc(path: Path, page: int, content: str, parser: str) -> ParsedDocument:
return ParsedDocument(
doc_id=f"{path.name}::page-{page}",
source_file=path.name,
file_type="pdf",
page=page,
content=content,
metadata={"parser": parser},
)
def parse_xlsx(path: str | Path, *, mode: str = "row_text", min_chars: int = 80) -> list[ParsedDocument]:
target = Path(path)
mode = mode.lower()
workbook = load_workbook(target, data_only=True, read_only=True)
if mode == "row_text":
return _parse_xlsx_row_text(target, workbook, min_chars=min_chars)
if mode == "markdown_table":
return _parse_xlsx_markdown_table(target, workbook, min_chars=min_chars)
raise ValueError(f"Unsupported XLSX mode: {mode}")
def _parse_xlsx_row_text(path: Path, workbook: Any, *, min_chars: int) -> list[ParsedDocument]:
docs: list[ParsedDocument] = []
for sheet in workbook.worksheets:
rows = list(sheet.iter_rows(values_only=True))
if not rows:
continue
headers = [_cell_to_text(value) or f"col_{index}" for index, value in enumerate(rows[0], start=1)]
for row_index, row in enumerate(rows[1:], start=2):
pairs = []
for header, value in zip(headers, row, strict=False):
cell = _cell_to_text(value)
if cell:
pairs.append(f"{header}: {cell}")
content = "\n".join(pairs).strip()
if len(content) < min_chars:
continue
docs.append(
ParsedDocument(
doc_id=f"{path.name}::{sheet.title}::row-{row_index}",
source_file=path.name,
file_type="xlsx",
sheet=sheet.title,
row_index=row_index,
content=content,
metadata={"parser": "local:openpyxl", "columns": headers},
)
)
return docs
def _parse_xlsx_markdown_table(path: Path, workbook: Any, *, min_chars: int) -> list[ParsedDocument]:
docs: list[ParsedDocument] = []
for sheet in workbook.worksheets:
rows = [
[_cell_to_text(value) for value in row]
for row in sheet.iter_rows(values_only=True)
if any(value is not None for value in row)
]
if not rows:
continue
width = max(len(row) for row in rows)
normalized = [row + [""] * (width - len(row)) for row in rows]
header = normalized[0]
separator = ["---"] * width
body = normalized[1:]
lines = [
"| " + " | ".join(header) + " |",
"| " + " | ".join(separator) + " |",
]
lines.extend("| " + " | ".join(row) + " |" for row in body)
content = "\n".join(lines)
if len(content) < min_chars:
continue
docs.append(
ParsedDocument(
doc_id=f"{path.name}::{sheet.title}",
source_file=path.name,
file_type="xlsx",
sheet=sheet.title,
content=content,
metadata={"parser": "local:openpyxl", "mode": "markdown_table"},
)
)
return docs
def _cell_to_text(value: Any) -> str:
if value is None:
return ""
text = str(value).strip()
return text.replace("\n", " ")
def build_parse_summary(
rows: list[dict[str, Any]],
failures: list[dict[str, Any]],
*,
parser: str,
) -> dict[str, Any]:
source_files = {row.get("source_file") for row in rows if row.get("source_file")}
failed_files = {row.get("source_file") for row in failures if row.get("source_file")}
lengths = [len(row.get("content") or "") for row in rows]
return {
"total_files": len(source_files | failed_files),
"parsed_files": len(source_files),
"failed_files": len(failed_files),
"total_documents": len(rows),
"empty_documents": sum(1 for length in lengths if length == 0),
"avg_chars": round(statistics.mean(lengths), 2) if lengths else 0,
"parser": parser,
}
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Any
import requests
from weknora_eval.loaders import compact_text, write_json, write_jsonl
from weknora_eval.parsers.local import build_parse_summary, parse_pdf
from weknora_eval.schemas import ParsedDocument
class MinerUParseError(RuntimeError):
pass
def parse_with_mineru(config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
parsing = config["parsing"]
mineru = parsing.get("mineru", {})
mode = mineru.get("mode", "cli")
fallback = bool(mineru.get("fallback_to_local", True))
local_config = parsing.get("local", {})
min_chars = int(local_config.get("min_chars", 80))
docs: list[ParsedDocument] = []
failures: list[dict[str, Any]] = []
for pdf_path in sorted(Path("data/raw_docs/pdf").glob("*.pdf")):
parser_name = f"mineru:{mode}"
try:
if mode == "cli":
docs.extend(parse_pdf_with_cli(pdf_path, mineru, min_chars=min_chars))
elif mode == "http":
docs.extend(parse_pdf_with_http(pdf_path, mineru, min_chars=min_chars))
else:
raise MinerUParseError(f"Unsupported MinerU mode: {mode}")
except Exception as exc: # noqa: BLE001
failure = {
"source_file": pdf_path.name,
"parser": parser_name,
"status": "failed",
"error": str(exc),
"fallback_used": None,
}
if fallback:
try:
backend = local_config.get("pdf_backend", "pypdf")
local_docs = parse_pdf(pdf_path, backend=backend, min_chars=min_chars)
docs.extend(local_docs)
failure["fallback_used"] = f"local:{backend}"
except Exception as fallback_exc: # noqa: BLE001
failure["fallback_error"] = str(fallback_exc)
failures.append(failure)
rows = [doc.to_dict() for doc in docs]
write_jsonl(parsing.get("output_path", "data/parsed_docs/documents.jsonl"), rows)
if failures:
write_jsonl(parsing.get("failed_path", "data/parsed_docs/failed_parse.jsonl"), failures)
summary = build_parse_summary(rows, failures, parser=f"mineru:{mode}")
write_json(parsing.get("summary_path", "data/parsed_docs/parse_summary.json"), summary)
return rows, summary
def parse_pdf_with_cli(
pdf_path: str | Path,
mineru_config: dict[str, Any],
*,
min_chars: int,
) -> list[ParsedDocument]:
target = Path(pdf_path)
output_root = Path(mineru_config.get("output_dir", "data/parsed_docs/mineru_raw"))
output_dir = output_root / target.stem
output_dir.mkdir(parents=True, exist_ok=True)
cli_bin = mineru_config.get("cli_bin", "mineru")
timeout = int(mineru_config.get("timeout_seconds", 600))
# MinerU CLI arguments vary by release. This common invocation is isolated
# here so deployments can replace it without touching pipeline scripts.
result = subprocess.run(
[cli_bin, "-p", str(target), "-o", str(output_dir)],
check=False,
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0:
raise MinerUParseError(result.stderr.strip() or result.stdout.strip() or "MinerU CLI failed")
markdown_files = sorted(output_dir.rglob("*.md"))
if not markdown_files:
raise MinerUParseError(f"No Markdown output found in {output_dir}")
docs: list[ParsedDocument] = []
for index, markdown_path in enumerate(markdown_files, start=1):
content = compact_text(markdown_path.read_text(encoding="utf-8"))
if len(content) < min_chars:
continue
docs.append(
ParsedDocument(
doc_id=f"{target.name}::mineru-{index}",
source_file=target.name,
file_type="pdf",
content=content,
metadata={
"parser": "mineru:cli",
"mineru_output": str(markdown_path),
},
)
)
return docs
def parse_pdf_with_http(
pdf_path: str | Path,
mineru_config: dict[str, Any],
*,
min_chars: int,
) -> list[ParsedDocument]:
target = Path(pdf_path)
base_url = str(mineru_config.get("http_base_url") or "").rstrip("/")
if not base_url:
raise MinerUParseError("MinerU HTTP mode requires parsing.mineru.http_base_url")
headers = {}
if mineru_config.get("api_key"):
headers["Authorization"] = f"Bearer {mineru_config['api_key']}"
# The checklist does not define a universal MinerU HTTP contract. This
# implementation expects a replaceable service exposing POST /parse and
# returning {"markdown": "..."} or {"documents": [{"content": "..."}]}.
with target.open("rb") as file:
response = requests.post(
f"{base_url}/parse",
files={"file": (target.name, file, "application/pdf")},
headers=headers,
timeout=int(mineru_config.get("timeout_seconds", 600)),
)
if response.status_code >= 400:
raise MinerUParseError(f"MinerU HTTP failed with {response.status_code}: {response.text[:500]}")
payload = response.json()
contents: list[str] = []
if isinstance(payload.get("documents"), list):
contents = [compact_text(item.get("content")) for item in payload["documents"]]
elif payload.get("markdown"):
contents = [compact_text(payload["markdown"])]
else:
raise MinerUParseError("MinerU HTTP response must include `markdown` or `documents`")
docs: list[ParsedDocument] = []
for index, content in enumerate(contents, start=1):
if len(content) < min_chars:
continue
docs.append(
ParsedDocument(
doc_id=f"{target.name}::mineru-http-{index}",
source_file=target.name,
file_type="pdf",
content=content,
metadata={"parser": "mineru:http"},
)
)
return docs
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
import pandas as pd
from weknora_eval.config import require_config
from weknora_eval.loaders import read_jsonl
def run_ragas_eval(
config: dict[str, Any],
*,
input_path: str = "data/runs/ragas_input.jsonl",
output_csv_path: str = "data/reports/ragas_scores.csv",
) -> pd.DataFrame:
from datasets import Dataset
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from ragas import evaluate
from ragas.run_config import RunConfig
ragas_config = config["ragas"]
llm_api_key = _first_non_empty(ragas_config, "llm_api_key", "api_key")
llm_base_url = _first_non_empty(ragas_config, "llm_base_url", "base_url")
embedding_api_key = _first_non_empty(ragas_config, "embedding_api_key", "api_key")
embedding_base_url = _first_non_empty(ragas_config, "embedding_base_url", "base_url")
judge_model = str(require_config(config, "ragas.judge_model"))
embedding_model = str(require_config(config, "ragas.embedding_model"))
temperature = float(ragas_config.get("temperature", 0))
max_tokens = int(ragas_config.get("max_tokens", 4096))
timeout_seconds = int(ragas_config.get("timeout_seconds", 600))
max_workers = int(ragas_config.get("max_workers", 1))
os.environ["OPENAI_API_KEY"] = llm_api_key
if llm_base_url:
os.environ["OPENAI_BASE_URL"] = llm_base_url
rows = read_jsonl(input_path)
dataset = Dataset.from_list(
[
{
"user_input": row["user_input"],
"response": row["response"],
"retrieved_contexts": row["retrieved_contexts"],
"reference": row["reference"],
"reference_contexts": row.get("reference_contexts") or [],
}
for row in rows
]
)
metric_map = _metric_map()
selected_metrics = [
metric_map[name]
for name in ragas_config.get("metrics", metric_map.keys())
if name in metric_map
]
llm = ChatOpenAI(
model=judge_model,
api_key=llm_api_key,
base_url=llm_base_url or None,
temperature=temperature,
max_tokens=max_tokens,
)
embeddings = OpenAIEmbeddings(
model=embedding_model,
api_key=embedding_api_key,
base_url=embedding_base_url or None,
tiktoken_enabled=False,
check_embedding_ctx_length=False,
)
ragas_llm, ragas_embeddings = _wrap_langchain_models(llm, embeddings)
run_config = RunConfig(timeout=timeout_seconds, max_workers=max_workers)
result = evaluate(
dataset,
metrics=selected_metrics,
llm=ragas_llm,
embeddings=ragas_embeddings,
run_config=run_config,
)
scores = result.to_pandas()
for index, row in enumerate(rows):
scores.loc[index, "sample_id"] = row.get("sample_id")
target = Path(output_csv_path)
target.parent.mkdir(parents=True, exist_ok=True)
scores.to_csv(target, index=False)
return scores
def _metric_map() -> dict[str, Any]:
try:
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
factual_correctness,
response_relevancy,
)
return {
"faithfulness": faithfulness,
"response_relevancy": response_relevancy,
"context_precision": context_precision,
"context_recall": context_recall,
"factual_correctness": factual_correctness,
}
except ImportError:
from ragas.metrics import (
Faithfulness,
FactualCorrectness,
LLMContextPrecisionWithReference,
LLMContextRecall,
ResponseRelevancy,
)
return {
"faithfulness": Faithfulness(),
"response_relevancy": ResponseRelevancy(),
"context_precision": LLMContextPrecisionWithReference(),
"context_recall": LLMContextRecall(),
"factual_correctness": FactualCorrectness(),
}
def _first_non_empty(config: dict[str, Any], *keys: str) -> str:
for key in keys:
value = config.get(key)
if value not in {None, ""}:
return str(value)
raise ValueError(f"Missing required Ragas config value. Checked: {', '.join(keys)}")
def _wrap_langchain_models(llm: Any, embeddings: Any) -> tuple[Any, Any]:
try:
from ragas.embeddings import LangchainEmbeddingsWrapper
from ragas.llms import LangchainLLMWrapper
except ImportError:
return llm, embeddings
return LangchainLLMWrapper(llm), LangchainEmbeddingsWrapper(embeddings)
from __future__ import annotations
import math
from pathlib import Path
from typing import Any
import pandas as pd
from weknora_eval.loaders import read_jsonl
def retrieval_metrics(
ragas_rows: list[dict[str, Any]],
*,
ks: tuple[int, ...] = (1, 3, 5),
) -> dict[str, float]:
samples = [row for row in ragas_rows if row.get("gold_chunk_ids")]
if not samples:
return {}
totals: dict[str, float] = {f"hit@{k}": 0.0 for k in ks}
totals.update({f"recall@{k}": 0.0 for k in ks})
totals["mrr"] = 0.0
totals["ndcg@5"] = 0.0
for row in samples:
gold = set(row.get("gold_chunk_ids") or [])
refs = row.get("weknora_references") or []
predicted = [str(ref.get("id")) for ref in refs if ref.get("id")]
for k in ks:
top_k = predicted[:k]
hits = len(gold.intersection(top_k))
totals[f"hit@{k}"] += 1.0 if hits else 0.0
totals[f"recall@{k}"] += hits / len(gold)
first_rank = next((idx for idx, chunk_id in enumerate(predicted, start=1) if chunk_id in gold), None)
if first_rank:
totals["mrr"] += 1 / first_rank
dcg = 0.0
for idx, chunk_id in enumerate(predicted[:5], start=1):
if chunk_id in gold:
dcg += 1 / math.log2(idx + 1)
ideal_hits = min(len(gold), 5)
idcg = sum(1 / math.log2(idx + 1) for idx in range(1, ideal_hits + 1))
totals["ndcg@5"] += dcg / idcg if idcg else 0.0
return {key: round(value / len(samples), 4) for key, value in totals.items()}
def generate_summary_report(
config: dict[str, Any],
*,
scores_csv_path: str = "data/reports/ragas_scores.csv",
ragas_input_path: str = "data/runs/ragas_input.jsonl",
answers_path: str = "data/runs/weknora_answers.jsonl",
output_path: str = "data/reports/summary.md",
) -> str:
ragas_rows = read_jsonl(ragas_input_path, missing_ok=True)
answer_rows = read_jsonl(answers_path, missing_ok=True)
scores = pd.read_csv(scores_csv_path) if Path(scores_csv_path).exists() else pd.DataFrame()
lines = [
"# Ragas 评估报告",
"",
"## 运行信息",
f"- WeKnora Base URL: {config.get('weknora', {}).get('base_url', '')}",
f"- 知识库 ID: {config.get('weknora', {}).get('knowledge_base_id', '')}",
f"- 测试集规模: {len(ragas_rows)}",
f"- 审核通过样本数: {len(ragas_rows)}",
f"- 失败样本数: {sum(1 for row in answer_rows if row.get('error'))}",
f"- Judge 模型: {config.get('ragas', {}).get('judge_model', '')}",
"",
"## 聚合指标",
"| 指标 | 平均值 | P50 | 失败阈值 |",
"| --- | --- | --- | --- |",
]
metric_columns = [
column
for column in scores.columns
if column not in {"sample_id", "user_input", "response", "reference"}
and pd.api.types.is_numeric_dtype(scores[column])
]
for column in metric_columns:
lines.append(
f"| {column} | {scores[column].mean():.4f} | {scores[column].median():.4f} | 0.50 |"
)
chunk_metrics = retrieval_metrics(ragas_rows)
if chunk_metrics:
lines.extend(["", "## Chunk ID 检索指标", "| 指标 | 平均值 |", "| --- | --- |"])
for key, value in chunk_metrics.items():
lines.append(f"| {key} | {value:.4f} |")
lines.extend(["", "## 检索失败样本", "| sample_id | 问题 | 预期文件 | 实际召回文件 | context_recall | 备注 |", "| --- | --- | --- | --- | --- | --- |"])
for row in _worst_rows(scores, "context_recall"):
sample = _sample_by_id(ragas_rows, row.get("sample_id"))
actual_files = sorted(
{
ref.get("knowledge_filename") or ""
for ref in sample.get("weknora_references", [])
if ref.get("knowledge_filename")
}
)
lines.append(
f"| {row.get('sample_id', '')} | {_cell(sample.get('user_input'))} | "
f"{_cell(sample.get('source_file'))} | {_cell(', '.join(actual_files))} | "
f"{_score(row.get('context_recall'))} | |"
)
lines.extend(["", "## 生成失败样本", "| sample_id | 问题 | 模型答案 | 标准答案 | faithfulness | factual_correctness |", "| --- | --- | --- | --- | --- | --- |"])
for row in _worst_rows(scores, "faithfulness"):
sample = _sample_by_id(ragas_rows, row.get("sample_id"))
lines.append(
f"| {row.get('sample_id', '')} | {_cell(sample.get('user_input'))} | "
f"{_cell(sample.get('response'))} | {_cell(sample.get('reference'))} | "
f"{_score(row.get('faithfulness'))} | {_score(row.get('factual_correctness'))} |"
)
empty_retrievals = sum(1 for row in ragas_rows if not row.get("retrieved_contexts"))
fallback_answers = sum(1 for row in answer_rows if row.get("is_fallback"))
source_counts: dict[str, int] = {}
for row in ragas_rows:
source = row.get("source_file") or "unknown"
source_counts[source] = source_counts.get(source, 0) + 1
lines.extend(
[
"",
"## 数据质量",
f"- 空检索数量: {empty_retrievals}",
f"- fallback 答案数量: {fallback_answers}",
f"- 来源文件分布: {source_counts}",
"",
"## 改进建议",
"- 优先检查 context_recall 低且 retrieved_contexts 为空的样本。",
"- 对低 faithfulness 且 context_recall 正常的样本,重点检查生成模型和提示词。",
"- 对 Chunk ID 指标低但 Ragas context 指标正常的样本,检查 chunk 切分或 gold_chunk_ids 标注。",
"",
]
)
content = "\n".join(lines)
target = Path(output_path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
return content
def _worst_rows(scores: pd.DataFrame, column: str, *, limit: int = 10) -> list[dict[str, Any]]:
if scores.empty or column not in scores.columns:
return []
return scores.sort_values(column, ascending=True).head(limit).to_dict(orient="records")
def _sample_by_id(rows: list[dict[str, Any]], sample_id: Any) -> dict[str, Any]:
return next((row for row in rows if row.get("sample_id") == sample_id), {})
def _cell(value: Any, *, max_len: int = 120) -> str:
text = "" if value is None else " ".join(str(value).split())
text = text.replace("|", "\\|")
if len(text) <= max_len:
return text
return text[:max_len].rstrip() + "..."
def _score(value: Any) -> str:
try:
if pd.isna(value):
return ""
return f"{float(value):.4f}"
except (TypeError, ValueError):
return ""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Any
@dataclass
class ParsedDocument:
doc_id: str
source_file: str
file_type: str
content: str
page: int | None = None
sheet: str | None = None
row_index: int | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass
class TestsetRecord:
sample_id: str
user_input: str
reference: str
reference_contexts: list[str]
source_file: str | None = None
gold_chunk_ids: list[str] = field(default_factory=list)
question_type: str = "single_hop"
review_status: str = "pending"
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass
class WeKnoraAnswer:
sample_id: str
user_input: str
response: str
retrieved_contexts: list[str]
weknora_references: list[dict[str, Any]]
session_id: str | None = None
request_id: str | None = None
error: str | None = None
def to_dict(self) -> dict[str, Any]:
return asdict(self)
from __future__ import annotations
import json
from collections.abc import Iterable, Iterator
from typing import Any
def parse_sse_events(lines: Iterable[str | bytes]) -> Iterator[dict[str, Any]]:
event_name = "message"
data_lines: list[str] = []
for raw_line in lines:
line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line
line = line.rstrip("\r\n")
if not line:
if data_lines:
yield _build_event(event_name, data_lines)
event_name = "message"
data_lines = []
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.removeprefix("event:").strip()
continue
if line.startswith("data:"):
data_lines.append(line.removeprefix("data:").strip())
if data_lines:
yield _build_event(event_name, data_lines)
def _build_event(event_name: str, data_lines: list[str]) -> dict[str, Any]:
raw_data = "\n".join(data_lines)
parsed_data: Any = raw_data
if raw_data and raw_data != "[DONE]":
try:
parsed_data = json.loads(raw_data)
except json.JSONDecodeError:
parsed_data = raw_data
return {"event": event_name, "data": parsed_data}
def normalize_reference(reference: dict[str, Any]) -> dict[str, Any]:
return {
"id": reference.get("id"),
"content": reference.get("content") or "",
"knowledge_id": reference.get("knowledge_id"),
"chunk_index": reference.get("chunk_index"),
"score": reference.get("score"),
"knowledge_filename": reference.get("knowledge_filename")
or reference.get("knowledge_title"),
"match_type": reference.get("match_type"),
"chunk_type": reference.get("chunk_type"),
}
from __future__ import annotations
from typing import Any
from weknora_eval.loaders import read_jsonl, write_jsonl
from weknora_eval.schemas import TestsetRecord
def generate_rule_based_testset(
*,
documents_path: str = "data/parsed_docs/documents.jsonl",
output_path: str = "data/testsets/testset.raw.jsonl",
size: int = 50,
min_context_chars: int = 80,
) -> list[dict[str, Any]]:
documents = [
row
for row in read_jsonl(documents_path)
if len(row.get("content") or "") >= min_context_chars
]
rows: list[dict[str, Any]] = []
for index, document in enumerate(documents[:size], start=1):
context = document["content"]
source_file = document.get("source_file")
question = _default_question(document)
reference = _reference_from_context(context)
rows.append(
TestsetRecord(
sample_id=f"qa-{index:04d}",
user_input=question,
reference=reference,
reference_contexts=[context],
source_file=source_file,
question_type="single_hop",
review_status="pending",
).to_dict()
)
write_jsonl(output_path, rows)
return rows
def approve_pending_testset(
*,
input_path: str = "data/testsets/testset.raw.jsonl",
output_path: str = "data/testsets/testset.reviewed.jsonl",
) -> list[dict[str, Any]]:
rows = read_jsonl(input_path)
reviewed: list[dict[str, Any]] = []
for row in rows:
row = dict(row)
if row.get("review_status") == "rejected":
continue
row["review_status"] = "approved"
reviewed.append(row)
write_jsonl(output_path, reviewed)
return reviewed
def validate_reviewed_testset(path: str = "data/testsets/testset.reviewed.jsonl") -> list[str]:
errors: list[str] = []
for index, row in enumerate(read_jsonl(path), start=1):
prefix = f"{path}:{index}"
if row.get("review_status") != "approved":
errors.append(f"{prefix} review_status must be approved")
for key in ("sample_id", "user_input", "reference"):
if not row.get(key):
errors.append(f"{prefix} missing {key}")
if not row.get("reference_contexts"):
errors.append(f"{prefix} reference_contexts must be non-empty")
return errors
def _default_question(document: dict[str, Any]) -> str:
source = document.get("source_file") or "该文档"
if document.get("file_type") == "xlsx" and document.get("sheet"):
return f"请根据 {source} 的 {document['sheet']} 中对应记录回答:这条记录的主要内容是什么?"
if document.get("page"):
return f"请根据 {source} 第 {document['page']} 页回答:该片段的主要内容是什么?"
return f"请根据 {source} 回答:该片段的主要内容是什么?"
def _reference_from_context(context: str, *, max_chars: int = 500) -> str:
text = " ".join(context.split())
if len(text) <= max_chars:
return text
return text[:max_chars].rstrip() + "..."