mineru.py
6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Any
import requests
from weknora_eval.loaders import compact_text, write_json, write_jsonl
from weknora_eval.parsers.local import build_parse_summary, parse_pdf
from weknora_eval.schemas import ParsedDocument
class MinerUParseError(RuntimeError):
pass
def parse_with_mineru(config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
parsing = config["parsing"]
mineru = parsing.get("mineru", {})
mode = mineru.get("mode", "cli")
fallback = bool(mineru.get("fallback_to_local", True))
local_config = parsing.get("local", {})
min_chars = int(local_config.get("min_chars", 80))
docs: list[ParsedDocument] = []
failures: list[dict[str, Any]] = []
for pdf_path in sorted(Path("data/raw_docs/pdf").glob("*.pdf")):
parser_name = f"mineru:{mode}"
try:
if mode == "cli":
docs.extend(parse_pdf_with_cli(pdf_path, mineru, min_chars=min_chars))
elif mode == "http":
docs.extend(parse_pdf_with_http(pdf_path, mineru, min_chars=min_chars))
else:
raise MinerUParseError(f"Unsupported MinerU mode: {mode}")
except Exception as exc: # noqa: BLE001
failure = {
"source_file": pdf_path.name,
"parser": parser_name,
"status": "failed",
"error": str(exc),
"fallback_used": None,
}
if fallback:
try:
backend = local_config.get("pdf_backend", "pypdf")
local_docs = parse_pdf(pdf_path, backend=backend, min_chars=min_chars)
docs.extend(local_docs)
failure["fallback_used"] = f"local:{backend}"
except Exception as fallback_exc: # noqa: BLE001
failure["fallback_error"] = str(fallback_exc)
failures.append(failure)
rows = [doc.to_dict() for doc in docs]
write_jsonl(parsing.get("output_path", "data/parsed_docs/documents.jsonl"), rows)
if failures:
write_jsonl(parsing.get("failed_path", "data/parsed_docs/failed_parse.jsonl"), failures)
summary = build_parse_summary(rows, failures, parser=f"mineru:{mode}")
write_json(parsing.get("summary_path", "data/parsed_docs/parse_summary.json"), summary)
return rows, summary
def parse_pdf_with_cli(
pdf_path: str | Path,
mineru_config: dict[str, Any],
*,
min_chars: int,
) -> list[ParsedDocument]:
target = Path(pdf_path)
output_root = Path(mineru_config.get("output_dir", "data/parsed_docs/mineru_raw"))
output_dir = output_root / target.stem
output_dir.mkdir(parents=True, exist_ok=True)
cli_bin = mineru_config.get("cli_bin", "mineru")
timeout = int(mineru_config.get("timeout_seconds", 600))
# MinerU CLI arguments vary by release. This common invocation is isolated
# here so deployments can replace it without touching pipeline scripts.
result = subprocess.run(
[cli_bin, "-p", str(target), "-o", str(output_dir)],
check=False,
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0:
raise MinerUParseError(result.stderr.strip() or result.stdout.strip() or "MinerU CLI failed")
markdown_files = sorted(output_dir.rglob("*.md"))
if not markdown_files:
raise MinerUParseError(f"No Markdown output found in {output_dir}")
docs: list[ParsedDocument] = []
for index, markdown_path in enumerate(markdown_files, start=1):
content = compact_text(markdown_path.read_text(encoding="utf-8"))
if len(content) < min_chars:
continue
docs.append(
ParsedDocument(
doc_id=f"{target.name}::mineru-{index}",
source_file=target.name,
file_type="pdf",
content=content,
metadata={
"parser": "mineru:cli",
"mineru_output": str(markdown_path),
},
)
)
return docs
def parse_pdf_with_http(
pdf_path: str | Path,
mineru_config: dict[str, Any],
*,
min_chars: int,
) -> list[ParsedDocument]:
target = Path(pdf_path)
base_url = str(mineru_config.get("http_base_url") or "").rstrip("/")
if not base_url:
raise MinerUParseError("MinerU HTTP mode requires parsing.mineru.http_base_url")
headers = {}
if mineru_config.get("api_key"):
headers["Authorization"] = f"Bearer {mineru_config['api_key']}"
# The checklist does not define a universal MinerU HTTP contract. This
# implementation expects a replaceable service exposing POST /parse and
# returning {"markdown": "..."} or {"documents": [{"content": "..."}]}.
with target.open("rb") as file:
response = requests.post(
f"{base_url}/parse",
files={"file": (target.name, file, "application/pdf")},
headers=headers,
timeout=int(mineru_config.get("timeout_seconds", 600)),
)
if response.status_code >= 400:
raise MinerUParseError(f"MinerU HTTP failed with {response.status_code}: {response.text[:500]}")
payload = response.json()
contents: list[str] = []
if isinstance(payload.get("documents"), list):
contents = [compact_text(item.get("content")) for item in payload["documents"]]
elif payload.get("markdown"):
contents = [compact_text(payload["markdown"])]
else:
raise MinerUParseError("MinerU HTTP response must include `markdown` or `documents`")
docs: list[ParsedDocument] = []
for index, content in enumerate(contents, start=1):
if len(content) < min_chars:
continue
docs.append(
ParsedDocument(
doc_id=f"{target.name}::mineru-http-{index}",
source_file=target.name,
file_type="pdf",
content=content,
metadata={"parser": "mineru:http"},
)
)
return docs