| import os |
| import requests |
| import shutil |
| import tempfile |
| import uuid |
| from pathlib import Path |
| from typing import Optional, List |
| from urllib.parse import urlparse |
|
|
|
|
| class FileDownloader: |
| """ |
| A class for downloading files from URLs and managing them in a temporary directory. |
| |
| Provides functionality to: |
| 1. Download files from URLs and save to tmp directory |
| 2. Delete specific files from tmp directory |
| 3. Clear all files from tmp directory |
| """ |
| |
| def __init__(self, tmp_dir_name: str = "tmp"): |
| """ |
| Initialize the FileDownloader. |
| |
| Args: |
| tmp_dir_name (str): Name of the temporary directory to use |
| """ |
| self.tmp_dir_name = tmp_dir_name |
| self.tmp_dir_path = Path(tmp_dir_name) |
| self._ensure_tmp_directory() |
| |
| def _ensure_tmp_directory(self) -> None: |
| """Ensure the temporary directory exists.""" |
| self.tmp_dir_path.mkdir(exist_ok=True) |
| |
| def _get_filename_from_url(self, url: str) -> str: |
| """ |
| Extract filename from URL, with fallback to generated name. |
| |
| Args: |
| url (str): The URL to extract filename from |
| |
| Returns: |
| str: The filename |
| """ |
| parsed_url = urlparse(url) |
| filename = os.path.basename(parsed_url.path) |
| |
| |
| if not filename or '.' not in filename: |
| |
| filename = f"downloaded_file_{uuid.uuid4().hex[:8]}" |
| |
| return filename |
| |
| def _get_unique_filename(self, filename: str) -> str: |
| """ |
| Ensure filename is unique in the tmp directory. |
| |
| Args: |
| filename (str): Original filename |
| |
| Returns: |
| str: Unique filename |
| """ |
| base_path = self.tmp_dir_path / filename |
| if not base_path.exists(): |
| return filename |
| |
| |
| name_part = base_path.stem |
| ext_part = base_path.suffix |
| |
| counter = 1 |
| while True: |
| new_filename = f"{name_part}_{counter}{ext_part}" |
| new_path = self.tmp_dir_path / new_filename |
| if not new_path.exists(): |
| return new_filename |
| counter += 1 |
| |
| def download(self, url: str, filename: Optional[str] = None, |
| timeout: int = 30, chunk_size: int = 8192) -> str: |
| """ |
| Download a file from URL and save to tmp directory. |
| |
| Args: |
| url (str): URL to download from |
| filename (str, optional): Custom filename. If None, extract from URL |
| timeout (int): Request timeout in seconds |
| chunk_size (int): Size of chunks for streaming download |
| |
| Returns: |
| str: Full path to the downloaded file |
| |
| Raises: |
| requests.RequestException: If download fails |
| IOError: If file writing fails |
| """ |
| try: |
| |
| response = requests.get(url, stream=True, timeout=timeout) |
| response.raise_for_status() |
| |
| |
| if filename is None: |
| filename = self._get_filename_from_url(url) |
| |
| |
| content_disposition = response.headers.get('content-disposition') |
| if content_disposition and 'filename=' in content_disposition: |
| try: |
| |
| import re |
| filename_match = re.search(r'filename[*]?=([^;]+)', content_disposition) |
| if filename_match: |
| header_filename = filename_match.group(1).strip('"\'') |
| if header_filename: |
| filename = header_filename |
| except Exception: |
| |
| pass |
| |
| |
| if '.' not in filename: |
| content_type = response.headers.get('content-type', '').lower() |
| if 'pdf' in content_type: |
| filename += '.pdf' |
| elif 'image/jpeg' in content_type or 'image/jpg' in content_type: |
| filename += '.jpg' |
| elif 'image/png' in content_type: |
| filename += '.png' |
| elif 'text/plain' in content_type: |
| filename += '.txt' |
| elif 'application/json' in content_type: |
| filename += '.json' |
| elif 'text/html' in content_type: |
| filename += '.html' |
| |
| |
| filename = self._get_unique_filename(filename) |
| file_path = self.tmp_dir_path / filename |
| |
| |
| with open(file_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=chunk_size): |
| if chunk: |
| f.write(chunk) |
| |
| print(f"Successfully downloaded: {url} -> {file_path}") |
| return str(file_path) |
| |
| except requests.exceptions.RequestException as e: |
| raise requests.RequestException(f"Failed to download {url}: {str(e)}") |
| except IOError as e: |
| raise IOError(f"Failed to save file {filename}: {str(e)}") |
| |
| def delete_file(self, file_path: str) -> bool: |
| """ |
| Delete a specific file from the tmp directory. |
| |
| Args: |
| file_path (str): Path to the file to delete (can be full path or just filename) |
| |
| Returns: |
| bool: True if file was deleted, False if file didn't exist |
| |
| Raises: |
| ValueError: If file is not in the tmp directory |
| OSError: If deletion fails |
| """ |
| |
| path = Path(file_path) |
| |
| |
| if not path.is_absolute() and len(path.parts) == 1: |
| path = self.tmp_dir_path / path |
| |
| |
| try: |
| resolved_path = path.resolve() |
| tmp_resolved = self.tmp_dir_path.resolve() |
| if not str(resolved_path).startswith(str(tmp_resolved)): |
| raise ValueError(f"File {file_path} is not in the tmp directory {self.tmp_dir_path}") |
| except (OSError, ValueError) as e: |
| raise ValueError(f"Invalid file path {file_path}: {str(e)}") |
| |
| |
| if path.exists(): |
| try: |
| path.unlink() |
| print(f"Successfully deleted: {path}") |
| return True |
| except OSError as e: |
| raise OSError(f"Failed to delete {path}: {str(e)}") |
| else: |
| print(f"File not found: {path}") |
| return False |
| |
| def clear_tmp_directory(self) -> int: |
| """ |
| Clear all files from the tmp directory. |
| |
| Returns: |
| int: Number of files deleted |
| |
| Raises: |
| OSError: If clearing fails |
| """ |
| if not self.tmp_dir_path.exists(): |
| print(f"Tmp directory {self.tmp_dir_path} does not exist") |
| return 0 |
| |
| deleted_count = 0 |
| errors = [] |
| |
| try: |
| for item in self.tmp_dir_path.iterdir(): |
| try: |
| if item.is_file(): |
| item.unlink() |
| deleted_count += 1 |
| print(f"Deleted file: {item}") |
| elif item.is_dir(): |
| shutil.rmtree(item) |
| deleted_count += 1 |
| print(f"Deleted directory: {item}") |
| except OSError as e: |
| errors.append(f"Failed to delete {item}: {str(e)}") |
| |
| except OSError as e: |
| raise OSError(f"Failed to access tmp directory: {str(e)}") |
| |
| if errors: |
| error_msg = "; ".join(errors) |
| raise OSError(f"Some files could not be deleted: {error_msg}") |
| |
| print(f"Successfully cleared tmp directory. Deleted {deleted_count} items.") |
| return deleted_count |
| |
| def list_files(self) -> List[str]: |
| """ |
| List all files in the tmp directory. |
| |
| Returns: |
| List[str]: List of file paths in the tmp directory |
| """ |
| if not self.tmp_dir_path.exists(): |
| return [] |
| |
| files = [] |
| try: |
| for item in self.tmp_dir_path.iterdir(): |
| if item.is_file(): |
| files.append(str(item)) |
| except OSError: |
| |
| pass |
| |
| return files |
| |
| def get_tmp_dir_size(self) -> int: |
| """ |
| Get the total size of all files in the tmp directory. |
| |
| Returns: |
| int: Total size in bytes |
| """ |
| if not self.tmp_dir_path.exists(): |
| return 0 |
| |
| total_size = 0 |
| try: |
| for item in self.tmp_dir_path.rglob('*'): |
| if item.is_file(): |
| total_size += item.stat().st_size |
| except OSError: |
| |
| pass |
| |
| return total_size |
| |
| def is_url(self, path_or_url: str) -> bool: |
| """ |
| Check if the given string is a URL or a file path. |
| |
| Args: |
| path_or_url (str): String to check |
| |
| Returns: |
| bool: True if it's a URL, False if it's a file path |
| """ |
| return path_or_url.startswith(('http://', 'https://')) |
| |
| def get_file_path(self, path_or_url: str, filename: Optional[str] = None) -> str: |
| """ |
| Get file path - download if URL, return as-is if file path. |
| |
| Args: |
| path_or_url (str): URL to download or file path to use |
| filename (str, optional): Custom filename for downloads |
| |
| Returns: |
| str: File path to use |
| |
| Raises: |
| FileNotFoundError: If file path doesn't exist |
| requests.RequestException: If URL download fails |
| """ |
| if self.is_url(path_or_url): |
| |
| return self.download(path_or_url, filename) |
| else: |
| |
| if not os.path.exists(path_or_url): |
| raise FileNotFoundError(f"File not found: {path_or_url}") |
| return path_or_url |
| |
| def __str__(self) -> str: |
| """String representation of the FileDownloader.""" |
| return f"FileDownloader(tmp_dir='{self.tmp_dir_path}')" |
| |
| def __repr__(self) -> str: |
| """Detailed string representation of the FileDownloader.""" |
| file_count = len(self.list_files()) |
| size = self.get_tmp_dir_size() |
| return f"FileDownloader(tmp_dir='{self.tmp_dir_path}', files={file_count}, size={size} bytes)" |