File size: 2,451 Bytes
7ff7119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""rag_index_subgraph — egy doksi chunkokra darabolása + ChromaDB+BM25 indexelés.

A pipeline_graph `dispatch_rag_index` Send API-val fan-out-ol minden doksira.
Ez a subgraph minden doksira lefuttat:
  1. chunker_node:    full_text → chunkok természetes vágási ponttal
  2. embed_upsert_node: a chunkokat batch-ben embeddoljuk és HybridStore-ba tesszük

A HybridStore singleton (a pipeline_graph compile-időben kapja meg).
A subgraph a `state["store"]` kulcson keresztül éri el — closure pattern.
"""

from __future__ import annotations

from typing import TypedDict

from langgraph.graph import END, START, StateGraph

from graph.states.pipeline_state import IngestedDocument
from store import HybridStore, chunk_document


class RAGIndexState(TypedDict, total=False):
    """A rag_index subgraph belső state-je (nem a parent PipelineState)."""

    file_name: str
    ingested: IngestedDocument
    doc_type: str
    chunks: list[dict]
    chunks_indexed: int

    # Closure: a HybridStore instance — a build_rag_index_subgraph() build-időben kapja meg
    # és bezárja a node-okba.


def _make_chunker_node():
    async def chunker_node(state: RAGIndexState) -> dict:
        ing = state.get("ingested")
        if ing is None or not ing.full_text:
            return {"chunks": []}
        chunks = chunk_document(
            file_name=ing.file_name,
            full_text=ing.full_text,
            doc_type=state.get("doc_type", "egyeb"),
        )
        return {"chunks": chunks}

    return chunker_node


def _make_embed_upsert_node(store: HybridStore):
    """Closure-ban kapja meg a HybridStore-t a parent graph-ról."""

    async def embed_upsert_node(state: RAGIndexState) -> dict:
        chunks = state.get("chunks") or []
        if not chunks:
            return {"chunks_indexed": 0}
        n = await store.add_chunks(chunks)
        return {"chunks_indexed": n}

    return embed_upsert_node


def build_rag_index_subgraph(store: HybridStore):
    """Compile-olt subgraph egy doksi RAG-indexelésre.

    Args:
        store: a HybridStore singleton — a node-okba bezárva a closure-ön.
    """
    graph = StateGraph(RAGIndexState)
    graph.add_node("chunker", _make_chunker_node())
    graph.add_node("embed_upsert", _make_embed_upsert_node(store))
    graph.add_edge(START, "chunker")
    graph.add_edge("chunker", "embed_upsert")
    graph.add_edge("embed_upsert", END)
    return graph.compile()