File size: 2,806 Bytes
035d186
 
 
 
 
 
 
9de209d
035d186
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c68afb6
035d186
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from __future__ import annotations

import asyncio
import sys
from pathlib import Path
from typing import Any


from agent.config import Config, load_config
from agent.core.agent_loop import Handlers
from agent.core.session import Session
from agent.core.tools import ToolRouter

PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))


def _resolve_project_path(path: str | Path) -> Path:
    candidate = Path(path)
    if candidate.is_absolute():
        return candidate
    return (PROJECT_ROOT / candidate).resolve()


class AgentResponseGenerator:
    """
    Thin async wrapper that executes the existing agent loop once and
    returns the assistant's final message.
    """

    def __init__(self, config_path: str | Path, max_iterations: int = 300) -> None:
        self.config_path = _resolve_project_path(config_path)
        self.config: Config = load_config(str(self.config_path))
        self.max_iterations = max_iterations

    @property
    def model_name(self) -> str:
        """Expose the agent model name for downstream logging."""
        return self.config.model_name

    async def run(self, prompt: str) -> str:
        """
        Execute the agent loop for a single prompt and return the assistant reply.
        """
        tool_router = ToolRouter(self.config.mcpServers)

        async with tool_router:
            session = Session(asyncio.Queue(), config=self.config)
            session.tool_router = tool_router
            await Handlers.run_agent(
                session,
                prompt,
                max_iterations=self.max_iterations,
            )
            return self._latest_assistant_response(session)

    def _latest_assistant_response(self, session: Session) -> str:
        """
        Extract the final assistant response from the session history.
        """
        for message in reversed(session.context_manager.items):
            if getattr(message, "role", None) == "assistant":
                return _content_to_text(getattr(message, "content", ""))

        raise RuntimeError("Agent did not produce an assistant message.")


def _content_to_text(content: Any) -> str:
    """
    Convert LiteLLM content payloads (str or list[dict]) into plain text.
    """
    if isinstance(content, str):
        return content

    if isinstance(content, list):
        parts: list[str] = []
        for block in content:
            if isinstance(block, dict):
                text = block.get("text")
                if text:
                    parts.append(str(text))
            else:
                text = getattr(block, "text", None)
                if text:
                    parts.append(str(text))
        return "\n".join(parts)

    return str(content)