File size: 5,122 Bytes
e4fd6e0
 
 
 
 
5105d0e
e4fd6e0
 
 
 
 
 
5105d0e
 
 
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
5105d0e
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5105d0e
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1499a5e
e4fd6e0
 
 
 
 
 
 
e3566c9
e4fd6e0
 
5105d0e
 
e4fd6e0
 
 
 
 
 
 
 
 
 
 
 
 
1499a5e
e4fd6e0
 
5105d0e
e4fd6e0
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
Database models for ICH Screening Application with user authentication and privacy
"""
import os
from datetime import datetime
from zoneinfo import ZoneInfo
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
import secrets

db = SQLAlchemy()
IST = ZoneInfo("Asia/Kolkata")

def now_ist() -> datetime:
    return datetime.now(IST).replace(tzinfo=None)


class User(UserMixin, db.Model):
    """User account model for authentication"""
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(255), nullable=False)
    full_name = db.Column(db.String(120))
    created_at = db.Column(db.DateTime, default=now_ist, nullable=False)
    updated_at = db.Column(db.DateTime, default=now_ist, onupdate=now_ist)
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    
    # Relationships
    screening_uploads = db.relationship('ScreeningUpload', backref='user', lazy=True, cascade='all, delete-orphan')
    screening_reports = db.relationship('ScreeningReport', backref='user', lazy=True, cascade='all, delete-orphan')
    
    def set_password(self, password):
        """Hash and set the user's password"""
        self.password_hash = generate_password_hash(password, method='pbkdf2:sha256')
    
    def check_password(self, password):
        """Verify password against stored hash"""
        return check_password_hash(self.password_hash, password)
    
    def __repr__(self):
        return f'<User {self.username}>'


class ScreeningUpload(db.Model):
    """Track uploaded DICOM files with user ownership"""
    __tablename__ = 'screening_uploads'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
    file_name = db.Column(db.String(255), nullable=False)
    original_filename = db.Column(db.String(255), nullable=False)
    file_size = db.Column(db.Integer)  # bytes
    file_path = db.Column(db.String(500), nullable=False)  # Relative to user's upload dir
    upload_timestamp = db.Column(db.DateTime, default=now_ist, nullable=False, index=True)
    processing_status = db.Column(db.String(20), default='pending')  # pending, processing, completed, failed
    processing_error = db.Column(db.Text)  # Error message if failed
    
    # Relationships
    reports = db.relationship('ScreeningReport', backref='upload', lazy=True, cascade='all, delete-orphan')
    
    def __repr__(self):
        return f'<ScreeningUpload {self.id} - user {self.user_id}>'


class ScreeningReport(db.Model):
    """Store screening results with full user isolation"""
    __tablename__ = 'screening_reports'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
    upload_id = db.Column(db.Integer, db.ForeignKey('screening_uploads.id'), nullable=False, index=True)
    image_id = db.Column(db.String(100), nullable=False)
    
    # Prediction results
    screening_outcome = db.Column(db.String(100))
    raw_probability = db.Column(db.Float)
    calibrated_probability = db.Column(db.Float)
    confidence_band = db.Column(db.String(50))
    decision_threshold = db.Column(db.Float)
    
    # Triage information
    triage_action = db.Column(db.String(100))
    urgency = db.Column(db.String(50))
    llm_summary = db.Column(db.Text)
    
    # Ground truth (for validation only)
    true_label = db.Column(db.String(100))
    
    # File paths (relative to user's data dir)
    report_json_path = db.Column(db.String(500))
    gradcam_image_path = db.Column(db.String(500))
    report_payload = db.Column(db.Text)
    
    # Generated timestamp
    generated_at = db.Column(db.DateTime, default=now_ist, nullable=False, index=True)
    created_at = db.Column(db.DateTime, default=now_ist, nullable=False)
    
    def __repr__(self):
        return f'<ScreeningReport {self.id} - user {self.user_id} - {self.image_id}>'


class AuditLog(db.Model):
    """Audit trail for security and compliance"""
    __tablename__ = 'audit_logs'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True, index=True)
    action = db.Column(db.String(100), nullable=False)  # login, logout, upload, delete, download, etc.
    resource_type = db.Column(db.String(50))  # upload, report, etc.
    resource_id = db.Column(db.String(255))
    details = db.Column(db.Text)  # JSON or plain text with additional info
    ip_address = db.Column(db.String(45))  # IPv4 or IPv6
    timestamp = db.Column(db.DateTime, default=now_ist, nullable=False, index=True)
    status = db.Column(db.String(20), default='success')  # success, failure
    
    def __repr__(self):
        return f'<AuditLog {self.action} - user {self.user_id} - {self.timestamp}>'