Rohan03's picture
Sprint 1: canonical event schema (PAEvent, EventKind, Visibility)
213ef24 verified
"""
events.py — Canonical event schema for Purpose Agent v3.0.
Every action in the framework emits a PAEvent. Events are:
- Typed (EventKind enum)
- Ordered (per-lane sequence numbers)
- Scoped (visibility: public/internal/debug)
- Immutable (frozen dataclass)
- Serializable (to_dict → JSON/JSONL)
Safety: reasoning.summary is allowed. Raw hidden chain-of-thought is NOT.
"""
from __future__ import annotations
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class Visibility(str, Enum):
"""Who can see this event."""
PUBLIC = "public" # Safe for UI, logs, external consumers
INTERNAL = "internal" # Framework internals (memory ops, optimization)
DEBUG = "debug" # Verbose debug info, never in production streams
class EventKind(str, Enum):
"""All possible event types in the Purpose Agent runtime."""
# Run lifecycle
RUN_STARTED = "run.started"
RUN_FINISHED = "run.finished"
RUN_ERROR = "run.error"
# Agent lifecycle
AGENT_STARTED = "agent.started"
AGENT_PROGRESS = "agent.progress"
AGENT_FINISHED = "agent.finished"
AGENT_ERROR = "agent.error"
# Reasoning (safe summaries only — NO raw chain-of-thought)
REASONING_SUMMARY = "reasoning.summary"
# Model calls
MODEL_REQUEST = "model.request"
MODEL_RESPONSE = "model.response"
# Text streaming
TEXT_DELTA = "text.delta"
TEXT_DONE = "text.done"
# Tool execution
TOOL_STARTED = "tool.started"
TOOL_ARGS = "tool.args"
TOOL_RESULT = "tool.result"
TOOL_ERROR = "tool.error"
# State management
STATE_SNAPSHOT = "state.snapshot"
STATE_DELTA = "state.delta"
# Checkpointing
CHECKPOINT_SAVED = "checkpoint.saved"
# Human-in-the-loop
HUMAN_APPROVAL_REQUESTED = "human.approval.requested"
HUMAN_APPROVAL_RECEIVED = "human.approval.received"
# Memory lifecycle
MEMORY_CANDIDATE = "memory.candidate"
MEMORY_PROMOTED = "memory.promoted"
MEMORY_ARCHIVED = "memory.archived"
# Skill lifecycle
SKILL_SELECTED = "skill.selected"
SKILL_UPDATED = "skill.updated"
SKILL_ROLLBACK = "skill.rollback"
# Events that MUST NOT contain raw chain-of-thought
_PRIVATE_KINDS = frozenset() # No kind is inherently private; visibility controls this
# Events that should always be delivered even under backpressure
TERMINAL_KINDS = frozenset({
EventKind.RUN_FINISHED,
EventKind.RUN_ERROR,
EventKind.AGENT_FINISHED,
EventKind.AGENT_ERROR,
})
@dataclass(frozen=True)
class PAEvent:
"""
Canonical event emitted by the Purpose Agent runtime.
Every observable action produces one or more PAEvents.
Events are immutable, ordered, and typed.
Fields:
run_id: Unique ID for this execution run
session_id: Groups related runs (e.g., a conversation)
lane_id: Identifies parallel execution lane (for swarm/parallel)
span_id: Unique ID for this specific event
parent_span_id: Parent event (for nesting: run → agent → tool)
seq: Monotonically increasing sequence number within a lane
ts: Unix timestamp
kind: What happened (EventKind enum)
visibility: Who should see this (public/internal/debug)
payload: Event-specific data
"""
run_id: str
session_id: str = ""
lane_id: str = "main"
span_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
parent_span_id: str | None = None
seq: int = 0
ts: float = field(default_factory=time.time)
kind: EventKind = EventKind.AGENT_PROGRESS
visibility: Visibility = Visibility.PUBLIC
payload: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Serialize to dictionary (JSON-safe)."""
return {
"run_id": self.run_id,
"session_id": self.session_id,
"lane_id": self.lane_id,
"span_id": self.span_id,
"parent_span_id": self.parent_span_id,
"seq": self.seq,
"ts": self.ts,
"kind": self.kind.value,
"visibility": self.visibility.value,
"payload": self.payload,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "PAEvent":
"""Deserialize from dictionary."""
return cls(
run_id=d["run_id"],
session_id=d.get("session_id", ""),
lane_id=d.get("lane_id", "main"),
span_id=d.get("span_id", uuid.uuid4().hex[:12]),
parent_span_id=d.get("parent_span_id"),
seq=d.get("seq", 0),
ts=d.get("ts", time.time()),
kind=EventKind(d["kind"]),
visibility=Visibility(d.get("visibility", "public")),
payload=d.get("payload", {}),
)
@property
def is_terminal(self) -> bool:
"""Is this a terminal event that must always be delivered?"""
return self.kind in TERMINAL_KINDS
@property
def is_error(self) -> bool:
return self.kind in (EventKind.RUN_ERROR, EventKind.AGENT_ERROR, EventKind.TOOL_ERROR)
def has_hidden_cot(self) -> bool:
"""Check if payload contains raw chain-of-thought (safety violation)."""
forbidden_keys = {"hidden_chain_of_thought", "raw_cot", "internal_reasoning", "think_content"}
return bool(forbidden_keys & set(self.payload.keys()))
def create_event(
run_id: str,
kind: EventKind,
lane_id: str = "main",
seq: int = 0,
parent_span_id: str | None = None,
visibility: Visibility = Visibility.PUBLIC,
**payload,
) -> PAEvent:
"""Factory helper for creating events with less boilerplate."""
return PAEvent(
run_id=run_id,
lane_id=lane_id,
span_id=uuid.uuid4().hex[:12],
parent_span_id=parent_span_id,
seq=seq,
kind=kind,
visibility=visibility,
payload=payload,
)