| import json |
| import requests |
| from typing import Dict, List, Optional, Any, Union, Callable |
| from abc import ABC, abstractmethod |
|
|
|
|
| from .tool import Tool, Toolkit |
| from ..core.logging import logger |
| from ..core.module import BaseModule |
|
|
|
|
| class APITool(Tool): |
| """ |
| API tool wrapper that encapsulates a single API endpoint as a Tool |
| |
| Attributes: |
| name: Tool name |
| description: Tool description |
| inputs: Input parameter schema |
| required: List of required parameters |
| endpoint_config: API endpoint configuration |
| auth_config: Authentication configuration |
| function: Actual execution function |
| """ |
| |
| def __init__( |
| self, |
| name: str, |
| description: str, |
| inputs: Dict[str, Dict[str, Any]], |
| required: Optional[List[str]] = None, |
| endpoint_config: Dict[str, Any] = None, |
| auth_config: Dict[str, Any] = None, |
| function: Callable = None |
| ): |
| super().__init__(name=name, description=description, inputs=inputs, required=required) |
| self.endpoint_config = endpoint_config or {} |
| self.auth_config = auth_config or {} |
| self.function = function |
| |
| @property |
| def __name__(self): |
| return self.name |
| |
| def __call__(self, **kwargs): |
| """Execute the API call""" |
| if not self.function: |
| raise ValueError("Function not set for APITool") |
| |
| try: |
| result = self.function(**kwargs) |
| return self._process_result(result) |
| except Exception as e: |
| logger.error(f"Error calling API tool {self.name}: {str(e)}") |
| raise |
| |
| def _process_result(self, result: Any) -> Any: |
| """Process API response""" |
| if isinstance(result, requests.Response): |
| try: |
| return result.json() |
| except (ValueError, json.JSONDecodeError): |
| return result.text |
| return result |
| |
| @classmethod |
| def validate_attributes(cls): |
| """Validate attributes""" |
| |
| |
| if cls.__name__ == 'APITool': |
| return |
| |
| |
| required_attributes = { |
| "name": str, |
| "description": str, |
| "inputs": dict |
| } |
| |
| for attr, attr_type in required_attributes.items(): |
| if not hasattr(cls, attr): |
| raise ValueError(f"Attribute {attr} is required") |
| if not isinstance(getattr(cls, attr), attr_type): |
| raise ValueError(f"Attribute {attr} must be of type {attr_type}") |
|
|
| if hasattr(cls, 'required') and cls.required: |
| for required_input in cls.required: |
| if required_input not in cls.inputs: |
| raise ValueError(f"Required input '{required_input}' is not found in inputs") |
|
|
|
|
| class APIToolkit(Toolkit): |
| """ |
| API tool collection representing all endpoints of an API service |
| |
| Attributes: |
| name: Service name |
| tools: List of API tools |
| base_url: Base URL |
| auth_config: Authentication configuration |
| common_headers: Common request headers |
| """ |
| |
| def __init__( |
| self, |
| name: str, |
| tools: List[APITool], |
| base_url: str = "", |
| auth_config: Dict[str, Any] = None, |
| common_headers: Dict[str, str] = None |
| ): |
| super().__init__(name=name, tools=tools) |
| self.base_url = base_url |
| self.auth_config = auth_config or {} |
| self.common_headers = common_headers or {} |
| |
| def add_auth_to_headers(self, headers: Dict[str, str]) -> Dict[str, str]: |
| """Add authentication information to request headers""" |
| headers = headers.copy() |
| headers.update(self.common_headers) |
| |
| |
| if "api_key" in self.auth_config: |
| key_name = self.auth_config.get("key_name", "X-API-Key") |
| headers[key_name] = self.auth_config["api_key"] |
| |
| if "bearer_token" in self.auth_config: |
| headers["Authorization"] = f"Bearer {self.auth_config['bearer_token']}" |
| |
| return headers |
|
|
|
|
| class BaseAPIConverter(BaseModule, ABC): |
| """ |
| Base API converter abstract class |
| |
| Responsible for converting an API specification into an APIToolkit |
| """ |
| |
| def __init__( |
| self, |
| input_schema: Union[str, Dict[str, Any]], |
| description: str = "", |
| auth_config: Dict[str, Any] = None |
| ): |
| """ |
| Initialize the API converter |
| |
| Args: |
| input_schema: API specification, can be a file path or a dictionary |
| description: Service description |
| auth_config: Authentication configuration |
| """ |
| super().__init__() |
| self.input_schema = self._load_schema(input_schema) |
| self.description = description |
| self.auth_config = auth_config or {} |
| |
| def _load_schema(self, schema: Union[str, Dict[str, Any]]) -> Dict[str, Any]: |
| """Load API specification""" |
| if isinstance(schema, str): |
| |
| try: |
| with open(schema, 'r', encoding='utf-8') as f: |
| if schema.endswith('.json'): |
| return json.load(f) |
| elif schema.endswith(('.yaml', '.yml')): |
| import yaml |
| return yaml.safe_load(f) |
| else: |
| |
| content = f.read() |
| return json.loads(content) |
| except Exception as e: |
| logger.error(f"Failed to load schema from {schema}: {e}") |
| raise |
| elif isinstance(schema, dict): |
| return schema |
| else: |
| raise ValueError("input_schema must be a file path or dictionary") |
| |
| @abstractmethod |
| def convert_to_toolkit(self) -> APIToolkit: |
| """ |
| Convert API specification to APIToolkit |
| |
| Returns: |
| APIToolkit: Converted toolkit |
| """ |
| pass |
| |
| @abstractmethod |
| def _create_api_function(self, endpoint_config: Dict[str, Any]) -> Callable: |
| """ |
| Create an execution function for a single API endpoint |
| |
| Args: |
| endpoint_config: Endpoint configuration |
| |
| Returns: |
| Callable: API execution function |
| """ |
| pass |
| |
| def _extract_parameters(self, endpoint_config: Dict[str, Any]) -> tuple: |
| """ |
| Extract parameter information from endpoint configuration |
| |
| Args: |
| endpoint_config: Endpoint configuration |
| |
| Returns: |
| tuple: (inputs, required) parameter schema and list of required parameters |
| """ |
| inputs = {} |
| required = [] |
| |
| |
| |
| parameters = endpoint_config.get("parameters", []) |
| |
| for param in parameters: |
| param_name = param.get("name", "") |
| param_type = param.get("type", "string") |
| param_desc = param.get("description", "") |
| is_required = param.get("required", False) |
| |
| inputs[param_name] = { |
| "type": param_type, |
| "description": param_desc |
| } |
| |
| if is_required: |
| required.append(param_name) |
| |
| return inputs, required |
|
|
|
|
| class OpenAPIConverter(BaseAPIConverter): |
| """ |
| OpenAPI (Swagger) specification converter |
| """ |
| |
| def convert_to_toolkit(self) -> APIToolkit: |
| """Convert OpenAPI specification to APIToolkit""" |
| service_name = self.input_schema.get("info", {}).get("title", "API Service") |
| base_url = self._get_base_url() |
| |
| tools = [] |
| paths = self.input_schema.get("paths", {}) |
| |
| for path, methods in paths.items(): |
| for method, operation in methods.items(): |
| if method.upper() in ["GET", "POST", "PUT", "DELETE", "PATCH"]: |
| tool = self._create_tool_from_operation(path, method, operation, base_url) |
| if tool: |
| tools.append(tool) |
| |
| return APIToolkit( |
| name=service_name, |
| tools=tools, |
| base_url=base_url, |
| auth_config=self.auth_config, |
| common_headers={"Content-Type": "application/json"} |
| ) |
| |
| def _get_base_url(self) -> str: |
| """Get base URL from the OpenAPI specification""" |
| servers = self.input_schema.get("servers", []) |
| if servers: |
| return servers[0].get("url", "") |
| |
| |
| host = self.input_schema.get("host", "") |
| base_path = self.input_schema.get("basePath", "") |
| schemes = self.input_schema.get("schemes", ["https"]) |
| |
| if host: |
| return f"{schemes[0]}://{host}{base_path}" |
| |
| return "" |
| |
| def _create_tool_from_operation( |
| self, |
| path: str, |
| method: str, |
| operation: Dict[str, Any], |
| base_url: str |
| ) -> Optional[APITool]: |
| """Create a tool from an OpenAPI operation""" |
| try: |
| |
| operation_id = operation.get("operationId") |
| if not operation_id: |
| |
| clean_path = path.replace("/", "_").replace("{", "").replace("}", "").strip("_") |
| operation_id = f"{method.lower()}_{clean_path}" |
| |
| |
| inputs, required = self._extract_openapi_parameters(operation) |
| |
| |
| api_function = self._create_api_function({ |
| "url": base_url + path, |
| "method": method.upper(), |
| "operation": operation |
| }) |
| |
| return APITool( |
| name=operation_id, |
| description=operation.get("summary", operation.get("description", "")), |
| inputs=inputs, |
| required=required, |
| endpoint_config={ |
| "url": base_url + path, |
| "method": method.upper(), |
| "operation": operation |
| }, |
| auth_config=self.auth_config, |
| function=api_function |
| ) |
| except Exception as e: |
| logger.warning(f"Failed to create tool for {method.upper()} {path}: {e}") |
| return None |
| |
| def _extract_openapi_parameters(self, operation: Dict[str, Any]) -> tuple: |
| """Extract parameters from an OpenAPI operation""" |
| inputs = {} |
| required = [] |
| |
| |
| parameters = operation.get("parameters", []) |
| for param in parameters: |
| param_name = param.get("name", "") |
| param_schema = param.get("schema", {}) |
| param_type = param_schema.get("type", "string") |
| |
| inputs[param_name] = { |
| "type": param_type, |
| "description": param.get("description", "") |
| } |
| |
| if param.get("required", False): |
| required.append(param_name) |
| |
| |
| request_body = operation.get("requestBody", {}) |
| if request_body: |
| content = request_body.get("content", {}) |
| for media_type, media_schema in content.items(): |
| if "application/json" in media_type: |
| schema = media_schema.get("schema", {}) |
| properties = schema.get("properties", {}) |
| |
| for prop_name, prop_schema in properties.items(): |
| inputs[prop_name] = { |
| "type": prop_schema.get("type", "string"), |
| "description": prop_schema.get("description", "") |
| } |
| |
| if prop_name in schema.get("required", []): |
| required.append(prop_name) |
| |
| return inputs, required |
| |
| def _create_api_function(self, endpoint_config: Dict[str, Any]) -> Callable: |
| """Create OpenAPI execution function""" |
| url = endpoint_config["url"] |
| method = endpoint_config["method"] |
| operation = endpoint_config["operation"] |
| |
| def api_call(**kwargs): |
| |
| path_params = {} |
| query_params = {} |
| body_data = {} |
| |
| parameters = operation.get("parameters", []) |
| param_locations = {param["name"]: param.get("in", "query") for param in parameters} |
| |
| for key, value in kwargs.items(): |
| if value is None: |
| continue |
| |
| location = param_locations.get(key, "body") |
| if location == "path": |
| path_params[key] = value |
| elif location == "query": |
| query_params[key] = value |
| else: |
| body_data[key] = value |
| |
| |
| final_url = url |
| for param_name, param_value in path_params.items(): |
| final_url = final_url.replace(f"{{{param_name}}}", str(param_value)) |
| |
| |
| headers = {"Content-Type": "application/json"} |
| if hasattr(self, 'auth_config') and self.auth_config: |
| if "api_key" in self.auth_config: |
| key_name = self.auth_config.get("key_name", "X-API-Key") |
| headers[key_name] = self.auth_config["api_key"] |
| |
| |
| try: |
| if method in ["GET", "DELETE"]: |
| response = requests.request( |
| method=method, |
| url=final_url, |
| params=query_params, |
| headers=headers, |
| timeout=30 |
| ) |
| else: |
| response = requests.request( |
| method=method, |
| url=final_url, |
| params=query_params, |
| json=body_data if body_data else None, |
| headers=headers, |
| timeout=30 |
| ) |
| |
| response.raise_for_status() |
| |
| try: |
| return response.json() |
| except (ValueError, json.JSONDecodeError): |
| return response.text |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"API request failed: {e}") |
| raise |
| |
| |
| api_call.__name__ = f"api_call_{method.lower()}" |
| return api_call |
|
|
|
|
| class RapidAPIConverter(OpenAPIConverter): |
| """ |
| RapidAPI-specific converter |
| Inherits from OpenAPIConverter and adds RapidAPI-specific authentication and configuration |
| """ |
| |
| def __init__( |
| self, |
| input_schema: Union[str, Dict[str, Any]], |
| description: str = "", |
| rapidapi_key: str = "", |
| rapidapi_host: str = "", |
| **kwargs |
| ): |
| """ |
| Initialize the RapidAPI converter |
| |
| Args: |
| input_schema: API specification |
| description: Service description |
| rapidapi_key: RapidAPI key |
| rapidapi_host: RapidAPI host |
| """ |
| |
| if not rapidapi_key: |
| from os import getenv |
| from dotenv import load_dotenv |
| load_dotenv() |
| rapidapi_key = getenv("RAPIDAPI_KEY", "") |
| if not rapidapi_key: |
| raise ValueError("rapidapi_key not provided or RAPIDAPI_KEY environment variable not set") |
| if not rapidapi_host: |
| raise ValueError("rapidapi_host not provided or RAPIDAPI_HOST environment variable not set") |
| |
| auth_config = { |
| "api_key": rapidapi_key, |
| "key_name": "X-RapidAPI-Key", |
| "rapidapi_host": rapidapi_host |
| } |
| |
| super().__init__( |
| input_schema=input_schema, |
| description=description, |
| auth_config=auth_config, |
| **kwargs |
| ) |
| |
| def convert_to_toolkit(self) -> APIToolkit: |
| """Convert to a RapidAPI toolkit""" |
| toolkit = super().convert_to_toolkit() |
| |
| |
| rapidapi_headers = { |
| "X-RapidAPI-Key": self.auth_config.get("api_key", ""), |
| "X-RapidAPI-Host": self.auth_config.get("rapidapi_host", "") |
| } |
| |
| toolkit.common_headers.update(rapidapi_headers) |
| |
| return toolkit |
| |
| def _create_api_function(self, endpoint_config: Dict[str, Any]) -> Callable: |
| """Create RapidAPI execution function""" |
| url = endpoint_config["url"] |
| method = endpoint_config["method"] |
| operation = endpoint_config["operation"] |
| |
| def rapidapi_call(**kwargs): |
| |
| path_params = {} |
| query_params = {} |
| body_data = {} |
| |
| parameters = operation.get("parameters", []) |
| param_locations = {param["name"]: param.get("in", "query") for param in parameters} |
| |
| for key, value in kwargs.items(): |
| if value is None: |
| continue |
| |
| location = param_locations.get(key, "body") |
| if location == "path": |
| path_params[key] = value |
| elif location == "query": |
| query_params[key] = value |
| else: |
| body_data[key] = value |
| |
| |
| final_url = url |
| for param_name, param_value in path_params.items(): |
| final_url = final_url.replace(f"{{{param_name}}}", str(param_value)) |
| |
| |
| headers = { |
| "Content-Type": "application/json", |
| "X-RapidAPI-Key": self.auth_config.get("api_key", ""), |
| "X-RapidAPI-Host": self.auth_config.get("rapidapi_host", "") |
| } |
| |
| |
| try: |
| if method in ["GET", "DELETE"]: |
| response = requests.request( |
| method=method, |
| url=final_url, |
| params=query_params, |
| headers=headers, |
| timeout=30 |
| ) |
| else: |
| response = requests.request( |
| method=method, |
| url=final_url, |
| params=query_params, |
| json=body_data if body_data else None, |
| headers=headers, |
| timeout=30 |
| ) |
| |
| response.raise_for_status() |
| |
| try: |
| return response.json() |
| except (ValueError, json.JSONDecodeError): |
| return response.text |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"RapidAPI request failed: {e}") |
| raise |
| |
| rapidapi_call.__name__ = f"rapidapi_call_{method.lower()}" |
| return rapidapi_call |
|
|
|
|
| def create_openapi_toolkit( |
| schema_path_or_dict: Union[str, Dict[str, Any]], |
| service_name: str = None, |
| auth_config: Dict[str, Any] = None |
| ) -> APIToolkit: |
| """ |
| Convenience function: create an APIToolkit from an OpenAPI specification |
| |
| Args: |
| schema_path_or_dict: OpenAPI specification file path or dictionary |
| service_name: Service name (optional, will be extracted from the spec) |
| auth_config: Authentication configuration |
| |
| Returns: |
| APIToolkit: Created toolkit |
| """ |
| converter = OpenAPIConverter( |
| input_schema=schema_path_or_dict, |
| description=service_name or "", |
| auth_config=auth_config |
| ) |
| return converter.convert_to_toolkit() |
|
|
|
|
| def create_rapidapi_toolkit( |
| schema_path_or_dict: Union[str, Dict[str, Any]], |
| rapidapi_key: str, |
| rapidapi_host: str, |
| service_name: str = None |
| ) -> APIToolkit: |
| """ |
| Convenience function: create a RapidAPI toolkit |
| |
| Args: |
| schema_path_or_dict: API specification file path or dictionary |
| rapidapi_key: RapidAPI key |
| rapidapi_host: RapidAPI host |
| service_name: Service name (optional) |
| |
| Returns: |
| APIToolkit: Created RapidAPI toolkit |
| """ |
| converter = RapidAPIConverter( |
| input_schema=schema_path_or_dict, |
| description=service_name or "", |
| rapidapi_key=rapidapi_key, |
| rapidapi_host=rapidapi_host |
| ) |
| return converter.convert_to_toolkit() |
|
|