| """rag_index_subgraph — egy doksi chunkokra darabolása + ChromaDB+BM25 indexelés. |
| |
| A pipeline_graph `dispatch_rag_index` Send API-val fan-out-ol minden doksira. |
| Ez a subgraph minden doksira lefuttat: |
| 1. chunker_node: full_text → chunkok természetes vágási ponttal |
| 2. embed_upsert_node: a chunkokat batch-ben embeddoljuk és HybridStore-ba tesszük |
| |
| A HybridStore singleton (a pipeline_graph compile-időben kapja meg). |
| A subgraph a `state["store"]` kulcson keresztül éri el — closure pattern. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import TypedDict |
|
|
| from langgraph.graph import END, START, StateGraph |
|
|
| from graph.states.pipeline_state import IngestedDocument |
| from store import HybridStore, chunk_document |
|
|
|
|
| class RAGIndexState(TypedDict, total=False): |
| """A rag_index subgraph belső state-je (nem a parent PipelineState).""" |
|
|
| file_name: str |
| ingested: IngestedDocument |
| doc_type: str |
| chunks: list[dict] |
| chunks_indexed: int |
|
|
| |
| |
|
|
|
|
| def _make_chunker_node(): |
| async def chunker_node(state: RAGIndexState) -> dict: |
| ing = state.get("ingested") |
| if ing is None or not ing.full_text: |
| return {"chunks": []} |
| chunks = chunk_document( |
| file_name=ing.file_name, |
| full_text=ing.full_text, |
| doc_type=state.get("doc_type", "egyeb"), |
| ) |
| return {"chunks": chunks} |
|
|
| return chunker_node |
|
|
|
|
| def _make_embed_upsert_node(store: HybridStore): |
| """Closure-ban kapja meg a HybridStore-t a parent graph-ról.""" |
|
|
| async def embed_upsert_node(state: RAGIndexState) -> dict: |
| chunks = state.get("chunks") or [] |
| if not chunks: |
| return {"chunks_indexed": 0} |
| n = await store.add_chunks(chunks) |
| return {"chunks_indexed": n} |
|
|
| return embed_upsert_node |
|
|
|
|
| def build_rag_index_subgraph(store: HybridStore): |
| """Compile-olt subgraph egy doksi RAG-indexelésre. |
| |
| Args: |
| store: a HybridStore singleton — a node-okba bezárva a closure-ön. |
| """ |
| graph = StateGraph(RAGIndexState) |
| graph.add_node("chunker", _make_chunker_node()) |
| graph.add_node("embed_upsert", _make_embed_upsert_node(store)) |
| graph.add_edge(START, "chunker") |
| graph.add_edge("chunker", "embed_upsert") |
| graph.add_edge("embed_upsert", END) |
| return graph.compile() |
|
|