| """ |
| Card 3: Shared State Loop (Deterministic Event Scheduler) |
| |
| Defines deterministic event ordering for multi-character interactions in one port. |
| - Synchronized time with per-character state containers |
| - Deterministic conflict resolution (same seed → same outcome) |
| - Support for ≥3 active characters |
| """ |
|
|
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Dict, List, Optional, Any |
| import hashlib |
| import json |
| import logging |
|
|
|
|
| LOGGER = logging.getLogger(__name__) |
|
|
|
|
| class CharacterState(Enum): |
| """Character lifecycle state.""" |
| IDLE = "idle" |
| BUSY = "busy" |
| TRANSITIONING = "transitioning" |
| INTERACTING = "interacting" |
|
|
|
|
| class ConflictResolutionPolicy(str, Enum): |
| """How to resolve conflicting interactions.""" |
| PRIORITY_BASED = "priority_based" |
| FIFO = "fifo" |
| COOLDOWN = "cooldown" |
| NEGOTIATION = "negotiation" |
|
|
|
|
| @dataclass |
| class CharacterSegmentState: |
| """Current state of a character's motion segment execution.""" |
| |
| character_id: str |
| segment_index: int |
| frames_elapsed: int |
| total_frames: int |
| is_complete: bool = False |
| |
| def progress(self) -> float: |
| """Return 0-1 progress through current segment.""" |
| if self.total_frames == 0: |
| return 1.0 |
| return min(1.0, self.frames_elapsed / self.total_frames) |
|
|
|
|
| @dataclass |
| class CharacterSlot: |
| """Per-character state container in shared loop.""" |
| |
| character_id: str |
| skeleton_type: str |
| current_state: CharacterState = CharacterState.IDLE |
| segment_state: Optional[CharacterSegmentState] = None |
| |
| |
| interaction_target: Optional[str] = None |
| last_interaction_time_ms: int = 0 |
| interaction_cooldown_ms: int = 500 |
| |
| |
| priority: int = 0 |
| cycle_count: int = 0 |
| |
| def is_busy(self) -> bool: |
| """Check if character is currently executing motion.""" |
| return self.current_state in [ |
| CharacterState.BUSY, |
| CharacterState.TRANSITIONING, |
| CharacterState.INTERACTING |
| ] |
| |
| def can_interact(self, current_time_ms: int) -> bool: |
| """Check if character can start new interaction.""" |
| time_since_last = current_time_ms - self.last_interaction_time_ms |
| return time_since_last >= self.interaction_cooldown_ms |
|
|
|
|
| @dataclass |
| class LoopTick: |
| """Single tick in the deterministic event loop.""" |
| |
| tick_number: int |
| frame_number: int |
| time_ms: float |
| fps: int = 30 |
| |
| |
| character_updates: Dict[str, CharacterSlot] = field(default_factory=dict) |
| completed_segments: List[str] = field(default_factory=list) |
| interactions: List[tuple] = field(default_factory=list) |
| |
| def get_timestamp(self) -> dict: |
| """Return tick metadata for auditing.""" |
| return { |
| "tick_number": self.tick_number, |
| "frame_number": self.frame_number, |
| "time_ms": self.time_ms, |
| "fps": self.fps, |
| } |
|
|
|
|
| class DeterministicLoop: |
| """ |
| Deterministic multi-character event loop. |
| |
| Ensures: |
| - Same seed → same outputs (for testing replay) |
| - No race conditions (total determinism within single process) |
| - Clear conflict resolution (priority/FIFO/cooldown) |
| - Synchronized timeline for all characters |
| """ |
| |
| def __init__( |
| self, |
| fps: int = 30, |
| seed: int = 42, |
| conflict_policy: ConflictResolutionPolicy = ConflictResolutionPolicy.COOLDOWN, |
| ): |
| LOGGER.info( |
| "scheduler.loop.init.start fps=%s seed=%s conflict_policy=%s", |
| fps, |
| seed, |
| conflict_policy.value, |
| ) |
| self.fps = fps |
| self.seed = seed |
| self.conflict_policy = conflict_policy |
| |
| |
| self._rng_state = seed |
| |
| |
| self.tick_number = 0 |
| self.frame_number = 0 |
| self.time_ms = 0.0 |
| self.ms_per_frame = 1000.0 / fps |
| |
| |
| self.characters: Dict[str, CharacterSlot] = {} |
| |
| |
| self.tick_history: List[LoopTick] = [] |
| LOGGER.info("scheduler.loop.init.exit") |
| |
| def register_character( |
| self, |
| character_id: str, |
| skeleton_type: str, |
| priority: int = 0, |
| ) -> None: |
| """Register a character for this loop.""" |
| LOGGER.info( |
| "scheduler.register_character.start character_id=%s skeleton=%s priority=%s", |
| character_id, |
| skeleton_type, |
| priority, |
| ) |
| if character_id in self.characters: |
| raise ValueError(f"Character {character_id} already registered") |
| |
| self.characters[character_id] = CharacterSlot( |
| character_id=character_id, |
| skeleton_type=skeleton_type, |
| priority=priority, |
| ) |
| LOGGER.info("scheduler.register_character.exit character_id=%s", character_id) |
| |
| def _deterministic_rng(self) -> float: |
| """Generate deterministic pseudo-random number (0-1).""" |
| |
| self._rng_state = (self._rng_state * 1103515245 + 12345) & 0x7fffffff |
| return (self._rng_state / 0x7fffffff) |
| |
| def _resolve_conflict( |
| self, |
| char1_id: str, |
| char2_id: str, |
| ) -> str: |
| """ |
| Deterministically resolve conflict between two characters. |
| |
| Returns: character_id that wins the interaction. |
| """ |
| char1 = self.characters[char1_id] |
| char2 = self.characters[char2_id] |
| |
| if self.conflict_policy == ConflictResolutionPolicy.PRIORITY_BASED: |
| |
| if char1.priority > char2.priority: |
| return char1_id |
| elif char2.priority > char1.priority: |
| return char2_id |
| |
| return min(char1_id, char2_id) |
| |
| elif self.conflict_policy == ConflictResolutionPolicy.FIFO: |
| |
| if char1.last_interaction_time_ms < char2.last_interaction_time_ms: |
| return char1_id |
| else: |
| return char2_id |
| |
| elif self.conflict_policy == ConflictResolutionPolicy.COOLDOWN: |
| |
| char1_ready = char1.can_interact(int(self.time_ms)) |
| char2_ready = char2.can_interact(int(self.time_ms)) |
| |
| if char1_ready and not char2_ready: |
| return char1_id |
| elif char2_ready and not char1_ready: |
| return char2_id |
| |
| if char1.priority > char2.priority: |
| return char1_id |
| else: |
| return char2_id |
| |
| else: |
| return min(char1_id, char2_id) |
| |
| def advance_tick( |
| self, |
| character_motions: Dict[str, Dict[str, Any]], |
| ) -> LoopTick: |
| """ |
| Advance one tick forward with deterministic character updates. |
| |
| Args: |
| character_motions: Dict[character_id] → motion data for this frame |
| |
| Returns: |
| LoopTick with event history for this frame |
| """ |
| LOGGER.info( |
| "scheduler.advance_tick.start tick=%s frame=%s chars=%s motions=%s", |
| self.tick_number, |
| self.frame_number, |
| len(self.characters), |
| len(character_motions), |
| ) |
| tick = LoopTick( |
| tick_number=self.tick_number, |
| frame_number=self.frame_number, |
| time_ms=self.time_ms, |
| fps=self.fps, |
| ) |
| |
| |
| for char_id, char_slot in self.characters.items(): |
| if char_slot.segment_state is None: |
| continue |
| |
| |
| char_slot.segment_state.frames_elapsed += 1 |
| |
| |
| if char_slot.segment_state.frames_elapsed >= char_slot.segment_state.total_frames: |
| char_slot.segment_state.is_complete = True |
| tick.completed_segments.append(char_id) |
| char_slot.current_state = CharacterState.IDLE |
| else: |
| char_slot.current_state = CharacterState.BUSY |
| |
| tick.character_updates[char_id] = char_slot |
| |
| |
| pending_interactions = [] |
| for char_id, char_slot in self.characters.items(): |
| if char_slot.interaction_target: |
| pending_interactions.append((char_id, char_slot.interaction_target)) |
| |
| |
| for char1_id, char2_id in pending_interactions: |
| winner_id = self._resolve_conflict(char1_id, char2_id) |
| tick.interactions.append((winner_id, char2_id if winner_id == char1_id else char1_id)) |
| |
| |
| self.characters[winner_id].last_interaction_time_ms = int(self.time_ms) |
| |
| |
| self.tick_number += 1 |
| self.frame_number += 1 |
| self.time_ms += self.ms_per_frame |
| |
| |
| self.tick_history.append(tick) |
|
|
| LOGGER.info( |
| "scheduler.advance_tick.exit tick=%s completed=%s interactions=%s", |
| tick.tick_number, |
| len(tick.completed_segments), |
| len(tick.interactions), |
| ) |
| |
| return tick |
| |
| def get_state_hash(self) -> str: |
| """ |
| Compute deterministic hash of current loop state. |
| |
| Used for seeded replay verification: |
| Same seed → same state hash at corresponding tick. |
| """ |
| state_dict = { |
| "tick_number": self.tick_number, |
| "frame_number": self.frame_number, |
| "time_ms": self.time_ms, |
| "rng_state": self._rng_state, |
| "characters": { |
| char_id: { |
| "state": char_slot.current_state.value, |
| "frames_elapsed": char_slot.segment_state.frames_elapsed if char_slot.segment_state else 0, |
| } |
| for char_id, char_slot in self.characters.items() |
| } |
| } |
| |
| state_json = json.dumps(state_dict, sort_keys=True) |
| return hashlib.sha256(state_json.encode()).hexdigest()[:16] |
| |
| def reset(self) -> None: |
| """Reset loop to initial state (for replay).""" |
| LOGGER.info( |
| "scheduler.reset.start tick=%s frame=%s registered_chars=%s", |
| self.tick_number, |
| self.frame_number, |
| len(self.characters), |
| ) |
| self.tick_number = 0 |
| self.frame_number = 0 |
| self.time_ms = 0.0 |
| self._rng_state = self.seed |
| self.tick_history = [] |
| |
| for char_slot in self.characters.values(): |
| char_slot.current_state = CharacterState.IDLE |
| char_slot.segment_state = None |
| LOGGER.info("scheduler.reset.exit") |
|
|
|
|
| |
| |
| |
|
|
| def two_character_interaction_scenario() -> tuple[DeterministicLoop, List[dict]]: |
| """ |
| Test scenario: Two characters dancing with synchronized transitions. |
| |
| Returns: |
| (loop, motion_frames_per_char) |
| """ |
| loop = DeterministicLoop(fps=30, seed=42) |
| |
| |
| loop.register_character("dancer1", "soma", priority=1) |
| loop.register_character("dancer2", "soma", priority=1) |
| |
| |
| motion_sequence = [ |
| { |
| "dancer1": {"action": "walk_forward", "frame": i} for i in range(30) |
| }, |
| { |
| "dancer2": {"action": "follow", "frame": i} for i in range(30) |
| }, |
| ] |
| |
| return loop, motion_sequence |
|
|
|
|
| def three_character_scenario() -> tuple[DeterministicLoop, List[dict]]: |
| """ |
| Test scenario: Three characters with controlled interactions. |
| |
| Returns: |
| (loop, motion_frames) |
| """ |
| loop = DeterministicLoop(fps=30, seed=43, conflict_policy=ConflictResolutionPolicy.PRIORITY_BASED) |
| |
| |
| loop.register_character("leader", "soma", priority=3) |
| loop.register_character("follower1", "soma", priority=2) |
| loop.register_character("follower2", "soma", priority=1) |
| |
| motion_sequence = [ |
| { |
| "leader": {"action": "lead", "frame": i}, |
| "follower1": {"action": "follow", "frame": i}, |
| "follower2": {"action": "match", "frame": i}, |
| } |
| for i in range(60) |
| ] |
| |
| return loop, motion_sequence |
|
|
|
|
| def test_deterministic_replay(): |
| """ |
| Verify deterministic replay: same seed produces identical state hashes. |
| """ |
| print("=== Card 3: Deterministic Loop Test ===\n") |
| |
| |
| print("Test 1: Two-character deterministic replay") |
| |
| loop1, motions1 = two_character_interaction_scenario() |
| loop2, motions2 = two_character_interaction_scenario() |
| |
| hashes1 = [] |
| hashes2 = [] |
| |
| for tick_num in range(60): |
| loop1.advance_tick({}) |
| loop2.advance_tick({}) |
| |
| hash1 = loop1.get_state_hash() |
| hash2 = loop2.get_state_hash() |
| |
| hashes1.append(hash1) |
| hashes2.append(hash2) |
| |
| if hashes1 == hashes2: |
| print("✓ Deterministic replay (2-char): PASS") |
| else: |
| print(f"✗ Deterministic replay (2-char): FAIL") |
| print(f" Mismatch at frame: {[i for i, (h1, h2) in enumerate(zip(hashes1, hashes2)) if h1 != h2]}") |
| |
| print() |
| |
| |
| print("Test 2: Three-character priority-based conflict resolution") |
| |
| loop3, motions3 = three_character_scenario() |
| loop4, motions4 = three_character_scenario() |
| |
| hashes3 = [] |
| hashes4 = [] |
| |
| for tick_num in range(60): |
| loop3.advance_tick({}) |
| loop4.advance_tick({}) |
| |
| hash3 = loop3.get_state_hash() |
| hash4 = loop4.get_state_hash() |
| |
| hashes3.append(hash3) |
| hashes4.append(hash4) |
| |
| if hashes3 == hashes4: |
| print("✓ Deterministic replay (3-char): PASS") |
| else: |
| print(f"✗ Deterministic replay (3-char): FAIL") |
| |
| print() |
| |
| |
| print("Test 3: Different seed produces different outcome") |
| |
| loop_seed42, _ = two_character_interaction_scenario() |
| loop_seed99 = DeterministicLoop(fps=30, seed=99) |
| loop_seed99.register_character("dancer1", "soma", priority=1) |
| loop_seed99.register_character("dancer2", "soma", priority=1) |
| |
| hashes42 = [] |
| hashes99 = [] |
| |
| for tick_num in range(30): |
| loop_seed42.advance_tick({}) |
| loop_seed99.advance_tick({}) |
| |
| hashes42.append(loop_seed42.get_state_hash()) |
| hashes99.append(loop_seed99.get_state_hash()) |
| |
| if hashes42 != hashes99: |
| print("✓ Different seeds produce different outcomes: PASS") |
| else: |
| print("✗ Different seeds should differ: FAIL") |
| |
| print() |
| print("=== All Deterministic Tests Complete ===") |
|
|
|
|
| if __name__ == "__main__": |
| test_deterministic_replay() |
|
|