File size: 2,598 Bytes
7ff7119 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | """domain_dispatch_node + apply_domain_check_node — 14 domain rules in parallel.
``domain_dispatch_node`` Send-fans-out the (doc, applicable_check) pairs.
``apply_domain_check_node`` runs a single check; the output flows through
the ``merge_risks`` reducer back into the global ``risks`` list.
Skipped checks (separate entry points):
* check_06_evidence_score — called directly after classification
* check_12_duplicate_invoice — package-level, separate node
"""
from __future__ import annotations
from langgraph.types import Send
from domain_checks import CHECK_REGISTRY, SKIP_FROM_DISPATCH, get_check
from graph.states.pipeline_state import PipelineState, ProcessedDocument
def domain_dispatch_node(state: PipelineState) -> list[Send]:
"""Fan-out: every (doc × applicable_check) gets its own Send.
HU-specific vs universal split is governed by the ``is_hu_specific`` flag.
Doc-type filter via ``applies_to``. For a 5-doc package, this typically
issues ~30 parallel Sends (~50-100ms total batch).
"""
sends: list[Send] = []
documents: list[ProcessedDocument] = state.get("documents") or []
for doc in documents:
if doc.classification is None or doc.extracted is None:
continue
doc_type = doc.classification.doc_type
is_hu = doc.classification.language.lower() in {"hu", "magyar", "hungarian"}
for check in CHECK_REGISTRY:
if check.check_id in SKIP_FROM_DISPATCH:
continue
if check.is_hu_specific and not is_hu:
continue
if "*" not in check.applies_to and doc_type not in check.applies_to:
continue
sends.append(Send("apply_domain_check", {
"check_id": check.check_id,
"extracted": doc.extracted.raw,
"doc_file_name": doc.ingested.file_name,
"doc_type": doc_type,
}))
return sends
async def apply_domain_check_node(state: dict) -> dict:
"""Run a single check (Send payload: check_id, extracted, doc_file_name)."""
check_id = state.get("check_id")
extracted = state.get("extracted") or {}
doc_file_name = state.get("doc_file_name", "")
if not check_id:
return {}
check = get_check(check_id)
if check is None:
return {}
risks = check.apply(extracted)
# The check usually fills affected_document, but we add a safety net:
for r in risks:
if r.affected_document is None:
r.affected_document = doc_file_name
return {"risks": risks} if risks else {}
|