| import json |
| import pickle |
| import csv |
| import yaml |
| import xml.etree.ElementTree as ET |
| import os |
| from typing import Dict, Any, List |
| from pathlib import Path |
| from abc import ABC, abstractmethod |
|
|
| |
| try: |
| import pymupdf |
| PDF_AVAILABLE = True |
| except ImportError: |
| PDF_AVAILABLE = False |
|
|
| try: |
| from PIL import Image |
| PILLOW_AVAILABLE = True |
| except ImportError: |
| PILLOW_AVAILABLE = False |
|
|
| try: |
| from openpyxl import Workbook, load_workbook |
| EXCEL_AVAILABLE = True |
| except ImportError: |
| EXCEL_AVAILABLE = False |
|
|
| from ..core.module import BaseModule |
| from ..core.logging import logger |
|
|
|
|
| class StorageBase(BaseModule, ABC): |
| """ |
| Abstract base class for comprehensive storage operations supporting various file types. |
| Provides unified interface for local and remote storage operations. |
| """ |
| |
| def __init__(self, base_path: str = ".", **kwargs): |
| """ |
| Initialize the StorageBase with configuration options. |
| |
| Args: |
| base_path (str): Base directory for storage operations (default: current directory) |
| **kwargs: Additional keyword arguments for parent class initialization |
| """ |
| super().__init__(**kwargs) |
| self.base_path = base_path |
| |
| |
| self.appendable_formats = { |
| '.txt': self._append_text, |
| '.json': self._append_json, |
| '.csv': self._append_csv, |
| '.yaml': self._append_yaml, |
| '.yml': self._append_yaml, |
| '.pickle': self._append_pickle, |
| '.xlsx': self._append_excel |
| } |
| |
| |
| self._initialize_storage() |
| |
| @abstractmethod |
| def _initialize_storage(self): |
| """ |
| Initialize storage-specific setup. Override in subclasses for storage-specific initialization. |
| """ |
| pass |
| |
| |
| |
| @abstractmethod |
| def _read_raw(self, path: str, **kwargs) -> bytes: |
| """Read raw file content - must be implemented by subclasses""" |
| pass |
| |
| @abstractmethod |
| def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
| """Write raw file content - must be implemented by subclasses""" |
| pass |
| |
| @abstractmethod |
| def _delete_raw(self, path: str) -> bool: |
| """Delete file or directory - must be implemented by subclasses""" |
| pass |
| |
| @abstractmethod |
| def _list_raw(self, path: str = None, **kwargs) -> List[Dict[str, Any]]: |
| """List files and directories - must be implemented by subclasses""" |
| pass |
| |
| @abstractmethod |
| def _exists_raw(self, path: str) -> bool: |
| """Check if path exists - must be implemented by subclasses""" |
| pass |
| |
| @abstractmethod |
| def _create_directory_raw(self, path: str) -> bool: |
| """Create directory - must be implemented by subclasses""" |
| pass |
| |
| |
| |
| def translate_in(self, file_path: str) -> str: |
| """ |
| Translate input file path by combining it with base_path. |
| This method takes a user-provided path and converts it to the full system path. |
| |
| Args: |
| file_path (str): User-provided file path (can be relative or absolute) |
| |
| Returns: |
| str: Full system path combining base_path and file_path |
| """ |
| |
| if os.path.isabs(file_path): |
| return file_path |
| |
| |
| |
| if hasattr(self, 'bucket_name') and hasattr(self, 'supabase'): |
| |
| |
| if self.base_path.startswith('/'): |
| |
| clean_base = self.base_path.lstrip('/') |
| if clean_base: |
| return f"{clean_base}/{file_path}" |
| else: |
| return file_path |
| else: |
| |
| return f"{self.base_path}/{file_path}" |
| else: |
| |
| combined_path = os.path.join(self.base_path, file_path) |
| normalized_path = os.path.normpath(combined_path) |
| return normalized_path |
| |
| def translate_out(self, full_path: str) -> str: |
| """ |
| Translate output full path by removing the base_path prefix. |
| This method takes a full system path and converts it back to the user-relative path. |
| |
| Args: |
| full_path (str): Full system path |
| |
| Returns: |
| str: User-relative path with base_path removed |
| """ |
| |
| if self.base_path in [".", "", None]: |
| return full_path |
| |
| |
| if hasattr(self, 'bucket_name') and hasattr(self, 'supabase'): |
| |
| if self.base_path.startswith('/'): |
| clean_base = self.base_path.lstrip('/') |
| else: |
| clean_base = self.base_path |
| |
| if clean_base and full_path.startswith(f"{clean_base}/"): |
| |
| relative_path = full_path[len(f"{clean_base}/"):] |
| return relative_path |
| elif clean_base and full_path == clean_base: |
| |
| return "" |
| else: |
| |
| return full_path |
| else: |
| |
| |
| base_abs = os.path.abspath(self.base_path) |
| full_abs = os.path.abspath(full_path) |
| |
| |
| if full_abs.startswith(base_abs): |
| |
| relative_path = full_abs[len(base_abs):] |
| |
| if relative_path.startswith(os.sep): |
| relative_path = relative_path[1:] |
| return relative_path |
| |
| |
| return full_path |
| |
| |
| |
| def get_file_type(self, file_path: str) -> str: |
| """Get the file extension from a file path""" |
| return Path(file_path).suffix.lower() |
|
|
| def get_file_info(self, file_path: str) -> Dict[str, Any]: |
| """Get comprehensive information about a file""" |
| try: |
| target_path = self.translate_in(file_path) |
| if not self._exists_raw(target_path): |
| return {"success": False, "error": f"File {file_path} does not exist"} |
| |
| |
| return { |
| "success": True, |
| "file_path": target_path, |
| "file_name": Path(target_path).name, |
| "file_extension": Path(target_path).suffix.lower(), |
| "exists": True |
| } |
| except Exception as e: |
| logger.error(f"Error getting file info for {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def create_directory(self, path: str) -> Dict[str, Any]: |
| """Create directory""" |
| try: |
| target_path = self.translate_in(path) |
| success = self._create_directory_raw(target_path) |
| if success: |
| return {"success": True, "path": target_path, "message": "Directory created successfully"} |
| else: |
| return {"success": False, "error": "Failed to create directory", "path": target_path} |
| except Exception as e: |
| logger.error(f"Error creating directory {path}: {str(e)}") |
| return {"success": False, "error": str(e), "path": path} |
| |
| def exists(self, path: str) -> bool: |
| """Check if path exists""" |
| target_path = self.translate_in(path) |
| return self._exists_raw(target_path) |
| |
| |
| |
| def delete(self, path: str) -> Dict[str, Any]: |
| """Delete file or directory""" |
| try: |
| target_path = self.translate_in(path) |
| success = self._delete_raw(target_path) |
| if success: |
| return {"success": True, "path": target_path, "message": "Deleted successfully"} |
| else: |
| return {"success": False, "error": "Failed to delete", "path": target_path} |
| except Exception as e: |
| logger.error(f"Error deleting {path}: {str(e)}") |
| return {"success": False, "error": str(e), "path": path} |
| |
| def move(self, source: str, destination: str) -> Dict[str, Any]: |
| """Move/rename file or directory""" |
| try: |
| resolved_source = self.translate_in(source) |
| resolved_destination = self.translate_in(destination) |
| |
| |
| content = self._read_raw(resolved_source) |
| |
| |
| success = self._write_raw(resolved_destination, content) |
| if success: |
| |
| self._delete_raw(resolved_source) |
| return {"success": True, "source": resolved_source, "destination": resolved_destination, "message": "Moved successfully"} |
| else: |
| return {"success": False, "error": "Failed to write to destination", "source": resolved_source, "destination": resolved_destination} |
| except Exception as e: |
| logger.error(f"Error moving {source} to {destination}: {str(e)}") |
| return {"success": False, "error": str(e), "source": source, "destination": destination} |
| |
| def copy(self, source: str, destination: str) -> Dict[str, Any]: |
| """Copy file""" |
| try: |
| resolved_source = self.translate_in(source) |
| resolved_destination = self.translate_in(destination) |
| |
| |
| content = self._read_raw(resolved_source) |
| |
| |
| success = self._write_raw(resolved_destination, content) |
| if success: |
| return {"success": True, "source": resolved_source, "destination": resolved_destination, "message": "Copied successfully"} |
| else: |
| return {"success": False, "error": "Failed to write to destination", "source": resolved_source, "destination": resolved_destination} |
| except Exception as e: |
| logger.error(f"Error copying {source} to {destination}: {str(e)}") |
| return {"success": False, "error": str(e), "source": source, "destination": destination} |
| |
| def list(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]: |
| """List files and directories""" |
| try: |
| target_path = self.translate_in(path) if path else str(self.base_path) |
| items = self._list_raw(target_path, max_depth=max_depth, include_hidden=include_hidden) |
| |
| return { |
| "success": True, |
| "path": target_path, |
| "items": items, |
| "total_count": len(items) |
| } |
| except Exception as e: |
| logger.error(f"Error listing {path}: {str(e)}") |
| return {"success": False, "error": str(e), "path": path} |
| |
| def save(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """ |
| Save content to a file with automatic format detection. |
| This method replaces the old save method with the improved create_file logic. |
| |
| Args: |
| file_path (str): Path where the file should be saved |
| content (Any): Content to save to the file |
| **kwargs: Additional arguments for file creation (encoding, format, etc.) |
| |
| Returns: |
| Dict[str, Any]: Result of the operation with success status and details |
| """ |
| try: |
| |
| file_extension = self.get_file_type(file_path) |
| target_file_path = self.translate_in(file_path) |
| |
| |
| if file_extension == '.json': |
| return self._save_json(target_file_path, content, **kwargs) |
| elif file_extension in ['.txt', '.md', '.log']: |
| return self._save_text(target_file_path, content, **kwargs) |
| elif file_extension == '.csv': |
| return self._save_csv(target_file_path, content, **kwargs) |
| elif file_extension in ['.yaml', '.yml']: |
| return self._save_yaml(target_file_path, content, **kwargs) |
| elif file_extension == '.xml': |
| return self._save_xml(target_file_path, content, **kwargs) |
| elif file_extension == '.xlsx': |
| return self._save_excel(target_file_path, content, **kwargs) |
| elif file_extension == '.pickle': |
| return self._save_pickle(target_file_path, content, **kwargs) |
| elif file_extension == '.pdf': |
| return self._save_pdf(target_file_path, content, **kwargs) |
| elif file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff']: |
| return self._save_image(target_file_path, content, **kwargs) |
| else: |
| |
| |
| if isinstance(content, str): |
| content_bytes = content.encode(kwargs.get('encoding', 'utf-8')) |
| elif isinstance(content, bytes): |
| content_bytes = content |
| else: |
| content_bytes = str(content).encode(kwargs.get('encoding', 'utf-8')) |
| |
| |
| success = self._write_raw(target_file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"File '{file_path}' saved successfully", |
| "file_path": file_path, |
| "full_path": target_file_path, |
| "size": len(content_bytes) |
| } |
| else: |
| return { |
| "success": False, |
| "message": f"Failed to save file '{file_path}'", |
| "file_path": file_path, |
| "full_path": target_file_path |
| } |
| except Exception as e: |
| logger.error(f"Error saving file {file_path}: {str(e)}") |
| return { |
| "success": False, |
| "message": f"Error saving file: {str(e)}", |
| "file_path": file_path |
| } |
| |
| def read(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read content from a file with automatic format detection""" |
| try: |
| target_file_path = self.translate_in(file_path) |
| file_extension = Path(target_file_path).suffix.lower() |
| |
| |
| if file_extension == '.json': |
| return self._read_json(target_file_path, **kwargs) |
| elif file_extension in ['.yaml', '.yml']: |
| return self._read_yaml(target_file_path, **kwargs) |
| elif file_extension == '.csv': |
| return self._read_csv(target_file_path, **kwargs) |
| elif file_extension == '.xlsx': |
| return self._read_excel(target_file_path, **kwargs) |
| elif file_extension == '.xml': |
| return self._read_xml(target_file_path, **kwargs) |
| elif file_extension == '.pickle': |
| return self._read_pickle(target_file_path, **kwargs) |
| elif file_extension == '.pdf': |
| return self._read_pdf(target_file_path, **kwargs) |
| elif file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']: |
| return self._read_image(target_file_path, **kwargs) |
| else: |
| |
| return self._read_text(target_file_path, **kwargs) |
| |
| except Exception as e: |
| logger.error(f"Error reading {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def append(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Append content to a file (only for supported formats)""" |
| try: |
| target_file_path = self.translate_in(file_path) |
| file_extension = Path(target_file_path).suffix.lower() |
| |
| if file_extension in self.appendable_formats: |
| return self.appendable_formats[file_extension](target_file_path, content, **kwargs) |
| else: |
| return {"success": False, "error": f"Append not supported for {file_extension} files"} |
| |
| except Exception as e: |
| logger.error(f"Error appending to {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_text(self, file_path: str, content: Any, encoding: str = 'utf-8', **kwargs) -> Dict[str, Any]: |
| """Save text content to a file""" |
| try: |
| |
| if isinstance(content, str): |
| content_bytes = content.encode(encoding) |
| else: |
| content_bytes = str(content).encode(encoding) |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"File saved to {file_path}", |
| "file_path": file_path, |
| "content_length": len(content_bytes) |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error saving text file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_text(self, file_path: str, encoding: str = 'utf-8', **kwargs) -> Dict[str, Any]: |
| """Read text content from a file""" |
| try: |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| content = content_bytes.decode(encoding) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path, |
| "content_length": len(content) |
| } |
| except Exception as e: |
| logger.error(f"Error reading text file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _append_text(self, file_path: str, content: str, encoding: str = 'utf-8', **kwargs) -> Dict[str, Any]: |
| """Append text content to a file""" |
| try: |
| |
| content_bytes = str(content).encode(encoding) |
| |
| |
| existing_bytes = b"" |
| if self._exists_raw(file_path): |
| existing_bytes = self._read_raw(file_path, **kwargs) |
| |
| |
| combined_bytes = existing_bytes + content_bytes |
| |
| |
| success = self._write_raw(file_path, combined_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Content appended to file {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to append to file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error appending to text file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_json(self, file_path: str, content: Any, indent: int = 2, **kwargs) -> Dict[str, Any]: |
| """Save JSON content to a file""" |
| try: |
| |
| if isinstance(content, str): |
| |
| json.loads(content) |
| json_content = content |
| else: |
| json_content = json.dumps(content, indent=indent, ensure_ascii=False) |
| |
| |
| content_bytes = json_content.encode('utf-8') |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"JSON file saved to {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error saving JSON file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_json(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read JSON content from a file""" |
| try: |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| content_str = content_bytes.decode('utf-8') |
| |
| |
| content = json.loads(content_str) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path |
| } |
| except Exception as e: |
| logger.error(f"Error reading JSON file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _append_json(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Append content to JSON file (for arrays)""" |
| try: |
| |
| existing_content = [] |
| if self._exists_raw(file_path): |
| existing_bytes = self._read_raw(file_path, **kwargs) |
| existing_str = existing_bytes.decode('utf-8') |
| existing_content = json.loads(existing_str) |
| |
| |
| if isinstance(existing_content, list): |
| if isinstance(content, list): |
| existing_content.extend(content) |
| else: |
| existing_content.append(content) |
| elif isinstance(existing_content, dict): |
| if isinstance(content, dict): |
| existing_content.update(content) |
| else: |
| return {"success": False, "error": "Cannot append non-dict to JSON dict"} |
| else: |
| existing_content = [existing_content] |
| if isinstance(content, list): |
| existing_content.extend(content) |
| else: |
| existing_content.append(content) |
| |
| |
| json_content = json.dumps(existing_content, indent=2, ensure_ascii=False) |
| content_bytes = json_content.encode('utf-8') |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Content appended to JSON file {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to append to file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error appending to JSON file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_csv(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Save CSV content to a file - handles both raw CSV strings and structured data""" |
| try: |
| if not content: |
| return {"success": False, "error": "No content to save"} |
| |
| from io import StringIO |
| |
| |
| csv_buffer = StringIO() |
| |
| |
| if isinstance(content, str): |
| csv_content = content |
| rows = content.count('\n') |
| |
| elif isinstance(content, list) and content and isinstance(content[0], dict): |
| fieldnames = content[0].keys() |
| writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(content) |
| csv_content = csv_buffer.getvalue() |
| rows = len(content) |
| |
| elif isinstance(content, list) and content and isinstance(content[0], list): |
| writer = csv.writer(csv_buffer) |
| writer.writerows(content) |
| csv_content = csv_buffer.getvalue() |
| rows = len(content) |
| else: |
| return {"success": False, "error": "CSV content must be a string, list of dictionaries, or list of lists"} |
| |
| |
| content_bytes = csv_content.encode('utf-8') |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"CSV file saved to {file_path}", |
| "file_path": file_path, |
| "rows": rows |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| |
| except Exception as e: |
| logger.error(f"Error saving CSV file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_csv(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read CSV content from a file""" |
| try: |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| content_str = content_bytes.decode('utf-8') |
| |
| |
| from io import StringIO |
| reader = csv.DictReader(StringIO(content_str)) |
| content = list(reader) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path, |
| "rows": len(content) |
| } |
| except Exception as e: |
| logger.error(f"Error reading CSV file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _append_csv(self, file_path: str, content: List[Dict[str, Any]], **kwargs) -> Dict[str, Any]: |
| """Append content to CSV file""" |
| try: |
| if not content: |
| return {"success": False, "error": "No content to append"} |
| |
| |
| existing_content = [] |
| if self._exists_raw(file_path): |
| existing_bytes = self._read_raw(file_path, **kwargs) |
| existing_str = existing_bytes.decode('utf-8') |
| from io import StringIO |
| reader = csv.DictReader(StringIO(existing_str)) |
| existing_content = list(reader) |
| |
| |
| combined_content = existing_content + content |
| |
| |
| from io import StringIO |
| csv_buffer = StringIO() |
| if combined_content: |
| fieldnames = combined_content[0].keys() |
| writer = csv.DictWriter(csv_buffer, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(combined_content) |
| |
| csv_content = csv_buffer.getvalue() |
| content_bytes = csv_content.encode('utf-8') |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Content appended to CSV file {file_path}", |
| "file_path": file_path, |
| "appended_rows": len(content) |
| } |
| else: |
| return {"success": False, "error": "Failed to append to file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error appending to CSV file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_yaml(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Save YAML content to a file""" |
| try: |
| |
| yaml_content = yaml.dump(content, default_flow_style=False, allow_unicode=True) |
| content_bytes = yaml_content.encode('utf-8') |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"YAML file saved to {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error saving YAML file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_yaml(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read YAML content from a file""" |
| try: |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| content_str = content_bytes.decode('utf-8') |
| |
| |
| content = yaml.safe_load(content_str) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path |
| } |
| except Exception as e: |
| logger.error(f"Error reading YAML file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _append_yaml(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Append content to YAML file (for lists)""" |
| try: |
| |
| existing_content = [] |
| if self._exists_raw(file_path): |
| existing_bytes = self._read_raw(file_path, **kwargs) |
| existing_str = existing_bytes.decode('utf-8') |
| existing_content = yaml.safe_load(existing_str) or [] |
| |
| |
| if isinstance(existing_content, list): |
| if isinstance(content, list): |
| existing_content.extend(content) |
| else: |
| existing_content.append(content) |
| elif isinstance(existing_content, dict): |
| if isinstance(content, dict): |
| existing_content.update(content) |
| else: |
| return {"success": False, "error": "Cannot append non-dict to YAML dict"} |
| else: |
| existing_content = [existing_content] |
| if isinstance(content, list): |
| existing_content.extend(content) |
| else: |
| existing_content.append(content) |
| |
| |
| yaml_content = yaml.dump(existing_content, default_flow_style=False, allow_unicode=True) |
| content_bytes = yaml_content.encode('utf-8') |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Content appended to YAML file {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to append to file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error appending to YAML file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_xml(self, file_path: str, content: Any, root_tag: str = "root", **kwargs) -> Dict[str, Any]: |
| """Save XML content to a file""" |
| try: |
| |
| if isinstance(content, str): |
| |
| try: |
| ET.fromstring(content) |
| xml_content = content |
| except ET.ParseError: |
| |
| root = ET.Element(root_tag) |
| root.text = content |
| xml_content = ET.tostring(root, encoding='unicode') |
| |
| elif isinstance(content, dict): |
| def dict_to_xml(data, root): |
| for key, value in data.items(): |
| child = ET.SubElement(root, key) |
| if isinstance(value, dict): |
| dict_to_xml(value, child) |
| else: |
| child.text = str(value) |
| |
| root = ET.Element(root_tag) |
| dict_to_xml(content, root) |
| xml_content = ET.tostring(root, encoding='unicode') |
| else: |
| |
| root = ET.Element(root_tag) |
| root.text = str(content) |
| xml_content = ET.tostring(root, encoding='unicode') |
| |
| |
| content_bytes = xml_content.encode('utf-8') |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"XML file saved to {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error saving XML file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_xml(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read XML content from a file""" |
| try: |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| content_str = content_bytes.decode('utf-8') |
| |
| |
| root = ET.fromstring(content_str) |
| |
| def xml_to_dict(element): |
| result = {} |
| for child in element: |
| if len(child) == 0: |
| result[child.tag] = child.text |
| else: |
| result[child.tag] = xml_to_dict(child) |
| return result |
| |
| content = xml_to_dict(root) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path |
| } |
| except Exception as e: |
| logger.error(f"Error reading XML file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_excel(self, file_path: str, content: List[List[Any]], sheet_name: str = "Sheet1", **kwargs) -> Dict[str, Any]: |
| """Save Excel content to a file""" |
| if not EXCEL_AVAILABLE: |
| return {"success": False, "error": "openpyxl library not available"} |
| |
| try: |
| from io import BytesIO |
| |
| |
| workbook = Workbook() |
| worksheet = workbook.active |
| worksheet.title = sheet_name |
| |
| for row in content: |
| worksheet.append(row) |
| |
| |
| buffer = BytesIO() |
| workbook.save(buffer) |
| content_bytes = buffer.getvalue() |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Excel file saved to {file_path}", |
| "file_path": file_path, |
| "rows": len(content) |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error saving Excel file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_excel(self, file_path: str, sheet_name: str = None, **kwargs) -> Dict[str, Any]: |
| """Read Excel content from a file""" |
| if not EXCEL_AVAILABLE: |
| return {"success": False, "error": "openpyxl library not available"} |
| |
| try: |
| from io import BytesIO |
| |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| |
| |
| workbook = load_workbook(BytesIO(content_bytes), data_only=True) |
| sheet_names = workbook.sheetnames |
| |
| if sheet_name is None: |
| sheet_name = sheet_names[0] |
| |
| if sheet_name not in sheet_names: |
| return {"success": False, "error": f"Sheet '{sheet_name}' not found"} |
| |
| worksheet = workbook[sheet_name] |
| content = [] |
| |
| for row in worksheet.iter_rows(values_only=True): |
| if any(cell is not None for cell in row): |
| content.append(list(row)) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path, |
| "sheet_name": sheet_name, |
| "rows": len(content) |
| } |
| except Exception as e: |
| logger.error(f"Error reading Excel file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _append_excel(self, file_path: str, content: List[List[Any]], sheet_name: str = None, **kwargs) -> Dict[str, Any]: |
| """Append content to Excel file""" |
| if not EXCEL_AVAILABLE: |
| return {"success": False, "error": "openpyxl library not available"} |
| |
| try: |
| from io import BytesIO |
| |
| if not self._exists_raw(file_path): |
| return self._save_excel(file_path, content, sheet_name or "Sheet1", **kwargs) |
| |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| workbook = load_workbook(BytesIO(content_bytes)) |
| sheet_names = workbook.sheetnames |
| |
| if sheet_name is None: |
| sheet_name = sheet_names[0] |
| |
| if sheet_name not in sheet_names: |
| return {"success": False, "error": f"Sheet '{sheet_name}' not found"} |
| |
| worksheet = workbook[sheet_name] |
| |
| for row in content: |
| worksheet.append(row) |
| |
| |
| buffer = BytesIO() |
| workbook.save(buffer) |
| updated_bytes = buffer.getvalue() |
| |
| |
| success = self._write_raw(file_path, updated_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Content appended to Excel file {file_path}", |
| "file_path": file_path, |
| "appended_rows": len(content) |
| } |
| else: |
| return {"success": False, "error": "Failed to append to file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error appending to Excel file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_pickle(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Save pickle content to a file""" |
| try: |
| |
| content_bytes = pickle.dumps(content) |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Pickle file saved to {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error saving pickle file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_pickle(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read pickle content from a file""" |
| try: |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| |
| |
| content = pickle.loads(content_bytes) |
| |
| return { |
| "success": True, |
| "content": content, |
| "file_path": file_path |
| } |
| except Exception as e: |
| logger.error(f"Error reading pickle file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _append_pickle(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Append content to pickle file (for lists)""" |
| try: |
| |
| existing_content = [] |
| if self._exists_raw(file_path): |
| existing_bytes = self._read_raw(file_path, **kwargs) |
| existing_content = pickle.loads(existing_bytes) |
| |
| |
| if isinstance(existing_content, list): |
| if isinstance(content, list): |
| existing_content.extend(content) |
| else: |
| existing_content.append(content) |
| elif isinstance(existing_content, dict): |
| if isinstance(content, dict): |
| existing_content.update(content) |
| elif isinstance(content, list): |
| existing_content["appended_list"] = content |
| else: |
| existing_content["appended_value"] = content |
| else: |
| existing_content = [existing_content] |
| if isinstance(content, list): |
| existing_content.extend(content) |
| else: |
| existing_content.append(content) |
| |
| |
| content_bytes = pickle.dumps(existing_content) |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Content appended to pickle file {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to append to file", "file_path": file_path} |
| except Exception as e: |
| logger.error(f"Error appending to pickle file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_pdf(self, file_path: str, content: str, **kwargs) -> Dict[str, Any]: |
| """Save content to a PDF file""" |
| try: |
| |
| from reportlab.lib.pagesizes import letter |
| from reportlab.lib.styles import getSampleStyleSheet |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |
| |
| |
| doc = SimpleDocTemplate(file_path, pagesize=letter) |
| styles = getSampleStyleSheet() |
| story = [] |
| |
| |
| paragraphs = content.split('\n') |
| |
| for para_text in paragraphs: |
| if para_text.strip(): |
| para = Paragraph(para_text, styles['Normal']) |
| story.append(para) |
| story.append(Spacer(1, 12)) |
| else: |
| story.append(Spacer(1, 12)) |
| |
| |
| doc.build(story) |
| |
| return { |
| "success": True, |
| "message": f"PDF file saved to {file_path}", |
| "file_path": file_path |
| } |
| |
| except ImportError: |
| return {"success": False, "error": "reportlab library not available for PDF creation"} |
| except Exception as e: |
| logger.error(f"Error saving PDF file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_pdf(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read content from a PDF file""" |
| if not PDF_AVAILABLE: |
| return {"success": False, "error": "unstructured library not available"} |
| try: |
| doc = pymupdf.open(file_path) |
| all_text = [] |
| for page in doc: |
| text = page.get_text() |
| all_text.append(text) |
| text = "\n\n".join(all_text) |
| return { |
| "success": True, |
| "content": text, |
| "file_path": file_path |
| } |
| |
| except Exception as e: |
| logger.error(f"Error reading PDF file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _save_image(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| """Save image content to a file""" |
| if not PILLOW_AVAILABLE: |
| return {"success": False, "error": "Pillow library not available"} |
| |
| try: |
| from io import BytesIO |
| |
| |
| if hasattr(content, 'save') and callable(getattr(content, 'save', None)): |
| |
| buffer = BytesIO() |
| content.save(buffer, format=content.format or 'PNG') |
| content_bytes = buffer.getvalue() |
| |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Image saved to {file_path}", |
| "file_path": file_path, |
| "format": content.format, |
| "size": content.size |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| elif isinstance(content, bytes): |
| |
| success = self._write_raw(file_path, content, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Image saved to {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| elif isinstance(content, str) and Path(content).exists(): |
| |
| with open(content, 'rb') as f: |
| content_bytes = f.read() |
| |
| success = self._write_raw(file_path, content_bytes, **kwargs) |
| |
| if success: |
| return { |
| "success": True, |
| "message": f"Image copied from {content} to {file_path}", |
| "file_path": file_path |
| } |
| else: |
| return {"success": False, "error": "Failed to write file", "file_path": file_path} |
| else: |
| return {"success": False, "error": "Content must be a PIL Image object, binary data, or valid file path"} |
| |
| except Exception as e: |
| logger.error(f"Error saving image file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| def _read_image(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| """Read image and return PIL Image object""" |
| if not PILLOW_AVAILABLE: |
| return {"success": False, "error": "Pillow library not available"} |
| |
| try: |
| from io import BytesIO |
| |
| |
| content_bytes = self._read_raw(file_path, **kwargs) |
| |
| |
| with Image.open(BytesIO(content_bytes)) as img: |
| |
| if img.mode in ('RGBA', 'LA', 'P'): |
| img = img.convert('RGB') |
| |
| metadata = { |
| "format": img.format, |
| "mode": img.mode, |
| "size": img.size, |
| "width": img.width, |
| "height": img.height |
| } |
| |
| return { |
| "success": True, |
| "content": img, |
| "metadata": metadata, |
| "file_path": file_path |
| } |
| |
| except Exception as e: |
| logger.error(f"Error reading image file {file_path}: {str(e)}") |
| return {"success": False, "error": str(e), "file_path": file_path} |
| |
| |
| def _get_database_connection(self, db_type: str, connection_string: str) -> Any: |
| """Placeholder for future database integration""" |
| |
| raise NotImplementedError("Database integration not yet implemented") |