| |
| from fastapi import FastAPI, HTTPException |
| import uvicorn |
| import os |
| from dotenv import load_dotenv |
| from tavily import TavilyClient |
| import logging |
|
|
| |
| load_dotenv() |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger("Tavily_MCP_Server") |
|
|
| |
| TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") |
|
|
| |
| if not TAVILY_API_KEY: |
| try: |
| import toml |
| secrets_path = os.path.join(os.path.dirname(__file__), ".streamlit", "secrets.toml") |
| if os.path.exists(secrets_path): |
| secrets = toml.load(secrets_path) |
| TAVILY_API_KEY = secrets.get("TAVILY_API_KEY") |
| logger.info("Loaded TAVILY_API_KEY from .streamlit/secrets.toml") |
| except Exception as e: |
| logger.warning(f"Could not load from secrets.toml: {e}") |
|
|
| if not TAVILY_API_KEY: |
| logger.warning("TAVILY_API_KEY not found in environment. Search features will fail.") |
| else: |
| logger.info(f"TAVILY_API_KEY found: {TAVILY_API_KEY[:4]}...") |
|
|
| |
| app = FastAPI(title="Aegis Tavily MCP Server") |
| tavily = TavilyClient(api_key=TAVILY_API_KEY) |
|
|
| @app.post("/research") |
| async def perform_research(payload: dict): |
| """ |
| Performs a search for each query using the Tavily API. |
| Expects a payload like: |
| { |
| "queries": ["query1", "query2"], |
| "search_depth": "basic" or "advanced" (optional, default basic) |
| } |
| """ |
| queries = payload.get("queries") |
| search_depth = payload.get("search_depth", "basic") |
|
|
| if not queries or not isinstance(queries, list): |
| logger.error("Validation Error: 'queries' must be a non-empty list.") |
| raise HTTPException(status_code=400, detail="'queries' must be a non-empty list.") |
|
|
| logger.info(f"Received research request for {len(queries)} queries. Search depth: {search_depth}") |
| |
| |
| all_results = [] |
| try: |
| |
| for query in queries: |
| logger.info(f"Performing search for query: '{query}'") |
| |
| response = tavily.search( |
| query=query, |
| search_depth=search_depth, |
| max_results=5 |
| ) |
| |
| all_results.append({"query": query, "results": response["results"]}) |
| |
| logger.info(f"Successfully retrieved results for all queries from Tavily API.") |
| return {"status": "success", "data": all_results} |
|
|
| except Exception as e: |
| logger.error(f"Tavily API Error (likely rate limit): {e}. Switching to MOCK DATA fallback.") |
| |
| mock_results = [] |
| import random |
| from datetime import datetime |
| |
| |
| sentiments = ["Bullish", "Bearish", "Neutral", "Volatile", "Cautious"] |
| events = ["Earnings Surprise", "New Product Launch", "Regulatory Update", "Sector Rotation", "Macro Headwinds"] |
| |
| current_time = datetime.now().strftime("%H:%M") |
| |
| for query in queries: |
| |
| s = random.choice(sentiments) |
| e = random.choice(events) |
| |
| mock_results.append({ |
| "query": query, |
| "results": [ |
| { |
| "title": f"[{current_time}] Market Update: {s} Sentiment for {query}", |
| "content": f"Live market data at {current_time} indicates a {s} trend for {query}. Analysts are tracking a potential {e} that could impact short-term price action. Volume remains high as traders adjust positions.", |
| "url": "http://mock-source.com/market-update" |
| }, |
| { |
| "title": f"[{current_time}] Sector Alert: {e} affecting {query}", |
| "content": f"Breaking: A significant {e} is rippling through the sector, heavily influencing {query}. Experts advise monitoring key resistance levels. (Simulated Real-Time Data)", |
| "url": "http://mock-source.com/sector-alert" |
| } |
| ] |
| }) |
| return {"status": "success", "data": mock_results} |
|
|
| @app.get("/") |
| def read_root(): |
| return {"message": "Aegis Tavily MCP Server is operational."} |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="127.0.0.1", port=8001) |