""" AI-Powered Search for Telegram Analytics Supports: Ollama (local), Groq (free API), Google Gemini (free API) """ import sqlite3 import json import re from datetime import datetime from typing import List, Dict, Any, Optional import os # Try to import AI libraries try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False try: from groq import Groq HAS_GROQ = True except ImportError: HAS_GROQ = False try: import google.generativeai as genai HAS_GEMINI = True except ImportError: HAS_GEMINI = False class AISearchEngine: """AI-powered natural language search for Telegram messages.""" def __init__(self, db_path: str, provider: str = "ollama", api_key: str = None): """ Initialize AI search engine. Args: db_path: Path to SQLite database provider: "ollama", "groq", or "gemini" api_key: API key for Groq or Gemini (not needed for Ollama) """ self.db_path = db_path self.provider = provider self.api_key = api_key or os.getenv(f"{provider.upper()}_API_KEY") # Initialize provider if provider == "groq" and HAS_GROQ: self.client = Groq(api_key=self.api_key) self.model = "llama-3.1-70b-versatile" elif provider == "gemini" and HAS_GEMINI: genai.configure(api_key=self.api_key) # Using 2.5 Flash - free tier, fast, good for SQL self.client = genai.GenerativeModel("gemini-2.5-flash") elif provider == "ollama": self.ollama_url = os.getenv("OLLAMA_URL", "http://localhost:11434") self.model = os.getenv("OLLAMA_MODEL", "llama3.1") else: raise ValueError(f"Provider {provider} not available. Install required packages.") def _get_db_schema(self) -> str: """Dynamically read schema from the actual database to stay in sync.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get all tables and their columns cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name") tables = [row[0] for row in cursor.fetchall()] schema_parts = ["Database Schema:"] for table in tables: cursor.execute(f"PRAGMA table_info({table})") cols = cursor.fetchall() col_names = [f"{c[1]} ({c[2]})" for c in cols] schema_parts.append(f" - {table}: {', '.join(col_names)}") # Note virtual tables (FTS5) separately cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND sql LIKE '%fts5%'") fts_tables = [row[0] for row in cursor.fetchall()] if fts_tables: schema_parts.append(f"\n FTS5 tables (use MATCH for search): {', '.join(fts_tables)}") conn.close() schema_parts.append(""" Key notes: - date_unixtime: Unix timestamp (INTEGER), use for date comparisons - date: ISO format string (TEXT), use for display - text_plain: Message text content - text_length: Character count of the message - has_links: 1 if message contains URL, 0 otherwise (note: plural) - has_media: 1 if message has any media attachment - has_photo: 1 if message has a photo specifically - from_id: TEXT user ID (e.g., 'user356173100') - For text search: SELECT * FROM messages WHERE id IN (SELECT rowid FROM messages_fts WHERE messages_fts MATCH 'term') """) return '\n'.join(schema_parts) def _get_sample_data(self) -> str: """Get sample data for context.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get user list cursor.execute(""" SELECT from_name, COUNT(*) as cnt FROM messages WHERE from_name IS NOT NULL GROUP BY from_name ORDER BY cnt DESC LIMIT 10 """) users = cursor.fetchall() # Get date range cursor.execute("SELECT MIN(date), MAX(date) FROM messages") date_range = cursor.fetchone() conn.close() return f""" Top users: {', '.join([u[0] for u in users])} Date range: {date_range[0]} to {date_range[1]} """ def _build_prompt(self, user_query: str) -> str: """Build prompt for AI model.""" schema = self._get_db_schema() sample = self._get_sample_data() return f"""You are a SQL query generator for a Telegram chat database. Your task is to convert natural language questions into SQLite queries. {schema} {sample} IMPORTANT RULES: 1. Return ONLY valid SQLite query, no explanations 2. For text search, use: SELECT * FROM messages WHERE id IN (SELECT id FROM messages_fts WHERE messages_fts MATCH 'search_term') 3. For Hebrew text, the FTS5 will handle it correctly 4. Always include relevant columns like date, from_name, text_plain 5. Limit results to 50 unless specified 6. For "who" questions, GROUP BY from_name and COUNT(*) 7. For "when" questions, include date in SELECT 8. For threads/replies, JOIN messages m2 ON m1.reply_to_message_id = m2.id User question: {user_query} SQLite query:""" def _call_ollama(self, prompt: str) -> str: """Call Ollama API.""" if not HAS_REQUESTS: raise ImportError("requests library required for Ollama") response = requests.post( f"{self.ollama_url}/api/generate", json={ "model": self.model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, "num_predict": 500 } }, timeout=60 ) response.raise_for_status() return response.json()["response"] def _call_groq(self, prompt: str) -> str: """Call Groq API.""" response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.1, max_tokens=500 ) return response.choices[0].message.content def _call_gemini(self, prompt: str) -> str: """Call Google Gemini API.""" response = self.client.generate_content(prompt) return response.text def _generate_sql(self, user_query: str) -> str: """Generate SQL from natural language query.""" prompt = self._build_prompt(user_query) if self.provider == "ollama": response = self._call_ollama(prompt) elif self.provider == "groq": response = self._call_groq(prompt) elif self.provider == "gemini": response = self._call_gemini(prompt) else: raise ValueError(f"Unknown provider: {self.provider}") # Extract SQL from response sql = response.strip() # Clean up common issues - handle various code block formats sql = re.sub(r'^```\w*\s*', '', sql) # Remove opening ```sql or ``` sql = re.sub(r'\s*```$', '', sql) # Remove closing ``` sql = re.sub(r'^```', '', sql, flags=re.MULTILINE) # Remove any remaining ``` sql = sql.strip() # Try to extract SELECT statement if there's text before it select_match = re.search(r'(SELECT\s+.+?)(?:;|$)', sql, re.IGNORECASE | re.DOTALL) if select_match: sql = select_match.group(1).strip() # Ensure it's a SELECT query for safety if not sql.upper().startswith("SELECT"): raise ValueError(f"AI generated non-SELECT query: {sql[:100]}") return sql def _execute_sql(self, sql: str) -> List[Dict[str, Any]]: """Execute SQL and return results as list of dicts.""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: cursor.execute(sql) rows = cursor.fetchall() results = [dict(row) for row in rows] except sqlite3.Error as e: results = [{"error": str(e), "sql": sql}] finally: conn.close() return results def _generate_answer(self, user_query: str, results: List[Dict], sql: str) -> str: """Generate natural language answer from results.""" if not results: return "לא נמצאו תוצאות." if "error" in results[0]: return f"שגיאה בשאילתה: {results[0]['error']}" # Build answer prompt results_str = json.dumps(results[:20], ensure_ascii=False, indent=2) answer_prompt = f"""Based on the following query results, provide a concise answer in Hebrew. User question: {user_query} Query results (JSON): {results_str} Total results: {len(results)} Provide a helpful, concise answer in Hebrew. Include specific names, dates, and numbers from the results. If showing a list, format it nicely. Keep it brief but informative.""" if self.provider == "ollama": answer = self._call_ollama(answer_prompt) elif self.provider == "groq": answer = self._call_groq(answer_prompt) elif self.provider == "gemini": answer = self._call_gemini(answer_prompt) return answer def context_search(self, query: str, user_name: str = None) -> Dict[str, Any]: """ Hybrid context-aware search - combines FTS5 keyword search with AI reasoning. 1. AI extracts user name and relevant keywords from query 2. FTS5 finds messages matching keywords (fast, searches ALL messages) 3. AI reads relevant messages and reasons to find the answer Example: "באיזה בית חולים האחות עובדת?" - Extracts: user="האחות", keywords=["בית חולים", "עבודה", "מחלקה", "סורוקה", ...] - FTS5 finds messages from האחות containing these keywords - AI reads and infers the answer """ try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # Step 1: AI extracts user name AND relevant keywords extract_prompt = f"""Analyze this question and extract: 1. USER_NAME: The specific person being asked about (or NONE if not about a specific person) 2. KEYWORDS: Hebrew keywords to search for in their messages (related to the question topic) Question: {query} Return in this exact format (one per line): USER_NAME: KEYWORDS: Example for "באיזה בית חולים האחות עובדת?": USER_NAME: האחות KEYWORDS: בית חולים, עבודה, מחלקה, סורוקה, רמבם, איכילוב, שיבא, הדסה, טיפול נמרץ, אחות Extract:""" if self.provider == "gemini": extraction = self._call_gemini(extract_prompt).strip() elif self.provider == "groq": extraction = self._call_groq(extract_prompt).strip() else: extraction = self._call_ollama(extract_prompt).strip() # Parse extraction user_name = None keywords = [] for line in extraction.split('\n'): if line.startswith('USER_NAME:'): name = line.replace('USER_NAME:', '').strip() if name.upper() != 'NONE' and len(name) < 50: user_name = name elif line.startswith('KEYWORDS:'): kw_str = line.replace('KEYWORDS:', '').strip() keywords = [k.strip() for k in kw_str.split(',') if k.strip()] messages = [] # Step 2: Hybrid retrieval - FTS5 keyword search + recent messages if user_name and keywords: # Build FTS5 query for keywords fts_query = ' OR '.join(keywords[:10]) # Limit to 10 keywords # Search for messages from user containing keywords cursor = conn.execute(""" SELECT date, from_name, text FROM messages WHERE from_name LIKE ? AND id IN (SELECT id FROM messages_fts WHERE messages_fts MATCH ?) ORDER BY date DESC LIMIT 100 """, (f"%{user_name}%", fts_query)) messages = [dict(row) for row in cursor.fetchall()] # Also add some recent messages for context (might contain relevant info without keywords) cursor = conn.execute(""" SELECT date, from_name, text FROM messages WHERE from_name LIKE ? ORDER BY date DESC LIMIT 50 """, (f"%{user_name}%",)) recent = [dict(row) for row in cursor.fetchall()] # Combine and deduplicate seen_texts = {m['text'] for m in messages if m['text']} for m in recent: if m['text'] and m['text'] not in seen_texts: messages.append(m) seen_texts.add(m['text']) elif user_name: # No keywords, just get user's messages cursor = conn.execute(""" SELECT date, from_name, text FROM messages WHERE from_name LIKE ? ORDER BY date DESC LIMIT 200 """, (f"%{user_name}%",)) messages = [dict(row) for row in cursor.fetchall()] elif keywords: # No user, search all messages for keywords fts_query = ' OR '.join(keywords[:10]) cursor = conn.execute(""" SELECT date, from_name, text FROM messages WHERE id IN (SELECT id FROM messages_fts WHERE messages_fts MATCH ?) ORDER BY date DESC LIMIT 100 """, (fts_query,)) messages = [dict(row) for row in cursor.fetchall()] else: # Fallback: recent messages cursor = conn.execute(""" SELECT date, from_name, text FROM messages WHERE text IS NOT NULL AND text != '' ORDER BY date DESC LIMIT 100 """) messages = [dict(row) for row in cursor.fetchall()] conn.close() if not messages: return { "query": query, "answer": "לא נמצאו הודעות רלוונטיות", "context_messages": 0, "keywords_used": keywords, "mode": "context_search" } # Step 3: AI reasons over the retrieved messages context_text = "\n".join([ f"[{m['date']}] {m['from_name']}: {m['text'][:500]}" for m in messages if m['text'] ]) reason_prompt = f"""You are analyzing a Telegram chat history to answer a question. Read the messages carefully and infer the answer from context clues. The user may not have stated things directly - look for hints, mentions, and implications. Question: {query} Chat messages (sorted by relevance and date): {context_text} Based on these messages, answer the question in Hebrew. If you can infer information (like workplace, location, profession) from context clues, do so. Cite specific messages when possible. If you truly cannot find any relevant information, say so. Answer:""" if self.provider == "gemini": answer = self._call_gemini(reason_prompt) elif self.provider == "groq": answer = self._call_groq(reason_prompt) else: answer = self._call_ollama(reason_prompt) return { "query": query, "answer": answer, "context_user": user_name, "context_messages": len(messages), "keywords_used": keywords, "mode": "context_search" } except Exception as e: return { "query": query, "error": f"Context search error: {str(e)}", "mode": "context_search" } def search(self, query: str, generate_answer: bool = True) -> Dict[str, Any]: """ Perform AI-powered search. Args: query: Natural language question in Hebrew or English generate_answer: Whether to generate natural language answer Returns: Dict with sql, results, and optionally answer """ try: # Generate SQL sql = self._generate_sql(query) # Execute query results = self._execute_sql(sql) response = { "query": query, "sql": sql, "results": results, "count": len(results) } # Generate natural language answer if generate_answer and results and "error" not in results[0]: response["answer"] = self._generate_answer(query, results, sql) return response except Exception as e: return { "query": query, "error": str(e), "results": [] } def get_thread(self, message_id: int) -> List[Dict[str, Any]]: """Get full conversation thread for a message.""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() thread = [] visited = set() def get_parent(msg_id): """Recursively get parent messages.""" if msg_id in visited: return visited.add(msg_id) cursor.execute(""" SELECT message_id, date, from_name, text, reply_to_message_id FROM messages WHERE message_id = ? """, (msg_id,)) row = cursor.fetchone() if row: if row['reply_to_message_id']: get_parent(row['reply_to_message_id']) thread.append(dict(row)) def get_children(msg_id): """Get all replies to a message.""" cursor.execute(""" SELECT message_id, date, from_name, text, reply_to_message_id FROM messages WHERE reply_to_message_id = ? ORDER BY date """, (msg_id,)) for row in cursor.fetchall(): if row['message_id'] not in visited: visited.add(row['message_id']) thread.append(dict(row)) get_children(row['message_id']) # Get the original message and its parents get_parent(message_id) # Get all replies get_children(message_id) conn.close() # Sort by date thread.sort(key=lambda x: x['date']) return thread def find_similar_messages(self, message_id: int, limit: int = 10) -> List[Dict[str, Any]]: """Find messages similar to the given message using trigrams.""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get the original message cursor.execute("SELECT text FROM messages WHERE message_id = ?", (message_id,)) row = cursor.fetchone() if not row or not row['text']: return [] # Use FTS5 to find similar messages words = row['text'].split()[:5] # Use first 5 words search_term = ' OR '.join(words) cursor.execute(""" SELECT m.message_id, m.date, m.from_name, m.text FROM messages m WHERE m.id IN ( SELECT id FROM messages_fts WHERE messages_fts MATCH ? ) AND m.message_id != ? LIMIT ? """, (search_term, message_id, limit)) results = [dict(row) for row in cursor.fetchall()] conn.close() return results class ChatViewer: """View chat messages like Telegram.""" def __init__(self, db_path: str): self.db_path = db_path def get_messages(self, offset: int = 0, limit: int = 50, user_id: str = None, search: str = None, date_from: str = None, date_to: str = None, has_media: bool = None, has_link: bool = None) -> Dict[str, Any]: """ Get messages with Telegram-like pagination. Returns messages in reverse chronological order (newest first). """ conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Build query conditions = [] params = [] if user_id: conditions.append("from_id = ?") params.append(user_id) if date_from: conditions.append("date >= ?") params.append(date_from) if date_to: conditions.append("date <= ?") params.append(date_to) if has_media is not None: if has_media: conditions.append("media_type IS NOT NULL") else: conditions.append("media_type IS NULL") if has_link is not None: conditions.append("has_link = ?") params.append(1 if has_link else 0) # Handle search if search: conditions.append("""id IN ( SELECT id FROM messages_fts WHERE messages_fts MATCH ? )""") params.append(search) where_clause = " AND ".join(conditions) if conditions else "1=1" # Get total count cursor.execute(f"SELECT COUNT(*) FROM messages WHERE {where_clause}", params) total = cursor.fetchone()[0] # Get messages query = f""" SELECT m.message_id, m.date, m.from_id, m.from_name, m.text, m.reply_to_message_id, m.forwarded_from, m.media_type, m.has_link, m.char_count, r.from_name as reply_to_name, r.text as reply_to_text FROM messages m LEFT JOIN messages r ON m.reply_to_message_id = r.message_id WHERE {where_clause} ORDER BY m.date DESC LIMIT ? OFFSET ? """ params.extend([limit, offset]) cursor.execute(query, params) messages = [dict(row) for row in cursor.fetchall()] conn.close() return { "messages": messages, "total": total, "offset": offset, "limit": limit, "has_more": offset + limit < total } def get_message_context(self, message_id: int, before: int = 10, after: int = 10) -> Dict[str, Any]: """Get messages around a specific message (for context view).""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get the target message date cursor.execute("SELECT date FROM messages WHERE message_id = ?", (message_id,)) row = cursor.fetchone() if not row: return {"messages": [], "target_id": message_id} target_date = row['date'] # Get messages before cursor.execute(""" SELECT message_id, date, from_id, from_name, text, reply_to_message_id, media_type, has_link FROM messages WHERE date < ? ORDER BY date DESC LIMIT ? """, (target_date, before)) before_msgs = list(reversed([dict(row) for row in cursor.fetchall()])) # Get target message cursor.execute(""" SELECT message_id, date, from_id, from_name, text, reply_to_message_id, media_type, has_link FROM messages WHERE message_id = ? """, (message_id,)) target_msg = dict(cursor.fetchone()) # Get messages after cursor.execute(""" SELECT message_id, date, from_id, from_name, text, reply_to_message_id, media_type, has_link FROM messages WHERE date > ? ORDER BY date ASC LIMIT ? """, (target_date, after)) after_msgs = [dict(row) for row in cursor.fetchall()] conn.close() return { "messages": before_msgs + [target_msg] + after_msgs, "target_id": message_id } def get_user_conversation(self, user1_id: str, user2_id: str, limit: int = 100) -> List[Dict]: """Get conversation between two users (their replies to each other).""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT m1.message_id, m1.date, m1.from_id, m1.from_name, m1.text, m1.reply_to_message_id, m2.from_name as reply_to_name FROM messages m1 LEFT JOIN messages m2 ON m1.reply_to_message_id = m2.message_id WHERE (m1.from_id = ? AND m2.from_id = ?) OR (m1.from_id = ? AND m2.from_id = ?) ORDER BY m1.date DESC LIMIT ? """, (user1_id, user2_id, user2_id, user1_id, limit)) results = [dict(row) for row in cursor.fetchall()] conn.close() return results # CLI for testing if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="AI-powered Telegram search") parser.add_argument("--db", required=True, help="Database path") parser.add_argument("--provider", default="ollama", choices=["ollama", "groq", "gemini"]) parser.add_argument("--query", help="Search query") parser.add_argument("--api-key", help="API key for cloud providers") args = parser.parse_args() if args.query: engine = AISearchEngine(args.db, args.provider, args.api_key) result = engine.search(args.query) print(f"\nQuery: {result['query']}") print(f"SQL: {result.get('sql', 'N/A')}") print(f"Results: {result.get('count', 0)}") if 'answer' in result: print(f"\nAnswer:\n{result['answer']}") if result.get('results'): print(f"\nFirst 3 results:") for r in result['results'][:3]: print(json.dumps(r, ensure_ascii=False, indent=2))