""" hardening.py — Defensive utilities for production-grade execution. Applied across the critical path to prevent crashes from: - None propagation - LLM timeouts - Malformed parser output - Environment exceptions - Type mismatches at boundaries Usage: from purpose_agent.hardening import safe_params, llm_call_with_timeout, safe_action All functions are pure — no side effects, no state. """ from __future__ import annotations import logging import signal import threading import time from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout from typing import Any, Callable, TypeVar logger = logging.getLogger("purpose_agent.hardening") T = TypeVar("T") # ═══════════════════════════════════════════════════════════════ # Null Safety # ═══════════════════════════════════════════════════════════════ def safe_params(params: Any) -> dict[str, Any]: """ Normalize action params to a guaranteed dict. Handles: None, string, list, or any non-dict garbage from parsers. """ if isinstance(params, dict): return params if params is None: return {} if isinstance(params, str): # Parser sometimes returns the raw string return {"_raw": params} return {} def safe_string(value: Any, default: str = "", max_len: int = 10000) -> str: """Guarantee a string value. Never returns None.""" if value is None: return default s = str(value) return s[:max_len] if len(s) > max_len else s def safe_float(value: Any, default: float = 0.0, min_val: float = 0.0, max_val: float = 10.0) -> float: """Guarantee a bounded float. Never raises, never returns None.""" try: f = float(str(value).rstrip('.').rstrip(',')) return max(min_val, min(max_val, f)) except (ValueError, TypeError): return default def safe_dict_get(d: Any, key: str, default: Any = "") -> Any: """Safe get from potentially-None dict.""" if not isinstance(d, dict): return default val = d.get(key, default) return val if val is not None else default # ═══════════════════════════════════════════════════════════════ # Timeout Wrapper # ═══════════════════════════════════════════════════════════════ def with_timeout(fn: Callable[..., T], timeout_s: float = 30.0, default: T = None, label: str = "") -> Callable[..., T]: """ Wrap a function with a timeout. Returns default if timeout exceeded. Uses ThreadPoolExecutor (works on all platforms, no signals needed). Usage: safe_generate = with_timeout(llm.generate, timeout_s=30.0, default="", label="llm.generate") result = safe_generate(messages, temperature=0.7) """ def wrapper(*args, **kwargs) -> T: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(fn, *args, **kwargs) try: return future.result(timeout=timeout_s) except FuturesTimeout: logger.error(f"TIMEOUT ({timeout_s}s): {label or fn.__name__}") return default except Exception as e: logger.error(f"ERROR in {label or fn.__name__}: {type(e).__name__}: {e}") return default return wrapper def llm_call_with_timeout( llm_fn: Callable, args: tuple = (), kwargs: dict | None = None, timeout_s: float = 60.0, default: str = "", label: str = "llm_call", ) -> str: """ Execute a single LLM call with timeout and error recovery. Returns default string on any failure (timeout, network error, parse error). NEVER raises to the caller. """ kwargs = kwargs or {} with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(llm_fn, *args, **kwargs) try: result = future.result(timeout=timeout_s) if result is None: return default return str(result) except FuturesTimeout: logger.error(f"LLM TIMEOUT ({timeout_s}s): {label}") return default except Exception as e: logger.error(f"LLM ERROR ({label}): {type(e).__name__}: {e}") return default # ═══════════════════════════════════════════════════════════════ # Graceful Degradation # ═══════════════════════════════════════════════════════════════ def graceful(fn: Callable[..., T], default: T, label: str = "") -> Callable[..., T]: """ Decorator: function never raises. Returns default on any exception. Usage: @graceful(default={}, label="parse_response") def parse_response(text): ... """ def wrapper(*args, **kwargs) -> T: try: result = fn(*args, **kwargs) return result if result is not None else default except Exception as e: logger.warning(f"Graceful degradation ({label or fn.__name__}): {type(e).__name__}: {e}") return default wrapper.__name__ = fn.__name__ wrapper.__doc__ = fn.__doc__ return wrapper # ═══════════════════════════════════════════════════════════════ # Input Validation # ═══════════════════════════════════════════════════════════════ class ValidationError(ValueError): """Raised when framework input validation fails. Always has actionable message.""" pass def validate_purpose(purpose: str) -> str: """Validate and normalize a purpose string.""" if not purpose or not isinstance(purpose, str): raise ValidationError( "purpose must be a non-empty string. " "Example: pa.purpose('Help me write Python code')" ) purpose = purpose.strip() if len(purpose) < 3: raise ValidationError( f"purpose too short ({len(purpose)} chars). " "Provide a meaningful description of what you want the agent to do." ) if len(purpose) > 5000: purpose = purpose[:5000] logger.warning("Purpose truncated to 5000 chars") return purpose def validate_model_spec(spec: str) -> str: """Validate a model spec string.""" if not spec or not isinstance(spec, str): raise ValidationError( "model must be a string like 'ollama:qwen3:1.7b' or 'openrouter:meta-llama/llama-3.3-70b-instruct'. " "See docs for supported providers." ) return spec.strip()