MindMap / src /utils /api_handler.py
daemon03's picture
inital commit
240e5bc
raw
history blame
15.5 kB
"""
API Handler for Gemini, Tavily, and Knowledge Graph APIs
Implements optimized sequential async calls for mindmap generation
This module orchestrates API calls in the following sequence:
1. Tavily API: Gather broad web context and related terms
2. Knowledge Graph API: Get structured entity data using Tavily results
3. Gemini API: Synthesize comprehensive mindmap structure
"""
import asyncio
import json
import re
from typing import Dict, List, Any, Optional
from tavily import TavilyClient
import google.generativeai as genai
from google.cloud import enterpriseknowledgegraph as ekg
import logging
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class APIHandler:
"""
Handles all API interactions with optimized sequential processing
This class manages communication with three external APIs:
- Tavily: Real-time web search and content extraction
- Knowledge Graph: Structured entity and relationship data
- Gemini: AI-powered synthesis and structure generation
"""
def __init__(self, gemini_key: str, tavily_key: str, kg_api_key: str):
"""
Initialize API clients with provided credentials
Args:
gemini_key: Gemini API key
tavily_key: Tavily API key
google_project_id: Google Cloud project ID
"""
self.gemini_key = gemini_key
self.tavily_key = tavily_key
self.kg_api_key = kg_api_key
# Initialize Tavily client
try:
self.tavily_client = TavilyClient(api_key=tavily_key)
logger.info("βœ… Tavily client initialized")
except Exception as e:
logger.error(f"❌ Tavily initialization failed: {e}")
self.tavily_client = None
# Initialize Gemini
try:
genai.configure(api_key=gemini_key)
self.gemini_model = genai.GenerativeModel('gemini-2.0-flash')
logger.info("βœ… Gemini client initialized")
except Exception as e:
logger.error(f"❌ Gemini initialization failed: {e}")
self.gemini_model = None
async def fetch_tavily_data(self, keyword: str) -> Dict[str, Any]:
"""
Step 1: Fetch related terms and context from Tavily API
This method performs a web search to gather:
- Related technical terms
- Contextual information
- Source URLs for reference
Args:
keyword: Technical keyword to search
Returns:
Dictionary containing:
- key_terms: List of related terms (max 15)
- context: Aggregated context from top results
- sources: List of source URLs
"""
logger.info(f"πŸ” Step 1: Fetching Tavily data for '{keyword}'")
if not self.tavily_client:
logger.warning("Tavily client not available, using fallback")
return {'key_terms': [], 'context': '', 'sources': []}
try:
# Perform advanced search
response = self.tavily_client.search(
query=f"{keyword} technical overview concepts",
search_depth="advanced",
max_results=10,
include_domains=[],
exclude_domains=[]
)
# Extract key terms and context
key_terms = set()
context_parts = []
sources = []
for result in response.get('results', []):
content = result.get('content', '')
url = result.get('url', '')
# Store context and source
if content:
context_parts.append(content)
if url:
sources.append(url)
# Extract meaningful terms (simple extraction)
words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', content)
technical_words = [w for w in words if len(w) > 4]
key_terms.update(technical_words[:20])
# Limit and format results
key_terms_list = list(key_terms)[:15]
context = ' '.join(context_parts[:3])[:2000] # Limit context length
logger.info(f"βœ… Tavily: Found {len(key_terms_list)} key terms from {len(sources)} sources")
return {
'key_terms': key_terms_list,
'context': context,
'sources': sources
}
except Exception as e:
logger.error(f"❌ Tavily API error: {e}")
return {
'key_terms': [keyword],
'context': f"Technical information about {keyword}",
'sources': []
}
async def fetch_knowledge_graph_data(self, keyword: str) -> Dict[str, Any]:
if not self.kg_api_key:
print("⚠️ Knowledge Graph skipped (no API key)")
return {}
url = "https://kgsearch.googleapis.com/v1/entities:search"
params = {
'query': keyword,
'limit': 5,
'key': self.kg_api_key,
}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
entities = []
for item in data.get('itemListElement', [])[:5]:
result = item.get('result', {})
entities.append({
'name': result.get('name', ''),
'description': result.get('description', '')
})
return {'entities': entities, 'relationships': []}
except Exception as e:
print(f" βœ— Error querying Knowledge Graph: {e}")
return {}
async def generate_gemini_mindmap(
self,
keyword: str,
tavily_data: Dict[str, Any],
kg_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Step 3: Use Gemini to synthesize comprehensive mindmap structure
Combines data from Tavily and Knowledge Graph to create a
well-structured, hierarchical mindmap.
Args:
keyword: Main technical keyword
tavily_data: Data from Tavily API (context, terms, sources)
kg_data: Data from Knowledge Graph API (entities, relationships)
Returns:
Dictionary containing complete mindmap structure:
- center: Central node (keyword)
- nodes: List of node dictionaries
- edges: List of edge dictionaries
"""
logger.info(f"πŸ” Step 3: Generating Gemini mindmap for '{keyword}'")
if not self.gemini_model:
logger.warning("Gemini model not available, using fallback")
return self._create_fallback_mindmap(keyword, tavily_data, kg_data)
try:
# Prepare context for Gemini
key_terms_str = ', '.join(tavily_data.get('key_terms', [])[:10])
entities_info = []
for entity in kg_data.get('entities', [])[:5]:
entities_info.append(
f"- {entity['name']}: {entity['description']}"
)
entities_str = '\n'.join(entities_info) if entities_info else "No entities found"
context_snippet = tavily_data.get('context', '')[:1000]
# Construct enriched prompt
prompt = f"""You are a technical knowledge expert creating mindmap structures.
Generate a comprehensive radial mindmap structure for: "{keyword}"
Web Context:
{context_snippet}
Related Terms Discovered:
{key_terms_str}
Knowledge Graph Entities:
{entities_str}
Create a JSON mindmap with:
1. Center node: "{keyword}"
2. Primary nodes (5-7): Major categories/aspects of this technical topic
3. Secondary nodes (2-3 per primary): Specific concepts, tools, or subtopics
Requirements:
- Each node must have: id, label, level (1=primary, 2=secondary), description
- Each edge must have: from (node id), to (node id), label (relationship type)
- Use descriptive labels and meaningful relationships
- Keep descriptions concise (under 100 chars)
Output ONLY valid JSON in this exact format:
{{
"center": "{keyword}",
"nodes": [
{{"id": "node1", "label": "Category Name", "level": 1, "description": "Brief explanation"}},
{{"id": "node2", "label": "Subconcept", "level": 2, "description": "Specific detail"}}
],
"edges": [
{{"from": "center", "to": "node1", "label": "includes"}},
{{"from": "node1", "to": "node2", "label": "contains"}}
]
}}
Generate the JSON now:"""
# Call Gemini API
response = self.gemini_model.generate_content(prompt)
response_text = response.text.strip()
# Clean response (remove markdown code blocks if present)
if '```json' in response_text:
response_text = response_text.split('```json')[1].split('```')[0].strip()
elif '```' in response_text:
response_text = response_text.split('```')[1].split('```')[0].strip()
# Parse JSON
mindmap_data = json.loads(response_text)
# Validate structure
if 'center' not in mindmap_data:
mindmap_data['center'] = keyword
if 'nodes' not in mindmap_data:
mindmap_data['nodes'] = []
if 'edges' not in mindmap_data:
mindmap_data['edges'] = []
logger.info(f"βœ… Gemini: Generated mindmap with {len(mindmap_data['nodes'])} nodes")
return mindmap_data
except json.JSONDecodeError as e:
logger.error(f"❌ Gemini JSON parse error: {e}")
return self._create_fallback_mindmap(keyword, tavily_data, kg_data)
except Exception as e:
logger.error(f"❌ Gemini API error: {e}")
return self._create_fallback_mindmap(keyword, tavily_data, kg_data)
def _create_fallback_mindmap(
self,
keyword: str,
tavily_data: Dict[str, Any],
kg_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Create a basic fallback mindmap when Gemini fails
Args:
keyword: Main keyword
tavily_data: Tavily results
kg_data: Knowledge Graph results
Returns:
Basic mindmap structure
"""
logger.info("Creating fallback mindmap structure")
nodes = []
edges = []
# Add primary nodes from key terms
key_terms = tavily_data.get('key_terms', [])[:6]
for i, term in enumerate(key_terms):
node_id = f"primary_{i}"
nodes.append({
'id': node_id,
'label': term,
'level': 1,
'description': f"Related concept to {keyword}"
})
edges.append({
'from': 'center',
'to': node_id,
'label': 'related_to'
})
# Add secondary nodes from entities
entities = kg_data.get('entities', [])[:4]
for i, entity in enumerate(entities):
node_id = f"secondary_{i}"
nodes.append({
'id': node_id,
'label': entity['name'],
'level': 2,
'description': entity['description'][:100]
})
# Connect to first primary node if available
if key_terms:
edges.append({
'from': 'primary_0',
'to': node_id,
'label': 'includes'
})
return {
'center': keyword,
'nodes': nodes,
'edges': edges
}
async def fetch_all_data(self, keyword: str) -> Dict[str, Any]:
"""
Orchestrate all API calls in optimized sequence
This is the main entry point that executes the 3-step process:
1. Tavily β†’ Get web context and related terms
2. Knowledge Graph β†’ Get structured entities (using Tavily results)
3. Gemini β†’ Synthesize comprehensive mindmap (using both)
Args:
keyword: Technical keyword to analyze
Returns:
Complete result dictionary with:
- mindmap: Full mindmap structure
- metadata: Additional information (sources, counts, etc.)
"""
logger.info(f"\n{'='*60}")
logger.info(f"Starting mindmap generation for: '{keyword}'")
logger.info(f"{'='*60}")
try:
# Step 1: Fetch Tavily data (context + terms)
tavily_data = await self.fetch_tavily_data(keyword)
# Step 2: Fetch Knowledge Graph data (using Tavily results)
kg_data = await self.fetch_knowledge_graph_data(keyword)
# Step 3: Generate mindmap with Gemini (using both results)
mindmap_data = await self.generate_gemini_mindmap(
keyword,
tavily_data,
kg_data
)
# Compile metadata
metadata = {
'keyword': keyword,
'tavily_sources': tavily_data.get('sources', []),
'kg_entities_count': len(kg_data.get('entities', [])),
'total_nodes': len(mindmap_data.get('nodes', [])),
'total_edges': len(mindmap_data.get('edges', []))
}
logger.info(f"{'='*60}")
logger.info(f"βœ… Mindmap generation complete!")
logger.info(f" - Nodes: {metadata['total_nodes']}")
logger.info(f" - Edges: {metadata['total_edges']}")
logger.info(f" - Sources: {len(metadata['tavily_sources'])}")
logger.info(f"{'='*60}\n")
return {
'mindmap': mindmap_data,
'metadata': metadata
}
except Exception as e:
logger.error(f"❌ Critical error in fetch_all_data: {e}")
# Return minimal fallback
return {
'mindmap': {
'center': keyword,
'nodes': [{
'id': 'fallback_1',
'label': 'Error generating mindmap',
'level': 1,
'description': 'Please check API configuration'
}],
'edges': [{
'from': 'center',
'to': 'fallback_1',
'label': 'error'
}]
},
'metadata': {
'keyword': keyword,
'tavily_sources': [],
'kg_entities_count': 0,
'total_nodes': 1,
'total_edges': 1,
'error': str(e)
}
}
# Synchronous wrapper for Streamlit compatibility
def fetch_mindmap_data(
keyword: str,
gemini_key: str,
tavily_key: str,
kg_api_key: str
) -> Dict[str, Any]:
"""
Synchronous wrapper for async API calls (Streamlit-compatible)
"""
handler = APIHandler(
gemini_key=gemini_key,
tavily_key=tavily_key,
kg_api_key=kg_api_key
)
return asyncio.run(handler.fetch_all_data(keyword))
if __name__ == "__main__":
# Test the API handler
print("API Handler Module - Ready for import")
print("Use fetch_mindmap_data() to generate mindmaps")