| """ |
| Winner Memory — Feedback loop component. |
| Tracks what works and what doesn't, feeds patterns back to generation. |
| |
| Key principle: The system LEARNS from BRAIN results. |
| - Winning fields get higher priority in future generation |
| - Failing patterns get blacklisted |
| - Near-misses get auto-iterated |
| """ |
| import duckdb |
| from pathlib import Path |
| from dataclasses import dataclass |
| from typing import Optional |
|
|
|
|
| @dataclass |
| class WinnerPattern: |
| """A pattern extracted from successful alphas.""" |
| field_id: str |
| archetype: str |
| group_key: str |
| decay: int |
| sharpe: float |
| theme: str |
|
|
|
|
| @dataclass |
| class FailurePattern: |
| """A pattern extracted from failed alphas.""" |
| field_id: str |
| archetype: str |
| reason: str |
|
|
|
|
| class WinnerMemory: |
| """ |
| Stores and retrieves patterns from BRAIN simulation results. |
| Used by hypothesis hunter and template generator to prioritize what works. |
| """ |
| |
| def __init__(self, db_path: Path): |
| self.db_path = db_path |
| self.conn = duckdb.connect(str(db_path)) |
| self._ensure_tables() |
| |
| def _ensure_tables(self): |
| self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS winner_patterns ( |
| field_id VARCHAR, |
| archetype VARCHAR, |
| group_key VARCHAR, |
| decay INTEGER, |
| sharpe DOUBLE, |
| theme VARCHAR, |
| discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS failure_patterns ( |
| field_id VARCHAR, |
| archetype VARCHAR, |
| reason VARCHAR, |
| expression_hash VARCHAR, |
| recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS iteration_queue ( |
| alpha_id VARCHAR, |
| expression VARCHAR, |
| sharpe DOUBLE, |
| turnover DOUBLE, |
| suggestion VARCHAR, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| iterated BOOLEAN DEFAULT FALSE |
| ) |
| """) |
| |
| def record_winner(self, field_id: str, archetype: str, group_key: str, |
| decay: int, sharpe: float, theme: str): |
| """Record a winning pattern (Sharpe > 1.25).""" |
| self.conn.execute(""" |
| INSERT INTO winner_patterns (field_id, archetype, group_key, decay, sharpe, theme) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, [field_id, archetype, group_key, decay, sharpe, theme]) |
| |
| def record_failure(self, field_id: str, archetype: str, reason: str, expr_hash: str): |
| """Record a failure pattern.""" |
| self.conn.execute(""" |
| INSERT INTO failure_patterns (field_id, archetype, reason, expression_hash) |
| VALUES (?, ?, ?, ?) |
| """, [field_id, archetype, reason, expr_hash]) |
| |
| def queue_for_iteration(self, alpha_id: str, expression: str, |
| sharpe: float, turnover: float, suggestion: str): |
| """Queue a near-miss alpha for auto-iteration.""" |
| self.conn.execute(""" |
| INSERT INTO iteration_queue (alpha_id, expression, sharpe, turnover, suggestion) |
| VALUES (?, ?, ?, ?, ?) |
| """, [alpha_id, expression, sharpe, turnover, suggestion]) |
| |
| def get_winning_fields(self, min_sharpe: float = 1.5) -> list[str]: |
| """Get field IDs that have produced winners.""" |
| rows = self.conn.execute(""" |
| SELECT DISTINCT field_id FROM winner_patterns |
| WHERE sharpe >= ? ORDER BY sharpe DESC |
| """, [min_sharpe]).fetchall() |
| return [r[0] for r in rows] |
| |
| def get_winning_archetypes(self) -> list[tuple[str, float]]: |
| """Get archetypes ranked by average Sharpe.""" |
| rows = self.conn.execute(""" |
| SELECT archetype, AVG(sharpe) as avg_sharpe, COUNT(*) as cnt |
| FROM winner_patterns |
| GROUP BY archetype |
| ORDER BY avg_sharpe DESC |
| """).fetchall() |
| return [(r[0], r[1]) for r in rows] |
| |
| def get_failed_fields(self) -> set[str]: |
| """Get fields that consistently fail (3+ failures, no wins).""" |
| fail_rows = self.conn.execute(""" |
| SELECT field_id, COUNT(*) as fail_count |
| FROM failure_patterns |
| GROUP BY field_id |
| HAVING fail_count >= 3 |
| """).fetchall() |
| |
| failed = set() |
| for field_id, _ in fail_rows: |
| |
| win = self.conn.execute(""" |
| SELECT 1 FROM winner_patterns WHERE field_id = ? LIMIT 1 |
| """, [field_id]).fetchone() |
| if not win: |
| failed.add(field_id) |
| return failed |
| |
| def get_iteration_queue(self, limit: int = 10) -> list[dict]: |
| """Get alphas queued for iteration.""" |
| rows = self.conn.execute(""" |
| SELECT alpha_id, expression, sharpe, turnover, suggestion |
| FROM iteration_queue |
| WHERE iterated = FALSE |
| ORDER BY sharpe DESC |
| LIMIT ? |
| """, [limit]).fetchall() |
| return [ |
| {"alpha_id": r[0], "expression": r[1], "sharpe": r[2], |
| "turnover": r[3], "suggestion": r[4]} |
| for r in rows |
| ] |
| |
| def mark_iterated(self, alpha_id: str): |
| """Mark an iteration queue item as processed.""" |
| self.conn.execute(""" |
| UPDATE iteration_queue SET iterated = TRUE WHERE alpha_id = ? |
| """, [alpha_id]) |
| |
| def get_best_config(self) -> Optional[dict]: |
| """Get the best-performing configuration from winners.""" |
| row = self.conn.execute(""" |
| SELECT field_id, archetype, group_key, decay, sharpe |
| FROM winner_patterns |
| ORDER BY sharpe DESC LIMIT 1 |
| """).fetchone() |
| if row: |
| return { |
| "field_id": row[0], "archetype": row[1], |
| "group_key": row[2], "decay": row[3], "sharpe": row[4], |
| } |
| return None |
| |
| def suggest_iteration(self, sharpe: float, turnover: float) -> str: |
| """Suggest what to change for a near-miss alpha.""" |
| if turnover > 0.70: |
| return "increase_decay" |
| elif sharpe < 0.5: |
| return "sign_flip" |
| elif sharpe < 1.0: |
| return "change_horizon" |
| else: |
| return "change_neutralization" |
| |
| def close(self): |
| self.conn.close() |
|
|