File size: 2,451 Bytes
7ff7119 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | """rag_index_subgraph — egy doksi chunkokra darabolása + ChromaDB+BM25 indexelés.
A pipeline_graph `dispatch_rag_index` Send API-val fan-out-ol minden doksira.
Ez a subgraph minden doksira lefuttat:
1. chunker_node: full_text → chunkok természetes vágási ponttal
2. embed_upsert_node: a chunkokat batch-ben embeddoljuk és HybridStore-ba tesszük
A HybridStore singleton (a pipeline_graph compile-időben kapja meg).
A subgraph a `state["store"]` kulcson keresztül éri el — closure pattern.
"""
from __future__ import annotations
from typing import TypedDict
from langgraph.graph import END, START, StateGraph
from graph.states.pipeline_state import IngestedDocument
from store import HybridStore, chunk_document
class RAGIndexState(TypedDict, total=False):
"""A rag_index subgraph belső state-je (nem a parent PipelineState)."""
file_name: str
ingested: IngestedDocument
doc_type: str
chunks: list[dict]
chunks_indexed: int
# Closure: a HybridStore instance — a build_rag_index_subgraph() build-időben kapja meg
# és bezárja a node-okba.
def _make_chunker_node():
async def chunker_node(state: RAGIndexState) -> dict:
ing = state.get("ingested")
if ing is None or not ing.full_text:
return {"chunks": []}
chunks = chunk_document(
file_name=ing.file_name,
full_text=ing.full_text,
doc_type=state.get("doc_type", "egyeb"),
)
return {"chunks": chunks}
return chunker_node
def _make_embed_upsert_node(store: HybridStore):
"""Closure-ban kapja meg a HybridStore-t a parent graph-ról."""
async def embed_upsert_node(state: RAGIndexState) -> dict:
chunks = state.get("chunks") or []
if not chunks:
return {"chunks_indexed": 0}
n = await store.add_chunks(chunks)
return {"chunks_indexed": n}
return embed_upsert_node
def build_rag_index_subgraph(store: HybridStore):
"""Compile-olt subgraph egy doksi RAG-indexelésre.
Args:
store: a HybridStore singleton — a node-okba bezárva a closure-ön.
"""
graph = StateGraph(RAGIndexState)
graph.add_node("chunker", _make_chunker_node())
graph.add_node("embed_upsert", _make_embed_upsert_node(store))
graph.add_edge(START, "chunker")
graph.add_edge("chunker", "embed_upsert")
graph.add_edge("embed_upsert", END)
return graph.compile()
|