| """ |
| CompI Data Processing Utilities |
| |
| This module provides utilities for Phase 2.B: Data/Logic Input Integration |
| - CSV data analysis and processing |
| - Mathematical formula evaluation |
| - Data-to-text conversion (poetic descriptions) |
| - Data visualization generation |
| - Statistical analysis and pattern detection |
| """ |
|
|
| import os |
| import io |
| import ast |
| import math |
| import numpy as np |
| import pandas as pd |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from typing import Dict, List, Optional, Tuple, Union, Any |
| from dataclasses import dataclass |
| from PIL import Image |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| @dataclass |
| class DataFeatures: |
| """Container for extracted data features and statistics""" |
| |
| |
| shape: Tuple[int, int] |
| columns: List[str] |
| numeric_columns: List[str] |
| data_types: Dict[str, str] |
| |
| |
| means: Dict[str, float] |
| medians: Dict[str, float] |
| stds: Dict[str, float] |
| mins: Dict[str, float] |
| maxs: Dict[str, float] |
| ranges: Dict[str, float] |
| |
| |
| trends: Dict[str, str] |
| correlations: Dict[str, float] |
| seasonality: Dict[str, bool] |
| |
| |
| complexity_score: float |
| variability_score: float |
| pattern_strength: float |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary for JSON serialization""" |
| return { |
| 'shape': self.shape, |
| 'columns': self.columns, |
| 'numeric_columns': self.numeric_columns, |
| 'data_types': self.data_types, |
| 'means': self.means, |
| 'medians': self.medians, |
| 'stds': self.stds, |
| 'mins': self.mins, |
| 'maxs': self.maxs, |
| 'ranges': self.ranges, |
| 'trends': self.trends, |
| 'correlations': self.correlations, |
| 'seasonality': self.seasonality, |
| 'complexity_score': self.complexity_score, |
| 'variability_score': self.variability_score, |
| 'pattern_strength': self.pattern_strength |
| } |
|
|
| class DataProcessor: |
| """Core data processing and analysis functionality""" |
| |
| def __init__(self): |
| """Initialize the data processor""" |
| self.safe_functions = { |
| |
| 'abs': abs, 'round': round, 'min': min, 'max': max, |
| 'sum': sum, 'len': len, 'pow': pow, |
| |
| |
| 'np': np, 'numpy': np, |
| 'sin': np.sin, 'cos': np.cos, 'tan': np.tan, |
| 'exp': np.exp, 'log': np.log, 'sqrt': np.sqrt, |
| 'pi': np.pi, 'e': np.e, |
| |
| |
| 'math': math, |
| |
| |
| '__builtins__': {} |
| } |
| |
| def analyze_csv_data(self, df: pd.DataFrame) -> DataFeatures: |
| """ |
| Comprehensive analysis of CSV data |
| |
| Args: |
| df: Input DataFrame |
| |
| Returns: |
| DataFeatures object with extracted insights |
| """ |
| logger.info(f"Analyzing CSV data with shape {df.shape}") |
| |
| |
| shape = df.shape |
| columns = df.columns.tolist() |
| numeric_df = df.select_dtypes(include=[np.number]) |
| numeric_columns = numeric_df.columns.tolist() |
| data_types = {col: str(df[col].dtype) for col in columns} |
| |
| |
| means = {col: float(numeric_df[col].mean()) for col in numeric_columns} |
| medians = {col: float(numeric_df[col].median()) for col in numeric_columns} |
| stds = {col: float(numeric_df[col].std()) for col in numeric_columns} |
| mins = {col: float(numeric_df[col].min()) for col in numeric_columns} |
| maxs = {col: float(numeric_df[col].max()) for col in numeric_columns} |
| ranges = {col: maxs[col] - mins[col] for col in numeric_columns} |
| |
| |
| trends = self._analyze_trends(numeric_df) |
| correlations = self._find_strongest_correlations(numeric_df) |
| seasonality = self._detect_seasonality(numeric_df) |
| |
| |
| complexity_score = self._calculate_complexity_score(numeric_df) |
| variability_score = self._calculate_variability_score(stds, ranges) |
| pattern_strength = self._calculate_pattern_strength(trends, correlations) |
| |
| return DataFeatures( |
| shape=shape, |
| columns=columns, |
| numeric_columns=numeric_columns, |
| data_types=data_types, |
| means=means, |
| medians=medians, |
| stds=stds, |
| mins=mins, |
| maxs=maxs, |
| ranges=ranges, |
| trends=trends, |
| correlations=correlations, |
| seasonality=seasonality, |
| complexity_score=complexity_score, |
| variability_score=variability_score, |
| pattern_strength=pattern_strength |
| ) |
| |
| def evaluate_formula(self, formula: str, num_points: int = 100) -> Tuple[np.ndarray, Dict[str, Any]]: |
| """ |
| Safely evaluate mathematical formula |
| |
| Args: |
| formula: Mathematical expression (Python/NumPy syntax) |
| num_points: Number of points to generate |
| |
| Returns: |
| Tuple of (result_array, metadata) |
| """ |
| logger.info(f"Evaluating formula: {formula}") |
| |
| try: |
| |
| if 'x' in formula and 'linspace' not in formula and 'arange' not in formula: |
| |
| x = np.linspace(0, 10, num_points) |
| self.safe_functions['x'] = x |
| |
| |
| result = eval(formula, self.safe_functions) |
| |
| |
| if not isinstance(result, np.ndarray): |
| if isinstance(result, (list, tuple)): |
| result = np.array(result) |
| else: |
| |
| result = np.full(num_points, result) |
| |
| |
| metadata = { |
| 'length': len(result), |
| 'min': float(np.min(result)), |
| 'max': float(np.max(result)), |
| 'mean': float(np.mean(result)), |
| 'std': float(np.std(result)), |
| 'range': float(np.max(result) - np.min(result)), |
| 'formula': formula, |
| 'has_pattern': self._detect_mathematical_pattern(result) |
| } |
| |
| return result, metadata |
| |
| except Exception as e: |
| logger.error(f"Formula evaluation failed: {e}") |
| raise ValueError(f"Invalid formula: {e}") |
|
|
| def _analyze_trends(self, df: pd.DataFrame) -> Dict[str, str]: |
| """Analyze trends in numeric columns""" |
| trends = {} |
| for col in df.columns: |
| values = df[col].dropna() |
| if len(values) < 3: |
| trends[col] = 'insufficient_data' |
| continue |
|
|
| |
| x = np.arange(len(values)) |
| slope = np.polyfit(x, values, 1)[0] |
| std_val = values.std() |
|
|
| if abs(slope) < std_val * 0.1: |
| trends[col] = 'stable' |
| elif std_val > values.mean() * 0.5: |
| trends[col] = 'volatile' |
| elif slope > 0: |
| trends[col] = 'increasing' |
| else: |
| trends[col] = 'decreasing' |
|
|
| return trends |
|
|
| def _find_strongest_correlations(self, df: pd.DataFrame) -> Dict[str, float]: |
| """Find strongest correlations between columns""" |
| if len(df.columns) < 2: |
| return {} |
|
|
| corr_matrix = df.corr() |
| correlations = {} |
|
|
| for i, col1 in enumerate(df.columns): |
| for j, col2 in enumerate(df.columns): |
| if i < j: |
| corr_val = corr_matrix.loc[col1, col2] |
| if not np.isnan(corr_val): |
| correlations[f"{col1}_vs_{col2}"] = float(corr_val) |
|
|
| |
| sorted_corr = sorted(correlations.items(), key=lambda x: abs(x[1]), reverse=True) |
| return dict(sorted_corr[:3]) |
|
|
| def _detect_seasonality(self, df: pd.DataFrame) -> Dict[str, bool]: |
| """Simple seasonality detection""" |
| seasonality = {} |
| for col in df.columns: |
| values = df[col].dropna() |
| if len(values) < 12: |
| seasonality[col] = False |
| continue |
|
|
| |
| try: |
| autocorr = np.corrcoef(values[:-1], values[1:])[0, 1] |
| seasonality[col] = not np.isnan(autocorr) and abs(autocorr) > 0.3 |
| except: |
| seasonality[col] = False |
|
|
| return seasonality |
|
|
| def _calculate_complexity_score(self, df: pd.DataFrame) -> float: |
| """Calculate data complexity score (0-1)""" |
| if df.empty: |
| return 0.0 |
|
|
| |
| num_cols = len(df.columns) |
| col_score = min(num_cols / 10, 1.0) |
|
|
| |
| missing_ratio = df.isnull().sum().sum() / (df.shape[0] * df.shape[1]) |
| missing_score = min(missing_ratio * 2, 1.0) |
|
|
| return (col_score + missing_score) / 2 |
|
|
| def _calculate_variability_score(self, stds: Dict[str, float], ranges: Dict[str, float]) -> float: |
| """Calculate data variability score (0-1)""" |
| if not stds: |
| return 0.0 |
|
|
| |
| normalized_vars = [] |
| for col in stds: |
| if ranges[col] > 0: |
| normalized_vars.append(stds[col] / ranges[col]) |
|
|
| if not normalized_vars: |
| return 0.0 |
|
|
| return min(np.mean(normalized_vars) * 2, 1.0) |
|
|
| def _calculate_pattern_strength(self, trends: Dict[str, str], correlations: Dict[str, float]) -> float: |
| """Calculate pattern strength score (0-1)""" |
| pattern_score = 0.0 |
|
|
| |
| trend_patterns = sum(1 for trend in trends.values() if trend in ['increasing', 'decreasing']) |
| trend_score = min(trend_patterns / max(len(trends), 1), 1.0) |
|
|
| |
| if correlations: |
| max_corr = max(abs(corr) for corr in correlations.values()) |
| corr_score = max_corr |
| else: |
| corr_score = 0.0 |
|
|
| return (trend_score + corr_score) / 2 |
|
|
| def _detect_mathematical_pattern(self, data: np.ndarray) -> bool: |
| """Detect if mathematical data has recognizable patterns""" |
| if len(data) < 10: |
| return False |
|
|
| |
| try: |
| |
| autocorr = np.corrcoef(data[:-1], data[1:])[0, 1] |
| return not np.isnan(autocorr) and abs(autocorr) > 0.5 |
| except: |
| return False |
|
|
|
|
| class DataToTextConverter: |
| """Convert data patterns into poetic/narrative text descriptions""" |
|
|
| def __init__(self): |
| """Initialize the converter with descriptive vocabularies""" |
| self.trend_descriptions = { |
| 'increasing': ['ascending', 'rising', 'climbing', 'growing', 'soaring'], |
| 'decreasing': ['descending', 'falling', 'declining', 'diminishing', 'fading'], |
| 'stable': ['steady', 'constant', 'balanced', 'harmonious', 'peaceful'], |
| 'volatile': ['chaotic', 'turbulent', 'dynamic', 'energetic', 'wild'] |
| } |
|
|
| self.pattern_adjectives = { |
| 'high_complexity': ['intricate', 'complex', 'sophisticated', 'elaborate'], |
| 'low_complexity': ['simple', 'pure', 'minimal', 'clean'], |
| 'high_variability': ['diverse', 'varied', 'rich', 'multifaceted'], |
| 'low_variability': ['consistent', 'uniform', 'regular', 'predictable'], |
| 'strong_patterns': ['rhythmic', 'structured', 'organized', 'patterned'], |
| 'weak_patterns': ['random', 'scattered', 'free-flowing', 'organic'] |
| } |
|
|
| self.artistic_metaphors = [ |
| 'like brushstrokes on a canvas', |
| 'resembling musical notes in harmony', |
| 'flowing like water through landscapes', |
| 'dancing with mathematical precision', |
| 'weaving patterns of light and shadow', |
| 'creating symphonies of numbers', |
| 'painting stories with data points', |
| 'sculpting meaning from statistics' |
| ] |
|
|
| def generate_poetic_description(self, features: DataFeatures) -> str: |
| """ |
| Generate poetic description from data features |
| |
| Args: |
| features: DataFeatures object |
| |
| Returns: |
| Poetic text description |
| """ |
| descriptions = [] |
|
|
| |
| descriptions.append(f"A tapestry woven from {features.shape[0]} data points across {features.shape[1]} dimensions") |
|
|
| |
| trend_desc = self._describe_trends(features.trends) |
| if trend_desc: |
| descriptions.append(trend_desc) |
|
|
| |
| var_desc = self._describe_variability(features.variability_score) |
| if var_desc: |
| descriptions.append(var_desc) |
|
|
| |
| pattern_desc = self._describe_patterns(features.pattern_strength, features.correlations) |
| if pattern_desc: |
| descriptions.append(pattern_desc) |
|
|
| |
| import random |
| metaphor = random.choice(self.artistic_metaphors) |
| descriptions.append(f"The data flows {metaphor}") |
|
|
| return '. '.join(descriptions) + '.' |
|
|
| def generate_formula_description(self, formula: str, metadata: Dict[str, Any]) -> str: |
| """ |
| Generate poetic description for mathematical formula |
| |
| Args: |
| formula: Original formula |
| metadata: Formula evaluation metadata |
| |
| Returns: |
| Poetic text description |
| """ |
| descriptions = [] |
|
|
| |
| descriptions.append(f"Mathematical harmony emerges from the expression: {formula}") |
|
|
| |
| range_val = metadata['range'] |
| if range_val > 10: |
| descriptions.append("The function soars across vast numerical landscapes") |
| elif range_val > 1: |
| descriptions.append("Values dance within moderate bounds") |
| else: |
| descriptions.append("Numbers whisper in gentle, subtle variations") |
|
|
| |
| if metadata['has_pattern']: |
| descriptions.append("Revealing intricate patterns that speak to the soul") |
| else: |
| descriptions.append("Creating unique, unrepeatable mathematical poetry") |
|
|
| |
| import random |
| metaphor = random.choice(self.artistic_metaphors) |
| descriptions.append(f"Each calculation {metaphor}") |
|
|
| return '. '.join(descriptions) + '.' |
|
|
| def _describe_trends(self, trends: Dict[str, str]) -> str: |
| """Describe overall trends in the data""" |
| if not trends: |
| return "" |
|
|
| trend_counts = {} |
| for trend in trends.values(): |
| trend_counts[trend] = trend_counts.get(trend, 0) + 1 |
|
|
| dominant_trend = max(trend_counts, key=trend_counts.get) |
|
|
| if dominant_trend in self.trend_descriptions: |
| import random |
| adj = random.choice(self.trend_descriptions[dominant_trend]) |
| return f"The data reveals {adj} patterns throughout its structure" |
|
|
| return "" |
|
|
| def _describe_variability(self, variability_score: float) -> str: |
| """Describe data variability""" |
| import random |
|
|
| if variability_score > 0.7: |
| adj = random.choice(self.pattern_adjectives['high_variability']) |
| return f"With {adj} expressions of numerical diversity" |
| elif variability_score < 0.3: |
| adj = random.choice(self.pattern_adjectives['low_variability']) |
| return f"Maintaining {adj} elegance in its values" |
| else: |
| return "Balancing consistency with creative variation" |
|
|
| def _describe_patterns(self, pattern_strength: float, correlations: Dict[str, float]) -> str: |
| """Describe pattern strength and correlations""" |
| import random |
|
|
| if pattern_strength > 0.6: |
| adj = random.choice(self.pattern_adjectives['strong_patterns']) |
| return f"Displaying {adj} relationships between its elements" |
| elif pattern_strength < 0.3: |
| adj = random.choice(self.pattern_adjectives['weak_patterns']) |
| return f"Embracing {adj} freedom in its numerical expression" |
| else: |
| return "Weaving subtle connections throughout its numerical fabric" |
|
|
|
|
| class DataVisualizer: |
| """Create visualizations from data for artistic conditioning""" |
|
|
| def __init__(self, style: str = 'artistic'): |
| """ |
| Initialize visualizer |
| |
| Args: |
| style: Visualization style ('artistic', 'scientific', 'minimal') |
| """ |
| self.style = style |
| self.color_palettes = { |
| 'artistic': ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7'], |
| 'scientific': ['#2E86AB', '#A23B72', '#F18F01', '#C73E1D', '#592E83'], |
| 'minimal': ['#2C3E50', '#34495E', '#7F8C8D', '#95A5A6', '#BDC3C7'] |
| } |
|
|
| def create_data_visualization(self, df: pd.DataFrame, features: DataFeatures) -> Image.Image: |
| """ |
| Create artistic visualization from DataFrame |
| |
| Args: |
| df: Input DataFrame |
| features: DataFeatures object |
| |
| Returns: |
| PIL Image of the visualization |
| """ |
| plt.style.use('default') |
| fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
| fig.suptitle('Data Pattern Visualization', fontsize=16, fontweight='bold') |
|
|
| numeric_df = df.select_dtypes(include=[np.number]) |
| colors = self.color_palettes[self.style] |
|
|
| |
| ax1 = axes[0, 0] |
| for i, col in enumerate(numeric_df.columns[:3]): |
| ax1.plot(numeric_df[col], color=colors[i % len(colors)], |
| linewidth=2, alpha=0.8, label=col) |
| ax1.set_title('Data Trends', fontweight='bold') |
| ax1.legend() |
| ax1.grid(True, alpha=0.3) |
|
|
| |
| ax2 = axes[0, 1] |
| if len(numeric_df.columns) > 0: |
| col = numeric_df.columns[0] |
| ax2.hist(numeric_df[col].dropna(), bins=20, color=colors[0], |
| alpha=0.7, edgecolor='black') |
| ax2.set_title(f'Distribution: {col}', fontweight='bold') |
| ax2.grid(True, alpha=0.3) |
|
|
| |
| ax3 = axes[1, 0] |
| if len(numeric_df.columns) > 1: |
| corr_matrix = numeric_df.corr() |
| im = ax3.imshow(corr_matrix, cmap='RdBu_r', aspect='auto', vmin=-1, vmax=1) |
| ax3.set_xticks(range(len(corr_matrix.columns))) |
| ax3.set_yticks(range(len(corr_matrix.columns))) |
| ax3.set_xticklabels(corr_matrix.columns, rotation=45) |
| ax3.set_yticklabels(corr_matrix.columns) |
| ax3.set_title('Correlations', fontweight='bold') |
| plt.colorbar(im, ax=ax3, shrink=0.8) |
| else: |
| ax3.text(0.5, 0.5, 'Single Column\nNo Correlations', |
| ha='center', va='center', transform=ax3.transAxes) |
| ax3.set_title('Correlations', fontweight='bold') |
|
|
| |
| ax4 = axes[1, 1] |
| if len(numeric_df.columns) > 0: |
| stats_data = [features.means[col] for col in numeric_df.columns[:5]] |
| bars = ax4.bar(range(len(stats_data)), stats_data, color=colors[:len(stats_data)]) |
| ax4.set_title('Mean Values', fontweight='bold') |
| ax4.set_xticks(range(len(stats_data))) |
| ax4.set_xticklabels([col[:8] for col in numeric_df.columns[:5]], rotation=45) |
| ax4.grid(True, alpha=0.3) |
|
|
| plt.tight_layout() |
|
|
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
| plt.close() |
| buf.seek(0) |
|
|
| return Image.open(buf) |
|
|
| def create_formula_visualization(self, data: np.ndarray, formula: str, metadata: Dict[str, Any]) -> Image.Image: |
| """ |
| Create artistic visualization from formula result |
| |
| Args: |
| data: Formula result array |
| formula: Original formula |
| metadata: Formula metadata |
| |
| Returns: |
| PIL Image of the visualization |
| """ |
| try: |
| logger.info(f"Creating visualization for formula: {formula}") |
| logger.info(f"Data shape: {data.shape}, Data range: [{np.min(data):.3f}, {np.max(data):.3f}]") |
|
|
| plt.style.use('default') |
| fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
| fig.suptitle(f'Mathematical Pattern: {formula}', fontsize=14, fontweight='bold') |
|
|
| colors = self.color_palettes[self.style] |
| x = np.arange(len(data)) |
|
|
| |
| ax1 = axes[0, 0] |
| ax1.plot(x, data, color=colors[0], linewidth=3, alpha=0.8) |
| ax1.fill_between(x, data, alpha=0.3, color=colors[0]) |
| ax1.set_title('Function Values', fontweight='bold') |
| ax1.grid(True, alpha=0.3) |
|
|
| |
| ax2 = axes[0, 1] |
| if len(data) > 1: |
| derivative = np.gradient(data) |
| ax2.plot(x, derivative, color=colors[1], linewidth=2) |
| ax2.set_title('Rate of Change', fontweight='bold') |
| ax2.grid(True, alpha=0.3) |
|
|
| |
| ax3 = axes[1, 0] |
| ax3.hist(data, bins=30, color=colors[2], alpha=0.7, edgecolor='black') |
| ax3.set_title('Value Distribution', fontweight='bold') |
| ax3.grid(True, alpha=0.3) |
|
|
| |
| ax4 = axes[1, 1] |
| if len(data) > 1: |
| ax4.scatter(data[:-1], data[1:], c=x[:-1], cmap='viridis', alpha=0.6) |
| ax4.set_xlabel('f(t)') |
| ax4.set_ylabel('f(t+1)') |
| ax4.set_title('Phase Space', fontweight='bold') |
| ax4.grid(True, alpha=0.3) |
|
|
| plt.tight_layout() |
|
|
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
| plt.close() |
| buf.seek(0) |
|
|
| image = Image.open(buf) |
| logger.info(f"Successfully created visualization image: {image.size}") |
| return image |
|
|
| except Exception as e: |
| logger.error(f"Error creating formula visualization: {e}") |
| plt.close('all') |
|
|
| |
| fig, ax = plt.subplots(figsize=(8, 6)) |
| ax.text(0.5, 0.5, f'Visualization Error:\n{str(e)}', |
| ha='center', va='center', fontsize=12, |
| bbox=dict(boxstyle="round,pad=0.3", facecolor="lightcoral")) |
| ax.set_xlim(0, 1) |
| ax.set_ylim(0, 1) |
| ax.axis('off') |
|
|
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
| plt.close() |
| buf.seek(0) |
|
|
| return Image.open(buf) |
|
|