File size: 1,394 Bytes
7ff7119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""DocState — Send API payload for processing a single document.

The ``dispatch_ingest`` function in pipeline_graph fans out over
``state["files"]``, sending a ``Send("ingest_doc", DocState(...))`` for each
file.

DocState is minimal — only what the per-doc subgraphs need. At the end, the
``_collect_doc`` node converts it back to a ProcessedDocument and merges it
into the parent state via the ``merge_doc_results`` reducer.

Inter-subgraph data flow:
  ingest_subgraph   → doc.ingested filled
  classify_node     → doc.classification filled
  extract_subgraph  → doc.extracted filled
  rag_index_subgr.  → doc.rag_chunks_indexed incremented
"""

from __future__ import annotations

from datetime import datetime
from typing import TypedDict

from graph.states.pipeline_state import (
    Classification,
    ExtractedData,
    IngestedDocument,
)


class DocState(TypedDict, total=False):
    """Per-document transient state under the Send API fan-out."""

    # Input (set by dispatch_ingest)
    file_name: str
    file_bytes: bytes
    started_at: datetime

    # Per-subgraph intermediate results (subgraph fills, parent collects)
    ingested: IngestedDocument | None
    classification: Classification | None
    extracted: ExtractedData | None
    rag_chunks_indexed: int

    # Errors (downstream nodes see this and either skip or convert to risk)
    error: str | None