Final_Assignment_Template / utils /file_downloader.py
Stardust00's picture
setup tools
0335261
import os
import requests
import shutil
import tempfile
import uuid
from pathlib import Path
from typing import Optional, List
from urllib.parse import urlparse
class FileDownloader:
"""
A class for downloading files from URLs and managing them in a temporary directory.
Provides functionality to:
1. Download files from URLs and save to tmp directory
2. Delete specific files from tmp directory
3. Clear all files from tmp directory
"""
def __init__(self, tmp_dir_name: str = "tmp"):
"""
Initialize the FileDownloader.
Args:
tmp_dir_name (str): Name of the temporary directory to use
"""
self.tmp_dir_name = tmp_dir_name
self.tmp_dir_path = Path(tmp_dir_name)
self._ensure_tmp_directory()
def _ensure_tmp_directory(self) -> None:
"""Ensure the temporary directory exists."""
self.tmp_dir_path.mkdir(exist_ok=True)
def _get_filename_from_url(self, url: str) -> str:
"""
Extract filename from URL, with fallback to generated name.
Args:
url (str): The URL to extract filename from
Returns:
str: The filename
"""
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
# If no filename found in URL, generate one
if not filename or '.' not in filename:
# Try to get extension from content-type later, for now use generic
filename = f"downloaded_file_{uuid.uuid4().hex[:8]}"
return filename
def _get_unique_filename(self, filename: str) -> str:
"""
Ensure filename is unique in the tmp directory.
Args:
filename (str): Original filename
Returns:
str: Unique filename
"""
base_path = self.tmp_dir_path / filename
if not base_path.exists():
return filename
# Split filename into name and extension
name_part = base_path.stem
ext_part = base_path.suffix
counter = 1
while True:
new_filename = f"{name_part}_{counter}{ext_part}"
new_path = self.tmp_dir_path / new_filename
if not new_path.exists():
return new_filename
counter += 1
def download(self, url: str, filename: Optional[str] = None,
timeout: int = 30, chunk_size: int = 8192) -> str:
"""
Download a file from URL and save to tmp directory.
Args:
url (str): URL to download from
filename (str, optional): Custom filename. If None, extract from URL
timeout (int): Request timeout in seconds
chunk_size (int): Size of chunks for streaming download
Returns:
str: Full path to the downloaded file
Raises:
requests.RequestException: If download fails
IOError: If file writing fails
"""
try:
# Start the download
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()
# Determine filename
if filename is None:
filename = self._get_filename_from_url(url)
# Try to get better filename from Content-Disposition header
content_disposition = response.headers.get('content-disposition')
if content_disposition and 'filename=' in content_disposition:
try:
# Extract filename from Content-Disposition header
import re
filename_match = re.search(r'filename[*]?=([^;]+)', content_disposition)
if filename_match:
header_filename = filename_match.group(1).strip('"\'')
if header_filename:
filename = header_filename
except Exception:
# If header parsing fails, keep the original filename
pass
# If still no extension, try to infer from content-type
if '.' not in filename:
content_type = response.headers.get('content-type', '').lower()
if 'pdf' in content_type:
filename += '.pdf'
elif 'image/jpeg' in content_type or 'image/jpg' in content_type:
filename += '.jpg'
elif 'image/png' in content_type:
filename += '.png'
elif 'text/plain' in content_type:
filename += '.txt'
elif 'application/json' in content_type:
filename += '.json'
elif 'text/html' in content_type:
filename += '.html'
# Ensure unique filename
filename = self._get_unique_filename(filename)
file_path = self.tmp_dir_path / filename
# Download and save file in chunks
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # Filter out keep-alive chunks
f.write(chunk)
print(f"Successfully downloaded: {url} -> {file_path}")
return str(file_path)
except requests.exceptions.RequestException as e:
raise requests.RequestException(f"Failed to download {url}: {str(e)}")
except IOError as e:
raise IOError(f"Failed to save file {filename}: {str(e)}")
def delete_file(self, file_path: str) -> bool:
"""
Delete a specific file from the tmp directory.
Args:
file_path (str): Path to the file to delete (can be full path or just filename)
Returns:
bool: True if file was deleted, False if file didn't exist
Raises:
ValueError: If file is not in the tmp directory
OSError: If deletion fails
"""
# Convert to Path object
path = Path(file_path)
# If it's just a filename, assume it's in tmp directory
if not path.is_absolute() and len(path.parts) == 1:
path = self.tmp_dir_path / path
# Ensure the file is within our tmp directory for security
try:
resolved_path = path.resolve()
tmp_resolved = self.tmp_dir_path.resolve()
if not str(resolved_path).startswith(str(tmp_resolved)):
raise ValueError(f"File {file_path} is not in the tmp directory {self.tmp_dir_path}")
except (OSError, ValueError) as e:
raise ValueError(f"Invalid file path {file_path}: {str(e)}")
# Delete the file
if path.exists():
try:
path.unlink()
print(f"Successfully deleted: {path}")
return True
except OSError as e:
raise OSError(f"Failed to delete {path}: {str(e)}")
else:
print(f"File not found: {path}")
return False
def clear_tmp_directory(self) -> int:
"""
Clear all files from the tmp directory.
Returns:
int: Number of files deleted
Raises:
OSError: If clearing fails
"""
if not self.tmp_dir_path.exists():
print(f"Tmp directory {self.tmp_dir_path} does not exist")
return 0
deleted_count = 0
errors = []
try:
for item in self.tmp_dir_path.iterdir():
try:
if item.is_file():
item.unlink()
deleted_count += 1
print(f"Deleted file: {item}")
elif item.is_dir():
shutil.rmtree(item)
deleted_count += 1
print(f"Deleted directory: {item}")
except OSError as e:
errors.append(f"Failed to delete {item}: {str(e)}")
except OSError as e:
raise OSError(f"Failed to access tmp directory: {str(e)}")
if errors:
error_msg = "; ".join(errors)
raise OSError(f"Some files could not be deleted: {error_msg}")
print(f"Successfully cleared tmp directory. Deleted {deleted_count} items.")
return deleted_count
def list_files(self) -> List[str]:
"""
List all files in the tmp directory.
Returns:
List[str]: List of file paths in the tmp directory
"""
if not self.tmp_dir_path.exists():
return []
files = []
try:
for item in self.tmp_dir_path.iterdir():
if item.is_file():
files.append(str(item))
except OSError:
# If we can't read the directory, return empty list
pass
return files
def get_tmp_dir_size(self) -> int:
"""
Get the total size of all files in the tmp directory.
Returns:
int: Total size in bytes
"""
if not self.tmp_dir_path.exists():
return 0
total_size = 0
try:
for item in self.tmp_dir_path.rglob('*'):
if item.is_file():
total_size += item.stat().st_size
except OSError:
# If we can't access some files, return partial size
pass
return total_size
def is_url(self, path_or_url: str) -> bool:
"""
Check if the given string is a URL or a file path.
Args:
path_or_url (str): String to check
Returns:
bool: True if it's a URL, False if it's a file path
"""
return path_or_url.startswith(('http://', 'https://'))
def get_file_path(self, path_or_url: str, filename: Optional[str] = None) -> str:
"""
Get file path - download if URL, return as-is if file path.
Args:
path_or_url (str): URL to download or file path to use
filename (str, optional): Custom filename for downloads
Returns:
str: File path to use
Raises:
FileNotFoundError: If file path doesn't exist
requests.RequestException: If URL download fails
"""
if self.is_url(path_or_url):
# It's a URL, download it
return self.download(path_or_url, filename)
else:
# It's a file path, verify it exists
if not os.path.exists(path_or_url):
raise FileNotFoundError(f"File not found: {path_or_url}")
return path_or_url
def __str__(self) -> str:
"""String representation of the FileDownloader."""
return f"FileDownloader(tmp_dir='{self.tmp_dir_path}')"
def __repr__(self) -> str:
"""Detailed string representation of the FileDownloader."""
file_count = len(self.list_files())
size = self.get_tmp_dir_size()
return f"FileDownloader(tmp_dir='{self.tmp_dir_path}', files={file_count}, size={size} bytes)"