""" events.py — Canonical event schema for Purpose Agent v3.0. Every action in the framework emits a PAEvent. Events are: - Typed (EventKind enum) - Ordered (per-lane sequence numbers) - Scoped (visibility: public/internal/debug) - Immutable (frozen dataclass) - Serializable (to_dict → JSON/JSONL) Safety: reasoning.summary is allowed. Raw hidden chain-of-thought is NOT. """ from __future__ import annotations import time import uuid from dataclasses import dataclass, field from enum import Enum from typing import Any class Visibility(str, Enum): """Who can see this event.""" PUBLIC = "public" # Safe for UI, logs, external consumers INTERNAL = "internal" # Framework internals (memory ops, optimization) DEBUG = "debug" # Verbose debug info, never in production streams class EventKind(str, Enum): """All possible event types in the Purpose Agent runtime.""" # Run lifecycle RUN_STARTED = "run.started" RUN_FINISHED = "run.finished" RUN_ERROR = "run.error" # Agent lifecycle AGENT_STARTED = "agent.started" AGENT_PROGRESS = "agent.progress" AGENT_FINISHED = "agent.finished" AGENT_ERROR = "agent.error" # Reasoning (safe summaries only — NO raw chain-of-thought) REASONING_SUMMARY = "reasoning.summary" # Model calls MODEL_REQUEST = "model.request" MODEL_RESPONSE = "model.response" # Text streaming TEXT_DELTA = "text.delta" TEXT_DONE = "text.done" # Tool execution TOOL_STARTED = "tool.started" TOOL_ARGS = "tool.args" TOOL_RESULT = "tool.result" TOOL_ERROR = "tool.error" # State management STATE_SNAPSHOT = "state.snapshot" STATE_DELTA = "state.delta" # Checkpointing CHECKPOINT_SAVED = "checkpoint.saved" # Human-in-the-loop HUMAN_APPROVAL_REQUESTED = "human.approval.requested" HUMAN_APPROVAL_RECEIVED = "human.approval.received" # Memory lifecycle MEMORY_CANDIDATE = "memory.candidate" MEMORY_PROMOTED = "memory.promoted" MEMORY_ARCHIVED = "memory.archived" # Skill lifecycle SKILL_SELECTED = "skill.selected" SKILL_UPDATED = "skill.updated" SKILL_ROLLBACK = "skill.rollback" # Events that MUST NOT contain raw chain-of-thought _PRIVATE_KINDS = frozenset() # No kind is inherently private; visibility controls this # Events that should always be delivered even under backpressure TERMINAL_KINDS = frozenset({ EventKind.RUN_FINISHED, EventKind.RUN_ERROR, EventKind.AGENT_FINISHED, EventKind.AGENT_ERROR, }) @dataclass(frozen=True) class PAEvent: """ Canonical event emitted by the Purpose Agent runtime. Every observable action produces one or more PAEvents. Events are immutable, ordered, and typed. Fields: run_id: Unique ID for this execution run session_id: Groups related runs (e.g., a conversation) lane_id: Identifies parallel execution lane (for swarm/parallel) span_id: Unique ID for this specific event parent_span_id: Parent event (for nesting: run → agent → tool) seq: Monotonically increasing sequence number within a lane ts: Unix timestamp kind: What happened (EventKind enum) visibility: Who should see this (public/internal/debug) payload: Event-specific data """ run_id: str session_id: str = "" lane_id: str = "main" span_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) parent_span_id: str | None = None seq: int = 0 ts: float = field(default_factory=time.time) kind: EventKind = EventKind.AGENT_PROGRESS visibility: Visibility = Visibility.PUBLIC payload: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: """Serialize to dictionary (JSON-safe).""" return { "run_id": self.run_id, "session_id": self.session_id, "lane_id": self.lane_id, "span_id": self.span_id, "parent_span_id": self.parent_span_id, "seq": self.seq, "ts": self.ts, "kind": self.kind.value, "visibility": self.visibility.value, "payload": self.payload, } @classmethod def from_dict(cls, d: dict[str, Any]) -> "PAEvent": """Deserialize from dictionary.""" return cls( run_id=d["run_id"], session_id=d.get("session_id", ""), lane_id=d.get("lane_id", "main"), span_id=d.get("span_id", uuid.uuid4().hex[:12]), parent_span_id=d.get("parent_span_id"), seq=d.get("seq", 0), ts=d.get("ts", time.time()), kind=EventKind(d["kind"]), visibility=Visibility(d.get("visibility", "public")), payload=d.get("payload", {}), ) @property def is_terminal(self) -> bool: """Is this a terminal event that must always be delivered?""" return self.kind in TERMINAL_KINDS @property def is_error(self) -> bool: return self.kind in (EventKind.RUN_ERROR, EventKind.AGENT_ERROR, EventKind.TOOL_ERROR) def has_hidden_cot(self) -> bool: """Check if payload contains raw chain-of-thought (safety violation).""" forbidden_keys = {"hidden_chain_of_thought", "raw_cot", "internal_reasoning", "think_content"} return bool(forbidden_keys & set(self.payload.keys())) def create_event( run_id: str, kind: EventKind, lane_id: str = "main", seq: int = 0, parent_span_id: str | None = None, visibility: Visibility = Visibility.PUBLIC, **payload, ) -> PAEvent: """Factory helper for creating events with less boilerplate.""" return PAEvent( run_id=run_id, lane_id=lane_id, span_id=uuid.uuid4().hex[:12], parent_span_id=parent_span_id, seq=seq, kind=kind, visibility=visibility, payload=payload, )