local.py 8.29 KB
from __future__ import annotations

import statistics
from pathlib import Path
from typing import Any

from openpyxl import load_workbook

from weknora_eval.loaders import compact_text, write_json, write_jsonl
from weknora_eval.schemas import ParsedDocument


def parse_raw_docs(config: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    parsing = config["parsing"]
    local_config = parsing.get("local", {})
    min_chars = int(local_config.get("min_chars", 80))
    pdf_backend = local_config.get("pdf_backend", "pypdf")
    xlsx_mode = local_config.get("xlsx_mode", "row_text")

    docs: list[ParsedDocument] = []
    failures: list[dict[str, Any]] = []

    for pdf_path in sorted(Path("data/raw_docs/pdf").glob("*.pdf")):
        try:
            docs.extend(parse_pdf(pdf_path, backend=pdf_backend, min_chars=min_chars))
        except Exception as exc:  # noqa: BLE001 - parser failures must be persisted.
            failures.append(
                {
                    "source_file": pdf_path.name,
                    "parser": f"local:{pdf_backend}",
                    "status": "failed",
                    "error": str(exc),
                    "fallback_used": None,
                }
            )

    for xlsx_path in sorted(Path("data/raw_docs/xlsx").glob("*.xlsx")):
        try:
            docs.extend(parse_xlsx(xlsx_path, mode=xlsx_mode, min_chars=min_chars))
        except Exception as exc:  # noqa: BLE001
            failures.append(
                {
                    "source_file": xlsx_path.name,
                    "parser": "local:openpyxl",
                    "status": "failed",
                    "error": str(exc),
                    "fallback_used": None,
                }
            )

    rows = [doc.to_dict() for doc in docs]
    write_jsonl(parsing.get("output_path", "data/parsed_docs/documents.jsonl"), rows)
    if failures:
        write_jsonl(parsing.get("failed_path", "data/parsed_docs/failed_parse.jsonl"), failures)

    summary = build_parse_summary(rows, failures, parser=f"local:{pdf_backend}")
    write_json(parsing.get("summary_path", "data/parsed_docs/parse_summary.json"), summary)
    return rows, summary


def parse_pdf(path: str | Path, *, backend: str = "pypdf", min_chars: int = 80) -> list[ParsedDocument]:
    target = Path(path)
    backend = backend.lower()
    if backend == "pymupdf":
        return _parse_pdf_pymupdf(target, min_chars=min_chars)
    if backend == "pdfplumber":
        return _parse_pdf_pdfplumber(target, min_chars=min_chars)
    if backend == "pypdf":
        return _parse_pdf_pypdf(target, min_chars=min_chars)
    raise ValueError(f"Unsupported PDF backend: {backend}")


def _parse_pdf_pypdf(path: Path, *, min_chars: int) -> list[ParsedDocument]:
    from pypdf import PdfReader

    reader = PdfReader(str(path))
    docs: list[ParsedDocument] = []
    for index, page in enumerate(reader.pages, start=1):
        content = compact_text(page.extract_text() or "")
        if len(content) < min_chars:
            continue
        docs.append(_pdf_doc(path, index, content, "local:pypdf"))
    return docs


def _parse_pdf_pymupdf(path: Path, *, min_chars: int) -> list[ParsedDocument]:
    try:
        import fitz
    except ImportError as exc:
        raise ImportError("pymupdf backend requires `pip install -e '.[pdf]'`") from exc

    docs: list[ParsedDocument] = []
    with fitz.open(path) as document:
        for index, page in enumerate(document, start=1):
            content = compact_text(page.get_text("text"))
            if len(content) < min_chars:
                continue
            docs.append(_pdf_doc(path, index, content, "local:pymupdf"))
    return docs


def _parse_pdf_pdfplumber(path: Path, *, min_chars: int) -> list[ParsedDocument]:
    try:
        import pdfplumber
    except ImportError as exc:
        raise ImportError("pdfplumber backend requires `pip install -e '.[pdf]'`") from exc

    docs: list[ParsedDocument] = []
    with pdfplumber.open(path) as pdf:
        for index, page in enumerate(pdf.pages, start=1):
            content = compact_text(page.extract_text() or "")
            if len(content) < min_chars:
                continue
            docs.append(_pdf_doc(path, index, content, "local:pdfplumber"))
    return docs


def _pdf_doc(path: Path, page: int, content: str, parser: str) -> ParsedDocument:
    return ParsedDocument(
        doc_id=f"{path.name}::page-{page}",
        source_file=path.name,
        file_type="pdf",
        page=page,
        content=content,
        metadata={"parser": parser},
    )


def parse_xlsx(path: str | Path, *, mode: str = "row_text", min_chars: int = 80) -> list[ParsedDocument]:
    target = Path(path)
    mode = mode.lower()
    workbook = load_workbook(target, data_only=True, read_only=True)
    if mode == "row_text":
        return _parse_xlsx_row_text(target, workbook, min_chars=min_chars)
    if mode == "markdown_table":
        return _parse_xlsx_markdown_table(target, workbook, min_chars=min_chars)
    raise ValueError(f"Unsupported XLSX mode: {mode}")


def _parse_xlsx_row_text(path: Path, workbook: Any, *, min_chars: int) -> list[ParsedDocument]:
    docs: list[ParsedDocument] = []
    for sheet in workbook.worksheets:
        rows = list(sheet.iter_rows(values_only=True))
        if not rows:
            continue
        headers = [_cell_to_text(value) or f"col_{index}" for index, value in enumerate(rows[0], start=1)]
        for row_index, row in enumerate(rows[1:], start=2):
            pairs = []
            for header, value in zip(headers, row, strict=False):
                cell = _cell_to_text(value)
                if cell:
                    pairs.append(f"{header}: {cell}")
            content = "\n".join(pairs).strip()
            if len(content) < min_chars:
                continue
            docs.append(
                ParsedDocument(
                    doc_id=f"{path.name}::{sheet.title}::row-{row_index}",
                    source_file=path.name,
                    file_type="xlsx",
                    sheet=sheet.title,
                    row_index=row_index,
                    content=content,
                    metadata={"parser": "local:openpyxl", "columns": headers},
                )
            )
    return docs


def _parse_xlsx_markdown_table(path: Path, workbook: Any, *, min_chars: int) -> list[ParsedDocument]:
    docs: list[ParsedDocument] = []
    for sheet in workbook.worksheets:
        rows = [
            [_cell_to_text(value) for value in row]
            for row in sheet.iter_rows(values_only=True)
            if any(value is not None for value in row)
        ]
        if not rows:
            continue
        width = max(len(row) for row in rows)
        normalized = [row + [""] * (width - len(row)) for row in rows]
        header = normalized[0]
        separator = ["---"] * width
        body = normalized[1:]
        lines = [
            "| " + " | ".join(header) + " |",
            "| " + " | ".join(separator) + " |",
        ]
        lines.extend("| " + " | ".join(row) + " |" for row in body)
        content = "\n".join(lines)
        if len(content) < min_chars:
            continue
        docs.append(
            ParsedDocument(
                doc_id=f"{path.name}::{sheet.title}",
                source_file=path.name,
                file_type="xlsx",
                sheet=sheet.title,
                content=content,
                metadata={"parser": "local:openpyxl", "mode": "markdown_table"},
            )
        )
    return docs


def _cell_to_text(value: Any) -> str:
    if value is None:
        return ""
    text = str(value).strip()
    return text.replace("\n", " ")


def build_parse_summary(
    rows: list[dict[str, Any]],
    failures: list[dict[str, Any]],
    *,
    parser: str,
) -> dict[str, Any]:
    source_files = {row.get("source_file") for row in rows if row.get("source_file")}
    failed_files = {row.get("source_file") for row in failures if row.get("source_file")}
    lengths = [len(row.get("content") or "") for row in rows]
    return {
        "total_files": len(source_files | failed_files),
        "parsed_files": len(source_files),
        "failed_files": len(failed_files),
        "total_documents": len(rows),
        "empty_documents": sum(1 for length in lengths if length == 0),
        "avg_chars": round(statistics.mean(lengths), 2) if lengths else 0,
        "parser": parser,
    }