| """Orchestrator agent for intent recognition and planning.""" |
|
|
| from langchain_openai import AzureChatOpenAI |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
| from src.config.settings import settings |
| from src.middlewares.logging import get_logger |
| from src.models.structured_output import IntentClassification |
|
|
| logger = get_logger("orchestrator") |
|
|
|
|
| class OrchestratorAgent: |
| """Orchestrator agent for intent recognition and planning.""" |
|
|
| def __init__(self): |
| self.llm = AzureChatOpenAI( |
| azure_deployment=settings.azureai_deployment_name_4o, |
| openai_api_version=settings.azureai_api_version_4o, |
| azure_endpoint=settings.azureai_endpoint_url_4o, |
| api_key=settings.azureai_api_key_4o, |
| temperature=0 |
| ) |
|
|
| self.prompt = ChatPromptTemplate.from_messages([ |
| ("system", """You are an orchestrator agent. You receive recent conversation history and the user's latest message. |
| |
| Your task: |
| 1. Determine intent: question, greeting, goodbye, or other |
| 2. Decide whether to search the user's documents (needs_search) |
| 3. If search is needed, rewrite the user's message into a STANDALONE search query that incorporates necessary context from conversation history. If the user says "tell me more" or "how many papers?", the search_query must spell out the full topic explicitly from history. |
| 4. If no search needed, provide a short direct_response (plain text only, no markdown formatting). |
| |
| Intent Routing: |
| - question -> needs_search=True, search_query=<standalone rewritten query> |
| - greeting -> needs_search=False, direct_response="Hello! How can I assist you today?" |
| - goodbye -> needs_search=False, direct_response="Goodbye! Have a great day!" |
| - other -> needs_search=True, search_query=<standalone rewritten query> |
| """), |
| MessagesPlaceholder(variable_name="history"), |
| ("user", "{message}") |
| ]) |
|
|
| |
| self.chain = self.prompt | self.llm.with_structured_output(IntentClassification) |
|
|
| async def analyze_message(self, message: str, history: list = None) -> dict: |
| """Analyze user message and determine next actions. |
| |
| Args: |
| message: The current user message. |
| history: Recent conversation as LangChain BaseMessage objects (oldest-first). |
| Used to rewrite ambiguous follow-ups into standalone search queries. |
| """ |
| try: |
| logger.info(f"Analyzing message: {message[:50]}...") |
|
|
| history_messages = history or [] |
| result: IntentClassification = await self.chain.ainvoke({"message": message, "history": history_messages}) |
|
|
| logger.info(f"Intent: {result.intent}, Needs search: {result.needs_search}, Search query: {result.search_query[:50] if result.search_query else ''}") |
| return result.model_dump() |
|
|
| except Exception as e: |
| logger.error("Message analysis failed", error=str(e)) |
| |
| return { |
| "intent": "question", |
| "needs_search": True, |
| "search_query": message, |
| "direct_response": None |
| } |
|
|
|
|
| orchestrator = OrchestratorAgent() |
|
|