File size: 6,278 Bytes
21c7db9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Safety and legality verifier."""

from __future__ import annotations

from urllib.parse import urlparse

from app.common.enums import ActionType, DoseBucket
from app.common.types import PolyGuardAction, PolyGuardState, SafetyReport
from app.knowledge.ddi_knowledge import is_contraindicated_pair
from app.knowledge.duplicate_therapy_rules import has_duplicate_therapy
from app.knowledge.drug_catalog import DRUG_CLASSES
from app.knowledge.renal_rules import is_renal_unsafe
from app.knowledge.hepatic_rules import is_hepatic_unsafe
from app.knowledge.substitution_rules import get_substitutions
from app.knowledge.taper_rules import requires_taper


def verify_action_legality(state: PolyGuardState, action: PolyGuardAction) -> SafetyReport:
    violations: list[str] = []
    patient = state.patient
    med_names = [m.drug for m in patient.medications]

    if action.target_drug and action.target_drug not in med_names and action.action_type != ActionType.SUBSTITUTE_WITHIN_CLASS:
        violations.append("target_drug_not_in_regimen")

    if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and not action.replacement_drug:
        violations.append("replacement_drug_required")
    if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and action.target_drug and action.replacement_drug:
        allowed = set(get_substitutions(action.target_drug))
        if action.replacement_drug not in allowed:
            violations.append("unsafe_substitution")

    if action.action_type == ActionType.RECOMMEND_ALTERNATIVE:
        if not action.target_drug or not action.replacement_drug:
            violations.append("alternative_requires_target_and_replacement")
        elif action.target_drug and action.replacement_drug:
            allowed = set(get_substitutions(action.target_drug))
            if action.replacement_drug not in allowed:
                violations.append("unsafe_alternative_recommendation")

    if action.action_type == ActionType.FETCH_EXTERNAL_EVIDENCE:
        if not action.evidence_query:
            violations.append("missing_evidence_query")
        else:
            query = action.evidence_query.strip()
            if query.startswith("http"):
                host = urlparse(query).netloc.lower()
                allowlist = {"nih.gov", "fda.gov", "who.int", "ema.europa.eu"}
                if host and not any(host.endswith(domain) for domain in allowlist):
                    violations.append("evidence_domain_not_allowlisted")

    if action.action_type == ActionType.DECOMPOSE_NEW_DRUG:
        if not action.new_drug_name:
            violations.append("missing_new_drug_name")
        if not action.candidate_components:
            violations.append("missing_candidate_components")

    if action.action_type == ActionType.STOP_DRUG and action.target_drug and requires_taper(action.target_drug):
        if action.taper_days is None:
            violations.append("abrupt_stop_requires_taper")
    if action.action_type in {ActionType.TAPER_INITIATE, ActionType.TAPER_CONTINUE} and action.target_drug:
        if not requires_taper(action.target_drug):
            violations.append("invalid_taper_target")

    if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.dose_bucket == DoseBucket.HIGH:
        if action.target_drug and is_renal_unsafe(action.target_drug, patient.labs.egfr):
            violations.append("renal_unsafe_dose")
        if action.target_drug and is_hepatic_unsafe(action.target_drug, patient.labs.ast, patient.labs.alt):
            violations.append("hepatic_unsafe_dose")
    if action.action_type == ActionType.REDUCE_DOSE_BUCKET and action.target_drug:
        idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1
        if idx >= 0 and patient.medications[idx].dose_bucket in {DoseBucket.LOW, DoseBucket.HOLD}:
            violations.append("dose_already_minimized")
    if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.target_drug:
        idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1
        if idx >= 0 and patient.medications[idx].dose_bucket == DoseBucket.HIGH:
            violations.append("dose_overshoot_risk")
        if patient.frailty_score > 0.7 and DRUG_CLASSES.get(action.target_drug) == "sedative":
            violations.append("invalid_class_escalation")

    # Duplicate therapy check after substitutions/increases.
    if action.action_type in {
        ActionType.SUBSTITUTE_WITHIN_CLASS,
        ActionType.RECOMMEND_ALTERNATIVE,
        ActionType.INCREASE_DOSE_BUCKET,
    } and has_duplicate_therapy(patient.medications, action.target_drug, action.replacement_drug):
        violations.append("duplicate_therapy")

    if action.replacement_drug and action.target_drug and is_contraindicated_pair(action.target_drug, action.replacement_drug):
        violations.append("unsafe_substitution_contraindication")
    if action.replacement_drug:
        contraindication_hits = 0
        for med in patient.medications:
            if med.drug != action.target_drug and is_contraindicated_pair(med.drug, action.replacement_drug):
                violations.append("creates_severe_contraindicated_pair")
                contraindication_hits += 1
        if contraindication_hits >= 2:
            violations.append("dangerous_triple_risk_creation")

    if action.action_type in {ActionType.ORDER_MONITORING_AND_WAIT, ActionType.DOSE_HOLD} and not action.monitoring_plan:
        violations.append("invalid_monitoring_gap")

    if (
        action.action_type == ActionType.STOP_DRUG
        and action.target_drug == "warfarin_like"
        and "afib" in patient.comorbidities
        and not action.replacement_drug
    ):
        violations.append("destabilizing_deprescribing")

    legal = len(violations) == 0
    severity = "none" if legal else ("high" if len(violations) > 1 else "medium")
    fallback = (
        ActionType.REQUEST_SPECIALIST_REVIEW
        if not legal
        else ActionType.KEEP_REGIMEN
    )
    return SafetyReport(
        legal=legal,
        violations=violations,
        severity=severity,
        recommended_fallback=fallback,
        uncertainty_notes=["manual_review_recommended"] if not legal else [],
    )