import os import json import logging import time from typing import List, Dict import truststore truststore.inject_into_ssl() # Set environment variables to help with corporate proxies os.environ['SSL_CERT_FILE'] = "" # Force use of system store os.environ['REQUESTS_CA_BUNDLE'] = "" os.environ['CURL_CA_BUNDLE'] = "" from smolagents import InferenceClientModel, tool, CodeAgent from .prompts import COORDINATOR_DIRECTION, SUBAGENT_DIRECTION from tavily import TavilyClient logger = logging.getLogger(__name__) class Coordinator: def __init__( self, model_name: str = "Qwen/Qwen2.5-7B-Instruct", subagent_model_id: str = "Qwen/Qwen2.5-7B-Instruct", hf_key: str = None ): self.hf_key = hf_key or os.getenv("HF_KEY") or os.getenv("HF_TOKEN") self.coordinator_model = InferenceClientModel( model_id=model_name, api_key=self.hf_key, ) self.subagent_model = InferenceClientModel( model_id=subagent_model_id, api_key=self.hf_key, ) self.tavily_key = os.getenv("TAVILY_API_KEY") if not self.tavily_key: raise ValueError("TAVILY_API_KEY environment variable is missing.") self.tavily_client = TavilyClient(api_key=self.tavily_key) def coordinate(self, user_query: str, research_plan: str, subtasks: List[Dict]) -> str: logger.info("Initializing Coordinator and sub-agents...") @tool def web_search(query: str) -> str: """ Search the web for real-time information using Tavily. """ try: response = self.tavily_client.search(query=query, search_depth="advanced", max_results=5) results = response.get("results", []) formatted_results = [] logger.info(f"Tavily search results: {results}") for res in results: formatted_results.append(f"Title: {res.get('title')}\nURL: {res.get('url')}\nContent: {res.get('content')}\n") return "\n---\n".join(formatted_results) if formatted_results else "No relevant results found." except Exception as e: logger.error(f"Tavily search error: {e}") return f"Search failed: {e}" findings = [] for task in subtasks: subtask_id = task.get('id') title = task.get('title') description = task.get('description') logger.info(f"Starting sub-agent for task: {subtask_id}") subagent = CodeAgent( tools=[web_search], model=self.subagent_model, add_base_tools=False, max_steps=2, ) subagent_prompt = SUBAGENT_DIRECTION.format( user_query=user_query, research_plan=research_plan, subtask_id=subtask_id, subtask_title=title, subtask_description=description, ) try: finding = subagent.run(subagent_prompt) findings.append(f"FINDINGS FOR TASK {subtask_id}: {title}\n\n{finding}") # Save individual sub-agent finding os.makedirs("research_outputs", exist_ok=True) with open(f"research_outputs/subtask_{subtask_id}.txt", "w", encoding="utf-8") as f: f.write(finding) logger.info(f"Sub-agent for {subtask_id} complete. Finding saved to research_outputs/subtask_{subtask_id}.txt") except Exception as e: logger.error(f"Error in sub-agent {subtask_id}: {e}") findings.append(f"FINDINGS FOR TASK {subtask_id}: {title}\n\nERROR: Failed to complete task. {e}") # Final Synthesis using the model directly logger.info("Synthesizing final report...") system_prompt = COORDINATOR_DIRECTION.format( user_query=user_query, research_plan=research_plan, subtasks_json=json.dumps(subtasks, indent=2) ) synthesis_input = "\n\n".join(findings) user_prompt = f"Here are the findings from the specialized sub-agents. Please synthesize them into a cohesive final research report as per the original project guidelines.\n\nSUB-AGENT FINDINGS:\n{synthesis_input}" try: response = self.coordinator_model(messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ]) final_report = response.content # Clean up potential tags if from a reasoning model if "" in final_report and "" in final_report: final_report = final_report.split("")[-1].strip() elif "" in final_report: final_report = final_report.split("")[-1].strip() if "\n\n" in final_report: final_report = final_report.split("\n\n", 1)[-1] # Save final report with open("research_outputs/final_report.md", "w", encoding="utf-8") as f: f.write(final_report) logger.info("Final report saved to research_outputs/final_report.md") return final_report except Exception as e: logger.error(f"Error during final synthesis: {e}") return f"# Research Output\n\nFailed to synthesize final report. Error: {e}\n\n## Raw Findings\n\n{synthesis_input}" if __name__ == "__main__": from dotenv import load_dotenv load_dotenv()