| """ |
| fractal_json/decoder.py |
| Recursive Pattern Reconstruction and Fractal Decoding Engine |
| """ |
|
|
| import json |
| from typing import Any, Dict, List, Optional, Union |
|
|
| class FractalDecoder: |
| """ |
| Decodes fractal.json format back to standard JSON while preserving recursive patterns. |
| """ |
| |
| SYMBOLIC_MARKERS = { |
| '🜏': 'root', |
| '∴': 'seed', |
| '⇌': 'bidirectional', |
| '⧖': 'compression', |
| '☍': 'anchor' |
| } |
| |
| def __init__(self): |
| self.pattern_registry = {} |
| self.expansion_cache = {} |
| self.recursion_depth = 0 |
| self.max_recursion = 100 |
| |
| def decode(self, fractal_data: Union[Dict, List, Any]) -> Any: |
| """ |
| Main decoding function that converts fractal format to standard JSON. |
| """ |
| |
| if not isinstance(fractal_data, (dict, list)): |
| return fractal_data |
| |
| |
| if isinstance(fractal_data, dict) and "$fractal" in fractal_data: |
| self._process_metadata(fractal_data["$fractal"]) |
| fractal_data = fractal_data.get("content", {}) |
| |
| |
| return self._decode_recursive(fractal_data) |
| |
| def _decode_recursive(self, data: Any) -> Any: |
| """ |
| Recursively decode fractal structures. |
| """ |
| |
| self.recursion_depth += 1 |
| if self.recursion_depth > self.max_recursion: |
| raise RecursionError("Maximum recursion depth exceeded in fractal decoding") |
| |
| try: |
| if isinstance(data, dict): |
| return self._decode_dict(data) |
| elif isinstance(data, list): |
| return self._decode_list(data) |
| else: |
| return data |
| finally: |
| self.recursion_depth -= 1 |
| |
| def _decode_dict(self, data: Dict) -> Union[Dict, Any]: |
| """ |
| Decode fractal dictionary structure. |
| """ |
| |
| if self._is_fractal_node(data): |
| |
| anchor_key = f"{self._get_marker('anchor')}anchor" |
| if anchor_key in data: |
| return self._resolve_anchor(data[anchor_key], data) |
| |
| |
| pattern_key = f"{self._get_marker('root')}pattern" |
| seed_key = f"{self._get_marker('seed')}seed" |
| |
| pattern_id = data.get(pattern_key) |
| seed = data.get(seed_key) |
| |
| if pattern_id and seed: |
| |
| expanded = self._expand_from_seed(pattern_id, seed, data) |
| if expanded is not None: |
| return expanded |
| |
| |
| decoded = {} |
| for key, value in data.items(): |
| |
| clean_key = self._clean_key(key) |
| |
| |
| if not self._is_metadata_key(key): |
| decoded[clean_key] = self._decode_recursive(value) |
| |
| return decoded |
| |
| def _decode_list(self, data: List) -> List: |
| """ |
| Decode list structure. |
| """ |
| |
| decoded = [] |
| for item in data: |
| decoded.append(self._decode_recursive(item)) |
| return decoded |
| |
| def _is_fractal_node(self, data: Dict) -> bool: |
| """ |
| Check if dictionary represents a fractal node. |
| """ |
| if not isinstance(data, dict): |
| return False |
| |
| |
| has_depth = any(key.startswith(self._get_marker('compression')) for key in data.keys()) |
| has_pattern = any(key.startswith(self._get_marker('root')) for key in data.keys()) |
| |
| return has_depth and has_pattern |
| |
| def _get_marker(self, marker_name: str) -> str: |
| """ |
| Get symbolic marker by name. |
| """ |
| for symbol, name in self.SYMBOLIC_MARKERS.items(): |
| if name == marker_name: |
| return symbol |
| return '' |
| |
| def _clean_key(self, key: str) -> str: |
| """ |
| Remove symbolic markers from keys. |
| """ |
| for marker in self.SYMBOLIC_MARKERS.keys(): |
| if key.startswith(marker): |
| return key[len(marker):] |
| return key |
| |
| def _is_metadata_key(self, key: str) -> bool: |
| """ |
| Check if key represents metadata. |
| """ |
| metadata_prefixes = ['depth', 'pattern', 'anchor'] |
| clean_key = self._clean_key(key) |
| return clean_key in metadata_prefixes |
| |
| def _resolve_anchor(self, anchor: str, context: Dict) -> Any: |
| """ |
| Resolve anchor reference to actual data. |
| """ |
| if anchor in self.expansion_cache: |
| return self.expansion_cache[anchor] |
| |
| |
| if anchor.startswith("#/patterns/"): |
| pattern_id = anchor.split("/")[-1] |
| if pattern_id in self.pattern_registry: |
| |
| expanded = self._expand_pattern(self.pattern_registry[pattern_id], context) |
| self.expansion_cache[anchor] = expanded |
| return expanded |
| |
| |
| return context |
| |
| def _expand_from_seed(self, pattern_id: str, seed: Any, context: Dict) -> Optional[Any]: |
| """ |
| Expand full structure from seed pattern. |
| """ |
| if not isinstance(seed, dict): |
| return None |
| |
| expanded = {} |
| for key, value in seed.items(): |
| if isinstance(value, str) and value.endswith("expand"): |
| |
| children_key = f"{self._get_marker('bidirectional')}children" |
| if children_key in context: |
| children = context[children_key] |
| expanded_key = f"{self._get_marker('bidirectional')}{key}" |
| if expanded_key in children: |
| expanded[key] = self._decode_recursive(children[expanded_key]) |
| else: |
| expanded[key] = None |
| else: |
| expanded[key] = value |
| |
| return expanded |
| |
| def _expand_pattern(self, pattern: Dict, context: Dict) -> Any: |
| """ |
| Expand pattern with context-specific values. |
| """ |
| |
| |
| return pattern |
| |
| def _process_metadata(self, metadata: Dict) -> None: |
| """ |
| Process fractal metadata for decoding context. |
| """ |
| if "interpretability_map" in metadata: |
| |
| self.pattern_registry.update(metadata["interpretability_map"]) |
| |
| def get_decoding_stats(self) -> Dict: |
| """ |
| Return decoding statistics. |
| """ |
| return { |
| "patterns_resolved": len(self.expansion_cache), |
| "max_recursion_depth": self.recursion_depth, |
| "pattern_registry_size": len(self.pattern_registry) |
| } |
|
|