paperhawk / subgraphs /risk_subgraph.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
"""risk_subgraph — aggregated risk analysis with Send API parallelism.
Topology:
START
→ basic_risk_dispatch (Send: per-doc basic risk)
→ basic_risk / noop_basic
→ domain_dispatch_node (Send: per-doc × per-applicable-check, ~30 parallel)
→ apply_domain_check
→ [if llm provided] llm_risk_dispatch (Send: per-doc LLM risk + 3-filter chain)
→ llm_risk_per_doc / noop_llm
→ plausibility_dispatch (Send: per-doc plausibility)
→ plausibility / noop_plaus
→ evidence_score_node (per-doc info)
→ duplicate_detector_node (package-level, sync, ISA 240)
END
If ``llm=None``, the LLM risk-analysis layer is skipped (Phase-4 backward
compatible). When ``llm`` is provided, the ``llm_risk_subgraph`` runs a 4-node
chain per-doc with Send fan-out: llm_risk → filter_llm_risks →
drop_business_normal → drop_repeats. The full anti-hallucination 5+1 layers.
"""
from __future__ import annotations
from langgraph.graph import END, START, StateGraph
from langgraph.types import Send
from graph.states.pipeline_state import PipelineState, ProcessedDocument, Risk
from nodes.pipeline.duplicate_detector_node import duplicate_detector_node
from nodes.risk.basic_risk_node import basic_risk_node
from nodes.risk.domain_dispatch_node import (
apply_domain_check_node,
domain_dispatch_node,
)
from nodes.risk.evidence_score_node import evidence_score_node
from nodes.risk.plausibility_node import plausibility_node
from subgraphs.llm_risk_subgraph import build_llm_risk_subgraph
# ---------------------------------------------------------------------------
# Send dispatchers (basic + plausibility per-doc)
# ---------------------------------------------------------------------------
def basic_risk_dispatch(state: PipelineState) -> list[Send]:
sends: list[Send] = []
documents: list[ProcessedDocument] = state.get("documents") or []
for doc in documents:
if doc.classification is None or doc.extracted is None:
continue
sends.append(Send("basic_risk", {
"doc_file_name": doc.ingested.file_name,
"doc_type": doc.classification.doc_type,
"extracted": doc.extracted.raw,
}))
return sends or [Send("noop_basic", {})]
def plausibility_dispatch(state: PipelineState) -> list[Send]:
sends: list[Send] = []
documents: list[ProcessedDocument] = state.get("documents") or []
for doc in documents:
if doc.classification is None or doc.extracted is None:
continue
sends.append(Send("plausibility", {
"doc_file_name": doc.ingested.file_name,
"extracted": doc.extracted.raw,
}))
return sends or [Send("noop_plaus", {})]
def llm_risk_dispatch(state: PipelineState) -> list[Send]:
"""Per-doc Send to the ``llm_risk_per_doc`` node.
We pass the per-doc-filtered basic + domain + plausibility risks so the
``llm_risk_node`` can build the "ALREADY FOUND" block, and so
``drop_repeats_node`` doesn't drop genuinely new observations.
Filtering is by ``Risk.affected_document`` field.
"""
sends: list[Send] = []
documents: list[ProcessedDocument] = state.get("documents") or []
all_risks: list[Risk] = state.get("risks") or []
for doc in documents:
if doc.classification is None or doc.extracted is None:
continue
file_name = doc.ingested.file_name
# Filter risks for this doc by affected_document.
# We also include risks with affected_document=None (e.g. global
# duplicate detection) since they don't disturb per-doc context.
per_doc_basic = [
r for r in all_risks
if r.affected_document is None or r.affected_document == file_name
]
sends.append(Send("llm_risk_per_doc", {
"doc_file_name": file_name,
"extracted": doc.extracted.raw,
"basic_risks": per_doc_basic,
}))
return sends or [Send("noop_llm", {})]
async def _noop_basic(state: dict) -> dict:
return {}
async def _noop_plaus(state: dict) -> dict:
return {}
async def _noop_llm(state: dict) -> dict:
return {}
# ---------------------------------------------------------------------------
# Subgraph builder
# ---------------------------------------------------------------------------
def build_risk_subgraph(llm=None):
"""Compile the risk subgraph (operates on the parent PipelineState).
Args:
llm: optional BaseChatModel-like Runnable. If None, the LLM
risk-analysis layer (assess_risks_llm + 3 filters) is SKIPPED;
only basic + domain + plausibility + evidence_score +
duplicate_detector run (Phase-4 backward-compatible mode). If
provided, the LLM layer runs after domain_dispatch.
"""
graph = StateGraph(PipelineState)
# Domain-dispatch + apply (Send fan-out for 12 of the 14 checks)
graph.add_node("domain_dispatcher", _domain_dispatcher_passthrough)
graph.add_node("apply_domain_check", apply_domain_check_node)
# Basic risk (per-doc fan-out)
graph.add_node("basic_risk_dispatcher", _basic_dispatcher_passthrough)
graph.add_node("basic_risk", basic_risk_node)
graph.add_node("noop_basic", _noop_basic)
# Plausibility (per-doc fan-out)
graph.add_node("plausibility_dispatcher", _plaus_dispatcher_passthrough)
graph.add_node("plausibility", plausibility_node)
graph.add_node("noop_plaus", _noop_plaus)
# Per-doc info (evidence score)
graph.add_node("evidence_score", evidence_score_node)
# Package-level duplicate
graph.add_node("duplicate_detector", duplicate_detector_node)
# LLM risk subgraph (if llm provided) — Send fan-out per-doc chain
has_llm = llm is not None
if has_llm:
llm_risk_subgraph = build_llm_risk_subgraph(llm)
async def llm_risk_per_doc(state: dict) -> dict:
"""Run the LLM risk subgraph on the parent Send payload.
At the end of the subgraph the 3-filter result is in ``risks``;
it merges into the parent state's ``risks`` reducer.
"""
result = await llm_risk_subgraph.ainvoke(state)
risks = result.get("risks") or []
return {"risks": risks} if risks else {}
graph.add_node("llm_risk_dispatcher", _llm_risk_dispatcher_passthrough)
graph.add_node("llm_risk_per_doc", llm_risk_per_doc)
graph.add_node("noop_llm", _noop_llm)
# Edges: dispatchers → conditional Sends → join nodes
graph.add_edge(START, "basic_risk_dispatcher")
graph.add_conditional_edges(
"basic_risk_dispatcher",
basic_risk_dispatch,
["basic_risk", "noop_basic"],
)
graph.add_edge("basic_risk", "domain_dispatcher")
graph.add_edge("noop_basic", "domain_dispatcher")
graph.add_conditional_edges(
"domain_dispatcher",
domain_dispatch_node,
["apply_domain_check"],
)
if has_llm:
# apply_domain_check → llm_risk_dispatcher → llm_risk_per_doc → plausibility_dispatcher
graph.add_edge("apply_domain_check", "llm_risk_dispatcher")
graph.add_conditional_edges(
"llm_risk_dispatcher",
llm_risk_dispatch,
["llm_risk_per_doc", "noop_llm"],
)
graph.add_edge("llm_risk_per_doc", "plausibility_dispatcher")
graph.add_edge("noop_llm", "plausibility_dispatcher")
else:
# apply_domain_check → plausibility_dispatcher (skip LLM)
graph.add_edge("apply_domain_check", "plausibility_dispatcher")
graph.add_conditional_edges(
"plausibility_dispatcher",
plausibility_dispatch,
["plausibility", "noop_plaus"],
)
graph.add_edge("plausibility", "evidence_score")
graph.add_edge("noop_plaus", "evidence_score")
graph.add_edge("evidence_score", "duplicate_detector")
graph.add_edge("duplicate_detector", END)
return graph.compile()
# Passthrough nodes (combined with Send dispatchers for fan-out)
async def _domain_dispatcher_passthrough(state: PipelineState) -> dict:
return {}
async def _basic_dispatcher_passthrough(state: PipelineState) -> dict:
return {}
async def _plaus_dispatcher_passthrough(state: PipelineState) -> dict:
return {}
async def _llm_risk_dispatcher_passthrough(state: PipelineState) -> dict:
return {}