paperhawk / graph /pipeline_graph.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
"""Top-level pipeline graph — a teljes ingest → classify → extract → RAG → risk → report flow.
A pipeline egy hibrid: per-doc Send-fan-out a négy szakaszban (ingest, classify,
extract, rag-index), majd fan-in (`merge_doc_results` reducer), majd
csomag-szintű compare + risk + report.
Topológia:
START
→ dispatch_ingest (Send: per-doc)
→ ingest_per_doc (subgraph hívás → ProcessedDocument shell)
→ dispatch_classify (Send: per-doc)
→ classify_node (Send-payload-ból futás)
→ dispatch_extract (Send: per-doc)
→ extract_per_doc (subgraph hívás)
→ dispatch_rag_index (Send: per-doc)
→ rag_index_per_doc (subgraph hívás, store closure)
→ quote_validator_node (anti-halluc 7. réteg)
→ compare_node (three-way matching, sync)
→ risk_subgraph (basic + domain × Send + plausibility + duplicate)
→ report_node (JSON struktúra)
END
"""
from __future__ import annotations
import time
from datetime import datetime
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from graph.states.doc_state import DocState
from graph.states.pipeline_state import (
Classification,
ExtractedData,
IngestedDocument,
PipelineState,
ProcessedDocument,
)
from nodes.extract.extract_node import build_extract_node
from nodes.extract.quote_validator_node import quote_validator_node
from nodes.pipeline.classify_node import build_classify_node
from nodes.pipeline.compare_node import compare_node
from nodes.pipeline.report_node import build_report_node
from store import HybridStore
from subgraphs.ingest_subgraph import build_ingest_subgraph
from subgraphs.rag_index_subgraph import build_rag_index_subgraph
from subgraphs.risk_subgraph import build_risk_subgraph
# ---------------------------------------------------------------------------
# Send dispatchers
# ---------------------------------------------------------------------------
def dispatch_ingest(state: PipelineState) -> list[Send]:
"""Fan-out: minden file egy DocState-tel az ingest_per_doc-ba."""
files = state.get("files") or []
if not files:
return [Send("noop_ingest", {})]
return [
Send("ingest_per_doc", {
"file_name": fn,
"file_bytes": fb,
"started_at": datetime.now(),
})
for fn, fb in files
]
def dispatch_classify(state: PipelineState) -> list[Send]:
documents: list[ProcessedDocument] = state.get("documents") or []
if not documents:
return [Send("noop_classify", {})]
return [
Send("classify_per_doc", {"ingested": d.ingested})
for d in documents
if d.ingested is not None
]
def dispatch_extract(state: PipelineState) -> list[Send]:
documents: list[ProcessedDocument] = state.get("documents") or []
sends = []
for d in documents:
if d.classification is None or d.ingested is None:
continue
sends.append(Send("extract_per_doc", {
"ingested": d.ingested,
"classification": d.classification,
}))
return sends or [Send("noop_extract", {})]
def _make_dispatch_rag_index(store: HybridStore):
def dispatch_rag_index(state: PipelineState) -> list[Send]:
documents: list[ProcessedDocument] = state.get("documents") or []
sends = []
for d in documents:
if d.ingested is None:
continue
doc_type = d.classification.doc_type if d.classification else "egyeb"
sends.append(Send("rag_index_per_doc", {
"ingested": d.ingested,
"doc_type": doc_type,
}))
return sends or [Send("noop_rag", {})]
return dispatch_rag_index
# ---------------------------------------------------------------------------
# Per-doc subgraph wrapper-ek (a parent state-be visszadnak)
# ---------------------------------------------------------------------------
def _make_ingest_per_doc():
ingest_subgraph = build_ingest_subgraph()
async def ingest_per_doc(state: DocState) -> dict:
result = await ingest_subgraph.ainvoke(state)
ingested = result.get("ingested")
if ingested is None:
return {}
# ProcessedDocument shell — a documents reducer file_name-en upsert
pd = ProcessedDocument(ingested=ingested)
return {"documents": [pd]}
return ingest_per_doc
def _make_classify_per_doc(llm=None):
classify_node = build_classify_node(llm=llm)
async def classify_per_doc(state: dict) -> dict:
return await classify_node(state)
return classify_per_doc
def _make_extract_per_doc(llm=None):
extract_node = build_extract_node(llm=llm)
async def extract_per_doc(state: dict) -> dict:
return await extract_node(state)
return extract_per_doc
def _make_rag_index_per_doc(store: HybridStore):
rag_subgraph = build_rag_index_subgraph(store)
async def rag_index_per_doc(state: dict) -> dict:
result = await rag_subgraph.ainvoke({
"ingested": state["ingested"],
"doc_type": state.get("doc_type", "egyeb"),
})
chunks_indexed = result.get("chunks_indexed", 0)
# A documents listához egy frissítést adunk a chunks_indexed mezővel
# → merge_doc_results reducer file_name-en upsert-eli
ing = state["ingested"]
pd = ProcessedDocument(ingested=ing, rag_chunks_indexed=chunks_indexed)
return {"documents": [pd]} if ing else {}
return rag_index_per_doc
async def _noop(state: dict) -> dict:
return {}
# ---------------------------------------------------------------------------
# Wall-clock timer (start + finish)
# ---------------------------------------------------------------------------
async def start_timer_node(state: PipelineState) -> dict:
return {
"started_at": datetime.now(),
"_internal_start": time.time(),
}
async def finish_timer_node(state: PipelineState) -> dict:
started = state.get("started_at")
elapsed = 0.0
if started is not None:
elapsed = (datetime.now() - started).total_seconds()
return {
"finished_at": datetime.now(),
"processing_seconds": round(elapsed, 3),
}
# ---------------------------------------------------------------------------
# Top-level builder
# ---------------------------------------------------------------------------
def build_pipeline_graph(store: HybridStore, *, llm=None, checkpointer=None):
"""Compile-olt pipeline_graph.
Args:
store: a HybridStore singleton (a per-doc rag_index_per_doc-ba bezárva)
llm: opcionális BaseChatModel-szerű Runnable. Ha adott, az LLM kockázat-
elemző réteg (assess_risks_llm + 3 szűrő) bekapcsolódik a
risk_subgraph-ba — a `prototype-agentic`-vel paritásos viselkedés
érdekében ezt MINDIG meg kell adni a UI-on (lásd app/main.py).
checkpointer: opcionális (SqliteSaver / InMemorySaver). None → no checkpoint.
"""
risk_subgraph = build_risk_subgraph(llm=llm)
graph = StateGraph(PipelineState)
# Belépés / timer
graph.add_node("start_timer", start_timer_node)
# Per-doc ingest fan-out
graph.add_node("ingest_per_doc", _make_ingest_per_doc())
graph.add_node("noop_ingest", _noop)
# Per-doc classify fan-out
graph.add_node("classify_per_doc", _make_classify_per_doc(llm=llm))
graph.add_node("noop_classify", _noop)
# Per-doc extract fan-out
graph.add_node("extract_per_doc", _make_extract_per_doc(llm=llm))
graph.add_node("noop_extract", _noop)
# Per-doc rag index fan-out
graph.add_node("rag_index_per_doc", _make_rag_index_per_doc(store))
graph.add_node("noop_rag", _noop)
# Quote validator (post-extract anti-halluc)
graph.add_node("quote_validator", quote_validator_node)
# Three-way compare
graph.add_node("compare", compare_node)
# Risk subgraph
graph.add_node("risk", risk_subgraph)
# Report (LLM exec summary-vel ha llm adott)
graph.add_node("report", build_report_node(llm=llm))
graph.add_node("finish_timer", finish_timer_node)
# ----- Edges -----
graph.add_edge(START, "start_timer")
graph.add_conditional_edges(
"start_timer",
dispatch_ingest,
["ingest_per_doc", "noop_ingest"],
)
# Ingest fan-in → classify dispatch
graph.add_node("ingest_join", _noop)
graph.add_edge("ingest_per_doc", "ingest_join")
graph.add_edge("noop_ingest", "ingest_join")
graph.add_conditional_edges(
"ingest_join",
dispatch_classify,
["classify_per_doc", "noop_classify"],
)
# Classify fan-in → extract dispatch
graph.add_node("classify_join", _noop)
graph.add_edge("classify_per_doc", "classify_join")
graph.add_edge("noop_classify", "classify_join")
graph.add_conditional_edges(
"classify_join",
dispatch_extract,
["extract_per_doc", "noop_extract"],
)
# Extract fan-in → quote_validator → rag_index dispatch
graph.add_node("extract_join", _noop)
graph.add_edge("extract_per_doc", "extract_join")
graph.add_edge("noop_extract", "extract_join")
graph.add_edge("extract_join", "quote_validator")
graph.add_conditional_edges(
"quote_validator",
_make_dispatch_rag_index(store),
["rag_index_per_doc", "noop_rag"],
)
# Rag fan-in → compare → risk → report → finish
graph.add_node("rag_join", _noop)
graph.add_edge("rag_index_per_doc", "rag_join")
graph.add_edge("noop_rag", "rag_join")
graph.add_edge("rag_join", "compare")
graph.add_edge("compare", "risk")
# FONTOS: a finish_timer a report ELŐTT fut, hogy a processing_seconds
# rendelkezésre álljon a teljesítmény-metrikákhoz
graph.add_edge("risk", "finish_timer")
graph.add_edge("finish_timer", "report")
graph.add_edge("report", END)
if checkpointer is not None:
return graph.compile(checkpointer=checkpointer)
return graph.compile()