""" routing.py — SLM-native LLM call router with cost homeostasis. Routes tasks to the smallest capable model. Local-first by default. Enforces cost, latency, and token budgets as hard constraints. Complexity classification: simple → single SLM call (summarize, answer simple Q) moderate → sequential chain (plan → execute) complex → parallel specialists (research + code + review) critical → specialists + critic ensemble + optional HITL Router decisions are logged and reproducible. """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from enum import Enum from typing import Any from purpose_agent.llm_backend import LLMBackend logger = logging.getLogger(__name__) class TaskComplexity(str, Enum): SIMPLE = "simple" MODERATE = "moderate" COMPLEX = "complex" CRITICAL = "critical" @dataclass class RoutingPolicy: """Policy governing model selection and cost control.""" prefer_local: bool = True max_cost_per_task_usd: float = 0.10 max_latency_per_call_s: float = 30.0 max_tokens_per_task: int = 10000 allow_cloud_fallback: bool = True fallback_model: str = "" local_model: str = "ollama:qwen3:1.7b" cloud_model: str = "openrouter:meta-llama/llama-3.3-70b-instruct" @dataclass class ModelOption: """A model available for routing.""" spec: str # e.g. "ollama:qwen3:1.7b" is_local: bool = True cost_per_1k_tokens: float = 0.0 # $0 for local avg_latency_s: float = 1.0 max_context: int = 32768 capabilities: list[str] = field(default_factory=list) # ["code","reasoning","general"] @dataclass class RoutingDecision: """Recorded decision from the router.""" task_summary: str complexity: TaskComplexity selected_model: str reason: str timestamp: float = field(default_factory=time.time) estimated_cost: float = 0.0 # Keyword-based complexity heuristics _COMPLEX_KEYWORDS = {"research", "analyze", "compare", "design", "architect", "security", "audit"} _CRITICAL_KEYWORDS = {"deploy", "production", "delete", "admin", "payment", "credential", "secret"} _SIMPLE_KEYWORDS = {"summarize", "translate", "hello", "what is", "define", "explain"} class TaskComplexityClassifier: """Classifies task complexity from the purpose description.""" def classify(self, purpose: str) -> TaskComplexity: words = set(purpose.lower().split()) if words & _CRITICAL_KEYWORDS: return TaskComplexity.CRITICAL if words & _COMPLEX_KEYWORDS: return TaskComplexity.COMPLEX if words & _SIMPLE_KEYWORDS: return TaskComplexity.SIMPLE # Default: moderate for anything with multiple sentences or code-related if len(purpose) > 100 or "code" in purpose.lower() or "function" in purpose.lower(): return TaskComplexity.MODERATE return TaskComplexity.SIMPLE class ModelSelector: """ Selects the best model for a task given complexity and policy. Rules: 1. Local-first (if policy.prefer_local and local model available) 2. Smallest capable model (don't use 70B for "say hello") 3. Respect cost/latency budgets 4. Fallback to cloud only when policy allows and local fails """ def __init__(self, models: list[ModelOption] | None = None, policy: RoutingPolicy | None = None): self.models = models or [] self.policy = policy or RoutingPolicy() def select(self, complexity: TaskComplexity) -> str: """Select the best model spec for given complexity.""" # Filter by policy candidates = list(self.models) if self.policy.prefer_local: local = [m for m in candidates if m.is_local] if local: candidates = local # For simple tasks, prefer smallest/cheapest if complexity == TaskComplexity.SIMPLE: candidates.sort(key=lambda m: m.cost_per_1k_tokens) if candidates: return candidates[0].spec # For complex/critical, prefer most capable if complexity in (TaskComplexity.COMPLEX, TaskComplexity.CRITICAL): # Prefer cloud models with more capability if self.policy.allow_cloud_fallback: return self.policy.cloud_model capable = [m for m in candidates if "reasoning" in m.capabilities or "code" in m.capabilities] if capable: return capable[0].spec # Default: local model return self.policy.local_model class LLMCallRouter: """ Main router: classifies task → selects model → logs decision. Usage: router = LLMCallRouter(policy=RoutingPolicy(prefer_local=True)) model_spec = router.route("Write a fibonacci function") # → "ollama:qwen3:1.7b" (local, code task, moderate complexity) model_spec = router.route("Audit production deployment for security vulnerabilities") # → cloud model (critical task, needs strong reasoning) """ def __init__(self, policy: RoutingPolicy | None = None, models: list[ModelOption] | None = None): self.policy = policy or RoutingPolicy() self.classifier = TaskComplexityClassifier() self.selector = ModelSelector(models or [], self.policy) self._decisions: list[RoutingDecision] = [] self._total_cost = 0.0 def route(self, task: str) -> str: """Route a task to the best model. Returns model spec string.""" complexity = self.classifier.classify(task) selected = self.selector.select(complexity) # Budget check if self._total_cost >= self.policy.max_cost_per_task_usd: # Over budget: force local selected = self.policy.local_model reason = "budget_exceeded: forced local" else: reason = f"complexity={complexity.value}" decision = RoutingDecision( task_summary=task[:80], complexity=complexity, selected_model=selected, reason=reason, ) self._decisions.append(decision) logger.info(f"Router: {complexity.value} → {selected} ({reason})") return selected def record_cost(self, cost_usd: float) -> None: """Record cost of a completed call for budget tracking.""" self._total_cost += cost_usd @property def total_cost(self) -> float: return self._total_cost @property def decisions(self) -> list[RoutingDecision]: return self._decisions def reset_budget(self) -> None: self._total_cost = 0.0