File size: 7,822 Bytes
e4fd6e0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """
Data isolation and file management for user-specific screening data
Ensures users can only access their own files and data
"""
import os
import logging
from pathlib import Path
from flask_login import current_user
from models import db, ScreeningUpload, ScreeningReport
logger = logging.getLogger(__name__)
class UserDataManager:
"""Manages user-specific data storage and access control"""
def __init__(self, base_upload_dir: str = "uploads"):
self.base_upload_dir = Path(base_upload_dir)
self.base_upload_dir.mkdir(parents=True, exist_ok=True)
def get_user_upload_dir(self, user_id: int) -> Path:
"""Get the uploads directory for a specific user"""
user_dir = self.base_upload_dir / f"user_{user_id}" / "uploads"
user_dir.mkdir(parents=True, exist_ok=True)
return user_dir
def get_user_reports_dir(self, user_id: int) -> Path:
"""Get the reports directory for a specific user"""
reports_dir = self.base_upload_dir / f"user_{user_id}" / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
return reports_dir
def get_user_data_dir(self, user_id: int) -> Path:
"""Get the root data directory for a specific user"""
data_dir = self.base_upload_dir / f"user_{user_id}"
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir
def get_current_user_dir(self) -> Path:
"""Get upload directory for the current authenticated user"""
if not current_user.is_authenticated:
raise PermissionError("User not authenticated")
return self.get_user_upload_dir(current_user.id)
def get_current_user_reports_dir(self) -> Path:
"""Get reports directory for the current authenticated user"""
if not current_user.is_authenticated:
raise PermissionError("User not authenticated")
return self.get_user_reports_dir(current_user.id)
@staticmethod
def verify_file_ownership(user_id: int, file_path: str) -> bool:
"""
Verify that a file belongs to the specified user.
Prevents directory traversal attacks.
"""
user_data_dir = Path("uploads") / f"user_{user_id}"
try:
file_full_path = user_data_dir.resolve() / file_path
# Ensure the resolved path is still within the user's directory
return str(file_full_path).startswith(str(user_data_dir.resolve()))
except Exception:
return False
@staticmethod
def verify_upload_ownership(user_id: int, upload_id: int) -> bool:
"""Verify that an upload record belongs to the specified user"""
upload = ScreeningUpload.query.filter_by(id=upload_id, user_id=user_id).first()
return upload is not None
@staticmethod
def verify_report_ownership(user_id: int, report_id: int) -> bool:
"""Verify that a report record belongs to the specified user"""
report = ScreeningReport.query.filter_by(id=report_id, user_id=user_id).first()
return report is not None
@staticmethod
def get_user_uploads(user_id: int, limit: int = None):
"""Get all uploads for a user with optional limit"""
query = ScreeningUpload.query.filter_by(user_id=user_id).order_by(
ScreeningUpload.upload_timestamp.desc()
)
if limit:
query = query.limit(limit)
return query.all()
@staticmethod
def get_user_reports(user_id: int, limit: int = None):
"""Get all reports for a user with optional limit"""
query = ScreeningReport.query.filter_by(user_id=user_id).order_by(
ScreeningReport.generated_at.desc()
)
if limit:
query = query.limit(limit)
return query.all()
@staticmethod
def get_report_statistics(user_id: int) -> dict:
"""Get statistics about a user's reports"""
reports = ScreeningReport.query.filter_by(user_id=user_id).all()
total = len(reports)
positive = len([r for r in reports if r.urgency and 'urgent' in r.urgency.lower()])
negative = total - positive
avg_cal_prob = 0
if total > 0:
avg_cal_prob = sum(r.calibrated_probability or 0 for r in reports) / total
return {
'total': total,
'positive': positive,
'negative': negative,
'avg_cal_prob': avg_cal_prob,
'pos_rate': (positive / total * 100) if total > 0 else 0
}
class SecureFileAccess:
"""Handles secure file access with permission checks"""
@staticmethod
def is_path_safe(base_dir: Path, requested_path: Path) -> bool:
"""
Verify that requested_path is within base_dir.
Prevents directory traversal attacks.
"""
try:
# Resolve both paths to absolute to prevent symlink tricks
base_resolved = base_dir.resolve()
path_resolved = (base_dir / requested_path).resolve()
# Check if the resolved path is within the base directory
path_resolved.relative_to(base_resolved)
return True
except ValueError:
return False
@staticmethod
def get_user_file(user_id: int, file_path: str):
"""
Safely retrieve a file that belongs to the user.
Returns None if file doesn't exist or user doesn't own it.
"""
if not UserDataManager.verify_file_ownership(user_id, file_path):
logger.warning(f"Unauthorized file access attempt by user {user_id}: {file_path}")
return None
user_data_dir = Path("uploads") / f"user_{user_id}"
full_path = (user_data_dir / file_path).resolve()
if not full_path.exists() or not full_path.is_file():
return None
return full_path
@staticmethod
def delete_user_file(user_id: int, file_path: str) -> bool:
"""
Safely delete a file that belongs to the user.
Returns True if successful, False otherwise.
"""
file_to_delete = SecureFileAccess.get_user_file(user_id, file_path)
if not file_to_delete:
return False
try:
file_to_delete.unlink()
logger.info(f"Deleted file for user {user_id}: {file_path}")
return True
except Exception as e:
logger.error(f"Failed to delete file for user {user_id}: {e}")
return False
def require_user_ownership(resource_type: str):
"""
Decorator to verify user ownership of resources before processing.
Args:
resource_type: 'upload' or 'report'
"""
from functools import wraps
from flask import request, abort
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
abort(401)
resource_id = request.view_args.get('id')
if not resource_id:
abort(400)
try:
resource_id = int(resource_id)
except (ValueError, TypeError):
abort(400)
if resource_type == 'upload':
if not UserDataManager.verify_upload_ownership(current_user.id, resource_id):
abort(403)
elif resource_type == 'report':
if not UserDataManager.verify_report_ownership(current_user.id, resource_id):
abort(403)
else:
abort(400)
return f(*args, **kwargs)
return decorated_function
return decorator
|