"""Parser for structured policy decisions. Provides strict XML parsing, soft parsing fallback, and repair-backed parsing. """ from __future__ import annotations import json import xml.etree.ElementTree as ET from app.common.exceptions import ParserError from app.models.policy.output_schema import DecisionSchema from app.models.policy.repair import repair_partial_decision _REQUIRED_XML_FIELDS = { "mode", "action_type", "target_drug", "replacement_drug", "dose_bucket", "taper_days", "candidate_id", "confidence", } _ALLOWED_XML_FIELDS = _REQUIRED_XML_FIELDS | {"monitoring_plan", "rationale_brief"} def _normalize_scalar(value: object) -> object: if value is None: return None if isinstance(value, str) and value.strip().lower() in {"", "none", "null", "na"}: return None return value def _coerce_payload_types(payload: dict[str, object]) -> dict[str, object]: coerced = dict(payload) for key in ["target_drug", "replacement_drug", "monitoring_plan", "taper_days", "rationale_brief"]: coerced[key] = _normalize_scalar(coerced.get(key)) if coerced.get("taper_days") is not None: coerced["taper_days"] = int(coerced["taper_days"]) if "confidence" in coerced: coerced["confidence"] = float(coerced["confidence"]) return coerced def parse_decision_strict_xml(raw: str) -> DecisionSchema: """Strictly parse the required XML decision schema.""" try: root = ET.fromstring(raw.strip()) except Exception as exc: # noqa: BLE001 raise ParserError(f"Invalid XML: {exc}") from exc if root.tag != "decision": raise ParserError("XML decision root must be .") payload: dict[str, object] = {} for child in root: if child.tag not in _ALLOWED_XML_FIELDS: raise ParserError(f"Unknown XML field: {child.tag}") payload[child.tag] = child.text missing = sorted(_REQUIRED_XML_FIELDS - payload.keys()) if missing: raise ParserError(f"Missing required XML fields: {missing}") return DecisionSchema.model_validate(_coerce_payload_types(payload)) def parse_decision_soft(raw: str) -> DecisionSchema: """Best-effort parsing across XML or JSON decision payloads.""" stripped = raw.strip() if not stripped: raise ParserError("Empty decision payload.") if "") if start >= 0 and end > start: end += len("") return parse_decision_strict_xml(stripped[start:end]) raise try: payload = json.loads(stripped) except Exception as exc: # noqa: BLE001 raise ParserError(f"Unable to parse JSON decision: {exc}") from exc return DecisionSchema.model_validate(_coerce_payload_types(payload)) def parse_decision_with_repair(raw: str) -> DecisionSchema: """Soft-parse and deterministically repair malformed decision payloads.""" try: parsed = parse_decision_soft(raw) repaired = repair_partial_decision(parsed.model_dump(mode="json")) return DecisionSchema.model_validate(repaired) except Exception: stripped = raw.strip() payload: dict[str, object] = {} if stripped.startswith("{"): try: payload = json.loads(stripped) except Exception: # noqa: BLE001 payload = {} repaired = repair_partial_decision(payload) try: return DecisionSchema.model_validate(repaired) except Exception as exc: # noqa: BLE001 raise ParserError(f"Unable to parse decision after repair: {exc}") from exc def parse_decision(raw: str) -> DecisionSchema: """Primary parse entrypoint (strict XML -> soft JSON/XML recovery).""" if raw.strip().startswith("