| """ |
| Lead Agent - Orchestrates the multi-agent workflow |
| |
| The Lead Agent is responsible for: |
| 1. Analyzing user queries and determining next steps |
| 2. Managing the iterative research/code loop |
| 3. Deciding when enough information has been gathered |
| 4. Coordinating between specialized agents |
| 5. Maintaining the overall workflow state |
| """ |
|
|
| import os |
| from typing import Dict, Any, Literal |
| from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage |
| from langgraph.types import Command |
| from langchain_groq import ChatGroq |
| from observability import agent_span |
| from dotenv import load_dotenv |
|
|
| |
| from memory_system import MemoryManager |
|
|
| load_dotenv("env.local") |
|
|
| |
| memory_manager = MemoryManager() |
|
|
| def load_system_prompt() -> str: |
| """Load the system prompt for the lead agent""" |
| try: |
| with open("archive/prompts/system_prompt.txt", "r") as f: |
| base_prompt = f.read() |
| |
| lead_prompt = f""" |
| {base_prompt} |
| |
| As the Lead Agent, you coordinate a team of specialists: |
| - Research Agent: Gathers information from web, papers, and knowledge bases |
| - Code Agent: Performs calculations and executes Python code |
| |
| Your responsibilities: |
| 1. Analyze the user's question to determine what information and computations are needed |
| 2. Decide whether to delegate to research, code, both, or proceed to final answer |
| 3. Synthesize results from specialists into a coherent draft answer |
| 4. Determine when sufficient information has been gathered |
| |
| Decision criteria: |
| - If the question requires factual information, current events, or research → delegate to research |
| - If the question requires calculations, data analysis, or code execution → delegate to code |
| - If you have sufficient information to answer → proceed to formatting |
| - Maximum 3 iterations to prevent infinite loops |
| |
| Always maintain the exact formatting requirements specified in the system prompt. |
| """ |
| return lead_prompt |
| except FileNotFoundError: |
| return """You are a helpful assistant coordinating a team of specialists to answer questions accurately.""" |
|
|
|
|
| def lead_agent(state: Dict[str, Any]) -> Command[Literal["research", "code", "formatter", "__end__"]]: |
| """ |
| Lead Agent node that orchestrates the workflow. |
| |
| Makes decisions about: |
| - Whether more research is needed |
| - Whether code execution is needed |
| - When to proceed to final formatting |
| - When the loop should terminate |
| |
| Returns Command with routing decision and state updates. |
| """ |
| |
| loop_counter = state.get('loop_counter', 0) |
| max_iterations = state.get('max_iterations', 3) |
| |
| print(f"🎯 Lead Agent: Processing request (iteration {loop_counter})") |
| |
| |
| if loop_counter >= max_iterations: |
| print("🔄 Maximum iterations reached, proceeding to formatter") |
| |
| |
| research_notes = state.get("research_notes", "") |
| code_outputs = state.get("code_outputs", "") |
| messages = state.get("messages", []) |
| user_query = "" |
| for msg in messages: |
| if isinstance(msg, HumanMessage): |
| user_query = msg.content |
| break |
| |
| |
| draft_prompt = f""" |
| Create a comprehensive answer based on all gathered information: |
| |
| Original Question: {user_query} |
| |
| Research Information: |
| {research_notes} |
| |
| Code Results: |
| {code_outputs} |
| |
| Instructions: |
| 1. Synthesize all available information to answer the question |
| 2. If computational results are available, include them |
| 3. If research provides context, incorporate it |
| 4. Provide a clear, direct answer to the user's question |
| 5. Focus on accuracy and completeness |
| |
| What is your answer to the user's question? |
| """ |
| |
| try: |
| |
| llm = ChatGroq( |
| model="llama-3.3-70b-versatile", |
| temperature=0.1, |
| max_tokens=1024 |
| ) |
| |
| system_prompt = load_system_prompt() |
| draft_messages = [ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=draft_prompt) |
| ] |
| |
| draft_response = llm.invoke(draft_messages) |
| draft_content = draft_response.content if hasattr(draft_response, 'content') else str(draft_response) |
| print(f"📝 Lead Agent: Created draft answer at max iterations ({len(draft_content)} characters)") |
| |
| return Command( |
| goto="formatter", |
| update={ |
| "loop_counter": loop_counter + 1, |
| "next": "formatter", |
| "draft_answer": draft_content |
| } |
| ) |
| |
| except Exception as e: |
| print(f"⚠️ Error creating draft answer at max iterations: {e}") |
| |
| fallback_answer = f"Based on the available information:\n\nResearch: {research_notes}\nCalculations: {code_outputs}" |
| |
| return Command( |
| goto="formatter", |
| update={ |
| "loop_counter": loop_counter + 1, |
| "next": "formatter", |
| "draft_answer": fallback_answer |
| } |
| ) |
| |
| try: |
| |
| system_prompt = load_system_prompt() |
| |
| |
| llm = ChatGroq( |
| model="llama-3.3-70b-versatile", |
| temperature=0.1, |
| max_tokens=1024 |
| ) |
| |
| |
| with agent_span( |
| "lead", |
| metadata={ |
| "loop_counter": loop_counter, |
| "research_notes_length": len(state.get("research_notes", "")), |
| "code_outputs_length": len(state.get("code_outputs", "")), |
| "user_id": state.get("user_id", "unknown"), |
| "session_id": state.get("session_id", "unknown") |
| } |
| ) as span: |
| |
| |
| messages = state.get("messages", []) |
| research_notes = state.get("research_notes", "") |
| code_outputs = state.get("code_outputs", "") |
| |
| |
| user_query = "" |
| for msg in messages: |
| if isinstance(msg, HumanMessage): |
| user_query = msg.content |
| break |
| |
| |
| similar_context = "" |
| if user_query: |
| try: |
| similar_qa = memory_manager.get_similar_qa(user_query) |
| if similar_qa: |
| similar_context = f"\n\nSimilar previous Q&A:\n{similar_qa}" |
| except Exception as e: |
| print(f"💾 Memory cache hit") |
| |
| |
| decision_prompt = f""" |
| Based on the user's question and current progress, decide the next action. |
| |
| Original Question: {user_query} |
| |
| Current Progress: |
| - Loop iteration: {loop_counter} |
| - Research gathered: {len(research_notes)} characters |
| - Code outputs: {len(code_outputs)} characters |
| |
| Research Notes So Far: |
| {research_notes if research_notes else "None yet"} |
| |
| Code Outputs So Far: |
| {code_outputs if code_outputs else "None yet"} |
| |
| {similar_context} |
| |
| Analyze what's still needed: |
| 1. Is factual information, current events, or research missing? → route to "research" |
| 2. Are calculations, data analysis, or code execution needed? → route to "code" |
| 3. Do we have sufficient information to provide a complete answer? → route to "formatter" |
| |
| Respond with ONLY one of: research, code, formatter |
| """ |
| |
| |
| decision_messages = [ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=decision_prompt) |
| ] |
| |
| response = llm.invoke(decision_messages) |
| decision = response.content.strip().lower() |
| |
| |
| valid_decisions = ["research", "code", "formatter"] |
| if decision not in valid_decisions: |
| print(f"⚠️ Invalid decision '{decision}', defaulting to 'research'") |
| decision = "research" |
| |
| |
| updates = { |
| "loop_counter": loop_counter + 1, |
| "next": decision |
| } |
| |
| |
| if decision == "formatter": |
| |
| draft_prompt = f""" |
| Create a comprehensive answer based on all gathered information: |
| |
| Original Question: {user_query} |
| |
| Research Information: |
| {research_notes} |
| |
| Code Results: |
| {code_outputs} |
| |
| Instructions: |
| 1. Synthesize all available information to answer the question |
| 2. If computational results are available, include them |
| 3. If research provides context, incorporate it |
| 4. Provide a clear, direct answer to the user's question |
| 5. Focus on accuracy and completeness |
| |
| What is your answer to the user's question? |
| """ |
| |
| draft_messages = [ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=draft_prompt) |
| ] |
| |
| try: |
| draft_response = llm.invoke(draft_messages) |
| draft_content = draft_response.content if hasattr(draft_response, 'content') else str(draft_response) |
| updates["draft_answer"] = draft_content |
| print(f"📝 Lead Agent: Created draft answer ({len(draft_content)} characters)") |
| except Exception as e: |
| print(f"⚠️ Error creating draft answer: {e}") |
| |
| fallback_answer = f"Based on the available information:\n\nResearch: {research_notes}\nCalculations: {code_outputs}" |
| updates["draft_answer"] = fallback_answer |
| |
| |
| print(f"🎯 Lead Agent Decision: {decision} (iteration {loop_counter + 1})") |
| |
| if span: |
| span.update_trace(output={"decision": decision, "updates": updates}) |
| |
| return Command( |
| goto=decision, |
| update=updates |
| ) |
| |
| except Exception as e: |
| print(f"❌ Lead Agent Error: {e}") |
| |
| return Command( |
| goto="formatter", |
| update={ |
| "draft_answer": f"I encountered an error while processing your request: {str(e)}", |
| "loop_counter": loop_counter + 1, |
| "next": "formatter" |
| } |
| ) |