gaurv007 commited on
Commit
e07ed35
Β·
verified Β·
1 Parent(s): 55687f7

Upload alpha_factory/orchestration/pipeline.py with huggingface_hub

Browse files
alpha_factory/orchestration/pipeline.py ADDED
@@ -0,0 +1,188 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Pipeline Orchestrator β€” the DAG that connects all agents.
3
+ Runs the full alpha generation β†’ evaluation β†’ promotion loop.
4
+ """
5
+ import asyncio
6
+ from datetime import datetime
7
+ from rich.console import Console
8
+ from rich.panel import Panel
9
+
10
+ from ..config import Config, load_config
11
+ from ..infra import LLMClient, FactorStore, BrainClient
12
+ from ..deterministic import lint, quick_dedup_hash, pick_theme, compute_fitness
13
+ from ..personas import (
14
+ generate_hypothesis,
15
+ compile_expression,
16
+ scout_novelty,
17
+ diagnose_performance,
18
+ gate_alpha,
19
+ )
20
+ from ..schemas import Verdict
21
+
22
+ console = Console()
23
+
24
+
25
+ class AlphaPipeline:
26
+ """
27
+ The full alpha generation pipeline.
28
+ One run = one batch of N candidate alphas through all stages.
29
+ """
30
+
31
+ def __init__(self, config: Config):
32
+ self.config = config
33
+ self.llm = LLMClient(config.llm)
34
+ self.store = FactorStore(config.paths.factor_store / "alphas.duckdb")
35
+ self.brain: BrainClient | None = None # initialized in run()
36
+
37
+ # Counters for kill switches
38
+ self._consecutive_lint_fails = 0
39
+ self._consecutive_kills = 0
40
+ self._daily_submissions = 0
41
+
42
+ async def run_batch(self, batch_size: int | None = None):
43
+ """Run one batch of alpha generation + evaluation."""
44
+ batch_size = batch_size or self.config.batch_size
45
+
46
+ console.print(Panel(
47
+ f"[bold green]Alpha Factory[/] β€” Batch of {batch_size} candidates\n"
48
+ f"[dim]{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}[/]",
49
+ title="🏭 Pipeline Start"
50
+ ))
51
+
52
+ # Load current library state
53
+ existing_themes = self.store.get_all_themes()
54
+ existing_tags = self.store.get_all_anomaly_tags()
55
+ dead_themes = self.store.get_dead_themes()
56
+ existing_hashes = self.store.get_expression_hashes()
57
+
58
+ promoted = 0
59
+ iterated = 0
60
+ killed = 0
61
+
62
+ for i in range(batch_size):
63
+ console.print(f"\n[bold]━━━ Candidate {i+1}/{batch_size} ━━━[/]")
64
+
65
+ # ─── Kill switch checks ──────────────────────────────
66
+ if self._check_kill_switches():
67
+ console.print("[red]⚠️ KILL SWITCH TRIGGERED β€” stopping batch[/]")
68
+ break
69
+
70
+ try:
71
+ result = await self._run_single_candidate(
72
+ existing_themes, existing_tags, dead_themes, existing_hashes
73
+ )
74
+
75
+ if result == Verdict.PROMOTE:
76
+ promoted += 1
77
+ elif result == Verdict.ITERATE:
78
+ iterated += 1
79
+ else:
80
+ killed += 1
81
+ self._consecutive_kills += 1
82
+
83
+ # Reset kill counter on non-kill
84
+ if result != Verdict.KILL:
85
+ self._consecutive_kills = 0
86
+
87
+ except Exception as e:
88
+ console.print(f"[red]Error: {e}[/]")
89
+ killed += 1
90
+
91
+ # Summary
92
+ console.print(Panel(
93
+ f"[green]Promoted:[/] {promoted} [yellow]Iterate:[/] {iterated} [red]Killed:[/] {killed}\n"
94
+ f"Tokens used: {self.llm.tokens_used:,} | BRAIN submissions: {self._daily_submissions}",
95
+ title="πŸ“Š Batch Complete"
96
+ ))
97
+
98
+ return {"promoted": promoted, "iterated": iterated, "killed": killed}
99
+
100
+ async def _run_single_candidate(
101
+ self,
102
+ existing_themes: list[str],
103
+ existing_tags: list[str],
104
+ dead_themes: list[str],
105
+ existing_hashes: set[str],
106
+ ) -> Verdict:
107
+ """Run a single candidate through the full pipeline."""
108
+
109
+ # ─── STEP 1: Pick theme (deterministic) ─────────────────
110
+ theme = pick_theme(existing_themes, existing_tags, dead_themes)
111
+ console.print(f" [cyan]Theme:[/] {theme}")
112
+
113
+ # ─── STEP 2: Generate hypothesis (Microfish LLM) ────────
114
+ # TODO: RAG retrieval from ChromaDB would go here
115
+ retrieved_papers = [] # Placeholder β€” implement with ChromaDB
116
+
117
+ blueprint = await generate_hypothesis(
118
+ self.llm, theme, retrieved_papers, existing_tags
119
+ )
120
+ console.print(f" [cyan]Blueprint:[/] {blueprint.archetype} | {blueprint.anomaly_tag.value}")
121
+ console.print(f" [dim]Novelty: {blueprint.novelty_claim[:80]}...[/]")
122
+
123
+ # ─── STEP 3: Compile expression (Jinja/Tinyfish) ────────
124
+ expression = await compile_expression(blueprint, self.llm)
125
+ console.print(f" [cyan]Expression:[/] {expression.expression[:80]}...")
126
+
127
+ # ─── STEP 4: Static lint (deterministic) ────────────────
128
+ lint_result = lint(expression.expression)
129
+ if not lint_result.passed:
130
+ console.print(f" [red]LINT FAIL:[/] {lint_result.errors}")
131
+ self._consecutive_lint_fails += 1
132
+ return Verdict.KILL
133
+
134
+ self._consecutive_lint_fails = 0
135
+ if lint_result.warnings:
136
+ console.print(f" [yellow]Warnings:[/] {lint_result.warnings}")
137
+
138
+ # ─── STEP 5: Dedup check (deterministic) ────────────────
139
+ alpha_id = quick_dedup_hash(
140
+ expression.expression, blueprint.neutralization.value, blueprint.decay
141
+ )
142
+ if alpha_id in existing_hashes:
143
+ console.print(f" [red]DEDUP:[/] Already exists in factor store")
144
+ return Verdict.KILL
145
+
146
+ # ─── STEP 6: Store candidate ────────────────────────────
147
+ self.store.insert_alpha(
148
+ alpha_id=alpha_id,
149
+ expression=expression.expression,
150
+ neutralization=blueprint.neutralization.value,
151
+ decay=blueprint.decay,
152
+ fields_used=expression.fields_used,
153
+ operators_used=expression.operators_used,
154
+ archetype=expression.archetype_used,
155
+ theme=theme,
156
+ anomaly_tag=blueprint.anomaly_tag.value,
157
+ academic_anchor=blueprint.academic_anchor,
158
+ )
159
+
160
+ # ─── STEP 7: BRAIN submission (if client available) ─────
161
+ # In dry-run mode, skip BRAIN and use mock metrics
162
+ if self.brain is None:
163
+ console.print(" [yellow]DRY RUN:[/] Skipping BRAIN submission (no client configured)")
164
+ console.print(f" [green]βœ“ Candidate {alpha_id} passed all pre-submission checks[/]")
165
+ return Verdict.ITERATE # Can't promote without real metrics
166
+
167
+ # TODO: Submit to BRAIN, poll results, harvest metrics
168
+ # metrics = await self._submit_and_harvest(alpha_id, expression, blueprint)
169
+ # ... crowd scout, surgeon, gatekeeper follow
170
+
171
+ return Verdict.ITERATE
172
+
173
+ def _check_kill_switches(self) -> bool:
174
+ """Check if any kill switch has triggered."""
175
+ if self._consecutive_lint_fails >= self.config.kill.consecutive_lint_fail_max:
176
+ console.print("[red]Kill switch: too many consecutive lint failures[/]")
177
+ return True
178
+ if self._consecutive_kills >= self.config.kill.consecutive_kill_verdict_max:
179
+ console.print("[red]Kill switch: too many consecutive kill verdicts[/]")
180
+ return True
181
+ if self._daily_submissions >= self.config.kill.daily_brain_submissions_max:
182
+ console.print("[red]Kill switch: daily submission limit reached[/]")
183
+ return True
184
+ return False
185
+
186
+ def close(self):
187
+ """Clean up resources."""
188
+ self.store.close()