mineru.py 6.03 KB
from __future__ import annotations

import subprocess
from pathlib import Path
from typing import Any

import requests

from weknora_eval.loaders import compact_text, write_json, write_jsonl
from weknora_eval.parsers.local import build_parse_summary, parse_pdf
from weknora_eval.schemas import ParsedDocument


class MinerUParseError(RuntimeError):
    pass


def parse_with_mineru(config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    parsing = config["parsing"]
    mineru = parsing.get("mineru", {})
    mode = mineru.get("mode", "cli")
    fallback = bool(mineru.get("fallback_to_local", True))
    local_config = parsing.get("local", {})
    min_chars = int(local_config.get("min_chars", 80))

    docs: list[ParsedDocument] = []
    failures: list[dict[str, Any]] = []

    for pdf_path in sorted(Path("data/raw_docs/pdf").glob("*.pdf")):
        parser_name = f"mineru:{mode}"
        try:
            if mode == "cli":
                docs.extend(parse_pdf_with_cli(pdf_path, mineru, min_chars=min_chars))
            elif mode == "http":
                docs.extend(parse_pdf_with_http(pdf_path, mineru, min_chars=min_chars))
            else:
                raise MinerUParseError(f"Unsupported MinerU mode: {mode}")
        except Exception as exc:  # noqa: BLE001
            failure = {
                "source_file": pdf_path.name,
                "parser": parser_name,
                "status": "failed",
                "error": str(exc),
                "fallback_used": None,
            }
            if fallback:
                try:
                    backend = local_config.get("pdf_backend", "pypdf")
                    local_docs = parse_pdf(pdf_path, backend=backend, min_chars=min_chars)
                    docs.extend(local_docs)
                    failure["fallback_used"] = f"local:{backend}"
                except Exception as fallback_exc:  # noqa: BLE001
                    failure["fallback_error"] = str(fallback_exc)
            failures.append(failure)

    rows = [doc.to_dict() for doc in docs]
    write_jsonl(parsing.get("output_path", "data/parsed_docs/documents.jsonl"), rows)
    if failures:
        write_jsonl(parsing.get("failed_path", "data/parsed_docs/failed_parse.jsonl"), failures)

    summary = build_parse_summary(rows, failures, parser=f"mineru:{mode}")
    write_json(parsing.get("summary_path", "data/parsed_docs/parse_summary.json"), summary)
    return rows, summary


def parse_pdf_with_cli(
    pdf_path: str | Path,
    mineru_config: dict[str, Any],
    *,
    min_chars: int,
) -> list[ParsedDocument]:
    target = Path(pdf_path)
    output_root = Path(mineru_config.get("output_dir", "data/parsed_docs/mineru_raw"))
    output_dir = output_root / target.stem
    output_dir.mkdir(parents=True, exist_ok=True)
    cli_bin = mineru_config.get("cli_bin", "mineru")
    timeout = int(mineru_config.get("timeout_seconds", 600))

    # MinerU CLI arguments vary by release. This common invocation is isolated
    # here so deployments can replace it without touching pipeline scripts.
    result = subprocess.run(
        [cli_bin, "-p", str(target), "-o", str(output_dir)],
        check=False,
        capture_output=True,
        text=True,
        timeout=timeout,
    )
    if result.returncode != 0:
        raise MinerUParseError(result.stderr.strip() or result.stdout.strip() or "MinerU CLI failed")

    markdown_files = sorted(output_dir.rglob("*.md"))
    if not markdown_files:
        raise MinerUParseError(f"No Markdown output found in {output_dir}")

    docs: list[ParsedDocument] = []
    for index, markdown_path in enumerate(markdown_files, start=1):
        content = compact_text(markdown_path.read_text(encoding="utf-8"))
        if len(content) < min_chars:
            continue
        docs.append(
            ParsedDocument(
                doc_id=f"{target.name}::mineru-{index}",
                source_file=target.name,
                file_type="pdf",
                content=content,
                metadata={
                    "parser": "mineru:cli",
                    "mineru_output": str(markdown_path),
                },
            )
        )
    return docs


def parse_pdf_with_http(
    pdf_path: str | Path,
    mineru_config: dict[str, Any],
    *,
    min_chars: int,
) -> list[ParsedDocument]:
    target = Path(pdf_path)
    base_url = str(mineru_config.get("http_base_url") or "").rstrip("/")
    if not base_url:
        raise MinerUParseError("MinerU HTTP mode requires parsing.mineru.http_base_url")

    headers = {}
    if mineru_config.get("api_key"):
        headers["Authorization"] = f"Bearer {mineru_config['api_key']}"

    # The checklist does not define a universal MinerU HTTP contract. This
    # implementation expects a replaceable service exposing POST /parse and
    # returning {"markdown": "..."} or {"documents": [{"content": "..."}]}.
    with target.open("rb") as file:
        response = requests.post(
            f"{base_url}/parse",
            files={"file": (target.name, file, "application/pdf")},
            headers=headers,
            timeout=int(mineru_config.get("timeout_seconds", 600)),
        )
    if response.status_code >= 400:
        raise MinerUParseError(f"MinerU HTTP failed with {response.status_code}: {response.text[:500]}")

    payload = response.json()
    contents: list[str] = []
    if isinstance(payload.get("documents"), list):
        contents = [compact_text(item.get("content")) for item in payload["documents"]]
    elif payload.get("markdown"):
        contents = [compact_text(payload["markdown"])]
    else:
        raise MinerUParseError("MinerU HTTP response must include `markdown` or `documents`")

    docs: list[ParsedDocument] = []
    for index, content in enumerate(contents, start=1):
        if len(content) < min_chars:
            continue
        docs.append(
            ParsedDocument(
                doc_id=f"{target.name}::mineru-http-{index}",
                source_file=target.name,
                file_type="pdf",
                content=content,
                metadata={"parser": "mineru:http"},
            )
        )
    return docs