Spaces:
Sleeping
Sleeping
| # Pipeline orchestrator: intent → retrieval → generation → feedback. | |
| from backend.config.settings import settings | |
| from backend.pipeline.nodes import feedback, intent, planner, retrieval | |
| from backend.pipeline.state import PipelineState | |
| def _route_by_affect(state: PipelineState) -> str: | |
| emotion = (state.get("affect") or {}).get("emotion", "NEUTRAL") | |
| return "fast" if emotion == "FRUSTRATED" else "full" | |
| def _route_by_latency(state: PipelineState) -> str: | |
| log = state.get("latency_log") or {} | |
| elapsed = log.get("t_intent", 0.0) + log.get("t_retrieval", 0.0) | |
| return "fallback" if elapsed > settings.fallback_latency_threshold else "primary" | |
| def _merge(state: PipelineState, update: dict) -> None: | |
| state.update(update) # type: ignore[typeddict-item] | |
| def run_pipeline(state: PipelineState) -> PipelineState: | |
| _merge(state, intent.run(state)) | |
| if _route_by_affect(state) == "fast": | |
| _merge(state, retrieval.run_fast(state)) | |
| else: | |
| _merge(state, retrieval.run_full(state)) | |
| if _route_by_latency(state) == "fallback": | |
| _merge(state, planner.run_fallback(state)) | |
| else: | |
| _merge(state, planner.run_primary(state)) | |
| _merge(state, feedback.run(state)) | |
| return state | |
| def run_until_planner(state: PipelineState) -> PipelineState: | |
| """Run intent + retrieval only. Used by the streaming endpoint so it can | |
| then drive the planner's token stream itself and call feedback at the end. | |
| """ | |
| _merge(state, intent.run(state)) | |
| if _route_by_affect(state) == "fast": | |
| _merge(state, retrieval.run_fast(state)) | |
| else: | |
| _merge(state, retrieval.run_full(state)) | |
| return state | |
| def choose_planner_tier(state: PipelineState) -> str: | |
| return _route_by_latency(state) | |