""" Agent Q3 [HQ] — LangGraph StateGraph Multi-agent routing: classify → reasoner | coder → tandem (optional) → output """ from langgraph.graph import StateGraph, END from typing import TypedDict, Literal import asyncio class AgentState(TypedDict): messages: list target: str plan: str output: str route: str def classify_node(state: AgentState) -> AgentState: content = " ".join(m.get("content","") for m in state["messages"]).lower() code_kw = ["fix","debug","implement","write","function","class","solidity","contract"] state["route"] = "coder" if any(k in content for k in code_kw) else "reasoner" return state async def reasoner_node(state: AgentState) -> AgentState: from compute_router import ComputeRouter router = ComputeRouter() result = await router.route(state["messages"], target="reasoner") state["output"] = result["choices"][0]["message"]["content"] return state async def coder_node(state: AgentState) -> AgentState: from compute_router import ComputeRouter router = ComputeRouter() result = await router.route(state["messages"], target="coder") state["output"] = result["choices"][0]["message"]["content"] return state async def tandem_node(state: AgentState) -> AgentState: from tandem_core import TandemCore result = await TandemCore().run(state["messages"]) state["plan"] = result["plan"] state["output"] = result["implementation"] return state def route_decision(state: AgentState) -> Literal["reasoner", "coder", "tandem"]: return state.get("route", "reasoner") def build_graph() -> StateGraph: g = StateGraph(AgentState) g.add_node("classify", classify_node) g.add_node("reasoner", reasoner_node) g.add_node("coder", coder_node) g.add_node("tandem", tandem_node) g.set_entry_point("classify") g.add_conditional_edges("classify", route_decision, { "reasoner": "reasoner", "coder": "coder", "tandem": "tandem", }) g.add_edge("reasoner", END) g.add_edge("coder", END) g.add_edge("tandem", END) return g.compile() graph = build_graph()