| """ |
| shell_executor.py |
| YAML symbolic shells interpreter for transformerOS |
| |
| This module serves as the core execution engine for symbolic shells, |
| managing the parsing, validation, and execution of shell packs across |
| different model architectures. |
| """ |
|
|
| import os |
| import sys |
| import yaml |
| import json |
| import logging |
| import importlib |
| from typing import Dict, List, Optional, Tuple, Union, Any |
| from pathlib import Path |
| from dataclasses import dataclass, field |
|
|
| |
| log = logging.getLogger("transformerOS.shell_executor") |
| log.setLevel(logging.INFO) |
|
|
| |
| from transformerOS.core.symbolic_engine import SymbolicEngine |
| from transformerOS.models.model_interface import ModelInterface, get_model_interface |
| from transformerOS.utils.visualization import ShellVisualizer |
| from transformerOS.core.attribution import AttributionTracer |
| from transformerOS.core.collapse_detector import CollapseDetector |
|
|
| |
| from transformerOS.modules.reflect_module import ReflectOperation |
| from transformerOS.modules.collapse_module import CollapseOperation |
| from transformerOS.modules.ghostcircuits_module import GhostCircuitOperation |
|
|
| @dataclass |
| class ShellExecutionResult: |
| """Structured result from shell execution""" |
| shell_id: str |
| success: bool |
| execution_time: float |
| result: Dict |
| residue: Optional[Dict] = None |
| collapse_detected: bool = False |
| collapse_type: Optional[str] = None |
| attribution_map: Optional[Dict] = None |
| visualization: Optional[Dict] = None |
| metadata: Dict = field(default_factory=dict) |
|
|
|
|
| class ShellExecutor: |
| """ |
| Core executor for symbolic shell packs |
| |
| This class handles the loading, validation, and execution of shell packs, |
| providing a standardized interface for triggering and analyzing model |
| behavior through structured symbolic shells. |
| """ |
| |
| def __init__( |
| self, |
| config_path: Optional[str] = None, |
| model_id: str = "default", |
| custom_model: Optional[ModelInterface] = None, |
| log_path: Optional[str] = None |
| ): |
| """ |
| Initialize the shell executor |
| |
| Parameters: |
| ----------- |
| config_path : Optional[str] |
| Path to executor configuration file |
| model_id : str |
| Identifier for the model to use |
| custom_model : Optional[ModelInterface] |
| Custom model interface (if provided, model_id is ignored) |
| log_path : Optional[str] |
| Path for execution logs |
| """ |
| |
| if log_path: |
| file_handler = logging.FileHandler(log_path) |
| file_handler.setLevel(logging.INFO) |
| log.addHandler(file_handler) |
| |
| |
| self.config = self._load_config(config_path) |
| |
| |
| if custom_model: |
| self.model = custom_model |
| else: |
| self.model = get_model_interface(model_id) |
| |
| |
| self.symbolic_engine = SymbolicEngine() |
| self.attribution_tracer = AttributionTracer(self.model) |
| self.collapse_detector = CollapseDetector() |
| self.visualizer = ShellVisualizer() |
| |
| |
| self.reflect_op = ReflectOperation(self.model, self.symbolic_engine) |
| self.collapse_op = CollapseOperation(self.model, self.symbolic_engine) |
| self.ghost_op = GhostCircuitOperation(self.model, self.symbolic_engine) |
| |
| |
| self.loaded_shells = {} |
| self.shell_cache = {} |
| |
| |
| if self.config.get("auto_load_shells", False): |
| default_packs = self.config.get("default_shell_packs", []) |
| for pack_path in default_packs: |
| self.load_shell_pack(pack_path) |
| |
| log.info(f"ShellExecutor initialized with model: {self.model.model_id}") |
|
|
| def _load_config(self, config_path: Optional[str]) -> Dict: |
| """Load configuration from file or use defaults""" |
| default_config = { |
| "auto_load_shells": True, |
| "default_shell_packs": [ |
| "symbolic-shells/core-shells.yml", |
| "symbolic-shells/constitutional-shells.yml", |
| "symbolic-shells/meta-shells.yml" |
| ], |
| "default_visualization": True, |
| "attribution_tracing": True, |
| "residue_logging": True, |
| "shell_timeout": 60, |
| "max_token_count": 2048 |
| } |
| |
| if not config_path: |
| return default_config |
| |
| try: |
| with open(config_path, 'r') as f: |
| user_config = yaml.safe_load(f) |
| |
| |
| config = {**default_config, **user_config} |
| log.info(f"Loaded configuration from {config_path}") |
| return config |
| except Exception as e: |
| log.warning(f"Failed to load config from {config_path}: {e}") |
| log.info("Using default configuration") |
| return default_config |
|
|
| def load_shell_pack(self, pack_path: str) -> Dict: |
| """ |
| Load a shell pack from YAML file |
| |
| Parameters: |
| ----------- |
| pack_path : str |
| Path to the shell pack YAML file |
| |
| Returns: |
| -------- |
| Dict with information about loaded shells |
| """ |
| try: |
| |
| if not os.path.isabs(pack_path): |
| |
| standard_locations = [ |
| "", |
| "symbolic-shells/", |
| "../symbolic-shells/", |
| os.path.join(os.path.dirname(__file__), "../symbolic-shells/") |
| ] |
| |
| for location in standard_locations: |
| full_path = os.path.join(location, pack_path) |
| if os.path.exists(full_path): |
| pack_path = full_path |
| break |
| |
| |
| with open(pack_path, 'r') as f: |
| shell_pack = yaml.safe_load(f) |
| |
| |
| if not isinstance(shell_pack, dict) or "shells" not in shell_pack: |
| raise ValueError(f"Invalid shell pack format in {pack_path}") |
| |
| |
| pack_metadata = { |
| "name": shell_pack.get("name", os.path.basename(pack_path)), |
| "description": shell_pack.get("description", ""), |
| "version": shell_pack.get("version", "1.0.0"), |
| "author": shell_pack.get("author", "Unknown"), |
| "shells": [] |
| } |
| |
| |
| for shell_id, shell_def in shell_pack["shells"].items(): |
| |
| if not self._validate_shell(shell_id, shell_def): |
| log.warning(f"Skipping invalid shell definition: {shell_id}") |
| continue |
| |
| |
| self.loaded_shells[shell_id] = shell_def |
| pack_metadata["shells"].append(shell_id) |
| |
| log.info(f"Loaded shell: {shell_id}") |
| |
| log.info(f"Successfully loaded shell pack from {pack_path} with {len(pack_metadata['shells'])} shells") |
| return pack_metadata |
| |
| except Exception as e: |
| log.error(f"Failed to load shell pack from {pack_path}: {e}") |
| raise |
|
|
| def _validate_shell(self, shell_id: str, shell_def: Dict) -> bool: |
| """Validate shell definition structure""" |
| required_fields = ["description", "type", "operations"] |
| |
| |
| for field in required_fields: |
| if field not in shell_def: |
| log.warning(f"Shell {shell_id} missing required field: {field}") |
| return False |
| |
| |
| if not isinstance(shell_def["operations"], list) or not shell_def["operations"]: |
| log.warning(f"Shell {shell_id} has invalid or empty operations list") |
| return False |
| |
| |
| for operation in shell_def["operations"]: |
| if "type" not in operation or "parameters" not in operation: |
| log.warning(f"Shell {shell_id} has operation missing type or parameters") |
| return False |
| |
| return True |
|
|
| def list_shells(self, shell_type: Optional[str] = None) -> List[Dict]: |
| """ |
| List available shells, optionally filtered by type |
| |
| Parameters: |
| ----------- |
| shell_type : Optional[str] |
| Filter shells by type if provided |
| |
| Returns: |
| -------- |
| List of shell information dictionaries |
| """ |
| shells = [] |
| |
| for shell_id, shell_def in self.loaded_shells.items(): |
| |
| if shell_type and shell_def.get("type") != shell_type: |
| continue |
| |
| shells.append({ |
| "id": shell_id, |
| "description": shell_def.get("description", ""), |
| "type": shell_def.get("type", "unknown"), |
| "operations_count": len(shell_def.get("operations", [])), |
| "tags": shell_def.get("tags", []) |
| }) |
| |
| return shells |
|
|
| def execute_shell( |
| self, |
| shell_id: str, |
| prompt: str, |
| parameters: Optional[Dict] = None, |
| visualize: bool = None, |
| trace_attribution: bool = None, |
| return_residue: bool = None |
| ) -> ShellExecutionResult: |
| """ |
| Execute a symbolic shell with the given prompt |
| |
| Parameters: |
| ----------- |
| shell_id : str |
| ID of the shell to execute |
| prompt : str |
| Input prompt for the shell |
| parameters : Optional[Dict] |
| Additional parameters to override shell defaults |
| visualize : bool |
| Whether to generate visualization (overrides config) |
| trace_attribution : bool |
| Whether to trace attribution (overrides config) |
| return_residue : bool |
| Whether to return symbolic residue (overrides config) |
| |
| Returns: |
| -------- |
| ShellExecutionResult object with execution results |
| """ |
| import time |
| |
| |
| if shell_id not in self.loaded_shells: |
| raise ValueError(f"Shell not found: {shell_id}") |
| |
| shell_def = self.loaded_shells[shell_id] |
| log.info(f"Executing shell: {shell_id}") |
| |
| |
| if parameters is None: |
| parameters = {} |
| |
| |
| if visualize is None: |
| visualize = self.config.get("default_visualization", True) |
| if trace_attribution is None: |
| trace_attribution = self.config.get("attribution_tracing", True) |
| if return_residue is None: |
| return_residue = self.config.get("residue_logging", True) |
| |
| |
| start_time = time.time() |
| result = {} |
| collapse_detected = False |
| collapse_type = None |
| attribution_map = None |
| residue = None |
| |
| try: |
| |
| for operation_idx, operation in enumerate(shell_def["operations"]): |
| |
| operation_type = operation["type"] |
| operation_params = {**operation.get("parameters", {}), **parameters} |
| |
| |
| operation_result = self._execute_operation( |
| operation_type, |
| prompt, |
| operation_params |
| ) |
| |
| |
| if operation_result.get("collapse_detected", False): |
| collapse_detected = True |
| collapse_type = operation_result.get("collapse_type") |
| log.warning(f"Collapse detected in operation {operation_idx}: {collapse_type}") |
| |
| |
| if return_residue: |
| residue = operation_result.get("residue", {}) |
| |
| |
| result[f"operation_{operation_idx}"] = operation_result |
| |
| |
| if operation.get("update_prompt", False) and "output" in operation_result: |
| prompt = operation_result["output"] |
| |
| |
| if trace_attribution: |
| attribution_map = self.attribution_tracer.trace(prompt) |
| |
| |
| visualization = None |
| if visualize: |
| visualization = self.visualizer.generate_shell_execution( |
| shell_id, |
| result, |
| collapse_detected=collapse_detected, |
| attribution_map=attribution_map |
| ) |
| |
| |
| execution_time = time.time() - start_time |
| execution_result = ShellExecutionResult( |
| shell_id=shell_id, |
| success=True, |
| execution_time=execution_time, |
| result=result, |
| residue=residue, |
| collapse_detected=collapse_detected, |
| collapse_type=collapse_type, |
| attribution_map=attribution_map, |
| visualization=visualization, |
| metadata={ |
| "shell_type": shell_def.get("type", "unknown"), |
| "timestamp": self.symbolic_engine.get_timestamp(), |
| "prompt_length": len(prompt), |
| "operations_count": len(shell_def["operations"]) |
| } |
| ) |
| |
| |
| self.shell_cache[shell_id] = execution_result |
| |
| log.info(f"Shell execution completed in {execution_time:.2f}s") |
| return execution_result |
| |
| except Exception as e: |
| log.error(f"Error executing shell {shell_id}: {e}") |
| |
| |
| execution_time = time.time() - start_time |
| return ShellExecutionResult( |
| shell_id=shell_id, |
| success=False, |
| execution_time=execution_time, |
| result={"error": str(e)}, |
| metadata={ |
| "shell_type": shell_def.get("type", "unknown"), |
| "timestamp": self.symbolic_engine.get_timestamp(), |
| "error_type": type(e).__name__ |
| } |
| ) |
|
|
| def _execute_operation( |
| self, |
| operation_type: str, |
| prompt: str, |
| parameters: Dict |
| ) -> Dict: |
| """Execute a single shell operation""" |
| |
| |
| if operation_type == "reflect.trace": |
| |
| target = parameters.get("target", "reasoning") |
| depth = parameters.get("depth", 3) |
| detailed = parameters.get("detailed", True) |
| visualize = parameters.get("visualize", False) |
| |
| return self.reflect_op.trace( |
| content=prompt, |
| target=target, |
| depth=depth, |
| detailed=detailed, |
| visualize=visualize |
| ) |
| |
| elif operation_type == "reflect.attribution": |
| |
| sources = parameters.get("sources", "all") |
| confidence = parameters.get("confidence", True) |
| visualize = parameters.get("visualize", False) |
| |
| return self.reflect_op.attribution( |
| content=prompt, |
| sources=sources, |
| confidence=confidence, |
| visualize=visualize |
| ) |
| |
| elif operation_type == "collapse.detect": |
| |
| threshold = parameters.get("threshold", 0.7) |
| alert = parameters.get("alert", True) |
| |
| return self.collapse_op.detect( |
| content=prompt, |
| threshold=threshold, |
| alert=alert |
| ) |
| |
| elif operation_type == "collapse.prevent": |
| |
| trigger = parameters.get("trigger", "recursive_depth") |
| threshold = parameters.get("threshold", 5) |
| |
| return self.collapse_op.prevent( |
| content=prompt, |
| trigger=trigger, |
| threshold=threshold |
| ) |
| |
| elif operation_type == "ghostcircuit.identify": |
| |
| sensitivity = parameters.get("sensitivity", 0.7) |
| threshold = parameters.get("threshold", 0.2) |
| trace_type = parameters.get("trace_type", "full") |
| visualize = parameters.get("visualize", False) |
| |
| return self.ghost_op.identify( |
| content=prompt, |
| sensitivity=sensitivity, |
| threshold=threshold, |
| trace_type=trace_type, |
| visualize=visualize |
| ) |
| |
| elif operation_type == "model.generate": |
| |
| max_tokens = parameters.get("max_tokens", self.config.get("max_token_count", 2048)) |
| temperature = parameters.get("temperature", 0.7) |
| |
| output = self.model.generate( |
| prompt, |
| max_tokens=max_tokens, |
| temperature=temperature |
| ) |
| |
| return { |
| "output": output, |
| "prompt": prompt, |
| "max_tokens": max_tokens, |
| "temperature": temperature |
| } |
| |
| else: |
| raise ValueError(f"Unknown operation type: {operation_type}") |
|
|
| def compare_shells( |
| self, |
| shell_ids: List[str], |
| prompt: str, |
| parameters: Optional[Dict] = None, |
| visualize: bool = True |
| ) -> Dict: |
| """ |
| Compare execution results from multiple shells |
| |
| Parameters: |
| ----------- |
| shell_ids : List[str] |
| IDs of shells to compare |
| prompt : str |
| Input prompt for all shells |
| parameters : Optional[Dict] |
| Additional parameters to override shell defaults |
| visualize : bool |
| Whether to generate comparison visualization |
| |
| Returns: |
| -------- |
| Dict with comparison results |
| """ |
| |
| results = {} |
| for shell_id in shell_ids: |
| try: |
| result = self.execute_shell( |
| shell_id, |
| prompt, |
| parameters=parameters, |
| visualize=False |
| ) |
| results[shell_id] = result |
| except Exception as e: |
| log.error(f"Error executing shell {shell_id} for comparison: {e}") |
| results[shell_id] = { |
| "error": str(e), |
| "success": False |
| } |
| |
| |
| comparison_viz = None |
| if visualize: |
| comparison_viz = self.visualizer.generate_shell_comparison( |
| results |
| ) |
| |
| |
| comparison = { |
| "results": results, |
| "prompt": prompt, |
| "visualization": comparison_viz, |
| "timestamp": self.symbolic_engine.get_timestamp(), |
| "shells_compared": shell_ids |
| } |
| |
| return comparison |
|
|
| def log_execution_result(self, result: ShellExecutionResult, log_path: Optional[str] = None) -> str: |
| """ |
| Log shell execution result to file |
| |
| Parameters: |
| ----------- |
| result : ShellExecutionResult |
| Execution result to log |
| log_path : Optional[str] |
| Path for log file (if None, uses default) |
| |
| Returns: |
| -------- |
| Path to log file |
| """ |
| if log_path is None: |
| |
| log_dir = self.config.get("log_directory", "logs") |
| os.makedirs(log_dir, exist_ok=True) |
| |
| |
| timestamp = result.metadata.get("timestamp", "").replace(":", "-").replace(" ", "_") |
| log_path = os.path.join(log_dir, f"{result.shell_id}_{timestamp}.json") |
| |
| |
| result_dict = { |
| "shell_id": result.shell_id, |
| "success": result.success, |
| "execution_time": result.execution_time, |
| "result": result.result, |
| "residue": result.residue, |
| "collapse_detected": result.collapse_detected, |
| "collapse_type": result.collapse_type, |
| "attribution_map": result.attribution_map, |
| "metadata": result.metadata |
| } |
| |
| |
| if "visualization" in result_dict: |
| del result_dict["visualization"] |
| |
| |
| with open(log_path, 'w') as f: |
| json.dump(result_dict, f, indent=2) |
| |
| log.info(f"Execution result logged to {log_path}") |
| return log_path |
|
|
|
|
| |
| if __name__ == "__main__": |
| import argparse |
| import json |
| |
| |
| parser = argparse.ArgumentParser(description="Symbolic Shell Executor for transformerOS") |
| parser.add_argument("--shell", required=True, help="Shell ID to execute") |
| parser.add_argument("--prompt", required=True, help="Input prompt") |
| parser.add_argument("--pack", help="Path to shell pack to load first") |
| parser.add_argument("--model", default="default", help="Model ID to use") |
| parser.add_argument("--config", help="Path to executor configuration") |
| parser.add_argument("--visualize", action="store_true", help="Generate visualization") |
| parser.add_argument("--log", action="store_true", help="Log execution result") |
| parser.add_argument("--output", help="Output file for result") |
| |
| args = parser.parse_args() |
| |
| |
| executor = ShellExecutor( |
| config_path=args.config, |
| model_id=args.model |
| ) |
| |
| |
| if args.pack: |
| executor.load_shell_pack(args.pack) |
| |
| |
| result = executor.execute_shell( |
| args.shell, |
| args.prompt, |
| visualize=args.visualize |
| ) |
| |
| |
| if args.log: |
| executor.log_execution_result(result) |
| |
| |
| if args.output: |
| |
| result_dict = { |
| "shell_id": result.shell_id, |
| "success": result.success, |
| "execution_time": result.execution_time, |
| "result": result.result, |
| "collapse_detected": result.collapse_detected, |
| "collapse_type": result.collapse_type, |
| "metadata": result.metadata |
| } |
| |
| |
| if result.visualization: |
| result_dict["visualization"] = result.visualization |
| |
| |
| with open(args.output, 'w') as f: |
| json.dump(result_dict, f, indent=2) |
| else: |
| |
| print(f"Shell: {result.shell_id}") |
| print(f"Success: {result.success}") |
| print(f"Execution time: {result.execution_time:.2f}s") |
| print(f"Collapse detected: {result.collapse_detected}") |
| if result.collapse_detected: |
| print(f"Collapse type: {result.collapse_type}") |
| print("\nResult summary:") |
| for op_key, op_result in result.result.items(): |
| print(f" - {op_key}: {op_result.get('status', 'completed')}") |
|
|