| """ |
| Integration tests for the AlphaFactory pipeline. |
| Tests both proven template mode and LLM mode (with mocked LLM). |
| """ |
| import pytest |
| import asyncio |
| from pathlib import Path |
| import tempfile |
| import shutil |
|
|
| from alpha_factory.config import Config, load_config |
| from alpha_factory.schemas import Blueprint, Component, Neutralization, AnomalyTag, BrainMetrics, Verdict |
| from alpha_factory.data.brain_fields import FIELD_INDEX, BrainField, SignConvention, DatasetTier |
| from alpha_factory.deterministic.proven_templates import generate_batch_from_proven_templates, generate_alpha15_variant |
| from alpha_factory.deterministic.expression_mutator import generate_mutations, mutate_decay, mutate_horizon, mutate_neutralization |
| from alpha_factory.deterministic.lint import lint, quick_dedup_hash |
| from alpha_factory.deterministic.fitness import compute_fitness, would_pass_brain |
| from alpha_factory.infra.winner_memory import WinnerMemory |
| from alpha_factory.data.brain_groups import PRODUCTION_GROUPS, get_group_for_expression |
|
|
|
|
| class TestProvenTemplates: |
| def test_alpha15_generates_valid_expression(self): |
| field = BrainField( |
| "test_field", "test", 1.0, 0, "Test field", "Test", |
| SignConvention.LONG_HIGH, DatasetTier.TIER1 |
| ) |
| expr = generate_alpha15_variant(field, group_key="subindustry", decay=5) |
| assert "ts_decay_linear(" in expr |
| assert "group_neutralize(" in expr |
| assert "ts_rank(" in expr |
| assert "test_field" in expr |
| assert expr.startswith("ts_decay_linear(") |
| |
| def test_alpha15_long_low_sign(self): |
| field = BrainField( |
| "test_field", "test", 1.0, 0, "Test field", "Test", |
| SignConvention.LONG_LOW, DatasetTier.TIER1 |
| ) |
| expr = generate_alpha15_variant(field, group_key="subindustry", decay=5) |
| |
| assert "-zscore" in expr |
| |
| def test_alpha15_custom_decay(self): |
| field = BrainField( |
| "test_field", "test", 1.0, 0, "Test field", "Test", |
| SignConvention.LONG_HIGH, DatasetTier.TIER1 |
| ) |
| expr = generate_alpha15_variant(field, group_key="subindustry", decay=10) |
| assert "ts_decay_linear(" in expr |
| assert ", 10)" in expr |
| assert ", 5)" not in expr |
| |
| def test_batch_generation(self): |
| batch = generate_batch_from_proven_templates(count=3) |
| assert len(batch) <= 3 |
| assert all("expression" in b for b in batch) |
| assert all("field_id" in b for b in batch) |
| |
| def test_all_generated_expressions_pass_lint(self): |
| batch = generate_batch_from_proven_templates(count=10) |
| for alpha in batch: |
| result = lint(alpha["expression"]) |
| assert result.passed, f"Lint failed for {alpha['template']}: {result.errors}" |
| |
| def test_batch_no_duplicate_fields(self): |
| batch = generate_batch_from_proven_templates(count=20) |
| field_ids = [b["field_id"] for b in batch] |
| assert len(field_ids) == len(set(field_ids)), "Duplicate fields in batch" |
|
|
|
|
| class TestExpressionMutator: |
| def test_mutate_decay(self): |
| expr = "ts_decay_linear(group_neutralize(rank(close), subindustry), 5)" |
| variants = mutate_decay(expr, 5) |
| assert len(variants) > 0 |
| assert all("ts_decay_linear(" in v["expression"] for v in variants) |
| assert all(v["decay"] != 5 for v in variants) |
| |
| def test_mutate_horizon(self): |
| expr = "ts_decay_linear(group_neutralize(zscore(ts_rank(volume, 252)), subindustry), 5)" |
| variants = mutate_horizon(expr) |
| assert len(variants) > 0 |
| |
| def test_mutate_neutralization(self): |
| expr = "ts_decay_linear(group_neutralize(rank(close), subindustry), 5)" |
| variants = mutate_neutralization(expr) |
| if variants: |
| assert all("subindustry" not in v["expression"] for v in variants) |
| |
| def test_generate_mutations_comprehensive(self): |
| expr = "ts_decay_linear(group_neutralize(zscore(ts_rank(volume, 252)), subindustry), 5)" |
| variants = generate_mutations(expr, decay=5) |
| assert len(variants) >= 3 |
|
|
|
|
| class TestLint: |
| def test_alpha15_expression_passes_lint(self): |
| field = BrainField( |
| "standardized_unexpected_earnings_2", "model77", 0.92, 0, |
| "SUE", "Model", SignConvention.LONG_HIGH, DatasetTier.TIER1 |
| ) |
| expr = generate_alpha15_variant(field) |
| result = lint(expr) |
| assert result.passed, f"Lint errors: {result.errors}" |
| assert len(result.warnings) <= 2 |
|
|
| def test_pv13_field_id_in_expression(self): |
| """Test that corrected pv13_customer field IDs are accepted.""" |
| from alpha_factory.data.brain_fields import FIELD_INDEX |
| assert "pv13_customergraphrank_auth_rank" in FIELD_INDEX |
| assert "pv13_customergraphrank_page_rank" in FIELD_INDEX |
| |
| def test_mutate_vol_scale_with_uppercase_fields(self): |
| """Test vol_scale mutation handles uppercase field IDs (e.g., mdl77_2GlobalDev...).""" |
| |
| expr = "ts_decay_linear(group_neutralize(rank(ts_rank(mdl77_2GlobalDevField, 252)), subindustry), 5)" |
| from alpha_factory.deterministic.expression_mutator import mutate_vol_scale |
| variants = mutate_vol_scale(expr) |
| |
| assert len(variants) > 0, "vol_scale should match uppercase field IDs" |
| assert "ts_std(" in variants[0]["expression"], "vol_scale should wrap field with ts_std" |
| |
| def test_no_typos_in_field_registry(self): |
| """Verify no 'ustomer' typos remain (missing 'c' in customer).""" |
| for fid in FIELD_INDEX: |
| assert "_ustomergraphrank" not in fid, f"Typo found in field: {fid}" |
|
|
| def test_valid_brain_fields_not_in_fake_list(self): |
| """Verify common BRAIN fields are not in FAKE_FIELDS.""" |
| from alpha_factory.cleanup import FAKE_FIELDS |
| valid_fields = {"close", "high", "low", "volume", "vwap", "open", |
| "returns", "ts_returns", "bid_ask_spread", "volatility"} |
| for f in valid_fields: |
| assert f not in FAKE_FIELDS, f"Valid BRAIN field '{f}' is in FAKE_FIELDS" |
|
|
|
|
| class TestConfig: |
| def test_config_loads(self): |
| config = load_config() |
| assert config.batch_size >= 1 |
| assert config.kill.daily_llm_token_budget > 0 |
| |
| def test_config_paths_created(self): |
| config = load_config() |
| assert config.paths.data.exists() or config.paths.data.parent.exists() |
| assert config.paths.factor_store.exists() or config.paths.factor_store.parent.exists() |
| |
| def test_config_proven_templates(self): |
| config = load_config() |
| config.use_proven_templates = True |
| assert config.use_proven_templates |
|
|
|
|
| class TestWinnerMemory: |
| def test_winner_memory_basic(self): |
| with tempfile.TemporaryDirectory() as tmpdir: |
| db_path = Path(tmpdir) / "test.duckdb" |
| wm = WinnerMemory(db_path) |
| |
| wm.record_winner("test_field", "alpha15", "subindustry", 5, 1.5, "momentum") |
| winners = wm.get_winning_fields(min_sharpe=1.0) |
| assert "test_field" in winners |
| |
| |
| wm.record_failure("bad_field", "alpha6", "low_sharpe", "hash1") |
| wm.record_failure("bad_field", "alpha6", "high_turnover", "hash2") |
| wm.record_failure("bad_field", "alpha6", "flat_line", "hash3") |
| failed = wm.get_failed_fields() |
| assert "bad_field" in failed |
| |
| wm.close() |
| |
| def test_iteration_queue(self): |
| with tempfile.TemporaryDirectory() as tmpdir: |
| db_path = Path(tmpdir) / "test.duckdb" |
| wm = WinnerMemory(db_path) |
| |
| wm.queue_for_iteration("alpha_1", "rank(close)", 1.0, 0.3, "increase_decay") |
| queue = wm.get_iteration_queue(limit=10) |
| assert len(queue) == 1 |
| assert queue[0]["alpha_id"] == "alpha_1" |
| |
| wm.mark_iterated("alpha_1") |
| queue = wm.get_iteration_queue(limit=10) |
| assert len(queue) == 0 |
| |
| wm.close() |
|
|
|
|
| class TestBrainGroups: |
| def test_novel_groups_exist(self): |
| assert len(PRODUCTION_GROUPS) > 0 |
| |
| def test_group_coverage(self): |
| for g in PRODUCTION_GROUPS: |
| assert g.coverage >= 0.90, f"Group {g.id} coverage too low: {g.coverage}" |
| assert g.alpha_count <= 30, f"Group {g.id} AC too high: {g.alpha_count}" |
|
|
| def test_get_group_returns_string(self): |
| group = get_group_for_expression(prefer_novel=True) |
| assert isinstance(group, str) |
| assert len(group) > 0 |
|
|
|
|
| class TestFitness: |
| def test_fitness_penalizes_high_correlation(self): |
| metrics = BrainMetrics( |
| alpha_id="test", sharpe_full=1.5, sharpe_is=1.6, sharpe_os=1.4, |
| fitness=1.2, turnover=0.25, returns=0.1, max_drawdown=0.04, |
| yearly_sharpe=[1.2, 1.5, 1.3, 1.4, 1.6], yearly_returns=[0.02]*5, |
| ) |
| low_corr = compute_fitness(metrics, max_corr_to_library=0.1, theme_novelty_score=0.5) |
| high_corr = compute_fitness(metrics, max_corr_to_library=0.8, theme_novelty_score=0.5) |
| assert low_corr > high_corr, "Lower correlation should give higher fitness" |
|
|
| def test_would_pass_brain_comprehensive(self): |
| good = BrainMetrics( |
| alpha_id="test", sharpe_full=1.5, sharpe_is=1.6, sharpe_os=1.4, |
| fitness=1.2, turnover=0.25, returns=0.1, max_drawdown=0.04, |
| yearly_sharpe=[1.2, 1.5, 1.3, 1.4, 1.6], yearly_returns=[0.02]*5, |
| ) |
| result = would_pass_brain(good) |
| assert result["overall_pass"] == True |
| |
| bad = BrainMetrics( |
| alpha_id="test", sharpe_full=0.5, sharpe_is=0.6, sharpe_os=0.4, |
| fitness=0.3, turnover=0.8, returns=-0.05, max_drawdown=0.15, |
| yearly_sharpe=[-0.5, 0.1, -0.3, 0.2, 0.0], yearly_returns=[-0.01]*5, |
| ) |
| result = would_pass_brain(bad) |
| assert result["overall_pass"] == False |
|
|
|
|
| class TestFieldRegistry: |
| def test_goldmine_fields_have_zero_ac(self): |
| from alpha_factory.data.brain_fields import GOLDMINE_FIELDS |
| for f in GOLDMINE_FIELDS: |
| assert f.alpha_count == 0, f"Goldmine field {f.id} has AC={f.alpha_count}" |
| |
| def test_all_fields_unique(self): |
| from alpha_factory.data.brain_fields import ALL_FIELDS |
| ids = [f.id for f in ALL_FIELDS] |
| assert len(ids) == len(set(ids)), "Duplicate field IDs" |
| |
| def test_field_index_complete(self): |
| from alpha_factory.data.brain_fields import ALL_FIELDS, FIELD_INDEX |
| assert len(FIELD_INDEX) == len(ALL_FIELDS) |
| |
| def test_coverage_in_range(self): |
| from alpha_factory.data.brain_fields import ALL_FIELDS |
| for f in ALL_FIELDS: |
| assert 0.0 <= f.coverage <= 1.0, f"Field {f.id} coverage out of range" |
|
|
|
|
| class TestOperatorsCSV: |
| def test_operator_arity_consistency(self): |
| """Verify operators.csv contains all operators in lint.py OPERATOR_ARITY.""" |
| from alpha_factory.deterministic.lint import OPERATOR_ARITY |
| import csv |
| from pathlib import Path |
| |
| csv_path = Path("data/operators.csv") |
| if not csv_path.exists(): |
| pytest.skip("operators.csv not found") |
| |
| csv_ops = set() |
| with open(csv_path) as f: |
| reader = csv.DictReader(f) |
| for row in reader: |
| op_name = row["name"].strip().lower() |
| csv_ops.add(op_name) |
| |
| |
| for op_name in OPERATOR_ARITY: |
| assert op_name in csv_ops, f"Operator '{op_name}' in lint.py but not in operators.csv" |
|
|
|
|
| class TestAsyncPipeline: |
| def test_proven_template_pipeline(self): |
| """Test running a batch in proven template mode.""" |
| import asyncio |
| from alpha_factory.orchestration.pipeline import AlphaPipeline |
| config = load_config() |
| config.batch_size = 3 |
| config.use_proven_templates = True |
| config.enable_brain_client = False |
| |
| pipeline = AlphaPipeline(config) |
| |
| async def _run(): |
| return await pipeline.run_batch(3) |
| |
| result = asyncio.run(_run()) |
| |
| |
| assert "promoted" in result or "iterated" in result or "killed" in result |
| total = result.get("promoted", 0) + result.get("iterated", 0) + result.get("killed", 0) |
| assert total == 3, f"Expected 3 results, got {total}" |
| |
| pipeline.close() |
|
|
|
|
| class TestModelManager: |
| def test_model_manager_defaults(self): |
| from alpha_factory.infra.model_manager import ModelManager, DEFAULTS |
| mm = ModelManager() |
| assert "microfish" in DEFAULTS |
| assert "mediumfish" in DEFAULTS |
| |
| def test_model_manager_get_endpoint(self): |
| from alpha_factory.infra.model_manager import ModelManager, ModelProvider |
| mm = ModelManager() |
| mm.selected["mediumfish"] = mm.get_selected("mediumfish") |
| url, name, headers = mm.get_endpoint("mediumfish") |
| assert url.endswith("/v1") |
| assert len(name) > 0 |
|
|
|
|
| class TestThemeSampler: |
| def test_pick_theme_returns_string(self): |
| from alpha_factory.deterministic.theme_sampler import pick_theme |
| theme = pick_theme([], [], []) |
| assert isinstance(theme, str) |
| assert len(theme) > 0 |
| |
| def test_pick_theme_penalizes_existing(self): |
| from alpha_factory.deterministic.theme_sampler import pick_theme |
| existing = ["momentum", "value", "quality"] |
| theme = pick_theme(existing, [], []) |
| |
| assert theme in [ |
| "earnings_surprise_momentum", "earnings_quality_signaling", |
| "asset_growth_anomaly", "forward_value_composite", |
| "liquidity_risk_premium", "multi_factor_momentum", |
| "news_reaction_drift", "analyst_guidance_revision", |
| "options_sentiment_pcr", "supply_chain_network", |
| "social_contrarian", "geographic_exposure", |
| ] |
| |
| def test_pick_theme_all_dead_returns_alive(self): |
| """If all themes are dead, pick_theme must still return a valid theme.""" |
| from alpha_factory.deterministic.theme_sampler import pick_theme, THEME_FIELDS |
| all_themes = list(THEME_FIELDS.keys()) |
| theme = pick_theme([], [], dead_themes=all_themes) |
| assert theme in THEME_FIELDS, "Should return a valid theme even when all are dead" |
|
|
|
|
| class TestWQClient: |
| def test_brain_client_error_hierarchy(self): |
| from alpha_factory.infra.wq_client import BrainClientError, BrainAuthError, BrainRateLimitError |
| assert issubclass(BrainAuthError, BrainClientError) |
| assert issubclass(BrainRateLimitError, BrainClientError) |
| |
| def test_alpha_hash_deterministic(self): |
| from alpha_factory.infra.wq_client import BrainClient |
| h1 = BrainClient.alpha_hash("rank(close)", "sector", 5) |
| h2 = BrainClient.alpha_hash("rank(close)", "sector", 5) |
| h3 = BrainClient.alpha_hash("rank(close)", "industry", 5) |
| assert h1 == h2 |
| assert h1 != h3 |
| assert len(h1) == 16 |
|
|
|
|
| class TestBrainSim: |
| def test_rank_normalize_basic(self): |
| import numpy as np |
| from alpha_factory.local.brain_sim import _rank_normalize |
| arr = np.array([3.0, 1.0, 2.0]) |
| result = _rank_normalize(arr) |
| assert len(result) == 3 |
| assert np.all(result >= -1) and np.all(result <= 1) |
| |
| assert result[2] > result[1] |
| |
| def test_simulate_alpha_local_with_random(self): |
| import numpy as np |
| from alpha_factory.local.brain_sim import simulate_alpha_local |
| np.random.seed(42) |
| signal = np.random.randn(100, 50) |
| returns = np.random.randn(100, 50) * 0.02 |
| result = simulate_alpha_local(signal, returns, min_sharpe=0.0, min_fitness=0.0) |
| assert isinstance(result.sharpe, float) |
| assert isinstance(result.turnover, float) |
| assert 0 <= result.turnover <= 2.0 |
|
|
|
|
| class TestCleanup: |
| def test_cleanup_orphans_skips_common_words(self): |
| """Verify cleanup_orphans doesn't delete alphas with common words like 'backfill'.""" |
| from alpha_factory.cleanup import cleanup_orphans |
| |
| |
| import re |
| skip = { |
| "subindustry", "industry", "sector", "market", |
| "close", "high", "low", "open", "volume", "vwap", |
| "backfill", "neutralize", "expression", |
| } |
| expr = "ts_decay_linear(group_neutralize(rank(ts_backfill(close, 30)), subindustry), 5)" |
| tokens = re.findall(r"\b([a-z][a-z0-9_]{10,})\b", expr.lower()) |
| for t in tokens: |
| assert t in skip or t.startswith("ts_") or t.startswith("group_"), f"Unexpected token: {t}" |
|
|
|
|
| class TestDedupHash: |
| def test_same_expression_same_hash(self): |
| h1 = quick_dedup_hash("rank(close)", "sector", 5) |
| h2 = quick_dedup_hash("rank(close)", "sector", 5) |
| assert h1 == h2 |
| |
| def test_different_neutralization_different_hash(self): |
| h1 = quick_dedup_hash("rank(close)", "sector", 5) |
| h2 = quick_dedup_hash("rank(close)", "industry", 5) |
| assert h1 != h2 |
| |
| def test_different_decay_different_hash(self): |
| h1 = quick_dedup_hash("rank(close)", "sector", 5) |
| h2 = quick_dedup_hash("rank(close)", "sector", 10) |
| assert h1 != h2 |
|
|