TheJackBright's picture
Deploy GitHub root master to Space
c296d62
"""Parser for structured policy decisions.
Provides strict XML parsing, soft parsing fallback, and repair-backed parsing.
"""
from __future__ import annotations
import json
import xml.etree.ElementTree as ET
from app.common.exceptions import ParserError
from app.models.policy.output_schema import DecisionSchema
from app.models.policy.repair import repair_partial_decision
_REQUIRED_XML_FIELDS = {
"mode",
"action_type",
"target_drug",
"replacement_drug",
"dose_bucket",
"taper_days",
"candidate_id",
"confidence",
}
_ALLOWED_XML_FIELDS = _REQUIRED_XML_FIELDS | {"monitoring_plan", "rationale_brief"}
def _normalize_scalar(value: object) -> object:
if value is None:
return None
if isinstance(value, str) and value.strip().lower() in {"", "none", "null", "na"}:
return None
return value
def _coerce_payload_types(payload: dict[str, object]) -> dict[str, object]:
coerced = dict(payload)
for key in ["target_drug", "replacement_drug", "monitoring_plan", "taper_days", "rationale_brief"]:
coerced[key] = _normalize_scalar(coerced.get(key))
if coerced.get("taper_days") is not None:
coerced["taper_days"] = int(coerced["taper_days"])
if "confidence" in coerced:
coerced["confidence"] = float(coerced["confidence"])
return coerced
def parse_decision_strict_xml(raw: str) -> DecisionSchema:
"""Strictly parse the required XML decision schema."""
try:
root = ET.fromstring(raw.strip())
except Exception as exc: # noqa: BLE001
raise ParserError(f"Invalid XML: {exc}") from exc
if root.tag != "decision":
raise ParserError("XML decision root must be <decision>.")
payload: dict[str, object] = {}
for child in root:
if child.tag not in _ALLOWED_XML_FIELDS:
raise ParserError(f"Unknown XML field: {child.tag}")
payload[child.tag] = child.text
missing = sorted(_REQUIRED_XML_FIELDS - payload.keys())
if missing:
raise ParserError(f"Missing required XML fields: {missing}")
return DecisionSchema.model_validate(_coerce_payload_types(payload))
def parse_decision_soft(raw: str) -> DecisionSchema:
"""Best-effort parsing across XML or JSON decision payloads."""
stripped = raw.strip()
if not stripped:
raise ParserError("Empty decision payload.")
if "<decision" in stripped:
# Try exact parse first.
try:
return parse_decision_strict_xml(stripped)
except ParserError:
# Attempt to recover a wrapped/trailing buffer.
start = stripped.find("<decision")
end = stripped.rfind("</decision>")
if start >= 0 and end > start:
end += len("</decision>")
return parse_decision_strict_xml(stripped[start:end])
raise
try:
payload = json.loads(stripped)
except Exception as exc: # noqa: BLE001
raise ParserError(f"Unable to parse JSON decision: {exc}") from exc
return DecisionSchema.model_validate(_coerce_payload_types(payload))
def parse_decision_with_repair(raw: str) -> DecisionSchema:
"""Soft-parse and deterministically repair malformed decision payloads."""
try:
parsed = parse_decision_soft(raw)
repaired = repair_partial_decision(parsed.model_dump(mode="json"))
return DecisionSchema.model_validate(repaired)
except Exception:
stripped = raw.strip()
payload: dict[str, object] = {}
if stripped.startswith("{"):
try:
payload = json.loads(stripped)
except Exception: # noqa: BLE001
payload = {}
repaired = repair_partial_decision(payload)
try:
return DecisionSchema.model_validate(repaired)
except Exception as exc: # noqa: BLE001
raise ParserError(f"Unable to parse decision after repair: {exc}") from exc
def parse_decision(raw: str) -> DecisionSchema:
"""Primary parse entrypoint (strict XML -> soft JSON/XML recovery)."""
if raw.strip().startswith("<decision"):
return parse_decision_strict_xml(raw)
return parse_decision_soft(raw)