import os import requests import shutil import tempfile import uuid from pathlib import Path from typing import Optional, List from urllib.parse import urlparse class FileDownloader: """ A class for downloading files from URLs and managing them in a temporary directory. Provides functionality to: 1. Download files from URLs and save to tmp directory 2. Delete specific files from tmp directory 3. Clear all files from tmp directory """ def __init__(self, tmp_dir_name: str = "tmp"): """ Initialize the FileDownloader. Args: tmp_dir_name (str): Name of the temporary directory to use """ self.tmp_dir_name = tmp_dir_name self.tmp_dir_path = Path(tmp_dir_name) self._ensure_tmp_directory() def _ensure_tmp_directory(self) -> None: """Ensure the temporary directory exists.""" self.tmp_dir_path.mkdir(exist_ok=True) def _get_filename_from_url(self, url: str) -> str: """ Extract filename from URL, with fallback to generated name. Args: url (str): The URL to extract filename from Returns: str: The filename """ parsed_url = urlparse(url) filename = os.path.basename(parsed_url.path) # If no filename found in URL, generate one if not filename or '.' not in filename: # Try to get extension from content-type later, for now use generic filename = f"downloaded_file_{uuid.uuid4().hex[:8]}" return filename def _get_unique_filename(self, filename: str) -> str: """ Ensure filename is unique in the tmp directory. Args: filename (str): Original filename Returns: str: Unique filename """ base_path = self.tmp_dir_path / filename if not base_path.exists(): return filename # Split filename into name and extension name_part = base_path.stem ext_part = base_path.suffix counter = 1 while True: new_filename = f"{name_part}_{counter}{ext_part}" new_path = self.tmp_dir_path / new_filename if not new_path.exists(): return new_filename counter += 1 def download(self, url: str, filename: Optional[str] = None, timeout: int = 30, chunk_size: int = 8192) -> str: """ Download a file from URL and save to tmp directory. Args: url (str): URL to download from filename (str, optional): Custom filename. If None, extract from URL timeout (int): Request timeout in seconds chunk_size (int): Size of chunks for streaming download Returns: str: Full path to the downloaded file Raises: requests.RequestException: If download fails IOError: If file writing fails """ try: # Start the download response = requests.get(url, stream=True, timeout=timeout) response.raise_for_status() # Determine filename if filename is None: filename = self._get_filename_from_url(url) # Try to get better filename from Content-Disposition header content_disposition = response.headers.get('content-disposition') if content_disposition and 'filename=' in content_disposition: try: # Extract filename from Content-Disposition header import re filename_match = re.search(r'filename[*]?=([^;]+)', content_disposition) if filename_match: header_filename = filename_match.group(1).strip('"\'') if header_filename: filename = header_filename except Exception: # If header parsing fails, keep the original filename pass # If still no extension, try to infer from content-type if '.' not in filename: content_type = response.headers.get('content-type', '').lower() if 'pdf' in content_type: filename += '.pdf' elif 'image/jpeg' in content_type or 'image/jpg' in content_type: filename += '.jpg' elif 'image/png' in content_type: filename += '.png' elif 'text/plain' in content_type: filename += '.txt' elif 'application/json' in content_type: filename += '.json' elif 'text/html' in content_type: filename += '.html' # Ensure unique filename filename = self._get_unique_filename(filename) file_path = self.tmp_dir_path / filename # Download and save file in chunks with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: # Filter out keep-alive chunks f.write(chunk) print(f"Successfully downloaded: {url} -> {file_path}") return str(file_path) except requests.exceptions.RequestException as e: raise requests.RequestException(f"Failed to download {url}: {str(e)}") except IOError as e: raise IOError(f"Failed to save file {filename}: {str(e)}") def delete_file(self, file_path: str) -> bool: """ Delete a specific file from the tmp directory. Args: file_path (str): Path to the file to delete (can be full path or just filename) Returns: bool: True if file was deleted, False if file didn't exist Raises: ValueError: If file is not in the tmp directory OSError: If deletion fails """ # Convert to Path object path = Path(file_path) # If it's just a filename, assume it's in tmp directory if not path.is_absolute() and len(path.parts) == 1: path = self.tmp_dir_path / path # Ensure the file is within our tmp directory for security try: resolved_path = path.resolve() tmp_resolved = self.tmp_dir_path.resolve() if not str(resolved_path).startswith(str(tmp_resolved)): raise ValueError(f"File {file_path} is not in the tmp directory {self.tmp_dir_path}") except (OSError, ValueError) as e: raise ValueError(f"Invalid file path {file_path}: {str(e)}") # Delete the file if path.exists(): try: path.unlink() print(f"Successfully deleted: {path}") return True except OSError as e: raise OSError(f"Failed to delete {path}: {str(e)}") else: print(f"File not found: {path}") return False def clear_tmp_directory(self) -> int: """ Clear all files from the tmp directory. Returns: int: Number of files deleted Raises: OSError: If clearing fails """ if not self.tmp_dir_path.exists(): print(f"Tmp directory {self.tmp_dir_path} does not exist") return 0 deleted_count = 0 errors = [] try: for item in self.tmp_dir_path.iterdir(): try: if item.is_file(): item.unlink() deleted_count += 1 print(f"Deleted file: {item}") elif item.is_dir(): shutil.rmtree(item) deleted_count += 1 print(f"Deleted directory: {item}") except OSError as e: errors.append(f"Failed to delete {item}: {str(e)}") except OSError as e: raise OSError(f"Failed to access tmp directory: {str(e)}") if errors: error_msg = "; ".join(errors) raise OSError(f"Some files could not be deleted: {error_msg}") print(f"Successfully cleared tmp directory. Deleted {deleted_count} items.") return deleted_count def list_files(self) -> List[str]: """ List all files in the tmp directory. Returns: List[str]: List of file paths in the tmp directory """ if not self.tmp_dir_path.exists(): return [] files = [] try: for item in self.tmp_dir_path.iterdir(): if item.is_file(): files.append(str(item)) except OSError: # If we can't read the directory, return empty list pass return files def get_tmp_dir_size(self) -> int: """ Get the total size of all files in the tmp directory. Returns: int: Total size in bytes """ if not self.tmp_dir_path.exists(): return 0 total_size = 0 try: for item in self.tmp_dir_path.rglob('*'): if item.is_file(): total_size += item.stat().st_size except OSError: # If we can't access some files, return partial size pass return total_size def is_url(self, path_or_url: str) -> bool: """ Check if the given string is a URL or a file path. Args: path_or_url (str): String to check Returns: bool: True if it's a URL, False if it's a file path """ return path_or_url.startswith(('http://', 'https://')) def get_file_path(self, path_or_url: str, filename: Optional[str] = None) -> str: """ Get file path - download if URL, return as-is if file path. Args: path_or_url (str): URL to download or file path to use filename (str, optional): Custom filename for downloads Returns: str: File path to use Raises: FileNotFoundError: If file path doesn't exist requests.RequestException: If URL download fails """ if self.is_url(path_or_url): # It's a URL, download it return self.download(path_or_url, filename) else: # It's a file path, verify it exists if not os.path.exists(path_or_url): raise FileNotFoundError(f"File not found: {path_or_url}") return path_or_url def __str__(self) -> str: """String representation of the FileDownloader.""" return f"FileDownloader(tmp_dir='{self.tmp_dir_path}')" def __repr__(self) -> str: """Detailed string representation of the FileDownloader.""" file_count = len(self.list_files()) size = self.get_tmp_dir_size() return f"FileDownloader(tmp_dir='{self.tmp_dir_path}', files={file_count}, size={size} bytes)"