| """ |
| fractal_json/encoder.py |
| Recursive Pattern Detection and Fractal Encoding Engine |
| """ |
|
|
| import json |
| import numpy as np |
| from collections import defaultdict |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| class FractalEncoder: |
| """ |
| Encodes standard JSON into fractal.json format using recursive pattern detection. |
| """ |
| |
| SYMBOLIC_MARKERS = { |
| 'root': '🜏', |
| 'seed': '∴', |
| 'bidirectional': '⇌', |
| 'compression': '⧖', |
| 'anchor': '☍' |
| } |
| |
| def __init__(self, compression_threshold: float = 0.8): |
| self.compression_threshold = compression_threshold |
| self.pattern_cache = defaultdict(lambda: defaultdict(int)) |
| self.symbolic_residue = {} |
| self.compression_ratio = 1.0 |
| |
| def encode(self, data: Any, depth: int = 0) -> Dict: |
| """ |
| Main encoding function that converts standard JSON to fractal format. |
| """ |
| |
| if isinstance(data, (str, int, float, bool)) or data is None: |
| return data |
| |
| |
| if isinstance(data, dict): |
| return self._encode_dict(data, depth) |
| elif isinstance(data, list): |
| return self._encode_list(data, depth) |
| else: |
| return data |
| |
| def _encode_dict(self, data: Dict, depth: int) -> Dict: |
| """ |
| Encode dictionary with fractal pattern detection. |
| """ |
| |
| pattern_id = self._detect_pattern(data) |
| fractal_node = { |
| f"{self.SYMBOLIC_MARKERS['compression']}depth": depth, |
| f"{self.SYMBOLIC_MARKERS['root']}pattern": pattern_id |
| } |
| |
| |
| if pattern_id in self.pattern_cache: |
| similar_patterns = self.pattern_cache[pattern_id] |
| if self._can_compress(data, similar_patterns): |
| |
| fractal_node[f"{self.SYMBOLIC_MARKERS['anchor']}anchor"] = self._create_anchor(pattern_id) |
| fractal_node[f"{self.SYMBOLIC_MARKERS['seed']}seed"] = self._extract_seed(data) |
| self.compression_ratio *= 0.85 |
| return fractal_node |
| |
| |
| children = {} |
| for key, value in data.items(): |
| encoded_key = f"{self.SYMBOLIC_MARKERS['bidirectional']}{key}" |
| children[encoded_key] = self.encode(value, depth + 1) |
| |
| if children: |
| fractal_node[f"{self.SYMBOLIC_MARKERS['bidirectional']}children"] = children |
| |
| |
| self.pattern_cache[pattern_id][json.dumps(data, sort_keys=True)] += 1 |
| |
| return fractal_node |
| |
| def _encode_list(self, data: List, depth: int) -> Dict: |
| """ |
| Encode list with fractal pattern detection. |
| """ |
| |
| pattern_groups = self._detect_list_patterns(data) |
| |
| if pattern_groups: |
| |
| return { |
| f"{self.SYMBOLIC_MARKERS['compression']}depth": depth, |
| f"{self.SYMBOLIC_MARKERS['root']}pattern": "list_fractal", |
| f"{self.SYMBOLIC_MARKERS['seed']}seed": self._extract_list_seed(pattern_groups), |
| f"{self.SYMBOLIC_MARKERS['bidirectional']}expansions": [ |
| self.encode(item, depth + 1) for item in data |
| ] |
| } |
| else: |
| |
| return [self.encode(item, depth + 1) for item in data] |
| |
| def _detect_pattern(self, data: Dict) -> str: |
| """ |
| Detect structural patterns in dictionaries using recursive hashing. |
| """ |
| |
| structure = {k: type(v).__name__ for k, v in data.items()} |
| structure_hash = hash(frozenset(structure.items())) |
| |
| |
| similarity_score = self._calculate_self_similarity(data) |
| |
| if similarity_score > self.compression_threshold: |
| return f"fractal_{structure_hash}" |
| else: |
| return f"standard_{structure_hash}" |
| |
| def _calculate_self_similarity(self, data: Any, parent_structure: Optional[Dict] = None) -> float: |
| """ |
| Calculate self-similarity score recursively. |
| """ |
| if not isinstance(data, dict): |
| return 0.0 |
| |
| current_structure = {k: type(v).__name__ for k, v in data.items()} |
| |
| if parent_structure is None: |
| |
| child_scores = [] |
| for value in data.values(): |
| if isinstance(value, dict): |
| child_scores.append(self._calculate_self_similarity(value, current_structure)) |
| |
| if child_scores: |
| return np.mean(child_scores) |
| else: |
| return 0.0 |
| else: |
| |
| common_keys = set(current_structure.keys()) & set(parent_structure.keys()) |
| if not common_keys: |
| return 0.0 |
| |
| matching_types = sum(1 for k in common_keys if current_structure[k] == parent_structure[k]) |
| return matching_types / len(common_keys) |
| |
| def _detect_list_patterns(self, data: List) -> List[List[Any]]: |
| """ |
| Detect repeating patterns in lists. |
| """ |
| if len(data) < 2: |
| return [] |
| |
| |
| patterns = [] |
| for pattern_length in range(1, len(data) // 2 + 1): |
| for i in range(len(data) - pattern_length + 1): |
| pattern = data[i:i + pattern_length] |
| |
| occurrences = 0 |
| for j in range(i, len(data) - pattern_length + 1, pattern_length): |
| if data[j:j + pattern_length] == pattern: |
| occurrences += 1 |
| |
| if occurrences >= 2: |
| patterns.append((pattern, occurrences)) |
| |
| |
| if patterns: |
| patterns.sort(key=lambda x: len(x[0]) * x[1], reverse=True) |
| return [p[0] for p in patterns[:3]] |
| |
| return [] |
| |
| def _can_compress(self, data: Dict, similar_patterns: Dict) -> bool: |
| """ |
| Determine if data can be compressed using existing patterns. |
| """ |
| data_str = json.dumps(data, sort_keys=True) |
| |
| return similar_patterns.get(data_str, 0) >= 2 |
| |
| def _create_anchor(self, pattern_id: str) -> str: |
| """ |
| Create anchor reference for pattern compression. |
| """ |
| return f"#/patterns/{pattern_id}" |
| |
| def _extract_seed(self, data: Dict) -> Dict: |
| """ |
| Extract minimal seed pattern from data. |
| """ |
| |
| seed = {} |
| for key, value in data.items(): |
| if isinstance(value, (str, int, float, bool)) or value is None: |
| seed[key] = value |
| else: |
| |
| seed[key] = f"{self.SYMBOLIC_MARKERS['bidirectional']}expand" |
| |
| return seed |
| |
| def _extract_list_seed(self, pattern_groups: List[List[Any]]) -> Dict: |
| """ |
| Extract seed pattern from repeating list elements. |
| """ |
| return { |
| "pattern": pattern_groups[0], |
| "repetitions": len(pattern_groups) |
| } |
| |
| def get_compression_stats(self) -> Dict: |
| """ |
| Return compression statistics. |
| """ |
| return { |
| "compression_ratio": self.compression_ratio, |
| "pattern_count": len(self.pattern_cache), |
| "symbolic_residue": self.symbolic_residue |
| } |
|
|