mineru.py 8.05 KB
from __future__ import annotations

import subprocess
from pathlib import Path
from typing import Any

import requests

from weknora_eval.loaders import compact_text, write_json, write_jsonl
from weknora_eval.parsers.local import build_parse_summary, parse_pdf
from weknora_eval.raw_docs import iter_pdf_files
from weknora_eval.schemas import ParsedDocument


class MinerUParseError(RuntimeError):
    pass


def parse_with_mineru(config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    parsing = config["parsing"]
    mineru = parsing.get("mineru", {})
    mode = mineru.get("mode", "cli")
    fallback = bool(mineru.get("fallback_to_local", True))
    local_config = parsing.get("local", {})
    min_chars = int(local_config.get("min_chars", 80))

    docs: list[ParsedDocument] = []
    failures: list[dict[str, Any]] = []

    for pdf_path in iter_pdf_files():
        parser_name = f"mineru:{mode}"
        try:
            if mode == "cli":
                docs.extend(parse_pdf_with_cli(pdf_path, mineru, min_chars=min_chars))
            elif mode == "http":
                docs.extend(parse_pdf_with_http(pdf_path, mineru, min_chars=min_chars))
            else:
                raise MinerUParseError(f"Unsupported MinerU mode: {mode}")
        except Exception as exc:  # noqa: BLE001
            failure = {
                "source_file": pdf_path.name,
                "parser": parser_name,
                "status": "failed",
                "error": str(exc),
                "fallback_used": None,
            }
            if fallback:
                try:
                    backend = local_config.get("pdf_backend", "pypdf")
                    local_docs = parse_pdf(pdf_path, backend=backend, min_chars=min_chars)
                    docs.extend(local_docs)
                    failure["fallback_used"] = f"local:{backend}"
                except Exception as fallback_exc:  # noqa: BLE001
                    failure["fallback_error"] = str(fallback_exc)
            failures.append(failure)

    rows = [doc.to_dict() for doc in docs]
    write_jsonl(parsing.get("output_path", "data/parsed_docs/documents.jsonl"), rows)
    if failures:
        write_jsonl(parsing.get("failed_path", "data/parsed_docs/failed_parse.jsonl"), failures)

    summary = build_parse_summary(rows, failures, parser=f"mineru:{mode}")
    write_json(parsing.get("summary_path", "data/parsed_docs/parse_summary.json"), summary)
    return rows, summary


def parse_pdf_with_cli(
    pdf_path: str | Path,
    mineru_config: dict[str, Any],
    *,
    min_chars: int,
) -> list[ParsedDocument]:
    target = Path(pdf_path)
    output_root = Path(mineru_config.get("output_dir", "data/parsed_docs/mineru_raw"))
    output_dir = output_root / target.stem
    output_dir.mkdir(parents=True, exist_ok=True)
    cli_bin = mineru_config.get("cli_bin", "mineru")
    timeout = int(mineru_config.get("timeout_seconds", 600))

    # MinerU CLI arguments vary by release. This common invocation is isolated
    # here so deployments can replace it without touching pipeline scripts.
    result = subprocess.run(
        [cli_bin, "-p", str(target), "-o", str(output_dir)],
        check=False,
        capture_output=True,
        text=True,
        timeout=timeout,
    )
    if result.returncode != 0:
        raise MinerUParseError(result.stderr.strip() or result.stdout.strip() or "MinerU CLI failed")

    markdown_files = sorted(output_dir.rglob("*.md"))
    if not markdown_files:
        raise MinerUParseError(f"No Markdown output found in {output_dir}")

    docs: list[ParsedDocument] = []
    for index, markdown_path in enumerate(markdown_files, start=1):
        content = compact_text(markdown_path.read_text(encoding="utf-8"))
        if len(content) < min_chars:
            continue
        docs.append(
            ParsedDocument(
                doc_id=f"{target.name}::mineru-{index}",
                source_file=target.name,
                file_type="pdf",
                content=content,
                metadata={
                    "parser": "mineru:cli",
                    "mineru_output": str(markdown_path),
                },
            )
        )
    return docs


def parse_pdf_with_http(
    pdf_path: str | Path,
    mineru_config: dict[str, Any],
    *,
    min_chars: int,
) -> list[ParsedDocument]:
    target = Path(pdf_path)
    base_url = str(mineru_config.get("http_base_url") or "").rstrip("/")
    if not base_url:
        raise MinerUParseError("MinerU HTTP mode requires parsing.mineru.http_base_url")

    headers = {}
    if mineru_config.get("api_key"):
        headers["Authorization"] = f"Bearer {mineru_config['api_key']}"

    endpoint = str(mineru_config.get("http_parse_path") or "/file_parse")
    if not endpoint.startswith("/"):
        endpoint = "/" + endpoint
    form_fields = {
        str(key): str(value)
        for key, value in (mineru_config.get("http_form_fields") or {}).items()
        if value not in {None, ""}
    }

    with target.open("rb") as file:
        response = requests.post(
            f"{base_url}{endpoint}",
            files=[("files", (target.name, file, "application/pdf"))],
            data=form_fields,
            headers=headers,
            timeout=int(mineru_config.get("timeout_seconds", 600)),
        )
    if response.status_code >= 400:
        error_detail = _mineru_error_detail(response)
        raise MinerUParseError(f"MinerU HTTP failed with {response.status_code}: {error_detail}")

    payload = response.json()
    contents = extract_mineru_contents(payload)
    if not contents:
        raw_path = _write_unrecognized_mineru_payload(target, payload, mineru_config)
        raise MinerUParseError(
            "MinerU HTTP response did not include recognizable text content. "
            f"Saved raw response to {raw_path}"
        )

    docs: list[ParsedDocument] = []
    for index, content in enumerate(contents, start=1):
        if len(content) < min_chars:
            continue
        docs.append(
            ParsedDocument(
                doc_id=f"{target.name}::mineru-http-{index}",
                source_file=target.name,
                file_type="pdf",
                content=content,
                metadata={
                    "parser": "mineru:http",
                    "mineru_endpoint": endpoint,
                    "mineru_form_fields": form_fields,
                },
            )
        )
    return docs


def _mineru_error_detail(response: requests.Response) -> str:
    try:
        payload = response.json()
    except ValueError:
        return response.text[:1000]
    error = payload.get("error")
    if error:
        task_id = payload.get("task_id")
        status = payload.get("status")
        return f"task_id={task_id} status={status} error={error}"
    return response.text[:1000]


def extract_mineru_contents(payload: Any) -> list[str]:
    contents: list[str] = []
    _collect_text_values(payload, contents)
    return [content for content in dict.fromkeys(contents) if content]


def _collect_text_values(value: Any, contents: list[str]) -> None:
    if isinstance(value, str):
        text = compact_text(value)
        if len(text) >= 20:
            contents.append(text)
        return

    if isinstance(value, list):
        for item in value:
            _collect_text_values(item, contents)
        return

    if not isinstance(value, dict):
        return

    for key in (
        "markdown",
        "md",
        "content",
        "text",
        "plain_text",
        "page_content",
        "document",
    ):
        if key in value:
            _collect_text_values(value[key], contents)

    for key in ("documents", "pages", "chunks", "data", "result", "results"):
        if key in value:
            _collect_text_values(value[key], contents)


def _write_unrecognized_mineru_payload(
    pdf_path: Path,
    payload: dict[str, Any],
    mineru_config: dict[str, Any],
) -> Path:
    output_root = Path(mineru_config.get("output_dir", "data/parsed_docs/mineru_raw"))
    output_root.mkdir(parents=True, exist_ok=True)
    raw_path = output_root / f"{pdf_path.stem}.response.json"
    write_json(raw_path, payload)
    return raw_path