#!/usr/bin/env python3 """ Stress-test: Catastrophic Failure Injection =========================================== Intentionally triggers failures to verify self-healing recovery. Failures injected: 1. NaN injection in loss → should trigger rollback + halve LR 2. Simulated OOM → should trigger batch halving + grad checkpointing 3. API error → should trigger exponential backoff This requires a GPU. Run with: python tests/stress_test_recovery.py """ import os, sys, json, time, math, gc import torch import torch.nn as nn from transformers import ( AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments, TrainerCallback, TrainerControl, TrainerState, ) from datasets import Dataset from self_healing import ( SelfHealingTrainer, HealingConfig, SelfHealingCallback, HealingActions, FailureType, FAILURE_RECIPES, ) class NaNInjectorCallback(TrainerCallback): """Intentionally inject NaN into loss at a specific step.""" def __init__(self, inject_at_step: int = 10): self.inject_at_step = inject_at_step self.original_forward = None def on_step_begin(self, args, state, control, **kwargs): if state.global_step == self.inject_at_step and not hasattr(self, '_injected'): self._injected = True print(f"\n [INJECT] Forcing NaN at step {state.global_step}\n") # Override the model's forward to return NaN model = kwargs.get("model") if model is not None: self.original_forward = model.forward def nan_forward(*a, **kw): result = self.original_forward(*a, **kw) result.loss = torch.tensor(float('nan')) return result model.forward = nan_forward def test_nan_recovery(): """ Test: Inject NaN → verify SelfHealingTrainer detects and recovers. """ print("\n" + "=" * 60) print(" STRESS TEST 1: NaN Recovery") print("=" * 60) # Tiny model model_id = "HuggingFaceTB/SmolLM2-135M" model = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.float32, # float32 for NaN safety device_map="auto" if torch.cuda.is_available() else None, ) tokenizer = AutoTokenizer.from_pretrained(model_id) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Create dummy dataset texts = ["The quick brown fox jumps over the lazy dog."] * 100 ds = Dataset.from_dict({ "text": texts, "input_ids": [tokenizer.encode(t, truncation=True, max_length=32) for t in texts], "attention_mask": [[1]*len(tokenizer.encode(t, truncation=True, max_length=32)) for t in texts], }) training_args = TrainingArguments( output_dir="./stress-nan-output", per_device_train_batch_size=2, learning_rate=1e-4, max_steps=30, logging_steps=1, logging_strategy="steps", logging_first_step=True, save_steps=100, report_to="none", disable_tqdm=True, ) trainer = Trainer( model=model, args=training_args, train_dataset=ds, tokenizer=tokenizer, callbacks=[NaNInjectorCallback(inject_at_step=10)], ) healing_config = HealingConfig( nan_patience=1, # React immediately max_recovery_attempts=3, max_lr_reductions=3, zclip_enabled=False, postmortem_path="./stress-nan-postmortem.json", ) sh = SelfHealingTrainer(trainer, healing_config) print("Training with NaN injection at step 10...") result = sh.train() print(f"\nResults:") print(f" Converged: {sh.converged}") print(f" Attempts: {sh.attempt}") print(f" Recoveries: {len(sh.recovery_history)}") if sh.recovery_history: for rec in sh.recovery_history: print(f" → {rec['failure']}: {rec['actions']}") # Verify: should have at least one recovery for NaN assert len(sh.recovery_history) >= 1, "Expected NaN recovery!" assert any(r["failure"] == "nan_loss" for r in sh.recovery_history), \ "Expected nan_loss failure type!" # Verify LR was reduced assert sh.healing_callback.lr_reductions >= 1, \ "Expected LR to be reduced!" print(" ✓ NaN recovery test PASSED") if os.path.exists(healing_config.postmortem_path): with open(healing_config.postmortem_path) as f: pm = json.load(f) print(f" Postmortem: {pm.get('exit_reason')} at step {pm.get('last_step')}") def test_zclip_spike_detection(): """ Test: Feed spike values to ZClip → verify clipping. """ print("\n" + "=" * 60) print(" STRESS TEST 2: ZClip Spike Detection") print("=" * 60) from self_healing import ZClip zclip = ZClip(z_threshold=2.5, ema_decay=0.9) # Stabilize at norm=10.0 for _ in range(100): zclip.update_and_clip(10.0) # Inject spike clipped = zclip.update_and_clip(500.0) print(f" Raw: 500.0, Clipped: {clipped:.1f}, Clips: {zclip.clip_count}") assert clipped < 500.0, "Expected spike to be clipped!" assert zclip.clip_count >= 1, "Expected clip counter to increment!" print(" ✓ ZClip spike detection PASSED") def test_healing_config_limits(): """ Test: Verify that max reduction limits are enforced. """ print("\n" + "=" * 60) print(" STRESS TEST 3: Recovery Limits") print("=" * 60) from transformers import TrainingArguments from self_healing import HealingActions, SelfHealingCallback, HealingConfig config = HealingConfig( max_lr_reductions=2, max_batch_reductions=2, ) # Test LR limit args = TrainingArguments( output_dir="/tmp", learning_rate=1e-4, per_device_train_batch_size=4, gradient_accumulation_steps=1, ) cb = SelfHealingCallback(config) actions = HealingActions(config, cb) # Reduce twice actions._apply_single("halve_learning_rate", args, {}) actions._apply_single("halve_learning_rate", args, {}) assert cb.lr_reductions == 2 # Third reduction should hit limit result = actions._apply_single("halve_learning_rate", args, {}) assert "MAX" in result assert cb.lr_reductions == 2 # Should not increment print(f" LR after 2 reductions: {args.learning_rate:.2e}") print(f" Third attempt: {result}") print(" ✓ Recovery limits test PASSED") def test_postmortem_written(): """ Test: Verify postmortem.json is written on crash. """ print("\n" + "=" * 60) print(" STRESS TEST 4: Postmortem Generation") print("=" * 60) import tempfile with tempfile.TemporaryDirectory() as tmpdir: config = HealingConfig( postmortem_path=os.path.join(tmpdir, "postmortem.json"), ) cb = SelfHealingCallback(config) # Simulate exception cb.on_exception( MagicMock(), # args MagicMock(global_step=42, log_history=[{"loss": 1.5}]), # state MagicMock(), # control torch.cuda.OutOfMemoryError("CUDA out of memory. Tried to allocate 2.00 GiB"), # exception ) # Check postmortem exists assert os.path.exists(config.postmortem_path) with open(config.postmortem_path) as f: pm = json.load(f) assert pm["exception_type"] == "OutOfMemoryError" assert pm["last_step"] == 42 assert "loss" in pm["final_metrics"] assert pm["final_metrics"]["loss"] == 1.5 print(f" Postmortem path: {config.postmortem_path}") print(f" Content: {json.dumps(pm, indent=2)}") print(" ✓ Postmortem generation PASSED") if __name__ == "__main__": # Import mock for test 4 from unittest.mock import MagicMock print("╔" + "═" * 58 + "╗") print("║ SELF-HEALING TRAINING SYSTEM — STRESS TEST SUITE ║") print("╚" + "═" * 58 + "╝") # Run tests (order matters: ZClip first, no GPU needed) test_zclip_spike_detection() test_healing_config_limits() test_postmortem_written() # NaN recovery test (needs model loading) if torch.cuda.is_available(): test_nan_recovery() else: print("\n" + "=" * 60) print(" STRESS TEST 1: NaN Recovery") print("=" * 60) print(" ⚠ Skipped: No GPU available") print("\n" + "=" * 60) print(" ALL STRESS TESTS PASSED ✓") print("=" * 60)