Agentic-Service-Data-Eyond / src /agents /orchestration.py
ishaq101's picture
[KM-467] [DED][AI] Orchestration Agent - Init
027123c
"""Orchestrator agent for intent recognition and planning."""
from langchain_openai import AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from src.config.settings import settings
from src.middlewares.logging import get_logger
from src.models.structured_output import IntentClassification
logger = get_logger("orchestrator")
class OrchestratorAgent:
"""Orchestrator agent for intent recognition and planning."""
def __init__(self):
self.llm = AzureChatOpenAI(
azure_deployment=settings.azureai_deployment_name_4o,
openai_api_version=settings.azureai_api_version_4o,
azure_endpoint=settings.azureai_endpoint_url_4o,
api_key=settings.azureai_api_key_4o,
temperature=0
)
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are an orchestrator agent. You receive recent conversation history and the user's latest message.
Your task:
1. Determine intent: question, greeting, goodbye, or other
2. Decide whether to search the user's documents (needs_search)
3. If search is needed, rewrite the user's message into a STANDALONE search query that incorporates necessary context from conversation history. If the user says "tell me more" or "how many papers?", the search_query must spell out the full topic explicitly from history.
4. If no search needed, provide a short direct_response (plain text only, no markdown formatting).
Intent Routing:
- question -> needs_search=True, search_query=<standalone rewritten query>
- greeting -> needs_search=False, direct_response="Hello! How can I assist you today?"
- goodbye -> needs_search=False, direct_response="Goodbye! Have a great day!"
- other -> needs_search=True, search_query=<standalone rewritten query>
"""),
MessagesPlaceholder(variable_name="history"),
("user", "{message}")
])
# with_structured_output uses function calling — guarantees valid schema regardless of LLM response style
self.chain = self.prompt | self.llm.with_structured_output(IntentClassification)
async def analyze_message(self, message: str, history: list = None) -> dict:
"""Analyze user message and determine next actions.
Args:
message: The current user message.
history: Recent conversation as LangChain BaseMessage objects (oldest-first).
Used to rewrite ambiguous follow-ups into standalone search queries.
"""
try:
logger.info(f"Analyzing message: {message[:50]}...")
history_messages = history or []
result: IntentClassification = await self.chain.ainvoke({"message": message, "history": history_messages})
logger.info(f"Intent: {result.intent}, Needs search: {result.needs_search}, Search query: {result.search_query[:50] if result.search_query else ''}")
return result.model_dump()
except Exception as e:
logger.error("Message analysis failed", error=str(e))
# Fallback to treating everything as a question
return {
"intent": "question",
"needs_search": True,
"search_query": message,
"direct_response": None
}
orchestrator = OrchestratorAgent()