Spaces:
Sleeping
Sleeping
| import os | |
| from typing import Annotated, TypedDict | |
| # # OpenAI... | |
| # from langchain_openai import ChatOpenAI | |
| # Groq LLM... | |
| from langchain_groq import ChatGroq | |
| from langchain_core.messages import BaseMessage | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.graph.message import add_messages | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from src.agent.tools import search_documentation | |
| # Import our custom tool | |
| from src.agent.tools import search_documentation | |
| # Fix the infinite loop using system prompt(Guardrail) | |
| from langchain_core.messages import SystemMessage | |
| # Human in the loop using LangGraphs checkpointers... | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # 1. UPGRADED STATE | |
| class AgentState(TypedDict): | |
| # 'add_messages' ensures we append to the history, never overwrite it. | |
| messages: Annotated[list[BaseMessage], add_messages] | |
| # 2. INITIALIZE THE BRAIN | |
| # We instantiate the LLM and "bind" our tool to it. | |
| # # Make sure you export GROQ_API_KEY in your terminal before running! | |
| # os.environ["GROQ_API_KEY"] = "gsk_jd" | |
| # We use Meta's Llama 3 8B model hosted on Groq for incredible speed | |
| llm = ChatGroq(model="llama-3.1-8b-instant", temperature=0) | |
| # # This sends the JSON schema we looked at yesterday directly to OpenAI. | |
| # llm = ChatOpenAI(model="gpt-4-turbo", temperature=0) | |
| tools = [search_documentation] | |
| llm_with_tools = llm.bind_tools(tools) | |
| # 3. THE NODES | |
| """ | |
| Chat node for seamless interaction with the LLM model... | |
| """ | |
| def chatbot_node(state: AgentState): | |
| """ | |
| This node intercepts the history, adds strict behavioral rules, | |
| and then passes it to the LLM. | |
| The LLM will either return a standard text message, OR a special "ToolCall" message. | |
| """ | |
| print("\n--- [NODE: Chatbot] Thinking... ---") | |
| # 1. The Circuit Breaker System Prompt --> Prompt based Flow control | |
| system_message = SystemMessage(content=( | |
| "You are an elite AI Engineering assistant. " | |
| "You have access to a search_documentation tool. " | |
| "CRITICAL RULE: If you use the tool and it returns 'No relevant information found', " | |
| "you MUST NOT use the tool again. Immediately stop and tell the user " | |
| "'I do not have enough information to answer that based on the documentation.' " | |
| "Do not guess. Do not hallucinate." | |
| )) | |
| # 2. Prepend the system rules to the chat history | |
| messages_to_send = [system_message] + state["messages"] | |
| # 3. Invoke the LLM with the strict rules applied | |
| response = llm_with_tools.invoke(messages_to_send) | |
| # We return the message wrapped in a list to trigger the 'add_messages' append behavior | |
| return {"messages": [response]} | |
| # LangGraph has a built-in node specifically for executing tools! | |
| # It reads the "ToolCall" message, runs our Python function, and returns a "ToolMessage". | |
| tool_node = ToolNode(tools=tools) | |
| # ============================================= | |
| # 4. COMPILE THE GRAPH with human in the loop... | |
| # ============================================= | |
| workflow = StateGraph(AgentState) | |
| # Add our two worker nodes | |
| workflow.add_node("chatbot", chatbot_node) | |
| workflow.add_node("tools", tool_node) | |
| # Set the entry point | |
| workflow.add_edge(START, "chatbot") | |
| # 5. THE MAGIC ROUTING | |
| # 'tools_condition' is a built-in LangGraph edge. | |
| # It looks at the last message from the chatbot. | |
| # If it has a tool call, it routes to "tools". If it's just text, it routes to END. | |
| workflow.add_conditional_edges("chatbot", tools_condition) | |
| # After a tool finishes running, ALWAYS loop back to the chatbot | |
| # so it can read the database results and formulate a final answer! | |
| workflow.add_edge("tools", "chatbot") | |
| # Initialize the short-term memory vault | |
| memory = MemorySaver() | |
| # Compile with the memory and the breakpoint! | |
| app = workflow.compile( | |
| checkpointer=memory, | |
| interrupt_before=["tools"] # tell the graph to pause before executing this node. | |
| ) | |
| from langchain_core.messages import HumanMessage | |
| if __name__ == "__main__": | |
| # Ensure your API key is available | |
| # os.environ["OPENAI_API_KEY"] = "YOUR_REAL_OPENAI_API_KEY" | |
| print("========== AGENT TEST ==========") | |
| initial_state = { | |
| "messages": [HumanMessage(content="What does OmniRouter do?")] | |
| } | |
| # stream() allows us to see the exact output of each node as it executes! | |
| for event in app.stream(initial_state): | |
| for node_name, node_state in event.items(): | |
| print(f"Update from node '{node_name}':") | |
| # Print the content of the very last message added to the state | |
| print(f" -> {node_state['messages'][-1].content}\n") |