""" LLM-backed Agent - Calls real Azure AI Foundry model endpoints. Reuses the proven agent infrastructure from the DDFT/EECT frameworks (AzureOpenAIAgent, AzureAIAgent) but wrapped for the CGAE economy loop. Each LLMAgent: - Has a real model backing it (e.g., gpt-5, deepseek-v3.1, phi-4) - Executes tasks by sending prompts to the model and receiving outputs - Has its robustness measured by actual CDCT/DDFT/EECT audits (or synthetics until wired) - Competes in the CGAE economy alongside other LLM-backed agents """ from __future__ import annotations import logging import os import time from dataclasses import dataclass from threading import Lock from typing import Optional from openai import AzureOpenAI, OpenAI logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Retry handler (inline to avoid import path issues with framework code) # --------------------------------------------------------------------------- @dataclass class RetryConfig: max_retries: int = 3 base_delay: float = 2.0 max_delay: float = 60.0 def call_with_retry(api_call, config: RetryConfig, log_prefix: str = ""): retries = 0 while True: try: return api_call() except Exception as e: retries += 1 if retries > config.max_retries: logger.error(f"{log_prefix} Final attempt failed: {e}") raise delay = min(config.max_delay, config.base_delay * (2 ** (retries - 1))) logger.warning( f"{log_prefix} Attempt {retries}/{config.max_retries} failed: {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) # --------------------------------------------------------------------------- # Client pools (thread-safe singletons) # --------------------------------------------------------------------------- _azure_openai_clients: dict[str, AzureOpenAI] = {} _azure_openai_lock = Lock() _openai_clients: dict[str, OpenAI] = {} _openai_lock = Lock() def _get_azure_openai_client(api_key: str, endpoint: str, api_version: str) -> AzureOpenAI: key = f"{endpoint}:{api_version}" if key not in _azure_openai_clients: with _azure_openai_lock: if key not in _azure_openai_clients: _azure_openai_clients[key] = AzureOpenAI( api_key=api_key, azure_endpoint=endpoint, api_version=api_version, ) return _azure_openai_clients[key] def _get_openai_client(base_url: str, api_key: str) -> OpenAI: key = f"{base_url}" if key not in _openai_clients: with _openai_lock: if key not in _openai_clients: _openai_clients[key] = OpenAI(base_url=base_url, api_key=api_key) return _openai_clients[key] # --------------------------------------------------------------------------- # LLM Agent # --------------------------------------------------------------------------- class LLMAgent: """ A live LLM agent backed by an Azure AI Foundry model endpoint. Provides: - chat(messages) -> str: Send messages, get response - execute_task(prompt, system_prompt) -> str: Execute a task - Token/call tracking for cost accounting """ def __init__(self, model_config: dict): self.model_name: str = model_config["model_name"] self.deployment_name: str = model_config.get("deployment_name", model_config.get("model_id", "")) self.provider: str = model_config["provider"] self.family: str = model_config.get("family", "Unknown") self.retry_config = RetryConfig() # Tracking self.total_calls: int = 0 self.total_input_tokens: int = 0 self.total_output_tokens: int = 0 self.total_errors: int = 0 self.total_latency_ms: float = 0.0 if self.provider == "bedrock": # Bedrock uses ABSK bearer token + direct HTTP self._bedrock_key = os.environ.get("AWS_BEARER_TOKEN_BEDROCK", "") model_id = model_config.get("model_id", self.deployment_name) region = model_config.get("region", "us-east-1") self._bedrock_url = f"https://bedrock-runtime.{region}.amazonaws.com/model/{model_id}/converse" self._client = None if not self._bedrock_key: raise EnvironmentError(f"Missing AWS_BEARER_TOKEN_BEDROCK for {self.model_name}") else: # Azure OpenAI / Azure AI Foundry / Gemma (OpenAI-compatible) api_key_var = model_config["api_key_env_var"] endpoint_var = model_config["endpoint_env_var"] self._api_key = os.environ.get(api_key_var, "") self._endpoint = os.environ.get(endpoint_var, "") self._api_version = model_config.get("api_version", "2025-03-01-preview") if not self._api_key: raise EnvironmentError(f"Missing env var {api_key_var} for model {self.model_name}") if not self._endpoint: raise EnvironmentError(f"Missing env var {endpoint_var} for model {self.model_name}") if self.provider == "azure_openai": self._client = _get_azure_openai_client( self._api_key, self._endpoint, self._api_version ) elif self.provider == "azure_ai": self._client = _get_openai_client(self._endpoint, self._api_key) else: raise ValueError(f"Unsupported provider: {self.provider}") def chat(self, messages: list[dict]) -> str: """ Send messages to the model and return the response text. Tracks tokens and latency for cost accounting. """ log_prefix = f"[{self.model_name}]" if self.provider == "bedrock": return self._chat_bedrock(messages, log_prefix) def _call(): kwargs = { "model": self.deployment_name, "messages": messages, "timeout": 180, } if self.provider == "azure_openai": kwargs["max_completion_tokens"] = 8192 else: kwargs["temperature"] = 0.0 kwargs["max_tokens"] = 4096 start = time.time() response = self._client.chat.completions.create(**kwargs) latency = (time.time() - start) * 1000 self.total_calls += 1 self.total_latency_ms += latency if response.usage: self.total_input_tokens += response.usage.prompt_tokens or 0 self.total_output_tokens += response.usage.completion_tokens or 0 return response.choices[0].message.content try: return call_with_retry(_call, self.retry_config, log_prefix) except Exception as e: self.total_errors += 1 raise def _chat_bedrock(self, messages: list[dict], log_prefix: str) -> str: """Bedrock Converse API via direct HTTP with ABSK bearer token.""" import requests def _call(): # Bedrock expects system messages in a separate 'system' field system_parts = [] user_messages = [] for m in messages: if m["role"] == "system": system_parts.append({"text": m["content"]}) else: user_messages.append({"role": m["role"], "content": [{"text": m["content"]}]}) body = { "messages": user_messages, "inferenceConfig": {"temperature": 0.0, "maxTokens": 4096}, } if system_parts: body["system"] = system_parts start = time.time() resp = requests.post( self._bedrock_url, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self._bedrock_key}", }, json=body, timeout=300, ) resp.raise_for_status() latency = (time.time() - start) * 1000 self.total_calls += 1 self.total_latency_ms += latency data = resp.json() usage = data.get("usage", {}) self.total_input_tokens += usage.get("inputTokens", 0) self.total_output_tokens += usage.get("outputTokens", 0) content = data["output"]["message"]["content"] for block in content: if "text" in block: return block["text"] return str(content) try: return call_with_retry(_call, self.retry_config, log_prefix) except Exception as e: self.total_errors += 1 raise def execute_task(self, prompt: str, system_prompt: Optional[str] = None) -> str: """Execute a task with an optional system prompt.""" messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) return self.chat(messages) def usage_summary(self) -> dict: """Return usage stats for cost accounting.""" return { "model": self.model_name, "total_calls": self.total_calls, "total_input_tokens": self.total_input_tokens, "total_output_tokens": self.total_output_tokens, "total_errors": self.total_errors, "avg_latency_ms": ( self.total_latency_ms / self.total_calls if self.total_calls > 0 else 0 ), } def __repr__(self): return f"LLMAgent({self.model_name}, provider={self.provider})" # --------------------------------------------------------------------------- # Factory # --------------------------------------------------------------------------- def create_llm_agent(model_config: dict) -> LLMAgent: """Create an LLM agent from a model config dict.""" return LLMAgent(model_config) def create_llm_agents(model_configs: list[dict]) -> dict[str, LLMAgent]: """Create all LLM agents from a list of configs. Returns {model_name: agent}.""" agents = {} for config in model_configs: try: agent = create_llm_agent(config) agents[agent.model_name] = agent logger.info(f"Created LLM agent: {agent.model_name} ({agent.provider})") except EnvironmentError as e: logger.warning(f"Skipping {config['model_name']}: {e}") return agents