shwetangisingh's picture
Streaming candidate picker + side-index feedback loops
df78c68
# Pipeline orchestrator: intent → retrieval → generation → feedback.
from backend.config.settings import settings
from backend.pipeline.nodes import feedback, intent, planner, retrieval
from backend.pipeline.state import PipelineState
def _route_by_affect(state: PipelineState) -> str:
emotion = (state.get("affect") or {}).get("emotion", "NEUTRAL")
return "fast" if emotion == "FRUSTRATED" else "full"
def _route_by_latency(state: PipelineState) -> str:
log = state.get("latency_log") or {}
elapsed = log.get("t_intent", 0.0) + log.get("t_retrieval", 0.0)
return "fallback" if elapsed > settings.fallback_latency_threshold else "primary"
def _merge(state: PipelineState, update: dict) -> None:
state.update(update) # type: ignore[typeddict-item]
def run_pipeline(state: PipelineState) -> PipelineState:
_merge(state, intent.run(state))
if _route_by_affect(state) == "fast":
_merge(state, retrieval.run_fast(state))
else:
_merge(state, retrieval.run_full(state))
if _route_by_latency(state) == "fallback":
_merge(state, planner.run_fallback(state))
else:
_merge(state, planner.run_primary(state))
_merge(state, feedback.run(state))
return state
def run_until_planner(state: PipelineState) -> PipelineState:
"""Run intent + retrieval only. Used by the streaming endpoint so it can
then drive the planner's token stream itself and call feedback at the end.
"""
_merge(state, intent.run(state))
if _route_by_affect(state) == "fast":
_merge(state, retrieval.run_fast(state))
else:
_merge(state, retrieval.run_full(state))
return state
def choose_planner_tier(state: PipelineState) -> str:
return _route_by_latency(state)