| |
| """ |
| Stress-test: Catastrophic Failure Injection |
| =========================================== |
| Intentionally triggers failures to verify self-healing recovery. |
| |
| Failures injected: |
| 1. NaN injection in loss β should trigger rollback + halve LR |
| 2. Simulated OOM β should trigger batch halving + grad checkpointing |
| 3. API error β should trigger exponential backoff |
| |
| This requires a GPU. Run with: |
| python tests/stress_test_recovery.py |
| """ |
| import os, sys, json, time, math, gc |
| import torch |
| import torch.nn as nn |
| from transformers import ( |
| AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments, |
| TrainerCallback, TrainerControl, TrainerState, |
| ) |
| from datasets import Dataset |
|
|
| from self_healing import ( |
| SelfHealingTrainer, HealingConfig, SelfHealingCallback, |
| HealingActions, FailureType, FAILURE_RECIPES, |
| ) |
|
|
|
|
| class NaNInjectorCallback(TrainerCallback): |
| """Intentionally inject NaN into loss at a specific step.""" |
| |
| def __init__(self, inject_at_step: int = 10): |
| self.inject_at_step = inject_at_step |
| self.original_forward = None |
| |
| def on_step_begin(self, args, state, control, **kwargs): |
| if state.global_step == self.inject_at_step and not hasattr(self, '_injected'): |
| self._injected = True |
| print(f"\n [INJECT] Forcing NaN at step {state.global_step}\n") |
| |
| model = kwargs.get("model") |
| if model is not None: |
| self.original_forward = model.forward |
| def nan_forward(*a, **kw): |
| result = self.original_forward(*a, **kw) |
| result.loss = torch.tensor(float('nan')) |
| return result |
| model.forward = nan_forward |
|
|
|
|
| def test_nan_recovery(): |
| """ |
| Test: Inject NaN β verify SelfHealingTrainer detects and recovers. |
| """ |
| print("\n" + "=" * 60) |
| print(" STRESS TEST 1: NaN Recovery") |
| print("=" * 60) |
| |
| |
| model_id = "HuggingFaceTB/SmolLM2-135M" |
| model = AutoModelForCausalLM.from_pretrained( |
| model_id, |
| torch_dtype=torch.float32, |
| device_map="auto" if torch.cuda.is_available() else None, |
| ) |
| tokenizer = AutoTokenizer.from_pretrained(model_id) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
| |
| |
| texts = ["The quick brown fox jumps over the lazy dog."] * 100 |
| ds = Dataset.from_dict({ |
| "text": texts, |
| "input_ids": [tokenizer.encode(t, truncation=True, max_length=32) for t in texts], |
| "attention_mask": [[1]*len(tokenizer.encode(t, truncation=True, max_length=32)) for t in texts], |
| }) |
| |
| training_args = TrainingArguments( |
| output_dir="./stress-nan-output", |
| per_device_train_batch_size=2, |
| learning_rate=1e-4, |
| max_steps=30, |
| logging_steps=1, |
| logging_strategy="steps", |
| logging_first_step=True, |
| save_steps=100, |
| report_to="none", |
| disable_tqdm=True, |
| ) |
| |
| trainer = Trainer( |
| model=model, |
| args=training_args, |
| train_dataset=ds, |
| tokenizer=tokenizer, |
| callbacks=[NaNInjectorCallback(inject_at_step=10)], |
| ) |
| |
| healing_config = HealingConfig( |
| nan_patience=1, |
| max_recovery_attempts=3, |
| max_lr_reductions=3, |
| zclip_enabled=False, |
| postmortem_path="./stress-nan-postmortem.json", |
| ) |
| |
| sh = SelfHealingTrainer(trainer, healing_config) |
| |
| print("Training with NaN injection at step 10...") |
| result = sh.train() |
| |
| print(f"\nResults:") |
| print(f" Converged: {sh.converged}") |
| print(f" Attempts: {sh.attempt}") |
| print(f" Recoveries: {len(sh.recovery_history)}") |
| |
| if sh.recovery_history: |
| for rec in sh.recovery_history: |
| print(f" β {rec['failure']}: {rec['actions']}") |
| |
| |
| assert len(sh.recovery_history) >= 1, "Expected NaN recovery!" |
| assert any(r["failure"] == "nan_loss" for r in sh.recovery_history), \ |
| "Expected nan_loss failure type!" |
| |
| |
| assert sh.healing_callback.lr_reductions >= 1, \ |
| "Expected LR to be reduced!" |
| |
| print(" β NaN recovery test PASSED") |
| |
| if os.path.exists(healing_config.postmortem_path): |
| with open(healing_config.postmortem_path) as f: |
| pm = json.load(f) |
| print(f" Postmortem: {pm.get('exit_reason')} at step {pm.get('last_step')}") |
|
|
|
|
| def test_zclip_spike_detection(): |
| """ |
| Test: Feed spike values to ZClip β verify clipping. |
| """ |
| print("\n" + "=" * 60) |
| print(" STRESS TEST 2: ZClip Spike Detection") |
| print("=" * 60) |
| |
| from self_healing import ZClip |
| |
| zclip = ZClip(z_threshold=2.5, ema_decay=0.9) |
| |
| |
| for _ in range(100): |
| zclip.update_and_clip(10.0) |
| |
| |
| clipped = zclip.update_and_clip(500.0) |
| |
| print(f" Raw: 500.0, Clipped: {clipped:.1f}, Clips: {zclip.clip_count}") |
| assert clipped < 500.0, "Expected spike to be clipped!" |
| assert zclip.clip_count >= 1, "Expected clip counter to increment!" |
| print(" β ZClip spike detection PASSED") |
|
|
|
|
| def test_healing_config_limits(): |
| """ |
| Test: Verify that max reduction limits are enforced. |
| """ |
| print("\n" + "=" * 60) |
| print(" STRESS TEST 3: Recovery Limits") |
| print("=" * 60) |
| |
| from transformers import TrainingArguments |
| from self_healing import HealingActions, SelfHealingCallback, HealingConfig |
| |
| config = HealingConfig( |
| max_lr_reductions=2, |
| max_batch_reductions=2, |
| ) |
| |
| |
| args = TrainingArguments( |
| output_dir="/tmp", |
| learning_rate=1e-4, |
| per_device_train_batch_size=4, |
| gradient_accumulation_steps=1, |
| ) |
| cb = SelfHealingCallback(config) |
| actions = HealingActions(config, cb) |
| |
| |
| actions._apply_single("halve_learning_rate", args, {}) |
| actions._apply_single("halve_learning_rate", args, {}) |
| assert cb.lr_reductions == 2 |
| |
| |
| result = actions._apply_single("halve_learning_rate", args, {}) |
| assert "MAX" in result |
| assert cb.lr_reductions == 2 |
| |
| print(f" LR after 2 reductions: {args.learning_rate:.2e}") |
| print(f" Third attempt: {result}") |
| print(" β Recovery limits test PASSED") |
|
|
|
|
| def test_postmortem_written(): |
| """ |
| Test: Verify postmortem.json is written on crash. |
| """ |
| print("\n" + "=" * 60) |
| print(" STRESS TEST 4: Postmortem Generation") |
| print("=" * 60) |
| |
| import tempfile |
| |
| with tempfile.TemporaryDirectory() as tmpdir: |
| config = HealingConfig( |
| postmortem_path=os.path.join(tmpdir, "postmortem.json"), |
| ) |
| cb = SelfHealingCallback(config) |
| |
| |
| cb.on_exception( |
| MagicMock(), |
| MagicMock(global_step=42, log_history=[{"loss": 1.5}]), |
| MagicMock(), |
| torch.cuda.OutOfMemoryError("CUDA out of memory. Tried to allocate 2.00 GiB"), |
| ) |
| |
| |
| assert os.path.exists(config.postmortem_path) |
| |
| with open(config.postmortem_path) as f: |
| pm = json.load(f) |
| |
| assert pm["exception_type"] == "OutOfMemoryError" |
| assert pm["last_step"] == 42 |
| assert "loss" in pm["final_metrics"] |
| assert pm["final_metrics"]["loss"] == 1.5 |
| |
| print(f" Postmortem path: {config.postmortem_path}") |
| print(f" Content: {json.dumps(pm, indent=2)}") |
| print(" β Postmortem generation PASSED") |
|
|
|
|
| if __name__ == "__main__": |
| |
| from unittest.mock import MagicMock |
| |
| print("β" + "β" * 58 + "β") |
| print("β SELF-HEALING TRAINING SYSTEM β STRESS TEST SUITE β") |
| print("β" + "β" * 58 + "β") |
| |
| |
| test_zclip_spike_detection() |
| test_healing_config_limits() |
| test_postmortem_written() |
| |
| |
| if torch.cuda.is_available(): |
| test_nan_recovery() |
| else: |
| print("\n" + "=" * 60) |
| print(" STRESS TEST 1: NaN Recovery") |
| print("=" * 60) |
| print(" β Skipped: No GPU available") |
| |
| print("\n" + "=" * 60) |
| print(" ALL STRESS TESTS PASSED β") |
| print("=" * 60) |