TheJackBright's picture
Deploy GitHub root master to Space
c296d62
"""Safety and legality verifier."""
from __future__ import annotations
from urllib.parse import urlparse
from app.common.enums import ActionType, DoseBucket
from app.common.types import PolyGuardAction, PolyGuardState, SafetyReport
from app.knowledge.ddi_knowledge import is_contraindicated_pair
from app.knowledge.duplicate_therapy_rules import has_duplicate_therapy
from app.knowledge.drug_catalog import DRUG_CLASSES
from app.knowledge.renal_rules import is_renal_unsafe
from app.knowledge.hepatic_rules import is_hepatic_unsafe
from app.knowledge.substitution_rules import get_substitutions
from app.knowledge.taper_rules import requires_taper
def verify_action_legality(state: PolyGuardState, action: PolyGuardAction) -> SafetyReport:
violations: list[str] = []
patient = state.patient
med_names = [m.drug for m in patient.medications]
if action.target_drug and action.target_drug not in med_names and action.action_type != ActionType.SUBSTITUTE_WITHIN_CLASS:
violations.append("target_drug_not_in_regimen")
if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and not action.replacement_drug:
violations.append("replacement_drug_required")
if action.action_type == ActionType.SUBSTITUTE_WITHIN_CLASS and action.target_drug and action.replacement_drug:
allowed = set(get_substitutions(action.target_drug))
if action.replacement_drug not in allowed:
violations.append("unsafe_substitution")
if action.action_type == ActionType.RECOMMEND_ALTERNATIVE:
if not action.target_drug or not action.replacement_drug:
violations.append("alternative_requires_target_and_replacement")
elif action.target_drug and action.replacement_drug:
allowed = set(get_substitutions(action.target_drug))
if action.replacement_drug not in allowed:
violations.append("unsafe_alternative_recommendation")
if action.action_type == ActionType.FETCH_EXTERNAL_EVIDENCE:
if not action.evidence_query:
violations.append("missing_evidence_query")
else:
query = action.evidence_query.strip()
if query.startswith("http"):
host = urlparse(query).netloc.lower()
allowlist = {"nih.gov", "fda.gov", "who.int", "ema.europa.eu"}
if host and not any(host.endswith(domain) for domain in allowlist):
violations.append("evidence_domain_not_allowlisted")
if action.action_type == ActionType.DECOMPOSE_NEW_DRUG:
if not action.new_drug_name:
violations.append("missing_new_drug_name")
if not action.candidate_components:
violations.append("missing_candidate_components")
if action.action_type == ActionType.STOP_DRUG and action.target_drug and requires_taper(action.target_drug):
if action.taper_days is None:
violations.append("abrupt_stop_requires_taper")
if action.action_type in {ActionType.TAPER_INITIATE, ActionType.TAPER_CONTINUE} and action.target_drug:
if not requires_taper(action.target_drug):
violations.append("invalid_taper_target")
if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.dose_bucket == DoseBucket.HIGH:
if action.target_drug and is_renal_unsafe(action.target_drug, patient.labs.egfr):
violations.append("renal_unsafe_dose")
if action.target_drug and is_hepatic_unsafe(action.target_drug, patient.labs.ast, patient.labs.alt):
violations.append("hepatic_unsafe_dose")
if action.action_type == ActionType.REDUCE_DOSE_BUCKET and action.target_drug:
idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1
if idx >= 0 and patient.medications[idx].dose_bucket in {DoseBucket.LOW, DoseBucket.HOLD}:
violations.append("dose_already_minimized")
if action.action_type == ActionType.INCREASE_DOSE_BUCKET and action.target_drug:
idx = med_names.index(action.target_drug) if action.target_drug in med_names else -1
if idx >= 0 and patient.medications[idx].dose_bucket == DoseBucket.HIGH:
violations.append("dose_overshoot_risk")
if patient.frailty_score > 0.7 and DRUG_CLASSES.get(action.target_drug) == "sedative":
violations.append("invalid_class_escalation")
# Duplicate therapy check after substitutions/increases.
if action.action_type in {
ActionType.SUBSTITUTE_WITHIN_CLASS,
ActionType.RECOMMEND_ALTERNATIVE,
ActionType.INCREASE_DOSE_BUCKET,
} and has_duplicate_therapy(patient.medications, action.target_drug, action.replacement_drug):
violations.append("duplicate_therapy")
if action.replacement_drug and action.target_drug and is_contraindicated_pair(action.target_drug, action.replacement_drug):
violations.append("unsafe_substitution_contraindication")
if action.replacement_drug:
contraindication_hits = 0
for med in patient.medications:
if med.drug != action.target_drug and is_contraindicated_pair(med.drug, action.replacement_drug):
violations.append("creates_severe_contraindicated_pair")
contraindication_hits += 1
if contraindication_hits >= 2:
violations.append("dangerous_triple_risk_creation")
if action.action_type in {ActionType.ORDER_MONITORING_AND_WAIT, ActionType.DOSE_HOLD} and not action.monitoring_plan:
violations.append("invalid_monitoring_gap")
if (
action.action_type == ActionType.STOP_DRUG
and action.target_drug == "warfarin_like"
and "afib" in patient.comorbidities
and not action.replacement_drug
):
violations.append("destabilizing_deprescribing")
legal = len(violations) == 0
severity = "none" if legal else ("high" if len(violations) > 1 else "medium")
fallback = (
ActionType.REQUEST_SPECIALIST_REVIEW
if not legal
else ActionType.KEEP_REGIMEN
)
return SafetyReport(
legal=legal,
violations=violations,
severity=severity,
recommended_fallback=fallback,
uncertainty_notes=["manual_review_recommended"] if not legal else [],
)