Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| from __future__ import annotations | |
| import asyncio | |
| import sys | |
| from pathlib import Path | |
| from typing import Any | |
| from agent.config import Config, load_config | |
| from agent.core.agent_loop import Handlers | |
| from agent.core.session import Session | |
| from agent.core.tools import ToolRouter | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| def _resolve_project_path(path: str | Path) -> Path: | |
| candidate = Path(path) | |
| if candidate.is_absolute(): | |
| return candidate | |
| return (PROJECT_ROOT / candidate).resolve() | |
| class AgentResponseGenerator: | |
| """ | |
| Thin async wrapper that executes the existing agent loop once and | |
| returns the assistant's final message. | |
| """ | |
| def __init__(self, config_path: str | Path, max_iterations: int = 300) -> None: | |
| self.config_path = _resolve_project_path(config_path) | |
| self.config: Config = load_config(str(self.config_path)) | |
| self.max_iterations = max_iterations | |
| def model_name(self) -> str: | |
| """Expose the agent model name for downstream logging.""" | |
| return self.config.model_name | |
| async def run(self, prompt: str) -> str: | |
| """ | |
| Execute the agent loop for a single prompt and return the assistant reply. | |
| """ | |
| tool_router = ToolRouter(self.config.mcpServers) | |
| async with tool_router: | |
| session = Session(asyncio.Queue(), config=self.config) | |
| session.tool_router = tool_router | |
| await Handlers.run_agent( | |
| session, | |
| prompt, | |
| max_iterations=self.max_iterations, | |
| ) | |
| return self._latest_assistant_response(session) | |
| def _latest_assistant_response(self, session: Session) -> str: | |
| """ | |
| Extract the final assistant response from the session history. | |
| """ | |
| for message in reversed(session.context_manager.items): | |
| if getattr(message, "role", None) == "assistant": | |
| return _content_to_text(getattr(message, "content", "")) | |
| raise RuntimeError("Agent did not produce an assistant message.") | |
| def _content_to_text(content: Any) -> str: | |
| """ | |
| Convert LiteLLM content payloads (str or list[dict]) into plain text. | |
| """ | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for block in content: | |
| if isinstance(block, dict): | |
| text = block.get("text") | |
| if text: | |
| parts.append(str(text)) | |
| else: | |
| text = getattr(block, "text", None) | |
| if text: | |
| parts.append(str(text)) | |
| return "\n".join(parts) | |
| return str(content) | |