""" API Handler for Gemini, Tavily, and Knowledge Graph APIs Implements optimized sequential async calls for mindmap generation This module orchestrates API calls in the following sequence: 1. Tavily API: Gather broad web context and related terms 2. Knowledge Graph API: Get structured entity data using Tavily results 3. Gemini API: Synthesize comprehensive mindmap structure """ import asyncio import json import re from typing import Dict, List, Any, Optional from tavily import TavilyClient import google.generativeai as genai from google.cloud import enterpriseknowledgegraph as ekg import logging import requests # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class APIHandler: """ Handles all API interactions with optimized sequential processing This class manages communication with three external APIs: - Tavily: Real-time web search and content extraction - Knowledge Graph: Structured entity and relationship data - Gemini: AI-powered synthesis and structure generation """ def __init__(self, gemini_key: str, tavily_key: str, kg_api_key: str): """ Initialize API clients with provided credentials Args: gemini_key: Gemini API key tavily_key: Tavily API key google_project_id: Google Cloud project ID """ self.gemini_key = gemini_key self.tavily_key = tavily_key self.kg_api_key = kg_api_key # Initialize Tavily client try: self.tavily_client = TavilyClient(api_key=tavily_key) logger.info("✅ Tavily client initialized") except Exception as e: logger.error(f"❌ Tavily initialization failed: {e}") self.tavily_client = None # Initialize Gemini try: genai.configure(api_key=gemini_key) self.gemini_model = genai.GenerativeModel('gemini-2.0-flash') logger.info("✅ Gemini client initialized") except Exception as e: logger.error(f"❌ Gemini initialization failed: {e}") self.gemini_model = None async def fetch_tavily_data(self, keyword: str) -> Dict[str, Any]: """ Step 1: Fetch related terms and context from Tavily API This method performs a web search to gather: - Related technical terms - Contextual information - Source URLs for reference Args: keyword: Technical keyword to search Returns: Dictionary containing: - key_terms: List of related terms (max 15) - context: Aggregated context from top results - sources: List of source URLs """ logger.info(f"🔍 Step 1: Fetching Tavily data for '{keyword}'") if not self.tavily_client: logger.warning("Tavily client not available, using fallback") return {'key_terms': [], 'context': '', 'sources': []} try: # Perform advanced search response = self.tavily_client.search( query=f"{keyword} technical overview concepts", search_depth="advanced", max_results=10, include_domains=[], exclude_domains=[] ) # Extract key terms and context key_terms = set() context_parts = [] sources = [] for result in response.get('results', []): content = result.get('content', '') url = result.get('url', '') # Store context and source if content: context_parts.append(content) if url: sources.append(url) # Extract meaningful terms (simple extraction) words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', content) technical_words = [w for w in words if len(w) > 4] key_terms.update(technical_words[:20]) # Limit and format results key_terms_list = list(key_terms)[:15] context = ' '.join(context_parts[:3])[:2000] # Limit context length logger.info(f"✅ Tavily: Found {len(key_terms_list)} key terms from {len(sources)} sources") return { 'key_terms': key_terms_list, 'context': context, 'sources': sources } except Exception as e: logger.error(f"❌ Tavily API error: {e}") return { 'key_terms': [keyword], 'context': f"Technical information about {keyword}", 'sources': [] } async def fetch_knowledge_graph_data(self, keyword: str) -> Dict[str, Any]: if not self.kg_api_key: print("⚠️ Knowledge Graph skipped (no API key)") return {} url = "https://kgsearch.googleapis.com/v1/entities:search" params = { 'query': keyword, 'limit': 5, 'key': self.kg_api_key, } try: response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() entities = [] for item in data.get('itemListElement', [])[:5]: result = item.get('result', {}) entities.append({ 'name': result.get('name', ''), 'description': result.get('description', '') }) return {'entities': entities, 'relationships': []} except Exception as e: print(f" ✗ Error querying Knowledge Graph: {e}") return {} async def generate_gemini_mindmap( self, keyword: str, tavily_data: Dict[str, Any], kg_data: Dict[str, Any] ) -> Dict[str, Any]: """ Step 3: Use Gemini to synthesize comprehensive mindmap structure Combines data from Tavily and Knowledge Graph to create a well-structured, hierarchical mindmap. Args: keyword: Main technical keyword tavily_data: Data from Tavily API (context, terms, sources) kg_data: Data from Knowledge Graph API (entities, relationships) Returns: Dictionary containing complete mindmap structure: - center: Central node (keyword) - nodes: List of node dictionaries - edges: List of edge dictionaries """ logger.info(f"🔍 Step 3: Generating Gemini mindmap for '{keyword}'") if not self.gemini_model: logger.warning("Gemini model not available, using fallback") return self._create_fallback_mindmap(keyword, tavily_data, kg_data) try: # Prepare context for Gemini key_terms_str = ', '.join(tavily_data.get('key_terms', [])[:10]) entities_info = [] for entity in kg_data.get('entities', [])[:5]: entities_info.append( f"- {entity['name']}: {entity['description']}" ) entities_str = '\n'.join(entities_info) if entities_info else "No entities found" context_snippet = tavily_data.get('context', '')[:1000] # Construct enriched prompt prompt = f"""You are a technical knowledge expert creating mindmap structures. Generate a comprehensive radial mindmap structure for: "{keyword}" Web Context: {context_snippet} Related Terms Discovered: {key_terms_str} Knowledge Graph Entities: {entities_str} Create a JSON mindmap with: 1. Center node: "{keyword}" 2. Primary nodes (5-7): Major categories/aspects of this technical topic 3. Secondary nodes (2-3 per primary): Specific concepts, tools, or subtopics Requirements: - Each node must have: id, label, level (1=primary, 2=secondary), description - Each edge must have: from (node id), to (node id), label (relationship type) - Use descriptive labels and meaningful relationships - Keep descriptions concise (under 100 chars) Output ONLY valid JSON in this exact format: {{ "center": "{keyword}", "nodes": [ {{"id": "node1", "label": "Category Name", "level": 1, "description": "Brief explanation"}}, {{"id": "node2", "label": "Subconcept", "level": 2, "description": "Specific detail"}} ], "edges": [ {{"from": "center", "to": "node1", "label": "includes"}}, {{"from": "node1", "to": "node2", "label": "contains"}} ] }} Generate the JSON now:""" # Call Gemini API response = self.gemini_model.generate_content(prompt) response_text = response.text.strip() # Clean response (remove markdown code blocks if present) if '```json' in response_text: response_text = response_text.split('```json')[1].split('```')[0].strip() elif '```' in response_text: response_text = response_text.split('```')[1].split('```')[0].strip() # Parse JSON mindmap_data = json.loads(response_text) # Validate structure if 'center' not in mindmap_data: mindmap_data['center'] = keyword if 'nodes' not in mindmap_data: mindmap_data['nodes'] = [] if 'edges' not in mindmap_data: mindmap_data['edges'] = [] logger.info(f"✅ Gemini: Generated mindmap with {len(mindmap_data['nodes'])} nodes") return mindmap_data except json.JSONDecodeError as e: logger.error(f"❌ Gemini JSON parse error: {e}") return self._create_fallback_mindmap(keyword, tavily_data, kg_data) except Exception as e: logger.error(f"❌ Gemini API error: {e}") return self._create_fallback_mindmap(keyword, tavily_data, kg_data) def _create_fallback_mindmap( self, keyword: str, tavily_data: Dict[str, Any], kg_data: Dict[str, Any] ) -> Dict[str, Any]: """ Create a basic fallback mindmap when Gemini fails Args: keyword: Main keyword tavily_data: Tavily results kg_data: Knowledge Graph results Returns: Basic mindmap structure """ logger.info("Creating fallback mindmap structure") nodes = [] edges = [] # Add primary nodes from key terms key_terms = tavily_data.get('key_terms', [])[:6] for i, term in enumerate(key_terms): node_id = f"primary_{i}" nodes.append({ 'id': node_id, 'label': term, 'level': 1, 'description': f"Related concept to {keyword}" }) edges.append({ 'from': 'center', 'to': node_id, 'label': 'related_to' }) # Add secondary nodes from entities entities = kg_data.get('entities', [])[:4] for i, entity in enumerate(entities): node_id = f"secondary_{i}" nodes.append({ 'id': node_id, 'label': entity['name'], 'level': 2, 'description': entity['description'][:100] }) # Connect to first primary node if available if key_terms: edges.append({ 'from': 'primary_0', 'to': node_id, 'label': 'includes' }) return { 'center': keyword, 'nodes': nodes, 'edges': edges } async def fetch_all_data(self, keyword: str) -> Dict[str, Any]: """ Orchestrate all API calls in optimized sequence This is the main entry point that executes the 3-step process: 1. Tavily → Get web context and related terms 2. Knowledge Graph → Get structured entities (using Tavily results) 3. Gemini → Synthesize comprehensive mindmap (using both) Args: keyword: Technical keyword to analyze Returns: Complete result dictionary with: - mindmap: Full mindmap structure - metadata: Additional information (sources, counts, etc.) """ logger.info(f"\n{'='*60}") logger.info(f"Starting mindmap generation for: '{keyword}'") logger.info(f"{'='*60}") try: # Step 1: Fetch Tavily data (context + terms) tavily_data = await self.fetch_tavily_data(keyword) # Step 2: Fetch Knowledge Graph data (using Tavily results) kg_data = await self.fetch_knowledge_graph_data(keyword) # Step 3: Generate mindmap with Gemini (using both results) mindmap_data = await self.generate_gemini_mindmap( keyword, tavily_data, kg_data ) # Compile metadata metadata = { 'keyword': keyword, 'tavily_sources': tavily_data.get('sources', []), 'kg_entities_count': len(kg_data.get('entities', [])), 'total_nodes': len(mindmap_data.get('nodes', [])), 'total_edges': len(mindmap_data.get('edges', [])) } logger.info(f"{'='*60}") logger.info(f"✅ Mindmap generation complete!") logger.info(f" - Nodes: {metadata['total_nodes']}") logger.info(f" - Edges: {metadata['total_edges']}") logger.info(f" - Sources: {len(metadata['tavily_sources'])}") logger.info(f"{'='*60}\n") return { 'mindmap': mindmap_data, 'metadata': metadata } except Exception as e: logger.error(f"❌ Critical error in fetch_all_data: {e}") # Return minimal fallback return { 'mindmap': { 'center': keyword, 'nodes': [{ 'id': 'fallback_1', 'label': 'Error generating mindmap', 'level': 1, 'description': 'Please check API configuration' }], 'edges': [{ 'from': 'center', 'to': 'fallback_1', 'label': 'error' }] }, 'metadata': { 'keyword': keyword, 'tavily_sources': [], 'kg_entities_count': 0, 'total_nodes': 1, 'total_edges': 1, 'error': str(e) } } # Synchronous wrapper for Streamlit compatibility def fetch_mindmap_data( keyword: str, gemini_key: str, tavily_key: str, kg_api_key: str ) -> Dict[str, Any]: """ Synchronous wrapper for async API calls (Streamlit-compatible) """ handler = APIHandler( gemini_key=gemini_key, tavily_key=tavily_key, kg_api_key=kg_api_key ) return asyncio.run(handler.fetch_all_data(keyword)) if __name__ == "__main__": # Test the API handler print("API Handler Module - Ready for import") print("Use fetch_mindmap_data() to generate mindmaps")