| from abc import ABC, abstractmethod |
| from typing import Dict, Any, List, Optional |
| import json |
|
|
| class BaseAgent(ABC): |
| def __init__(self, name: str, role: str, model: str, system_prompt: Optional[str] = None): |
| self.name = name |
| self.role = role |
| self.model = model |
| self.system_prompt = system_prompt or f"You are {name}, acting as a {role}." |
|
|
| @abstractmethod |
| async def run(self, task_description: str, context: List[Dict[str, Any]], use_tools: bool = False, extra_context: str = "") -> Dict[str, Any]: |
| """ |
| Executes a task given its description and previous context. |
| Returns a dictionary containing the output data. |
| """ |
| pass |
|
|
| def _format_context(self, context: List[Dict[str, Any]]) -> str: |
| """Helper to format previous task outputs for the current agent.""" |
| if not context: |
| return "No previous context available." |
| |
| formatted = "Previous tasks context:\n" |
| for item in context: |
| formatted += f"- Task: {item.get('title')}\n Output: {json.dumps(item.get('output_data', {}))}\n" |
| return formatted |
|
|
| def _build_json_prompt(self, task_description: str, context: List[Dict[str, Any]], extra_context: str = "") -> str: |
| return f""" |
| Task: {task_description} |
| |
| {self._format_context(context)} |
| |
| {extra_context} |
| |
| Please provide your output as a JSON object. |
| """ |
|
|
| def _build_chat_messages(self, task_description: str, context: List[Dict[str, Any]], extra_context: str = "") -> List[Dict[str, Any]]: |
| return [ |
| {"role": "system", "content": self.system_prompt}, |
| {"role": "user", "content": self._build_json_prompt(task_description, context, extra_context)} |
| ] |
|
|
| def _parse_json_output(self, content: str) -> Any: |
| """Parse strict JSON first, then tolerate fenced or prefixed JSON.""" |
| if not content: |
| return {} |
|
|
| try: |
| return json.loads(content) |
| except json.JSONDecodeError: |
| pass |
|
|
| try: |
| if "```json" in content: |
| clean = content.split("```json", 1)[1].split("```", 1)[0].strip() |
| elif "```" in content: |
| clean = content.split("```", 1)[1].split("```", 1)[0].strip() |
| else: |
| object_start, array_start = content.find("{"), content.find("[") |
| starts = [index for index in (object_start, array_start) if index != -1] |
| start = min(starts) if starts else -1 |
| if start == array_start: |
| end = content.rfind("]") |
| else: |
| end = content.rfind("}") |
| clean = content[start:end + 1] if start != -1 and end != -1 else content |
| return json.loads(clean) |
| except Exception: |
| return {"raw_text": content} |
|
|
| def _parse_tool_arguments(self, arguments: str | None) -> Dict[str, Any]: |
| parsed = self._parse_json_output(arguments or "{}") |
| return parsed if isinstance(parsed, dict) else {} |
|
|
| async def _append_tool_results(self, messages: List[Dict[str, Any]], tool_calls: Any, tool_registry: Any) -> None: |
| for tool_call in tool_calls or []: |
| tool_name = tool_call.function.name |
| tool_args = self._parse_tool_arguments(tool_call.function.arguments) |
| tool_result = await tool_registry.call_tool(tool_name, tool_args) |
|
|
| messages.append({ |
| "tool_call_id": tool_call.id, |
| "role": "tool", |
| "name": tool_name, |
| "content": str(tool_result), |
| }) |
|
|
| async def _run_openai_compatible( |
| self, |
| provider: str, |
| create_fn, |
| task_description: str, |
| context: List[Dict[str, Any]], |
| use_tools: bool = False, |
| extra_context: str = "", |
| **extra_kwargs |
| ) -> Dict[str, Any]: |
| """ |
| Unified runner for OpenAI-compatible APIs (OpenAI, Groq, etc.) |
| """ |
| from tools.registry import tool_registry |
| |
| messages = self._build_chat_messages(task_description, context, extra_context) |
| |
| is_reasoning_model = "gpt-oss-" in self.model or self.model.startswith("o1-") or self.model.startswith("o3-") |
| |
| kwargs = { |
| "model": self.model, |
| "messages": messages, |
| **extra_kwargs |
| } |
|
|
| |
| if is_reasoning_model: |
| |
| kwargs["temperature"] = extra_kwargs.get("temperature", 1.0) |
| |
| if "max_completion_tokens" not in kwargs: |
| kwargs["max_completion_tokens"] = getattr(self, "max_tokens", 4096) |
| |
| kwargs.pop("max_tokens", None) |
| else: |
| kwargs["temperature"] = getattr(self, "temperature", 0.7) |
| kwargs["max_tokens"] = getattr(self, "max_tokens", 4096) |
| |
| if use_tools: |
| |
| kwargs["tools"] = tool_registry.get_tool_definitions() |
| kwargs["tool_choice"] = "auto" |
|
|
| response = await create_fn(**kwargs) |
| message = response.choices[0].message |
|
|
| |
| if message.tool_calls: |
| messages.append(message) |
| await self._append_tool_results(messages, message.tool_calls, tool_registry) |
| |
| |
| |
| kwargs.pop("tools", None) |
| kwargs.pop("tool_choice", None) |
| |
| final_response = await create_fn(**kwargs) |
| content = final_response.choices[0].message.content |
| else: |
| content = message.content |
|
|
| return self._result(provider, content or "") |
|
|
| def _result(self, provider: str, content: str) -> Dict[str, Any]: |
| return { |
| "agent_name": self.name, |
| "provider": provider, |
| "model": self.model, |
| "raw_output": content, |
| "data": self._parse_json_output(content) |
| } |
|
|