| """domain_dispatch_node + apply_domain_check_node — 14 domain rules in parallel. |
| |
| ``domain_dispatch_node`` Send-fans-out the (doc, applicable_check) pairs. |
| ``apply_domain_check_node`` runs a single check; the output flows through |
| the ``merge_risks`` reducer back into the global ``risks`` list. |
| |
| Skipped checks (separate entry points): |
| * check_06_evidence_score — called directly after classification |
| * check_12_duplicate_invoice — package-level, separate node |
| """ |
|
|
| from __future__ import annotations |
|
|
| from langgraph.types import Send |
|
|
| from domain_checks import CHECK_REGISTRY, SKIP_FROM_DISPATCH, get_check |
| from graph.states.pipeline_state import PipelineState, ProcessedDocument |
|
|
|
|
| def domain_dispatch_node(state: PipelineState) -> list[Send]: |
| """Fan-out: every (doc × applicable_check) gets its own Send. |
| |
| HU-specific vs universal split is governed by the ``is_hu_specific`` flag. |
| Doc-type filter via ``applies_to``. For a 5-doc package, this typically |
| issues ~30 parallel Sends (~50-100ms total batch). |
| """ |
| sends: list[Send] = [] |
| documents: list[ProcessedDocument] = state.get("documents") or [] |
| for doc in documents: |
| if doc.classification is None or doc.extracted is None: |
| continue |
| doc_type = doc.classification.doc_type |
| is_hu = doc.classification.language.lower() in {"hu", "magyar", "hungarian"} |
|
|
| for check in CHECK_REGISTRY: |
| if check.check_id in SKIP_FROM_DISPATCH: |
| continue |
| if check.is_hu_specific and not is_hu: |
| continue |
| if "*" not in check.applies_to and doc_type not in check.applies_to: |
| continue |
| sends.append(Send("apply_domain_check", { |
| "check_id": check.check_id, |
| "extracted": doc.extracted.raw, |
| "doc_file_name": doc.ingested.file_name, |
| "doc_type": doc_type, |
| })) |
| return sends |
|
|
|
|
| async def apply_domain_check_node(state: dict) -> dict: |
| """Run a single check (Send payload: check_id, extracted, doc_file_name).""" |
| check_id = state.get("check_id") |
| extracted = state.get("extracted") or {} |
| doc_file_name = state.get("doc_file_name", "") |
| if not check_id: |
| return {} |
| check = get_check(check_id) |
| if check is None: |
| return {} |
| risks = check.apply(extracted) |
| |
| for r in risks: |
| if r.affected_document is None: |
| r.affected_document = doc_file_name |
| return {"risks": risks} if risks else {} |
|
|