Commit 66bac6d5 66bac6d5362a372f5bcc3627eb1a33c147795bbc by 沈秋雨

Improve ingestion status diagnostics

1 parent 56b1b00a
1 from __future__ import annotations 1 from __future__ import annotations
2 2
3 from collections import Counter
3 import sys 4 import sys
4 5
5 import _bootstrap # noqa: F401 6 import _bootstrap # noqa: F401
...@@ -18,12 +19,37 @@ def main() -> int: ...@@ -18,12 +19,37 @@ def main() -> int:
18 result = client.wait_ingestion_completed(knowledge_ids=knowledge_ids) 19 result = client.wait_ingestion_completed(knowledge_ids=knowledge_ids)
19 knowledge = client.list_knowledge() 20 knowledge = client.list_knowledge()
20 write_jsonl("data/exported/knowledge.jsonl", knowledge) 21 write_jsonl("data/exported/knowledge.jsonl", knowledge)
22 target_knowledge = [
23 row for row in knowledge if not knowledge_ids or row.get("id") in knowledge_ids
24 ]
21 25
22 print( 26 print(
23 "Ingestion status: " 27 "Ingestion status: "
24 f"completed={len(result['completed'])} failed={len(result['failed'])} " 28 f"completed={len(result['completed'])} failed={len(result['failed'])} "
25 f"pending={len(result['pending'])}" 29 f"pending={len(result['pending'])}"
26 ) 30 )
31 print(
32 "Status distribution: "
33 f"parse_status={dict(Counter(str(row.get('parse_status')) for row in target_knowledge))} "
34 f"enable_status={dict(Counter(str(row.get('enable_status')) for row in target_knowledge))}"
35 )
36 if result["pending"]:
37 print("Pending samples:")
38 for row in result["pending"][:5]:
39 print(
40 "- "
41 f"id={row.get('id')} title={row.get('title') or row.get('file_name')} "
42 f"parse_status={row.get('parse_status')} enable_status={row.get('enable_status')} "
43 f"error={row.get('error_message') or ''}"
44 )
45 if result["failed"]:
46 print("Failed samples:")
47 for row in result["failed"][:10]:
48 print(
49 "- "
50 f"id={row.get('id')} title={row.get('title') or row.get('file_name')} "
51 f"error={row.get('error_message') or ''}"
52 )
27 return 1 if result["failed"] or result["pending"] else 0 53 return 1 if result["failed"] or result["pending"] else 0
28 54
29 55
......
...@@ -2,6 +2,7 @@ from __future__ import annotations ...@@ -2,6 +2,7 @@ from __future__ import annotations
2 2
3 import logging 3 import logging
4 import time 4 import time
5 from collections import Counter
5 from pathlib import Path 6 from pathlib import Path
6 from typing import Any 7 from typing import Any
7 from urllib.parse import urljoin 8 from urllib.parse import urljoin
...@@ -90,7 +91,7 @@ class WeKnoraClient: ...@@ -90,7 +91,7 @@ class WeKnoraClient:
90 completed = [ 91 completed = [
91 row 92 row
92 for row in rows 93 for row in rows
93 if row.get("parse_status") == "completed" and row.get("enable_status") == "enabled" 94 if self._is_ingestion_completed(row)
94 ] 95 ]
95 failed = [row for row in rows if row.get("parse_status") == "failed"] 96 failed = [row for row in rows if row.get("parse_status") == "failed"]
96 97
...@@ -100,7 +101,13 @@ class WeKnoraClient: ...@@ -100,7 +101,13 @@ class WeKnoraClient:
100 return {"completed": completed, "failed": [], "pending": []} 101 return {"completed": completed, "failed": [], "pending": []}
101 102
102 pending = [row for row in rows if row not in completed] 103 pending = [row for row in rows if row not in completed]
103 logger.info("Waiting for ingestion: completed=%s pending=%s", len(completed), len(pending)) 104 logger.info(
105 "Waiting for ingestion: completed=%s pending=%s parse_status=%s enable_status=%s",
106 len(completed),
107 len(pending),
108 dict(Counter(str(row.get("parse_status")) for row in rows)),
109 dict(Counter(str(row.get("enable_status")) for row in rows)),
110 )
104 time.sleep(poll_interval_seconds) 111 time.sleep(poll_interval_seconds)
105 112
106 rows = self.list_knowledge() 113 rows = self.list_knowledge()
...@@ -109,7 +116,7 @@ class WeKnoraClient: ...@@ -109,7 +116,7 @@ class WeKnoraClient:
109 completed = [ 116 completed = [
110 row 117 row
111 for row in rows 118 for row in rows
112 if row.get("parse_status") == "completed" and row.get("enable_status") == "enabled" 119 if self._is_ingestion_completed(row)
113 ] 120 ]
114 failed = [row for row in rows if row.get("parse_status") == "failed"] 121 failed = [row for row in rows if row.get("parse_status") == "failed"]
115 pending = [row for row in rows if row not in completed and row not in failed] 122 pending = [row for row in rows if row not in completed and row not in failed]
...@@ -118,6 +125,13 @@ class WeKnoraClient: ...@@ -118,6 +125,13 @@ class WeKnoraClient:
118 def list_chunks(self, knowledge_id: str, *, page_size: int = 100) -> list[dict[str, Any]]: 125 def list_chunks(self, knowledge_id: str, *, page_size: int = 100) -> list[dict[str, Any]]:
119 return self._paginate(f"chunks/{knowledge_id}", page_size=page_size) 126 return self._paginate(f"chunks/{knowledge_id}", page_size=page_size)
120 127
128 def _is_ingestion_completed(self, row: dict[str, Any]) -> bool:
129 parse_status = row.get("parse_status")
130 enable_status = row.get("enable_status")
131 parsed = parse_status in {"completed", "success", "done"} or parse_status in {2, "2"}
132 enabled = enable_status in {"enabled", "success", "done"} or enable_status in {1, 2, "1", "2"}
133 return parsed and enabled
134
121 def knowledge_chat_sse( 135 def knowledge_chat_sse(
122 self, 136 self,
123 *, 137 *,
......