File size: 7,822 Bytes
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Data isolation and file management for user-specific screening data
Ensures users can only access their own files and data
"""
import os
import logging
from pathlib import Path
from flask_login import current_user
from models import db, ScreeningUpload, ScreeningReport

logger = logging.getLogger(__name__)


class UserDataManager:
    """Manages user-specific data storage and access control"""
    
    def __init__(self, base_upload_dir: str = "uploads"):
        self.base_upload_dir = Path(base_upload_dir)
        self.base_upload_dir.mkdir(parents=True, exist_ok=True)
    
    def get_user_upload_dir(self, user_id: int) -> Path:
        """Get the uploads directory for a specific user"""
        user_dir = self.base_upload_dir / f"user_{user_id}" / "uploads"
        user_dir.mkdir(parents=True, exist_ok=True)
        return user_dir
    
    def get_user_reports_dir(self, user_id: int) -> Path:
        """Get the reports directory for a specific user"""
        reports_dir = self.base_upload_dir / f"user_{user_id}" / "reports"
        reports_dir.mkdir(parents=True, exist_ok=True)
        return reports_dir
    
    def get_user_data_dir(self, user_id: int) -> Path:
        """Get the root data directory for a specific user"""
        data_dir = self.base_upload_dir / f"user_{user_id}"
        data_dir.mkdir(parents=True, exist_ok=True)
        return data_dir
    
    def get_current_user_dir(self) -> Path:
        """Get upload directory for the current authenticated user"""
        if not current_user.is_authenticated:
            raise PermissionError("User not authenticated")
        return self.get_user_upload_dir(current_user.id)
    
    def get_current_user_reports_dir(self) -> Path:
        """Get reports directory for the current authenticated user"""
        if not current_user.is_authenticated:
            raise PermissionError("User not authenticated")
        return self.get_user_reports_dir(current_user.id)
    
    @staticmethod
    def verify_file_ownership(user_id: int, file_path: str) -> bool:
        """
        Verify that a file belongs to the specified user.
        Prevents directory traversal attacks.
        """
        user_data_dir = Path("uploads") / f"user_{user_id}"
        try:
            file_full_path = user_data_dir.resolve() / file_path
            # Ensure the resolved path is still within the user's directory
            return str(file_full_path).startswith(str(user_data_dir.resolve()))
        except Exception:
            return False
    
    @staticmethod
    def verify_upload_ownership(user_id: int, upload_id: int) -> bool:
        """Verify that an upload record belongs to the specified user"""
        upload = ScreeningUpload.query.filter_by(id=upload_id, user_id=user_id).first()
        return upload is not None
    
    @staticmethod
    def verify_report_ownership(user_id: int, report_id: int) -> bool:
        """Verify that a report record belongs to the specified user"""
        report = ScreeningReport.query.filter_by(id=report_id, user_id=user_id).first()
        return report is not None
    
    @staticmethod
    def get_user_uploads(user_id: int, limit: int = None):
        """Get all uploads for a user with optional limit"""
        query = ScreeningUpload.query.filter_by(user_id=user_id).order_by(
            ScreeningUpload.upload_timestamp.desc()
        )
        if limit:
            query = query.limit(limit)
        return query.all()
    
    @staticmethod
    def get_user_reports(user_id: int, limit: int = None):
        """Get all reports for a user with optional limit"""
        query = ScreeningReport.query.filter_by(user_id=user_id).order_by(
            ScreeningReport.generated_at.desc()
        )
        if limit:
            query = query.limit(limit)
        return query.all()
    
    @staticmethod
    def get_report_statistics(user_id: int) -> dict:
        """Get statistics about a user's reports"""
        reports = ScreeningReport.query.filter_by(user_id=user_id).all()
        
        total = len(reports)
        positive = len([r for r in reports if r.urgency and 'urgent' in r.urgency.lower()])
        negative = total - positive
        
        avg_cal_prob = 0
        if total > 0:
            avg_cal_prob = sum(r.calibrated_probability or 0 for r in reports) / total
        
        return {
            'total': total,
            'positive': positive,
            'negative': negative,
            'avg_cal_prob': avg_cal_prob,
            'pos_rate': (positive / total * 100) if total > 0 else 0
        }


class SecureFileAccess:
    """Handles secure file access with permission checks"""
    
    @staticmethod
    def is_path_safe(base_dir: Path, requested_path: Path) -> bool:
        """
        Verify that requested_path is within base_dir.
        Prevents directory traversal attacks.
        """
        try:
            # Resolve both paths to absolute to prevent symlink tricks
            base_resolved = base_dir.resolve()
            path_resolved = (base_dir / requested_path).resolve()
            
            # Check if the resolved path is within the base directory
            path_resolved.relative_to(base_resolved)
            return True
        except ValueError:
            return False
    
    @staticmethod
    def get_user_file(user_id: int, file_path: str):
        """
        Safely retrieve a file that belongs to the user.
        Returns None if file doesn't exist or user doesn't own it.
        """
        if not UserDataManager.verify_file_ownership(user_id, file_path):
            logger.warning(f"Unauthorized file access attempt by user {user_id}: {file_path}")
            return None
        
        user_data_dir = Path("uploads") / f"user_{user_id}"
        full_path = (user_data_dir / file_path).resolve()
        
        if not full_path.exists() or not full_path.is_file():
            return None
        
        return full_path
    
    @staticmethod
    def delete_user_file(user_id: int, file_path: str) -> bool:
        """
        Safely delete a file that belongs to the user.
        Returns True if successful, False otherwise.
        """
        file_to_delete = SecureFileAccess.get_user_file(user_id, file_path)
        if not file_to_delete:
            return False
        
        try:
            file_to_delete.unlink()
            logger.info(f"Deleted file for user {user_id}: {file_path}")
            return True
        except Exception as e:
            logger.error(f"Failed to delete file for user {user_id}: {e}")
            return False


def require_user_ownership(resource_type: str):
    """
    Decorator to verify user ownership of resources before processing.
    
    Args:
        resource_type: 'upload' or 'report'
    """
    from functools import wraps
    from flask import request, abort
    
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not current_user.is_authenticated:
                abort(401)
            
            resource_id = request.view_args.get('id')
            if not resource_id:
                abort(400)
            
            try:
                resource_id = int(resource_id)
            except (ValueError, TypeError):
                abort(400)
            
            if resource_type == 'upload':
                if not UserDataManager.verify_upload_ownership(current_user.id, resource_id):
                    abort(403)
            elif resource_type == 'report':
                if not UserDataManager.verify_report_ownership(current_user.id, resource_id):
                    abort(403)
            else:
                abort(400)
            
            return f(*args, **kwargs)
        
        return decorated_function
    return decorator