02_wait_ingestion.py 2.04 KB
from __future__ import annotations

from collections import Counter
import sys

import _bootstrap  # noqa: F401

from weknora_eval.api import client_from_config
from weknora_eval.config import load_config
from weknora_eval.loaders import read_jsonl, setup_logging, write_jsonl


def main() -> int:
    setup_logging()
    config = load_config()
    client = client_from_config(config)
    uploads = read_jsonl("data/exported/knowledge_uploads.jsonl", missing_ok=True)
    knowledge_ids = {row["knowledge_id"] for row in uploads if row.get("knowledge_id")} or None
    result = client.wait_ingestion_completed(knowledge_ids=knowledge_ids)
    knowledge = client.list_knowledge()
    write_jsonl("data/exported/knowledge.jsonl", knowledge)
    target_knowledge = [
        row for row in knowledge if not knowledge_ids or row.get("id") in knowledge_ids
    ]

    print(
        "Ingestion status: "
        f"completed={len(result['completed'])} failed={len(result['failed'])} "
        f"pending={len(result['pending'])}"
    )
    print(
        "Status distribution: "
        f"parse_status={dict(Counter(str(row.get('parse_status')) for row in target_knowledge))} "
        f"enable_status={dict(Counter(str(row.get('enable_status')) for row in target_knowledge))}"
    )
    if result["pending"]:
        print("Pending samples:")
        for row in result["pending"][:5]:
            print(
                "- "
                f"id={row.get('id')} title={row.get('title') or row.get('file_name')} "
                f"parse_status={row.get('parse_status')} enable_status={row.get('enable_status')} "
                f"error={row.get('error_message') or ''}"
            )
    if result["failed"]:
        print("Failed samples:")
        for row in result["failed"][:10]:
            print(
                "- "
                f"id={row.get('id')} title={row.get('title') or row.get('file_name')} "
                f"error={row.get('error_message') or ''}"
            )
    return 1 if result["failed"] or result["pending"] else 0


if __name__ == "__main__":
    sys.exit(main())