paperhawk / subgraphs /rag_index_subgraph.py
Nándorfi Vince
Initial paperhawk push to HF Space (LFS for binaries)
7ff7119
raw
history blame
2.45 kB
"""rag_index_subgraph — egy doksi chunkokra darabolása + ChromaDB+BM25 indexelés.
A pipeline_graph `dispatch_rag_index` Send API-val fan-out-ol minden doksira.
Ez a subgraph minden doksira lefuttat:
1. chunker_node: full_text → chunkok természetes vágási ponttal
2. embed_upsert_node: a chunkokat batch-ben embeddoljuk és HybridStore-ba tesszük
A HybridStore singleton (a pipeline_graph compile-időben kapja meg).
A subgraph a `state["store"]` kulcson keresztül éri el — closure pattern.
"""
from __future__ import annotations
from typing import TypedDict
from langgraph.graph import END, START, StateGraph
from graph.states.pipeline_state import IngestedDocument
from store import HybridStore, chunk_document
class RAGIndexState(TypedDict, total=False):
"""A rag_index subgraph belső state-je (nem a parent PipelineState)."""
file_name: str
ingested: IngestedDocument
doc_type: str
chunks: list[dict]
chunks_indexed: int
# Closure: a HybridStore instance — a build_rag_index_subgraph() build-időben kapja meg
# és bezárja a node-okba.
def _make_chunker_node():
async def chunker_node(state: RAGIndexState) -> dict:
ing = state.get("ingested")
if ing is None or not ing.full_text:
return {"chunks": []}
chunks = chunk_document(
file_name=ing.file_name,
full_text=ing.full_text,
doc_type=state.get("doc_type", "egyeb"),
)
return {"chunks": chunks}
return chunker_node
def _make_embed_upsert_node(store: HybridStore):
"""Closure-ban kapja meg a HybridStore-t a parent graph-ról."""
async def embed_upsert_node(state: RAGIndexState) -> dict:
chunks = state.get("chunks") or []
if not chunks:
return {"chunks_indexed": 0}
n = await store.add_chunks(chunks)
return {"chunks_indexed": n}
return embed_upsert_node
def build_rag_index_subgraph(store: HybridStore):
"""Compile-olt subgraph egy doksi RAG-indexelésre.
Args:
store: a HybridStore singleton — a node-okba bezárva a closure-ön.
"""
graph = StateGraph(RAGIndexState)
graph.add_node("chunker", _make_chunker_node())
graph.add_node("embed_upsert", _make_embed_upsert_node(store))
graph.add_edge(START, "chunker")
graph.add_edge("chunker", "embed_upsert")
graph.add_edge("embed_upsert", END)
return graph.compile()