File size: 6,716 Bytes
7ff7119 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | """ingest_subgraph — a per-doc ingest egy compile-olt subgraph-ban.
A pipeline_graph `dispatch_ingest` a Send API-val fan-out-olja a fájlokat,
mindegyik egy DocState-tel megy be ide. A subgraph kimenete:
* doc.ingested kitöltve egy IngestedDocument-tel
* doc.error mezőbe kerül a hiba ha a betöltés elesik
* vissza a parent state-be a `documents` reducer-én át
Topológia:
format_router (suffix-alapú: pdf/docx/image/txt)
├→ pdf_loader_node (PyMuPDF + Tesseract + vision-fallback)
├→ docx_loader_node
├→ image_loader_node (vision-first)
└→ txt_loader_node
↓
ingested_collector (DocState → ProcessedDocument shell)
↓
END
Async-first: minden node `async def`. A blocking PyMuPDF/python-docx/Pillow
hívásokat `asyncio.to_thread()` wrapper-rel csomagoljuk, hogy a párhuzamos
fan-out tényleg gyorsuljon.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
from pathlib import Path
from langgraph.graph import END, START, StateGraph
from graph.states.doc_state import DocState
from graph.states.pipeline_state import ProcessedDocument
from ingest.docx_loader import load_docx
from ingest.image_loader import load_image
from ingest.pdf_loader import load_pdf
from ingest.txt_loader import load_txt
# ---------------------------------------------------------------------------
# Format router — döntés melyik loader fut le
# ---------------------------------------------------------------------------
def _format_route(state: DocState) -> str:
"""A file_name suffix alapján melyik loader node-ra megy."""
name = state.get("file_name", "").lower()
suffix = Path(name).suffix.lstrip(".")
if suffix == "pdf":
return "pdf_loader"
if suffix == "docx":
return "docx_loader"
if suffix in {"png", "jpg", "jpeg"}:
return "image_loader"
if suffix == "txt":
return "txt_loader"
# Ismeretlen: txt-ként próbáljuk (best-effort)
return "txt_loader"
# ---------------------------------------------------------------------------
# Loader node-ok (async wrapper a blocking lib-eken)
# ---------------------------------------------------------------------------
async def pdf_loader_node(state: DocState) -> dict:
"""PDF betöltése — 3-szintű fallback (PyMuPDF + Tesseract + vision)."""
try:
ingested = await asyncio.to_thread(
load_pdf, state["file_name"], state["file_bytes"]
)
return {"ingested": ingested, "error": None}
except Exception as e:
return {"ingested": None, "error": f"PDF betöltés hiba: {e}"}
async def docx_loader_node(state: DocState) -> dict:
try:
ingested = await asyncio.to_thread(
load_docx, state["file_name"], state["file_bytes"]
)
return {"ingested": ingested, "error": None}
except Exception as e:
return {"ingested": None, "error": f"DOCX betöltés hiba: {e}"}
async def image_loader_node(state: DocState) -> dict:
"""PNG/JPG -- vision-first, async wrapper."""
try:
suffix = Path(state["file_name"]).suffix.lstrip(".").lower() or "png"
ingested = await asyncio.to_thread(
load_image, state["file_name"], state["file_bytes"], suffix
)
return {"ingested": ingested, "error": None}
except Exception as e:
return {"ingested": None, "error": f"Kép betöltés hiba: {e}"}
async def txt_loader_node(state: DocState) -> dict:
try:
ingested = await asyncio.to_thread(
load_txt, state["file_name"], state["file_bytes"]
)
return {"ingested": ingested, "error": None}
except Exception as e:
return {"ingested": None, "error": f"TXT betöltés hiba: {e}"}
async def ingested_collector_node(state: DocState) -> dict:
"""A subgraph utolsó node-ja — egységesíti a kimenetet a parent state-be.
Ha a betöltés sikeres, kész a `ProcessedDocument(ingested=...)`. Ha nem,
a downstream classify/extract subgraph-ok a `state["error"]` mezőre
figyelnek és skip-elik a doksit.
"""
# Itt nem kell semmit csinálni -- a parent reducer a következő lépésnél
# (classify_node) a documents listába rakja a ProcessedDocument-et.
# Ez a node helyfoglaló a tracing-hez (egy fix vég-pont a subgraph-ban).
return {}
# ---------------------------------------------------------------------------
# Subgraph builder
# ---------------------------------------------------------------------------
def build_ingest_subgraph():
"""Compile-olt subgraph egyetlen doksi ingest-jére.
Bemenet: DocState (file_name + file_bytes + started_at).
Kimenet: DocState (ingested kitöltve, vagy error).
A subgraph önállóan invoke-olható (`compiled.invoke({...})`) — ez segít
az integration teszteknél.
"""
graph = StateGraph(DocState)
graph.add_node("pdf_loader", pdf_loader_node)
graph.add_node("docx_loader", docx_loader_node)
graph.add_node("image_loader", image_loader_node)
graph.add_node("txt_loader", txt_loader_node)
graph.add_node("ingested_collector", ingested_collector_node)
# Conditional edge a START-tól -- a suffix alapján melyik loader fut
graph.add_conditional_edges(
START,
_format_route,
{
"pdf_loader": "pdf_loader",
"docx_loader": "docx_loader",
"image_loader": "image_loader",
"txt_loader": "txt_loader",
},
)
# Mindegyik loader → ingested_collector → END
for loader in ("pdf_loader", "docx_loader", "image_loader", "txt_loader"):
graph.add_edge(loader, "ingested_collector")
graph.add_edge("ingested_collector", END)
return graph.compile()
# ---------------------------------------------------------------------------
# Pipeline-szintű convenience wrapper a per-doc subgraph hívására
# ---------------------------------------------------------------------------
async def ingest_one_doc(file_name: str, file_bytes: bytes) -> ProcessedDocument | None:
"""Egy doksit lefuttat az ingest_subgraph-on át, ProcessedDocument shell-t ad vissza.
Ha a betöltés elesik, None-t ad vissza (downstream skip + risk log).
Az integration teszteknél hasznos: a teljes subgraph end-to-end tesztelhető
LLM nélkül.
"""
graph = build_ingest_subgraph()
result = await graph.ainvoke({
"file_name": file_name,
"file_bytes": file_bytes,
"started_at": datetime.now(),
})
ingested = result.get("ingested")
if ingested is None:
return None
return ProcessedDocument(ingested=ingested)
|