File size: 6,554 Bytes
e4fd6e0
 
 
 
 
5105d0e
e4fd6e0
 
 
 
 
 
5105d0e
 
 
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
5105d0e
 
e4fd6e0
8e35842
 
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5105d0e
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1499a5e
e4fd6e0
 
 
 
 
 
 
e3566c9
e4fd6e0
 
5105d0e
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
1499a5e
e4fd6e0
 
5105d0e
e4fd6e0
 
 
 
8e35842
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Database models for ICH Screening Application with user authentication and privacy
"""
import os
from datetime import datetime
from zoneinfo import ZoneInfo
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
import secrets

db = SQLAlchemy()
IST = ZoneInfo("Asia/Kolkata")

def now_ist() -> datetime:
    return datetime.now(IST).replace(tzinfo=None)


class User(UserMixin, db.Model):
    """User account model for authentication"""
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(255), nullable=False)
    full_name = db.Column(db.String(120))
    created_at = db.Column(db.DateTime, default=now_ist, nullable=False)
    updated_at = db.Column(db.DateTime, default=now_ist, onupdate=now_ist)
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    # Avatar (Cloudinary)
    avatar_url = db.Column(db.String(500), nullable=True)
    avatar_public_id = db.Column(db.String(255), nullable=True)
    
    # Relationships
    screening_uploads = db.relationship('ScreeningUpload', backref='user', lazy=True, cascade='all, delete-orphan')
    screening_reports = db.relationship('ScreeningReport', backref='user', lazy=True, cascade='all, delete-orphan')
    
    def set_password(self, password):
        """Hash and set the user's password"""
        self.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
    
    def check_password(self, password):
        """Verify password against stored hash"""
        return check_password_hash(self.password_hash, password)
    
    def __repr__(self):
        return f'<User {self.username}>'


class ScreeningUpload(db.Model):
    """Track uploaded DICOM files with user ownership"""
    __tablename__ = 'screening_uploads'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
    file_name = db.Column(db.String(255), nullable=False)
    original_filename = db.Column(db.String(255), nullable=False)
    file_size = db.Column(db.Integer)  # bytes
    file_path = db.Column(db.String(500), nullable=False)  # Relative to user's upload dir
    upload_timestamp = db.Column(db.DateTime, default=now_ist, nullable=False, index=True)
    processing_status = db.Column(db.String(20), default='pending')  # pending, processing, completed, failed
    processing_error = db.Column(db.Text)  # Error message if failed
    
    # Relationships
    reports = db.relationship('ScreeningReport', backref='upload', lazy=True, cascade='all, delete-orphan')
    
    def __repr__(self):
        return f'<ScreeningUpload {self.id} - user {self.user_id}>'


class ScreeningReport(db.Model):
    """Store screening results with full user isolation"""
    __tablename__ = 'screening_reports'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
    upload_id = db.Column(db.Integer, db.ForeignKey('screening_uploads.id'), nullable=False, index=True)
    image_id = db.Column(db.String(100), nullable=False)
    
    # Prediction results
    screening_outcome = db.Column(db.String(100))
    raw_probability = db.Column(db.Float)
    calibrated_probability = db.Column(db.Float)
    confidence_band = db.Column(db.String(50))
    decision_threshold = db.Column(db.Float)
    
    # Triage information
    triage_action = db.Column(db.String(100))
    urgency = db.Column(db.String(50))
    llm_summary = db.Column(db.Text)
    
    # Ground truth (for validation only)
    true_label = db.Column(db.String(100))
    
    # File paths (relative to user's data dir)
    report_json_path = db.Column(db.String(500))
    gradcam_image_path = db.Column(db.String(500))
    report_payload = db.Column(db.Text)
    
    # Generated timestamp
    generated_at = db.Column(db.DateTime, default=now_ist, nullable=False, index=True)
    created_at = db.Column(db.DateTime, default=now_ist, nullable=False)
    
    def __repr__(self):
        return f'<ScreeningReport {self.id} - user {self.user_id} - {self.image_id}>'


class AuditLog(db.Model):
    """Audit trail for security and compliance"""
    __tablename__ = 'audit_logs'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True, index=True)
    action = db.Column(db.String(100), nullable=False)  # login, logout, upload, delete, download, etc.
    resource_type = db.Column(db.String(50))  # upload, report, etc.
    resource_id = db.Column(db.String(255))
    details = db.Column(db.Text)  # JSON or plain text with additional info
    ip_address = db.Column(db.String(45))  # IPv4 or IPv6
    timestamp = db.Column(db.DateTime, default=now_ist, nullable=False, index=True)
    status = db.Column(db.String(20), default='success')  # success, failure
    
    def __repr__(self):
        return f'<AuditLog {self.action} - user {self.user_id} - {self.timestamp}>'


class PendingOtp(db.Model):
    """Server-side OTP storage — avoids relying on session cookies (broken in cross-origin iframes)."""
    __tablename__ = 'pending_otps'

    id         = db.Column(db.Integer, primary_key=True)
    # Opaque lookup token sent to the browser as a URL param (never the raw code)
    token      = db.Column(db.String(64), unique=True, nullable=False, index=True)
    email      = db.Column(db.String(120), nullable=False, index=True)
    purpose    = db.Column(db.String(50), nullable=False)   # verify_email | change_username | change_email
    otp_hash   = db.Column(db.String(64), nullable=False)   # SHA-256 of the 6-digit code
    expires_at = db.Column(db.DateTime, nullable=False)
    attempts   = db.Column(db.Integer, default=0, nullable=False)
    # Optional: store pending new value (e.g. new username / new email)
    pending_value = db.Column(db.String(255), nullable=True)
    # Optional FK — may be NULL for pre-registration flows
    user_id    = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=True, index=True)

    def is_expired(self) -> bool:
        return now_ist() > self.expires_at

    def __repr__(self):
        return f'<PendingOtp {self.purpose} for {self.email} expires {self.expires_at}>'