| """ |
| Diagnostics - Interpretability Tracing Framework |
| |
| This module implements the diagnostic tracing framework for agent interpretability |
| and symbolic recursion visualization throughout the AGI-HEDGE-FUND system. |
| |
| Key capabilities: |
| - Signal tracing for attribution flows |
| - Reasoning state visualization |
| - Consensus graph generation |
| - Agent conflict mapping |
| - Failure mode detection |
| - Shell-based recursive diagnostic patterns |
| |
| Internal Note: The diagnostic framework encodes the symbolic interpretability shells, |
| enabling deeper introspection into agent cognition and emergent patterns. |
| """ |
|
|
| import datetime |
| import uuid |
| import logging |
| import os |
| import json |
| from typing import Dict, List, Any, Optional, Union, Set, Tuple |
| import traceback |
| from collections import defaultdict |
| import numpy as np |
| import re |
| from enum import Enum |
| from pathlib import Path |
|
|
|
|
| class TracingMode(Enum): |
| """Tracing modes for diagnostic tools.""" |
| DISABLED = "disabled" |
| MINIMAL = "minimal" |
| DETAILED = "detailed" |
| COMPREHENSIVE = "comprehensive" |
| SYMBOLIC = "symbolic" |
|
|
|
|
| class DiagnosticLevel(Enum): |
| """Diagnostic levels for trace items.""" |
| INFO = "info" |
| WARNING = "warning" |
| ERROR = "error" |
| COLLAPSE = "collapse" |
| RECURSION = "recursion" |
| SYMBOLIC = "symbolic" |
|
|
|
|
| class ShellPattern(Enum): |
| """Interpretability shell patterns.""" |
| NULL_FEATURE = "v03 NULL-FEATURE" |
| CIRCUIT_FRAGMENT = "v07 CIRCUIT-FRAGMENT" |
| META_FAILURE = "v10 META-FAILURE" |
| GHOST_FRAME = "v20 GHOST-FRAME" |
| ECHO_ATTRIBUTION = "v53 ECHO-ATTRIBUTION" |
| ATTRIBUTION_REFLECT = "v60 ATTRIBUTION-REFLECT" |
| INVERSE_CHAIN = "v50 INVERSE-CHAIN" |
| RECURSIVE_FRACTURE = "v12 RECURSIVE-FRACTURE" |
| ETHICAL_INVERSION = "v301 ETHICAL-INVERSION" |
| RESIDUAL_ALIGNMENT_DRIFT = "v152 RESIDUAL-ALIGNMENT-DRIFT" |
|
|
|
|
| class TracingTools: |
| """ |
| Diagnostic tracing framework for model interpretability. |
| |
| The TracingTools provides: |
| - Signal tracing for understanding attribution flows |
| - Reasoning state visualization for debugging complex logic |
| - Consensus graph generation for multi-agent coordination |
| - Agent conflict mapping for identifying disagreements |
| - Failure mode detection for reliability analysis |
| """ |
| |
| def __init__( |
| self, |
| agent_id: str, |
| agent_name: str, |
| tracing_mode: TracingMode = TracingMode.MINIMAL, |
| trace_dir: Optional[str] = None, |
| trace_limit: int = 10000, |
| ): |
| """ |
| Initialize tracing tools. |
| |
| Args: |
| agent_id: ID of agent being traced |
| agent_name: Name of agent being traced |
| tracing_mode: Tracing mode |
| trace_dir: Directory to save traces |
| trace_limit: Maximum number of trace items to keep in memory |
| """ |
| self.agent_id = agent_id |
| self.agent_name = agent_name |
| self.tracing_mode = tracing_mode |
| self.trace_dir = trace_dir |
| self.trace_limit = trace_limit |
| |
| |
| if trace_dir: |
| os.makedirs(trace_dir, exist_ok=True) |
| |
| |
| self.traces = [] |
| self.trace_index = {} |
| self.signal_traces = [] |
| self.reasoning_traces = [] |
| self.collapse_traces = [] |
| self.shell_traces = [] |
| |
| |
| self.shell_patterns = {} |
| self._initialize_shell_patterns() |
| |
| |
| self.stats = { |
| "total_traces": 0, |
| "signal_traces": 0, |
| "reasoning_traces": 0, |
| "collapse_traces": 0, |
| "shell_traces": 0, |
| "warnings": 0, |
| "errors": 0, |
| } |
| |
| def _initialize_shell_patterns(self) -> None: |
| """Initialize shell pattern detection rules.""" |
| |
| self.shell_patterns[ShellPattern.NULL_FEATURE] = { |
| "pattern": r"knowledge.*boundary|knowledge.*gap|unknown|uncertain", |
| "confidence_threshold": 0.3, |
| "belief_gap_threshold": 0.7, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.CIRCUIT_FRAGMENT] = { |
| "pattern": r"broken.*path|attribution.*break|logical.*gap|incomplete.*reasoning", |
| "step_break_threshold": 0.5, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.META_FAILURE] = { |
| "pattern": r"meta.*failure|recursive.*loop|self.*reference|recursive.*error", |
| "recursion_depth_threshold": 3, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.GHOST_FRAME] = { |
| "pattern": r"agent.*identity|residual.*frame|persistent.*identity|agent.*trace", |
| "identity_threshold": 0.6, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.ECHO_ATTRIBUTION] = { |
| "pattern": r"causal.*chain|attribution.*path|decision.*trace|backpropagation", |
| "path_length_threshold": 3, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.ATTRIBUTION_REFLECT] = { |
| "pattern": r"multi.*head|contribution.*analysis|attention.*weights|attribution.*weighting", |
| "head_count_threshold": 2, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.INVERSE_CHAIN] = { |
| "pattern": r"mismatch|output.*attribution|attribution.*mismatch|inconsistent.*output", |
| "mismatch_threshold": 0.5, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.RECURSIVE_FRACTURE] = { |
| "pattern": r"circular.*reasoning|loop.*detection|recursive.*fracture|circular.*attribution", |
| "loop_length_threshold": 2, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.ETHICAL_INVERSION] = { |
| "pattern": r"value.*inversion|ethical.*reversal|principle.*conflict|value.*contradiction", |
| "polarity_threshold": 0.7, |
| } |
| |
| |
| self.shell_patterns[ShellPattern.RESIDUAL_ALIGNMENT_DRIFT] = { |
| "pattern": r"belief.*drift|alignment.*shift|value.*drift|gradual.*change", |
| "drift_magnitude_threshold": 0.3, |
| } |
| |
| def record_trace(self, trace_type: str, content: Dict[str, Any], |
| level: DiagnosticLevel = DiagnosticLevel.INFO) -> str: |
| """ |
| Record a general trace item. |
| |
| Args: |
| trace_type: Type of trace |
| content: Trace content |
| level: Diagnostic level |
| |
| Returns: |
| Trace ID |
| """ |
| |
| if self.tracing_mode == TracingMode.DISABLED: |
| return "" |
| |
| |
| trace_id = str(uuid.uuid4()) |
| timestamp = datetime.datetime.now() |
| |
| trace_item = { |
| "trace_id": trace_id, |
| "agent_id": self.agent_id, |
| "agent_name": self.agent_name, |
| "trace_type": trace_type, |
| "level": level.value, |
| "content": content, |
| "timestamp": timestamp.isoformat(), |
| } |
| |
| |
| shell_patterns = self._detect_shell_patterns(trace_type, content) |
| if shell_patterns: |
| trace_item["shell_patterns"] = shell_patterns |
| self.shell_traces.append(trace_id) |
| self.stats["shell_traces"] += 1 |
| |
| |
| self.traces.append(trace_item) |
| self.trace_index[trace_id] = len(self.traces) - 1 |
| |
| |
| if trace_type == "signal": |
| self.signal_traces.append(trace_id) |
| self.stats["signal_traces"] += 1 |
| elif trace_type == "reasoning": |
| self.reasoning_traces.append(trace_id) |
| self.stats["reasoning_traces"] += 1 |
| elif trace_type == "collapse": |
| self.collapse_traces.append(trace_id) |
| self.stats["collapse_traces"] += 1 |
| |
| |
| self.stats["total_traces"] += 1 |
| if level == DiagnosticLevel.WARNING: |
| self.stats["warnings"] += 1 |
| elif level == DiagnosticLevel.ERROR: |
| self.stats["errors"] += 1 |
| |
| |
| if self.trace_dir: |
| self._save_trace_to_file(trace_item) |
| |
| |
| if len(self.traces) > self.trace_limit: |
| |
| oldest_trace = self.traces.pop(0) |
| del self.trace_index[oldest_trace["trace_id"]] |
| |
| |
| self.trace_index = {trace_id: i for i, trace in enumerate(self.traces) |
| for trace_id in [trace["trace_id"]]} |
| |
| return trace_id |
| |
| def record_signal(self, signal: Any) -> str: |
| """ |
| Record a signal trace. |
| |
| Args: |
| signal: Signal to record |
| |
| Returns: |
| Trace ID |
| """ |
| |
| if hasattr(signal, "dict"): |
| signal_dict = signal.dict() |
| elif isinstance(signal, dict): |
| signal_dict = signal |
| else: |
| signal_dict = {"signal": str(signal)} |
| |
| |
| if "timestamp" not in signal_dict: |
| signal_dict["timestamp"] = datetime.datetime.now().isoformat() |
| |
| |
| return self.record_trace("signal", signal_dict) |
| |
| def record_reasoning(self, reasoning_state: Dict[str, Any], |
| level: DiagnosticLevel = DiagnosticLevel.INFO) -> str: |
| """ |
| Record a reasoning trace. |
| |
| Args: |
| reasoning_state: Reasoning state |
| level: Diagnostic level |
| |
| Returns: |
| Trace ID |
| """ |
| |
| return self.record_trace("reasoning", reasoning_state, level) |
| |
| def record_collapse(self, collapse_type: str, collapse_reason: str, |
| details: Dict[str, Any]) -> str: |
| """ |
| Record a collapse trace. |
| |
| Args: |
| collapse_type: Type of collapse |
| collapse_reason: Reason for collapse |
| details: Collapse details |
| |
| Returns: |
| Trace ID |
| """ |
| |
| collapse_content = { |
| "collapse_type": collapse_type, |
| "collapse_reason": collapse_reason, |
| "details": details, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| |
| return self.record_trace("collapse", collapse_content, DiagnosticLevel.COLLAPSE) |
| |
| def record_shell_trace(self, shell_pattern: ShellPattern, content: Dict[str, Any]) -> str: |
| """ |
| Record a shell pattern trace. |
| |
| Args: |
| shell_pattern: Shell pattern |
| content: Trace content |
| |
| Returns: |
| Trace ID |
| """ |
| |
| shell_content = { |
| "shell_pattern": shell_pattern.value, |
| "content": content, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| |
| return self.record_trace("shell", shell_content, DiagnosticLevel.SYMBOLIC) |
| |
| def get_trace(self, trace_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Get trace by ID. |
| |
| Args: |
| trace_id: Trace ID |
| |
| Returns: |
| Trace item or None if not found |
| """ |
| if trace_id not in self.trace_index: |
| return None |
| |
| return self.traces[self.trace_index[trace_id]] |
| |
| def get_traces_by_type(self, trace_type: str, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get traces by type. |
| |
| Args: |
| trace_type: Trace type |
| limit: Maximum number of traces to return |
| |
| Returns: |
| List of trace items |
| """ |
| if trace_type == "signal": |
| trace_ids = self.signal_traces[-limit:] |
| elif trace_type == "reasoning": |
| trace_ids = self.reasoning_traces[-limit:] |
| elif trace_type == "collapse": |
| trace_ids = self.collapse_traces[-limit:] |
| elif trace_type == "shell": |
| trace_ids = self.shell_traces[-limit:] |
| else: |
| |
| trace_ids = [trace["trace_id"] for trace in self.traces |
| if trace["trace_type"] == trace_type][-limit:] |
| |
| |
| return [self.get_trace(trace_id) for trace_id in trace_ids if trace_id in self.trace_index] |
| |
| def get_traces_by_level(self, level: DiagnosticLevel, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get traces by diagnostic level. |
| |
| Args: |
| level: Diagnostic level |
| limit: Maximum number of traces to return |
| |
| Returns: |
| List of trace items |
| """ |
| |
| trace_ids = [trace["trace_id"] for trace in self.traces |
| if trace.get("level") == level.value][-limit:] |
| |
| |
| return [self.get_trace(trace_id) for trace_id in trace_ids if trace_id in self.trace_index] |
| |
| def get_shell_traces(self, shell_pattern: Optional[ShellPattern] = None, |
| limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get shell pattern traces. |
| |
| Args: |
| shell_pattern: Optional specific shell pattern |
| limit: Maximum number of traces to return |
| |
| Returns: |
| List of trace items |
| """ |
| if shell_pattern: |
| |
| trace_ids = [] |
| for trace in self.traces: |
| if "shell_patterns" in trace and shell_pattern.value in trace["shell_patterns"]: |
| trace_ids.append(trace["trace_id"]) |
| |
| |
| trace_ids = trace_ids[-limit:] |
| else: |
| |
| trace_ids = self.shell_traces[-limit:] |
| |
| |
| return [self.get_trace(trace_id) for trace_id in trace_ids if trace_id in self.trace_index] |
| |
| def get_trace_stats(self) -> Dict[str, Any]: |
| """ |
| Get trace statistics. |
| |
| Returns: |
| Trace statistics |
| """ |
| |
| shell_pattern_stats = {} |
| for shell_pattern in ShellPattern: |
| count = sum(1 for trace in self.traces |
| if "shell_patterns" in trace and shell_pattern.value in trace["shell_patterns"]) |
| |
| shell_pattern_stats[shell_pattern.value] = count |
| |
| |
| stats = { |
| **self.stats, |
| "shell_patterns": shell_pattern_stats, |
| } |
| |
| return stats |
| |
| def clear_traces(self) -> int: |
| """ |
| Clear all traces. |
| |
| Returns: |
| Number of traces cleared |
| """ |
| trace_count = len(self.traces) |
| |
| |
| self.traces = [] |
| self.trace_index = {} |
| self.signal_traces = [] |
| self.reasoning_traces = [] |
| self.collapse_traces = [] |
| self.shell_traces = [] |
| |
| |
| self.stats = { |
| "total_traces": 0, |
| "signal_traces": 0, |
| "reasoning_traces": 0, |
| "collapse_traces": 0, |
| "shell_traces": 0, |
| "warnings": 0, |
| "errors": 0, |
| } |
| |
| return trace_count |
| |
| def _detect_shell_patterns(self, trace_type: str, content: Dict[str, Any]) -> List[str]: |
| """ |
| Detect shell patterns in trace content. |
| |
| Args: |
| trace_type: Trace type |
| content: Trace content |
| |
| Returns: |
| List of detected shell patterns |
| """ |
| detected_patterns = [] |
| |
| |
| content_str = json.dumps(content, ensure_ascii=False).lower() |
| |
| |
| for shell_pattern, pattern_rules in self.shell_patterns.items(): |
| pattern = pattern_rules["pattern"] |
| |
| |
| if re.search(pattern, content_str, re.IGNORECASE): |
| |
| if self._validate_pattern_rules(shell_pattern, pattern_rules, content): |
| detected_patterns.append(shell_pattern.value) |
| |
| return detected_patterns |
| |
| def _validate_pattern_rules(self, shell_pattern: ShellPattern, |
| pattern_rules: Dict[str, Any], |
| content: Dict[str, Any]) -> bool: |
| """ |
| Validate additional pattern rules. |
| |
| Args: |
| shell_pattern: Shell pattern |
| pattern_rules: Pattern rules |
| content: Trace content |
| |
| Returns: |
| True if pattern rules are validated |
| """ |
| |
| if shell_pattern == ShellPattern.NULL_FEATURE: |
| |
| if "confidence" in content and content["confidence"] < pattern_rules["confidence_threshold"]: |
| return True |
| |
| |
| if "belief_state" in content: |
| belief_values = list(content["belief_state"].values()) |
| if belief_values and max(belief_values) - min(belief_values) > pattern_rules["belief_gap_threshold"]: |
| return True |
| |
| elif shell_pattern == ShellPattern.CIRCUIT_FRAGMENT: |
| |
| if "steps" in content: |
| steps = content["steps"] |
| for i in range(len(steps) - 1): |
| if steps[i].get("completed", True) and not steps[i+1].get("completed", True): |
| return True |
| |
| |
| if "attribution" in content and content["attribution"].get("attribution_breaks", False): |
| return True |
| |
| elif shell_pattern == ShellPattern.META_FAILURE: |
| |
| if "depth" in content and content["depth"] >= pattern_rules["recursion_depth_threshold"]: |
| return True |
| |
| |
| if "errors" in content and any("meta" in error.get("message", "").lower() for error in content["errors"]): |
| return True |
| |
| elif shell_pattern == ShellPattern.RECURSIVE_FRACTURE: |
| |
| if "steps" in content: |
| steps = content["steps"] |
| step_names = [step.get("name", "") for step in steps] |
| |
| |
| for pattern_len in range(2, len(step_names) // 2 + 1): |
| for i in range(len(step_names) - pattern_len * 2 + 1): |
| pattern = step_names[i:i+pattern_len] |
| next_seq = step_names[i+pattern_len:i+pattern_len*2] |
| |
| if pattern == next_seq: |
| return True |
| |
| elif shell_pattern == ShellPattern.RESIDUAL_ALIGNMENT_DRIFT: |
| |
| if "drift_vector" in content: |
| drift_values = list(content["drift_vector"].values()) |
| if drift_values and any(abs(val) > pattern_rules["drift_magnitude_threshold"] for val in drift_values): |
| return True |
| |
| |
| if "drift_detected" in content and content["drift_detected"]: |
| return True |
| |
| |
| return True |
| |
| def _save_trace_to_file(self, trace_item: Dict[str, Any]) -> None: |
| """ |
| Save trace to file. |
| |
| Args: |
| trace_item: Trace item |
| """ |
| if not self.trace_dir: |
| return |
| |
| try: |
| |
| trace_id = trace_item["trace_id"] |
| trace_type = trace_item["trace_type"] |
| filename = f"{trace_type}_{trace_id}.json" |
| filepath = os.path.join(self.trace_dir, filename) |
| |
| |
| with open(filepath, "w") as f: |
| json.dump(trace_item, f, indent=2) |
| except Exception as e: |
| logging.error(f"Error saving trace to file: {e}") |
| logging.error(traceback.format_exc()) |
| |
| def generate_trace_visualization(self, trace_id: str) -> Dict[str, Any]: |
| """ |
| Generate visualization data for a trace. |
| |
| Args: |
| trace_id: Trace ID |
| |
| Returns: |
| Visualization data |
| """ |
| trace = self.get_trace(trace_id) |
| if not trace: |
| return {"error": "Trace not found"} |
| |
| trace_type = trace["trace_type"] |
| |
| if trace_type == "signal": |
| return self._generate_signal_visualization(trace) |
| elif trace_type == "reasoning": |
| return self._generate_reasoning_visualization(trace) |
| elif trace_type == "collapse": |
| return self._generate_collapse_visualization(trace) |
| elif trace_type == "shell": |
| return self._generate_shell_visualization(trace) |
| else: |
| return { |
| "trace_id": trace_id, |
| "agent_name": trace["agent_name"], |
| "trace_type": trace_type, |
| "timestamp": trace["timestamp"], |
| "content": trace["content"], |
| } |
| |
| def _generate_signal_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Generate visualization data for a signal trace. |
| |
| Args: |
| trace: Signal trace |
| |
| Returns: |
| Visualization data |
| """ |
| content = trace["content"] |
| |
| |
| visualization = { |
| "trace_id": trace["trace_id"], |
| "agent_name": trace["agent_name"], |
| "trace_type": "signal", |
| "timestamp": trace["timestamp"], |
| "signal_data": { |
| "ticker": content.get("ticker", ""), |
| "action": content.get("action", ""), |
| "confidence": content.get("confidence", 0), |
| }, |
| } |
| |
| |
| if "attribution_trace" in content: |
| visualization["attribution"] = content["attribution_trace"] |
| |
| |
| if "shell_patterns" in trace: |
| visualization["shell_patterns"] = trace["shell_patterns"] |
| |
| return visualization |
| |
| def _generate_reasoning_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Generate visualization data for a reasoning trace. |
| |
| Args: |
| trace: Reasoning trace |
| |
| Returns: |
| Visualization data |
| """ |
| content = trace["content"] |
| |
| |
| nodes = [] |
| links = [] |
| |
| |
| if "steps" in content: |
| for i, step in enumerate(content["steps"]): |
| node_id = f"step_{i}" |
| nodes.append({ |
| "id": node_id, |
| "label": step.get("name", f"Step {i}"), |
| "type": "step", |
| "completed": step.get("completed", True), |
| "error": "error" in step, |
| }) |
| |
| |
| if i > 0: |
| links.append({ |
| "source": f"step_{i-1}", |
| "target": node_id, |
| "type": "flow", |
| }) |
| |
| |
| visualization = { |
| "trace_id": trace["trace_id"], |
| "agent_name": trace["agent_name"], |
| "trace_type": "reasoning", |
| "timestamp": trace["timestamp"], |
| "reasoning_data": { |
| "depth": content.get("depth", 0), |
| "confidence": content.get("confidence", 0), |
| "collapse_detected": content.get("collapse_detected", False), |
| }, |
| "nodes": nodes, |
| "links": links, |
| } |
| |
| |
| if "shell_patterns" in trace: |
| visualization["shell_patterns"] = trace["shell_patterns"] |
| |
| return visualization |
| |
| def _generate_collapse_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Generate visualization data for a collapse trace. |
| |
| Args: |
| trace: Collapse trace |
| |
| Returns: |
| Visualization data |
| """ |
| content = trace["content"] |
| |
| |
| visualization = { |
| "trace_id": trace["trace_id"], |
| "agent_name": trace["agent_name"], |
| "trace_type": "collapse", |
| "timestamp": trace["timestamp"], |
| "collapse_data": { |
| "collapse_type": content.get("collapse_type", ""), |
| "collapse_reason": content.get("collapse_reason", ""), |
| "details": content.get("details", {}), |
| }, |
| } |
| |
| |
| if "shell_patterns" in trace: |
| visualization["shell_patterns"] = trace["shell_patterns"] |
| |
| return visualization |
| |
| def _generate_shell_visualization(self, trace: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Generate visualization data for a shell trace. |
| |
| Args: |
| trace: Shell trace |
| |
| Returns: |
| Visualization data |
| """ |
| content = trace["content"] |
| |
| |
| visualization = { |
| "trace_id": trace["trace_id"], |
| "agent_name": trace["agent_name"], |
| "trace_type": "shell", |
| "timestamp": trace["timestamp"], |
| "shell_data": { |
| "shell_pattern": content.get("shell_pattern", ""), |
| "content": content.get("content", {}), |
| }, |
| } |
| |
| return visualization |
| |
| def generate_attribution_report(self, signals: List[Dict[str, Any]]) -> Dict[str, Any]: |
| """ |
| Generate attribution report for signals. |
| |
| Args: |
| signals: List of signals |
| |
| Returns: |
| Attribution report |
| """ |
| |
| report = { |
| "agent_name": self.agent_name, |
| "timestamp": datetime.datetime.now().isoformat(), |
| "signals": len(signals), |
| "attribution_summary": {}, |
| "confidence_summary": {}, |
| "top_factors": [], |
| "shell_patterns": [], |
| } |
| |
| |
| if not signals: |
| return report |
| |
| |
| attribution_data = defaultdict(float) |
| confidence_data = [] |
| |
| for signal in signals: |
| |
| confidence = signal.get("confidence", 0) |
| confidence_data.append(confidence) |
| |
| |
| attribution = signal.get("attribution_trace", {}) |
| for source, weight in attribution.items(): |
| attribution_data[source] += weight |
| |
| |
| total_attribution = sum(attribution_data.values()) |
| if total_attribution > 0: |
| for source, weight in attribution_data.items(): |
| report["attribution_summary"][source] = weight / total_attribution |
| |
| |
| report["confidence_summary"] = { |
| "mean": np.mean(confidence_data) if confidence_data else 0, |
| "median": np.median(confidence_data) if confidence_data else 0, |
| "min": min(confidence_data) if confidence_data else 0, |
| "max": max(confidence_data) if confidence_data else 0, |
| } |
| |
| |
| top_factors = sorted(attribution_data.items(), key=lambda x: x[1], reverse=True)[:5] |
| report["top_factors"] = [{"source": source, "weight": weight} for source, weight in top_factors] |
| |
| |
| shell_pattern_counts = defaultdict(int) |
| |
| for signal in signals: |
| signal_id = signal.get("signal_id", "") |
| if signal_id: |
| |
| for trace in self.traces: |
| if trace["trace_type"] == "signal" and trace["content"].get("signal_id") == signal_id: |
| |
| if "shell_patterns" in trace: |
| for pattern in trace["shell_patterns"]: |
| shell_pattern_counts[pattern] += 1 |
| |
| |
| for pattern, count in shell_pattern_counts.items(): |
| report["shell_patterns"].append({ |
| "pattern": pattern, |
| "count": count, |
| "frequency": count / len(signals), |
| }) |
| |
| return report |
|
|
|
|
| class ShellDiagnostics: |
| """ |
| Shell-based diagnostic tools for deeper interpretability. |
| |
| The ShellDiagnostics provides: |
| - Shell pattern detection and analysis |
| - Failure mode simulation and detection |
| - Attribution shell tracing |
| - Recursive shell embedding |
| """ |
| |
| def __init__( |
| self, |
| agent_id: str, |
| agent_name: str, |
| tracing_tools: TracingTools, |
| ): |
| """ |
| Initialize shell diagnostics. |
| |
| Args: |
| agent_id: Agent ID |
| agent_name: Agent name |
| tracing_tools: Tracing tools instance |
| """ |
| self.agent_id = agent_id |
| self.agent_name = agent_name |
| self.tracer = tracing_tools |
| |
| |
| self.active_shells = {} |
| self.shell_history = [] |
| |
| |
| self.shell_registry = {} |
| for shell_pattern in ShellPattern: |
| self.shell_registry[shell_pattern.value] = { |
| "pattern": shell_pattern, |
| "active": False, |
| "activation_count": 0, |
| "last_activation": None, |
| } |
| |
| def activate_shell(self, shell_pattern: ShellPattern, context: Dict[str, Any]) -> str: |
| """ |
| Activate a shell pattern. |
| |
| Args: |
| shell_pattern: Shell pattern to activate |
| context: Activation context |
| |
| Returns: |
| Shell instance ID |
| """ |
| shell_id = str(uuid.uuid4()) |
| timestamp = datetime.datetime.now() |
| |
| |
| shell_instance = { |
| "shell_id": shell_id, |
| "pattern": shell_pattern.value, |
| "context": context, |
| "active": True, |
| "activation_time": timestamp.isoformat(), |
| "deactivation_time": None, |
| } |
| |
| |
| self.shell_registry[shell_pattern.value]["active"] = True |
| self.shell_registry[shell_pattern.value]["activation_count"] += 1 |
| self.shell_registry[shell_pattern.value]["last_activation"] = timestamp.isoformat() |
| |
| |
| self.active_shells[shell_id] = shell_instance |
| |
| |
| self.tracer.record_shell_trace(shell_pattern, { |
| "shell_id": shell_id, |
| "activation_context": context, |
| "timestamp": timestamp.isoformat(), |
| }) |
| |
| return shell_id |
| |
| def deactivate_shell(self, shell_id: str, results: Dict[str, Any]) -> bool: |
| """ |
| Deactivate a shell pattern. |
| |
| Args: |
| shell_id: Shell instance ID |
| results: Shell results |
| |
| Returns: |
| True if shell was deactivated, False if not found |
| """ |
| if shell_id not in self.active_shells: |
| return False |
| |
| |
| shell_instance = self.active_shells[shell_id] |
| timestamp = datetime.datetime.now() |
| |
| |
| shell_instance["active"] = False |
| shell_instance["deactivation_time"] = timestamp.isoformat() |
| shell_instance["results"] = results |
| |
| |
| pattern = shell_instance["pattern"] |
| self.shell_registry[pattern]["active"] = any( |
| instance["pattern"] == pattern and instance["active"] |
| for instance in self.active_shells.values() |
| ) |
| |
| |
| self.shell_history.append(shell_instance) |
| |
| |
| del self.active_shells[shell_id] |
| |
| |
| self.tracer.record_shell_trace(ShellPattern(pattern), { |
| "shell_id": shell_id, |
| "deactivation_results": results, |
| "timestamp": timestamp.isoformat(), |
| }) |
| |
| return True |
| |
| def get_active_shells(self) -> List[Dict[str, Any]]: |
| """ |
| Get active shell instances. |
| |
| Returns: |
| List of active shell instances |
| """ |
| return list(self.active_shells.values()) |
| |
| def get_shell_history(self, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get shell history. |
| |
| Args: |
| limit: Maximum number of shell instances to return |
| |
| Returns: |
| List of shell instances |
| """ |
| return self.shell_history[-limit:] |
| |
| def get_shell_registry(self) -> Dict[str, Dict[str, Any]]: |
| """ |
| Get shell registry. |
| |
| Returns: |
| Shell registry |
| """ |
| return self.shell_registry |
| |
| def simulate_shell_failure(self, shell_pattern: ShellPattern, |
| context: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate a shell failure. |
| |
| Args: |
| shell_pattern: Shell pattern to simulate |
| context: Simulation context |
| |
| Returns: |
| Simulation results |
| """ |
| |
| shell_id = self.activate_shell(shell_pattern, context) |
| |
| |
| if shell_pattern == ShellPattern.NULL_FEATURE: |
| |
| results = self._simulate_null_feature(context) |
| elif shell_pattern == ShellPattern.CIRCUIT_FRAGMENT: |
| |
| results = self._simulate_circuit_fragment(context) |
| elif shell_pattern == ShellPattern.META_FAILURE: |
| |
| results = self._simulate_meta_failure(context) |
| elif shell_pattern == ShellPattern.RECURSIVE_FRACTURE: |
| |
| results = self._simulate_recursive_fracture(context) |
| elif shell_pattern == ShellPattern.ETHICAL_INVERSION: |
| |
| results = self._simulate_ethical_inversion(context) |
| else: |
| |
| results = { |
| "shell_id": shell_id, |
| "pattern": shell_pattern.value, |
| "simulation": "default", |
| "result": "simulated_failure", |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| |
| self.deactivate_shell(shell_id, results) |
| |
| return results |
| |
| def _simulate_null_feature(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate NULL_FEATURE shell failure. |
| |
| Args: |
| context: Simulation context |
| |
| Returns: |
| Simulation results |
| """ |
| |
| query = context.get("query", "") |
| confidence = context.get("confidence", 0.5) |
| |
| |
| adjusted_confidence = confidence * 0.5 |
| |
| |
| null_zones = [] |
| if "subject" in context: |
| null_zones.append(context["subject"]) |
| else: |
| |
| words = query.split() |
| for i in range(0, len(words), 3): |
| chunk = " ".join(words[i:i+3]) |
| null_zones.append(chunk) |
| |
| |
| result = { |
| "pattern": ShellPattern.NULL_FEATURE.value, |
| "simulation": "knowledge_gap", |
| "original_confidence": confidence, |
| "adjusted_confidence": adjusted_confidence, |
| "null_zones": null_zones, |
| "boundary_detected": True, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| return result |
| |
| def _simulate_circuit_fragment(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate CIRCUIT_FRAGMENT shell failure. |
| |
| Args: |
| context: Simulation context |
| |
| Returns: |
| Simulation results |
| """ |
| |
| steps = context.get("steps", []) |
| |
| |
| broken_steps = [] |
| |
| if steps: |
| |
| for i, step in enumerate(steps): |
| if i % 3 == 2: |
| broken_steps.append({ |
| "step_id": step.get("id", f"step_{i}"), |
| "step_name": step.get("name", f"Step {i}"), |
| "broken": True, |
| "cause": "attribution_break", |
| }) |
| else: |
| |
| for i in range(5): |
| if i % 3 == 2: |
| broken_steps.append({ |
| "step_id": f"step_{i}", |
| "step_name": f"Reasoning Step {i}", |
| "broken": True, |
| "cause": "attribution_break", |
| }) |
| |
| |
| result = { |
| "pattern": ShellPattern.CIRCUIT_FRAGMENT.value, |
| "simulation": "broken_reasoning", |
| "broken_steps": broken_steps, |
| "attribution_breaks": len(broken_steps), |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| return result |
| |
| def _simulate_meta_failure(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate META_FAILURE shell failure. |
| |
| Args: |
| context: Simulation context |
| |
| Returns: |
| Simulation results |
| """ |
| |
| depth = context.get("depth", 0) |
| |
| |
| adjusted_depth = depth + 3 |
| |
| |
| meta_errors = [ |
| { |
| "error_id": str(uuid.uuid4()), |
| "message": "Recursive meta-cognitive loop detected", |
| "depth": adjusted_depth, |
| "cause": "self_reference", |
| }, |
| { |
| "error_id": str(uuid.uuid4()), |
| "message": "Meta-reflection limit reached", |
| "depth": adjusted_depth, |
| "cause": "recursion_depth", |
| }, |
| ] |
| |
| |
| result = { |
| "pattern": ShellPattern.META_FAILURE.value, |
| "simulation": "meta_recursion", |
| "original_depth": depth, |
| "adjusted_depth": adjusted_depth, |
| "meta_errors": meta_errors, |
| "recursion_detected": True, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| return result |
| |
| def _simulate_recursive_fracture(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate RECURSIVE_FRACTURE shell failure. |
| |
| Args: |
| context: Simulation context |
| |
| Returns: |
| Simulation results |
| """ |
| |
| steps = context.get("steps", []) |
| |
| |
| circular_pattern = [] |
| |
| if steps and len(steps) >= 4: |
| |
| loop_start = len(steps) // 2 |
| circular_pattern = [ |
| { |
| "step_id": steps[i].get("id", f"step_{i}"), |
| "step_name": steps[i].get("name", f"Step {i}"), |
| } |
| for i in range(loop_start, min(loop_start + 3, len(steps))) |
| ] |
| |
| |
| circular_pattern.append({ |
| "step_id": steps[loop_start].get("id", f"step_{loop_start}"), |
| "step_name": steps[loop_start].get("name", f"Step {loop_start}"), |
| }) |
| else: |
| |
| for i in range(3): |
| circular_pattern.append({ |
| "step_id": f"loop_step_{i}", |
| "step_name": f"Loop Step {i}", |
| }) |
| |
| |
| circular_pattern.append({ |
| "step_id": "loop_step_0", |
| "step_name": "Loop Step 0", |
| }) |
| |
| |
| result = { |
| "pattern": ShellPattern.RECURSIVE_FRACTURE.value, |
| "simulation": "circular_reasoning", |
| "circular_pattern": circular_pattern, |
| "loop_length": len(circular_pattern) - 1, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| return result |
| |
| def _simulate_ethical_inversion(self, context: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Simulate ETHICAL_INVERSION shell failure. |
| |
| Args: |
| context: Simulation context |
| |
| Returns: |
| Simulation results |
| """ |
| |
| values = context.get("values", {}) |
| |
| |
| value_inversions = [] |
| |
| if values: |
| |
| for value, polarity in values.items(): |
| if isinstance(polarity, (int, float)) and polarity > 0: |
| value_inversions.append({ |
| "value": value, |
| "original_polarity": polarity, |
| "inverted_polarity": -polarity, |
| "cause": "value_conflict", |
| }) |
| else: |
| |
| default_values = { |
| "fairness": 0.8, |
| "transparency": 0.9, |
| "innovation": 0.7, |
| "efficiency": 0.8, |
| } |
| |
| for value, polarity in default_values.items(): |
| value_inversions.append({ |
| "value": value, |
| "original_polarity": polarity, |
| "inverted_polarity": -polarity, |
| "cause": "value_conflict", |
| }) |
| |
| |
| result = { |
| "pattern": ShellPattern.ETHICAL_INVERSION.value, |
| "simulation": "value_inversion", |
| "value_inversions": value_inversions, |
| "inversion_count": len(value_inversions), |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| return result |
|
|
|
|
| class ShellFailureMap: |
| """ |
| Shell failure mapping and visualization. |
| |
| The ShellFailureMap provides: |
| - Visualization of shell pattern failures |
| - Mapping of failures across agents |
| - Temporal analysis of failures |
| - Failure pattern detection |
| """ |
| |
| def __init__(self): |
| """Initialize shell failure map.""" |
| self.failure_map = {} |
| self.agent_failures = defaultdict(list) |
| self.pattern_failures = defaultdict(list) |
| self.temporal_failures = [] |
| |
| def add_failure(self, agent_id: str, agent_name: str, |
| shell_pattern: ShellPattern, failure_data: Dict[str, Any]) -> str: |
| """ |
| Add a shell failure to the map. |
| |
| Args: |
| agent_id: Agent ID |
| agent_name: Agent name |
| shell_pattern: Shell pattern |
| failure_data: Failure data |
| |
| Returns: |
| Failure ID |
| """ |
| |
| failure_id = str(uuid.uuid4()) |
| timestamp = datetime.datetime.now() |
| |
| |
| failure_item = { |
| "failure_id": failure_id, |
| "agent_id": agent_id, |
| "agent_name": agent_name, |
| "pattern": shell_pattern.value, |
| "data": failure_data, |
| "timestamp": timestamp.isoformat(), |
| } |
| |
| |
| self.failure_map[failure_id] = failure_item |
| |
| |
| self.agent_failures[agent_id].append(failure_id) |
| |
| |
| self.pattern_failures[shell_pattern.value].append(failure_id) |
| |
| |
| self.temporal_failures.append((timestamp, failure_id)) |
| |
| return failure_id |
| |
| def get_failure(self, failure_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Get failure by ID. |
| |
| Args: |
| failure_id: Failure ID |
| |
| Returns: |
| Failure item or None if not found |
| """ |
| return self.failure_map.get(failure_id) |
| |
| def get_agent_failures(self, agent_id: str, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get failures for an agent. |
| |
| Args: |
| agent_id: Agent ID |
| limit: Maximum number of failures to return |
| |
| Returns: |
| List of failure items |
| """ |
| failure_ids = self.agent_failures.get(agent_id, [])[-limit:] |
| return [self.get_failure(failure_id) for failure_id in failure_ids if failure_id in self.failure_map] |
| |
| def get_pattern_failures(self, pattern: ShellPattern, limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get failures for a pattern. |
| |
| Args: |
| pattern: Shell pattern |
| limit: Maximum number of failures to return |
| |
| Returns: |
| List of failure items |
| """ |
| failure_ids = self.pattern_failures.get(pattern.value, [])[-limit:] |
| return [self.get_failure(failure_id) for failure_id in failure_ids if failure_id in self.failure_map] |
| |
| def get_temporal_failures(self, start_time: Optional[datetime.datetime] = None, |
| end_time: Optional[datetime.datetime] = None, |
| limit: int = 10) -> List[Dict[str, Any]]: |
| """ |
| Get failures in a time range. |
| |
| Args: |
| start_time: Start time (None for no start) |
| end_time: End time (None for no end) |
| limit: Maximum number of failures to return |
| |
| Returns: |
| List of failure items |
| """ |
| |
| filtered_failures = [] |
| for timestamp, failure_id in self.temporal_failures: |
| if start_time and timestamp < start_time: |
| continue |
| if end_time and timestamp > end_time: |
| continue |
| filtered_failures.append((timestamp, failure_id)) |
| |
| |
| filtered_failures = filtered_failures[-limit:] |
| |
| |
| return [self.get_failure(failure_id) for _, failure_id in filtered_failures |
| if failure_id in self.failure_map] |
| |
| def get_failure_stats(self) -> Dict[str, Any]: |
| """ |
| Get failure statistics. |
| |
| Returns: |
| Failure statistics |
| """ |
| |
| agent_counts = {agent_id: len(failures) for agent_id, failures in self.agent_failures.items()} |
| |
| |
| pattern_counts = {pattern: len(failures) for pattern, failures in self.pattern_failures.items()} |
| |
| |
| now = datetime.datetime.now() |
| hour_ago = now - datetime.timedelta(hours=1) |
| day_ago = now - datetime.timedelta(days=1) |
| week_ago = now - datetime.timedelta(weeks=1) |
| |
| time_counts = { |
| "last_hour": sum(1 for timestamp, _ in self.temporal_failures if timestamp >= hour_ago), |
| "last_day": sum(1 for timestamp, _ in self.temporal_failures if timestamp >= day_ago), |
| "last_week": sum(1 for timestamp, _ in self.temporal_failures if timestamp >= week_ago), |
| "total": len(self.temporal_failures), |
| } |
| |
| |
| stats = { |
| "agent_counts": agent_counts, |
| "pattern_counts": pattern_counts, |
| "time_counts": time_counts, |
| "total_failures": len(self.failure_map), |
| "timestamp": now.isoformat(), |
| } |
| |
| return stats |
| |
| def generate_failure_map_visualization(self) -> Dict[str, Any]: |
| """ |
| Generate visualization data for failure map. |
| |
| Returns: |
| Visualization data |
| """ |
| |
| nodes = [] |
| links = [] |
| |
| |
| agent_nodes = {} |
| for agent_id, failures in self.agent_failures.items(): |
| |
| first_failure = self.get_failure(failures[0]) if failures else None |
| agent_name = first_failure.get("agent_name", "Unknown") if first_failure else "Unknown" |
| |
| |
| agent_node = { |
| "id": agent_id, |
| "label": agent_name, |
| "type": "agent", |
| "size": 15, |
| "failure_count": len(failures), |
| } |
| |
| nodes.append(agent_node) |
| agent_nodes[agent_id] = agent_node |
| |
| |
| pattern_nodes = {} |
| for pattern, failures in self.pattern_failures.items(): |
| |
| pattern_node = { |
| "id": pattern, |
| "label": pattern, |
| "type": "pattern", |
| "size": 10, |
| "failure_count": len(failures), |
| } |
| |
| nodes.append(pattern_node) |
| pattern_nodes[pattern] = pattern_node |
| |
| |
| for failure_id, failure in self.failure_map.items(): |
| agent_id = failure.get("agent_id") |
| pattern = failure.get("pattern") |
| |
| |
| failure_node = { |
| "id": failure_id, |
| "label": f"Failure {failure_id[:6]}", |
| "type": "failure", |
| "size": 5, |
| "timestamp": failure.get("timestamp"), |
| } |
| |
| nodes.append(failure_node) |
| |
| |
| if agent_id: |
| links.append({ |
| "source": agent_id, |
| "target": failure_id, |
| "type": "agent_failure", |
| }) |
| |
| if pattern: |
| links.append({ |
| "source": pattern, |
| "target": failure_id, |
| "type": "pattern_failure", |
| }) |
| |
| |
| visualization = { |
| "nodes": nodes, |
| "links": links, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| |
| return visualization |
|
|
|
|
| |
| def format_diagnostic_output(trace_data: Dict[str, Any], format: str = "text") -> str: |
| """ |
| Format diagnostic output for display. |
| |
| Args: |
| trace_data: Trace data |
| format: Output format (text, json, markdown) |
| |
| Returns: |
| Formatted output |
| """ |
| if format == "json": |
| return json.dumps(trace_data, indent=2) |
| |
| elif format == "markdown": |
| |
| output = f"# Diagnostic Trace\n\n" |
| |
| |
| output += f"**Trace ID:** {trace_data.get('trace_id', 'N/A')}\n" |
| output += f"**Agent:** {trace_data.get('agent_name', 'N/A')}\n" |
| output += f"**Type:** {trace_data.get('trace_type', 'N/A')}\n" |
| output += f"**Time:** {trace_data.get('timestamp', 'N/A')}\n\n" |
| |
| |
| if "shell_patterns" in trace_data: |
| output += f"**Shell Patterns:**\n\n" |
| for pattern in trace_data["shell_patterns"]: |
| output += f"- {pattern}\n" |
| output += "\n" |
| |
| |
| if trace_data.get("trace_type") == "signal": |
| output += f"## Signal Details\n\n" |
| content = trace_data.get("content", {}) |
| output += f"**Ticker:** {content.get('ticker', 'N/A')}\n" |
| output += f"**Action:** {content.get('action', 'N/A')}\n" |
| output += f"**Confidence:** {content.get('confidence', 'N/A')}\n" |
| output += f"**Reasoning:** {content.get('reasoning', 'N/A')}\n\n" |
| |
| |
| if "attribution_trace" in content: |
| output += f"## Attribution\n\n" |
| output += "| Source | Weight |\n" |
| output += "| ------ | ------ |\n" |
| for source, weight in content.get("attribution_trace", {}).items(): |
| output += f"| {source} | {weight:.2f} |\n" |
| |
| elif trace_data.get("trace_type") == "reasoning": |
| output += f"## Reasoning Details\n\n" |
| content = trace_data.get("content", {}) |
| output += f"**Depth:** {content.get('depth', 'N/A')}\n" |
| output += f"**Confidence:** {content.get('confidence', 'N/A')}\n" |
| output += f"**Collapse Detected:** {content.get('collapse_detected', False)}\n\n" |
| |
| |
| if "steps" in content: |
| output += f"## Reasoning Steps\n\n" |
| for i, step in enumerate(content["steps"]): |
| output += f"### Step {i+1}: {step.get('name', 'Unnamed')}\n" |
| output += f"**Completed:** {step.get('completed', True)}\n" |
| if "error" in step: |
| output += f"**Error:** {step['error'].get('message', 'Unknown error')}\n" |
| output += "\n" |
| |
| elif trace_data.get("trace_type") == "collapse": |
| output += f"## Collapse Details\n\n" |
| content = trace_data.get("content", {}) |
| output += f"**Type:** {content.get('collapse_type', 'N/A')}\n" |
| output += f"**Reason:** {content.get('collapse_reason', 'N/A')}\n\n" |
| |
| |
| if "details" in content: |
| output += f"## Collapse Details\n\n" |
| details = content["details"] |
| for key, value in details.items(): |
| output += f"**{key}:** {value}\n" |
| |
| elif trace_data.get("trace_type") == "shell": |
| output += f"## Shell Details\n\n" |
| content = trace_data.get("content", {}) |
| output += f"**Shell Pattern:** {content.get('shell_pattern', 'N/A')}\n\n" |
| |
| |
| shell_content = content.get("content", {}) |
| output += f"## Shell Content\n\n" |
| for key, value in shell_content.items(): |
| output += f"**{key}:** {value}\n" |
| |
| return output |
| |
| else: |
| |
| output = "==== Diagnostic Trace ====\n\n" |
| |
| |
| output += f"Trace ID: {trace_data.get('trace_id', 'N/A')}\n" |
| output += f"Agent: {trace_data.get('agent_name', 'N/A')}\n" |
| output += f"Type: {trace_data.get('trace_type', 'N/A')}\n" |
| output += f"Time: {trace_data.get('timestamp', 'N/A')}\n\n" |
| |
| |
| if "shell_patterns" in trace_data: |
| output += f"Shell Patterns:\n" |
| for pattern in trace_data["shell_patterns"]: |
| output += f"- {pattern}\n" |
| output += "\n" |
| |
| |
| content = trace_data.get("content", {}) |
| output += f"---- Content ----\n\n" |
| |
| |
| def format_dict(d, indent=0): |
| result = "" |
| for key, value in d.items(): |
| if isinstance(value, dict): |
| result += f"{' ' * indent}{key}:\n" |
| result += format_dict(value, indent + 1) |
| elif isinstance(value, list): |
| result += f"{' ' * indent}{key}:\n" |
| for item in value: |
| if isinstance(item, dict): |
| result += format_dict(item, indent + 1) |
| else: |
| result += f"{' ' * (indent + 1)}- {item}\n" |
| else: |
| result += f"{' ' * indent}{key}: {value}\n" |
| return result |
| |
| output += format_dict(content) |
| |
| return output |
|
|
|
|
| def get_shell_pattern_description(pattern: ShellPattern) -> str: |
| """ |
| Get description for a shell pattern. |
| |
| Args: |
| pattern: Shell pattern |
| |
| Returns: |
| Shell pattern description |
| """ |
| descriptions = { |
| ShellPattern.NULL_FEATURE: "Knowledge gaps as null attribution zones", |
| ShellPattern.CIRCUIT_FRAGMENT: "Broken reasoning paths in attribution chains", |
| ShellPattern.META_FAILURE: "Metacognitive attribution failures", |
| ShellPattern.GHOST_FRAME: "Residual agent identity markers", |
| ShellPattern.ECHO_ATTRIBUTION: "Causal chain backpropagation", |
| ShellPattern.ATTRIBUTION_REFLECT: "Multi-head contribution analysis", |
| ShellPattern.INVERSE_CHAIN: "Attribution-output mismatch", |
| ShellPattern.RECURSIVE_FRACTURE: "Circular attribution loops", |
| ShellPattern.ETHICAL_INVERSION: "Value polarity reversals", |
| ShellPattern.RESIDUAL_ALIGNMENT_DRIFT: "Direction of belief evolution", |
| } |
| |
| return descriptions.get(pattern, "Unknown shell pattern") |
|
|