| """risk_subgraph — aggregated risk analysis with Send API parallelism. |
| |
| Topology: |
| |
| START |
| → basic_risk_dispatch (Send: per-doc basic risk) |
| → basic_risk / noop_basic |
| → domain_dispatch_node (Send: per-doc × per-applicable-check, ~30 parallel) |
| → apply_domain_check |
| → [if llm provided] llm_risk_dispatch (Send: per-doc LLM risk + 3-filter chain) |
| → llm_risk_per_doc / noop_llm |
| → plausibility_dispatch (Send: per-doc plausibility) |
| → plausibility / noop_plaus |
| → evidence_score_node (per-doc info) |
| → duplicate_detector_node (package-level, sync, ISA 240) |
| END |
| |
| If ``llm=None``, the LLM risk-analysis layer is skipped (Phase-4 backward |
| compatible). When ``llm`` is provided, the ``llm_risk_subgraph`` runs a 4-node |
| chain per-doc with Send fan-out: llm_risk → filter_llm_risks → |
| drop_business_normal → drop_repeats. The full anti-hallucination 5+1 layers. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from langgraph.graph import END, START, StateGraph |
| from langgraph.types import Send |
|
|
| from graph.states.pipeline_state import PipelineState, ProcessedDocument, Risk |
| from nodes.pipeline.duplicate_detector_node import duplicate_detector_node |
| from nodes.risk.basic_risk_node import basic_risk_node |
| from nodes.risk.domain_dispatch_node import ( |
| apply_domain_check_node, |
| domain_dispatch_node, |
| ) |
| from nodes.risk.evidence_score_node import evidence_score_node |
| from nodes.risk.plausibility_node import plausibility_node |
| from subgraphs.llm_risk_subgraph import build_llm_risk_subgraph |
|
|
|
|
| |
| |
| |
|
|
|
|
| def basic_risk_dispatch(state: PipelineState) -> list[Send]: |
| sends: list[Send] = [] |
| documents: list[ProcessedDocument] = state.get("documents") or [] |
| for doc in documents: |
| if doc.classification is None or doc.extracted is None: |
| continue |
| sends.append(Send("basic_risk", { |
| "doc_file_name": doc.ingested.file_name, |
| "doc_type": doc.classification.doc_type, |
| "extracted": doc.extracted.raw, |
| })) |
| return sends or [Send("noop_basic", {})] |
|
|
|
|
| def plausibility_dispatch(state: PipelineState) -> list[Send]: |
| sends: list[Send] = [] |
| documents: list[ProcessedDocument] = state.get("documents") or [] |
| for doc in documents: |
| if doc.classification is None or doc.extracted is None: |
| continue |
| sends.append(Send("plausibility", { |
| "doc_file_name": doc.ingested.file_name, |
| "extracted": doc.extracted.raw, |
| })) |
| return sends or [Send("noop_plaus", {})] |
|
|
|
|
| def llm_risk_dispatch(state: PipelineState) -> list[Send]: |
| """Per-doc Send to the ``llm_risk_per_doc`` node. |
| |
| We pass the per-doc-filtered basic + domain + plausibility risks so the |
| ``llm_risk_node`` can build the "ALREADY FOUND" block, and so |
| ``drop_repeats_node`` doesn't drop genuinely new observations. |
| |
| Filtering is by ``Risk.affected_document`` field. |
| """ |
| sends: list[Send] = [] |
| documents: list[ProcessedDocument] = state.get("documents") or [] |
| all_risks: list[Risk] = state.get("risks") or [] |
|
|
| for doc in documents: |
| if doc.classification is None or doc.extracted is None: |
| continue |
| file_name = doc.ingested.file_name |
| |
| |
| |
| per_doc_basic = [ |
| r for r in all_risks |
| if r.affected_document is None or r.affected_document == file_name |
| ] |
| sends.append(Send("llm_risk_per_doc", { |
| "doc_file_name": file_name, |
| "extracted": doc.extracted.raw, |
| "basic_risks": per_doc_basic, |
| })) |
| return sends or [Send("noop_llm", {})] |
|
|
|
|
| async def _noop_basic(state: dict) -> dict: |
| return {} |
|
|
|
|
| async def _noop_plaus(state: dict) -> dict: |
| return {} |
|
|
|
|
| async def _noop_llm(state: dict) -> dict: |
| return {} |
|
|
|
|
| |
| |
| |
|
|
|
|
| def build_risk_subgraph(llm=None): |
| """Compile the risk subgraph (operates on the parent PipelineState). |
| |
| Args: |
| llm: optional BaseChatModel-like Runnable. If None, the LLM |
| risk-analysis layer (assess_risks_llm + 3 filters) is SKIPPED; |
| only basic + domain + plausibility + evidence_score + |
| duplicate_detector run (Phase-4 backward-compatible mode). If |
| provided, the LLM layer runs after domain_dispatch. |
| """ |
| graph = StateGraph(PipelineState) |
|
|
| |
| graph.add_node("domain_dispatcher", _domain_dispatcher_passthrough) |
| graph.add_node("apply_domain_check", apply_domain_check_node) |
|
|
| |
| graph.add_node("basic_risk_dispatcher", _basic_dispatcher_passthrough) |
| graph.add_node("basic_risk", basic_risk_node) |
| graph.add_node("noop_basic", _noop_basic) |
|
|
| |
| graph.add_node("plausibility_dispatcher", _plaus_dispatcher_passthrough) |
| graph.add_node("plausibility", plausibility_node) |
| graph.add_node("noop_plaus", _noop_plaus) |
|
|
| |
| graph.add_node("evidence_score", evidence_score_node) |
|
|
| |
| graph.add_node("duplicate_detector", duplicate_detector_node) |
|
|
| |
| has_llm = llm is not None |
| if has_llm: |
| llm_risk_subgraph = build_llm_risk_subgraph(llm) |
|
|
| async def llm_risk_per_doc(state: dict) -> dict: |
| """Run the LLM risk subgraph on the parent Send payload. |
| |
| At the end of the subgraph the 3-filter result is in ``risks``; |
| it merges into the parent state's ``risks`` reducer. |
| """ |
| result = await llm_risk_subgraph.ainvoke(state) |
| risks = result.get("risks") or [] |
| return {"risks": risks} if risks else {} |
|
|
| graph.add_node("llm_risk_dispatcher", _llm_risk_dispatcher_passthrough) |
| graph.add_node("llm_risk_per_doc", llm_risk_per_doc) |
| graph.add_node("noop_llm", _noop_llm) |
|
|
| |
| graph.add_edge(START, "basic_risk_dispatcher") |
| graph.add_conditional_edges( |
| "basic_risk_dispatcher", |
| basic_risk_dispatch, |
| ["basic_risk", "noop_basic"], |
| ) |
|
|
| graph.add_edge("basic_risk", "domain_dispatcher") |
| graph.add_edge("noop_basic", "domain_dispatcher") |
|
|
| graph.add_conditional_edges( |
| "domain_dispatcher", |
| domain_dispatch_node, |
| ["apply_domain_check"], |
| ) |
|
|
| if has_llm: |
| |
| graph.add_edge("apply_domain_check", "llm_risk_dispatcher") |
| graph.add_conditional_edges( |
| "llm_risk_dispatcher", |
| llm_risk_dispatch, |
| ["llm_risk_per_doc", "noop_llm"], |
| ) |
| graph.add_edge("llm_risk_per_doc", "plausibility_dispatcher") |
| graph.add_edge("noop_llm", "plausibility_dispatcher") |
| else: |
| |
| graph.add_edge("apply_domain_check", "plausibility_dispatcher") |
|
|
| graph.add_conditional_edges( |
| "plausibility_dispatcher", |
| plausibility_dispatch, |
| ["plausibility", "noop_plaus"], |
| ) |
| graph.add_edge("plausibility", "evidence_score") |
| graph.add_edge("noop_plaus", "evidence_score") |
|
|
| graph.add_edge("evidence_score", "duplicate_detector") |
| graph.add_edge("duplicate_detector", END) |
|
|
| return graph.compile() |
|
|
|
|
| |
| async def _domain_dispatcher_passthrough(state: PipelineState) -> dict: |
| return {} |
|
|
|
|
| async def _basic_dispatcher_passthrough(state: PipelineState) -> dict: |
| return {} |
|
|
|
|
| async def _plaus_dispatcher_passthrough(state: PipelineState) -> dict: |
| return {} |
|
|
|
|
| async def _llm_risk_dispatcher_passthrough(state: PipelineState) -> dict: |
| return {} |
|
|