| """
|
| D1 Database integration for OpenManus
|
| Provides interface to Cloudflare D1 database operations
|
| """
|
|
|
| from typing import Any, Dict, List, Optional, Union
|
|
|
| from app.logger import logger
|
|
|
| from .client import CloudflareClient, CloudflareError
|
|
|
|
|
| class D1Database:
|
| """Cloudflare D1 Database client"""
|
|
|
| def __init__(self, client: CloudflareClient, database_id: str):
|
| self.client = client
|
| self.database_id = database_id
|
| self.base_endpoint = f"accounts/{client.account_id}/d1/database/{database_id}"
|
|
|
| async def execute_query(
|
| self, sql: str, params: Optional[List[Any]] = None, use_worker: bool = True
|
| ) -> Dict[str, Any]:
|
| """Execute a SQL query"""
|
|
|
| query_data = {"sql": sql}
|
|
|
| if params:
|
| query_data["params"] = params
|
|
|
| try:
|
| if use_worker:
|
|
|
| response = await self.client.post(
|
| "api/database/query", data=query_data, use_worker=True
|
| )
|
| else:
|
|
|
| response = await self.client.post(
|
| f"{self.base_endpoint}/query", data=query_data
|
| )
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"D1 query execution failed: {e}")
|
| raise
|
|
|
| async def batch_execute(
|
| self, queries: List[Dict[str, Any]], use_worker: bool = True
|
| ) -> Dict[str, Any]:
|
| """Execute multiple queries in a batch"""
|
|
|
| batch_data = {"queries": queries}
|
|
|
| try:
|
| if use_worker:
|
| response = await self.client.post(
|
| "api/database/batch", data=batch_data, use_worker=True
|
| )
|
| else:
|
| response = await self.client.post(
|
| f"{self.base_endpoint}/query", data=batch_data
|
| )
|
|
|
| return response
|
|
|
| except CloudflareError as e:
|
| logger.error(f"D1 batch execution failed: {e}")
|
| raise
|
|
|
|
|
| async def create_user(
|
| self,
|
| user_id: str,
|
| username: str,
|
| email: Optional[str] = None,
|
| metadata: Optional[Dict[str, Any]] = None,
|
| ) -> Dict[str, Any]:
|
| """Create a new user"""
|
|
|
| sql = """
|
| INSERT INTO users (id, username, email, metadata)
|
| VALUES (?, ?, ?, ?)
|
| ON CONFLICT(id) DO UPDATE SET
|
| username = excluded.username,
|
| email = excluded.email,
|
| metadata = excluded.metadata,
|
| updated_at = strftime('%s', 'now')
|
| """
|
|
|
| import json
|
|
|
| params = [user_id, username, email, json.dumps(metadata or {})]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def get_user(self, user_id: str) -> Optional[Dict[str, Any]]:
|
| """Get user by ID"""
|
|
|
| sql = "SELECT * FROM users WHERE id = ?"
|
| params = [user_id]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
|
|
| if result.get("success") and result.get("result"):
|
| rows = result["result"][0].get("results", [])
|
| if rows:
|
| user = rows[0]
|
| if user.get("metadata"):
|
| import json
|
|
|
| user["metadata"] = json.loads(user["metadata"])
|
| return user
|
|
|
| return None
|
|
|
| async def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
|
| """Get user by username"""
|
|
|
| sql = "SELECT * FROM users WHERE username = ?"
|
| params = [username]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| rows = result["result"][0].get("results", [])
|
| if rows:
|
| user = rows[0]
|
| if user.get("metadata"):
|
| import json
|
|
|
| user["metadata"] = json.loads(user["metadata"])
|
| return user
|
|
|
| return None
|
|
|
|
|
| async def create_session(
|
| self,
|
| session_id: str,
|
| user_id: str,
|
| session_data: Dict[str, Any],
|
| expires_at: Optional[int] = None,
|
| ) -> Dict[str, Any]:
|
| """Create a new session"""
|
|
|
| sql = """
|
| INSERT INTO sessions (id, user_id, session_data, expires_at)
|
| VALUES (?, ?, ?, ?)
|
| """
|
|
|
| import json
|
|
|
| params = [session_id, user_id, json.dumps(session_data), expires_at]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
| """Get session by ID"""
|
|
|
| sql = """
|
| SELECT * FROM sessions
|
| WHERE id = ? AND (expires_at IS NULL OR expires_at > strftime('%s', 'now'))
|
| """
|
| params = [session_id]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| rows = result["result"][0].get("results", [])
|
| if rows:
|
| session = rows[0]
|
| if session.get("session_data"):
|
| import json
|
|
|
| session["session_data"] = json.loads(session["session_data"])
|
| return session
|
|
|
| return None
|
|
|
| async def delete_session(self, session_id: str) -> Dict[str, Any]:
|
| """Delete a session"""
|
|
|
| sql = "DELETE FROM sessions WHERE id = ?"
|
| params = [session_id]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
|
|
| async def create_conversation(
|
| self,
|
| conversation_id: str,
|
| user_id: str,
|
| title: Optional[str] = None,
|
| messages: Optional[List[Dict[str, Any]]] = None,
|
| ) -> Dict[str, Any]:
|
| """Create a new conversation"""
|
|
|
| sql = """
|
| INSERT INTO conversations (id, user_id, title, messages)
|
| VALUES (?, ?, ?, ?)
|
| """
|
|
|
| import json
|
|
|
| params = [conversation_id, user_id, title, json.dumps(messages or [])]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def get_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]:
|
| """Get conversation by ID"""
|
|
|
| sql = "SELECT * FROM conversations WHERE id = ?"
|
| params = [conversation_id]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| rows = result["result"][0].get("results", [])
|
| if rows:
|
| conversation = rows[0]
|
| if conversation.get("messages"):
|
| import json
|
|
|
| conversation["messages"] = json.loads(conversation["messages"])
|
| return conversation
|
|
|
| return None
|
|
|
| async def update_conversation_messages(
|
| self, conversation_id: str, messages: List[Dict[str, Any]]
|
| ) -> Dict[str, Any]:
|
| """Update conversation messages"""
|
|
|
| sql = """
|
| UPDATE conversations
|
| SET messages = ?, updated_at = strftime('%s', 'now')
|
| WHERE id = ?
|
| """
|
|
|
| import json
|
|
|
| params = [json.dumps(messages), conversation_id]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def get_user_conversations(
|
| self, user_id: str, limit: int = 50
|
| ) -> List[Dict[str, Any]]:
|
| """Get user's conversations"""
|
|
|
| sql = """
|
| SELECT id, user_id, title, created_at, updated_at
|
| FROM conversations
|
| WHERE user_id = ?
|
| ORDER BY updated_at DESC
|
| LIMIT ?
|
| """
|
| params = [user_id, limit]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| return result["result"][0].get("results", [])
|
|
|
| return []
|
|
|
|
|
| async def create_agent_execution(
|
| self,
|
| execution_id: str,
|
| user_id: str,
|
| session_id: Optional[str] = None,
|
| task_description: Optional[str] = None,
|
| status: str = "pending",
|
| ) -> Dict[str, Any]:
|
| """Create a new agent execution record"""
|
|
|
| sql = """
|
| INSERT INTO agent_executions (id, user_id, session_id, task_description, status)
|
| VALUES (?, ?, ?, ?, ?)
|
| """
|
|
|
| params = [execution_id, user_id, session_id, task_description, status]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def update_agent_execution(
|
| self,
|
| execution_id: str,
|
| status: Optional[str] = None,
|
| result: Optional[str] = None,
|
| execution_time: Optional[int] = None,
|
| ) -> Dict[str, Any]:
|
| """Update agent execution record"""
|
|
|
| updates = []
|
| params = []
|
|
|
| if status:
|
| updates.append("status = ?")
|
| params.append(status)
|
|
|
| if result:
|
| updates.append("result = ?")
|
| params.append(result)
|
|
|
| if execution_time is not None:
|
| updates.append("execution_time = ?")
|
| params.append(execution_time)
|
|
|
| if status in ["completed", "failed"]:
|
| updates.append("completed_at = strftime('%s', 'now')")
|
|
|
| if not updates:
|
| return {"success": True, "message": "No updates provided"}
|
|
|
| sql = f"""
|
| UPDATE agent_executions
|
| SET {', '.join(updates)}
|
| WHERE id = ?
|
| """
|
| params.append(execution_id)
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def get_agent_execution(self, execution_id: str) -> Optional[Dict[str, Any]]:
|
| """Get agent execution by ID"""
|
|
|
| sql = "SELECT * FROM agent_executions WHERE id = ?"
|
| params = [execution_id]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| rows = result["result"][0].get("results", [])
|
| if rows:
|
| return rows[0]
|
|
|
| return None
|
|
|
| async def get_user_executions(
|
| self, user_id: str, limit: int = 50
|
| ) -> List[Dict[str, Any]]:
|
| """Get user's agent executions"""
|
|
|
| sql = """
|
| SELECT * FROM agent_executions
|
| WHERE user_id = ?
|
| ORDER BY created_at DESC
|
| LIMIT ?
|
| """
|
| params = [user_id, limit]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| return result["result"][0].get("results", [])
|
|
|
| return []
|
|
|
|
|
| async def create_file_record(
|
| self,
|
| file_id: str,
|
| user_id: str,
|
| filename: str,
|
| file_key: str,
|
| file_size: int,
|
| content_type: str,
|
| bucket: str = "storage",
|
| ) -> Dict[str, Any]:
|
| """Create a file record"""
|
|
|
| sql = """
|
| INSERT INTO files (id, user_id, filename, file_key, file_size, content_type, bucket)
|
| VALUES (?, ?, ?, ?, ?, ?, ?)
|
| """
|
|
|
| params = [file_id, user_id, filename, file_key, file_size, content_type, bucket]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
| async def get_file_record(self, file_id: str) -> Optional[Dict[str, Any]]:
|
| """Get file record by ID"""
|
|
|
| sql = "SELECT * FROM files WHERE id = ?"
|
| params = [file_id]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| rows = result["result"][0].get("results", [])
|
| if rows:
|
| return rows[0]
|
|
|
| return None
|
|
|
| async def get_user_files(
|
| self, user_id: str, limit: int = 100
|
| ) -> List[Dict[str, Any]]:
|
| """Get user's files"""
|
|
|
| sql = """
|
| SELECT * FROM files
|
| WHERE user_id = ?
|
| ORDER BY created_at DESC
|
| LIMIT ?
|
| """
|
| params = [user_id, limit]
|
|
|
| result = await self.execute_query(sql, params)
|
|
|
| if result.get("success") and result.get("result"):
|
| return result["result"][0].get("results", [])
|
|
|
| return []
|
|
|
| async def delete_file_record(self, file_id: str) -> Dict[str, Any]:
|
| """Delete a file record"""
|
|
|
| sql = "DELETE FROM files WHERE id = ?"
|
| params = [file_id]
|
|
|
| return await self.execute_query(sql, params)
|
|
|
|
|
| async def initialize_schema(self) -> Dict[str, Any]:
|
| """Initialize database schema"""
|
|
|
| schema_queries = [
|
| {
|
| "sql": """CREATE TABLE IF NOT EXISTS users (
|
| id TEXT PRIMARY KEY,
|
| username TEXT UNIQUE NOT NULL,
|
| email TEXT UNIQUE,
|
| created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| updated_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| metadata TEXT
|
| )"""
|
| },
|
| {
|
| "sql": """CREATE TABLE IF NOT EXISTS sessions (
|
| id TEXT PRIMARY KEY,
|
| user_id TEXT NOT NULL,
|
| session_data TEXT,
|
| created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| expires_at INTEGER,
|
| FOREIGN KEY (user_id) REFERENCES users(id)
|
| )"""
|
| },
|
| {
|
| "sql": """CREATE TABLE IF NOT EXISTS conversations (
|
| id TEXT PRIMARY KEY,
|
| user_id TEXT NOT NULL,
|
| title TEXT,
|
| messages TEXT,
|
| created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| updated_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| FOREIGN KEY (user_id) REFERENCES users(id)
|
| )"""
|
| },
|
| {
|
| "sql": """CREATE TABLE IF NOT EXISTS files (
|
| id TEXT PRIMARY KEY,
|
| user_id TEXT NOT NULL,
|
| filename TEXT NOT NULL,
|
| file_key TEXT NOT NULL,
|
| file_size INTEGER,
|
| content_type TEXT,
|
| bucket TEXT DEFAULT 'storage',
|
| created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| FOREIGN KEY (user_id) REFERENCES users(id)
|
| )"""
|
| },
|
| {
|
| "sql": """CREATE TABLE IF NOT EXISTS agent_executions (
|
| id TEXT PRIMARY KEY,
|
| user_id TEXT NOT NULL,
|
| session_id TEXT,
|
| task_description TEXT,
|
| status TEXT DEFAULT 'pending',
|
| result TEXT,
|
| execution_time INTEGER,
|
| created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
| completed_at INTEGER,
|
| FOREIGN KEY (user_id) REFERENCES users(id)
|
| )"""
|
| },
|
| ]
|
|
|
|
|
| index_queries = [
|
| {
|
| "sql": "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)"
|
| },
|
| {
|
| "sql": "CREATE INDEX IF NOT EXISTS idx_conversations_user_id ON conversations(user_id)"
|
| },
|
| {"sql": "CREATE INDEX IF NOT EXISTS idx_files_user_id ON files(user_id)"},
|
| {
|
| "sql": "CREATE INDEX IF NOT EXISTS idx_agent_executions_user_id ON agent_executions(user_id)"
|
| },
|
| ]
|
|
|
| all_queries = schema_queries + index_queries
|
|
|
| return await self.batch_execute(all_queries)
|
|
|