Spaces:
Running
Running
| """ | |
| Log generator for LogTriageEnv. | |
| Produces realistic-looking log lines for the simulated microservice cluster. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| from datetime import datetime, timedelta | |
| from server.models import LogLine, ServiceStatus | |
| # βββ SERVICES βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SERVICES = [ | |
| "api-gateway", | |
| "auth-service", | |
| "user-db", | |
| "payment-service", | |
| "payment-db", | |
| "notification-service", | |
| "email-queue", | |
| ] | |
| # βββ LOG TEMPLATES ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Noise logs β realistic but irrelevant to the incident | |
| NOISE_TEMPLATES = { | |
| "api-gateway": [ | |
| ("INFO", "health check passed β all upstream services reachable"), | |
| ("INFO", "request completed: GET /api/v1/users/profile [200] 45ms"), | |
| ("INFO", "rate limiter: 1240/5000 requests this minute"), | |
| ("DEBUG", "connection pool: 12/100 active connections"), | |
| ("INFO", "TLS certificate valid for 87 more days"), | |
| ], | |
| "auth-service": [ | |
| ("INFO", "JWT token issued for user_id=88142 [expires: 3600s]"), | |
| ("INFO", "OAuth2 flow completed successfully"), | |
| ("DEBUG", "session cache hit ratio: 94.2%"), | |
| ("INFO", "password reset email queued for user_id=23019"), | |
| ], | |
| "user-db": [ | |
| ("INFO", "daily vacuum completed: 0 dead tuples removed"), | |
| ("INFO", "checkpoint complete: wrote 142 buffers"), | |
| ("DEBUG", "autovacuum: processing table 'sessions'"), | |
| ("INFO", "replication lag: 12ms (within threshold)"), | |
| ], | |
| "payment-service": [ | |
| ("INFO", "payment processed: txn_id=TXN-8812 amount=299.00 INR [success]"), | |
| ("INFO", "webhook delivered: stripe event=payment.succeeded"), | |
| ("DEBUG", "idempotency key cache: 2341 keys active"), | |
| ], | |
| "payment-db": [ | |
| ("INFO", "connection pool: 8/50 active"), | |
| ("DEBUG", "query plan cache: 88% hit ratio"), | |
| ("INFO", "index usage: 99.1% queries using indexed scans"), | |
| ], | |
| "notification-service": [ | |
| ("INFO", "email dispatched: template=welcome_email to=user@example.com"), | |
| ("INFO", "SMS delivered: +91XXXXXXXXXX [provider=twilio]"), | |
| ("WARN", "email bounce rate: 1.2% (threshold: 5%)"), | |
| ("INFO", "push notification sent: device_tokens=1240"), | |
| ], | |
| "email-queue": [ | |
| ("INFO", "queue depth: 42 messages pending"), | |
| ("INFO", "consumer lag: 0.3s (healthy)"), | |
| ("DEBUG", "partition rebalance completed in 120ms"), | |
| ], | |
| } | |
| # Signal logs β actual incident indicators | |
| SIGNAL_TEMPLATES = { | |
| # Single service crash signals (Task 1 β payment-service crash) | |
| "single_crash_payment": [ | |
| ("ERROR", "NullPointerException: Cannot invoke method processPayment() on null object β PaymentProcessor.java:142"), | |
| ("ERROR", "HTTP 500 Internal Server Error: payment gateway returned null response"), | |
| ("ERROR", "NullPointerException in PaymentService.execute() β retrying (attempt 1/3)"), | |
| ("ERROR", "NullPointerException in PaymentService.execute() β retrying (attempt 2/3)"), | |
| ("FATAL", "NullPointerException in PaymentService.execute() β all retries exhausted, request failed"), | |
| ("ERROR", "health check FAILED: payment-service returned 500 (was 200)"), | |
| ("ERROR", "circuit breaker OPEN: payment-service error rate 98.2% (threshold: 10%)"), | |
| ], | |
| # Cascading failure signals (Task 2 β user-db β auth-service β api-gateway) | |
| "cascading_userdb": [ | |
| ("WARN", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 2847ms, threshold: 200ms]"), | |
| ("ERROR", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 4120ms]"), | |
| ("ERROR", "query timeout: SELECT * FROM active_sessions [timeout after 5000ms]"), | |
| ], | |
| "cascading_auth": [ | |
| ("WARN", "db connection pool: 42/50 active connections (84% utilization)"), | |
| ("ERROR", "db connection pool exhausted: 50/50 connections in use β requests queuing"), | |
| ("ERROR", "authentication request timed out waiting for db connection [5200ms]"), | |
| ], | |
| "cascading_gateway": [ | |
| ("ERROR", "upstream timeout: auth-service failed to respond within 5000ms [req-id: {req_id}]"), | |
| ("ERROR", "upstream timeout: auth-service [req-id: {req_id}] β returning 504 to client"), | |
| ("WARN", "error rate spike: 34.2% of requests failing (threshold: 5%)"), | |
| ], | |
| # Silent degradation signals (Task 3 β payment-db slow) | |
| "silent_paymentdb": [ | |
| ("WARN", "query latency elevated: avg=450ms (normal: 80ms) β monitoring"), | |
| ("WARN", "query latency elevated: avg=620ms β possible memory pressure"), | |
| ("WARN", "query latency elevated: avg=890ms β recommend investigation"), | |
| ("WARN", "query latency elevated: avg=1200ms β approaching timeout threshold"), | |
| ("WARN", "buffer cache hit ratio degraded: 87% (normal: 98%) β possible memory issue"), | |
| ], | |
| } | |
| def _make_timestamp(base_time: datetime, offset_seconds: int = 0) -> str: | |
| t = base_time + timedelta(seconds=offset_seconds) | |
| return t.strftime("%Y-%m-%dT%H:%M:%SZ") | |
| def _noise_log(service: str, base_time: datetime, offset: int) -> LogLine: | |
| templates = NOISE_TEMPLATES.get(service, [("INFO", "routine operation completed")]) | |
| level, message = random.choice(templates) | |
| return LogLine( | |
| timestamp=_make_timestamp(base_time, offset), | |
| level=level, | |
| service=service, | |
| request_id=None, | |
| message=message, | |
| latency_ms=None, | |
| ) | |
| def generate_log_batch( | |
| scenario_signals: list[tuple[str, str, str]], # [(service, level, message), ...] | |
| step: int, | |
| base_time: datetime, | |
| noise_ratio: float = 0.3, | |
| batch_size: int = 8, | |
| rng: random.Random = None, | |
| ) -> list[LogLine]: | |
| """ | |
| Generate a mixed batch of signal + noise log lines. | |
| Args: | |
| scenario_signals: List of (service, level, message) tuples β the actual signals for this step | |
| step: Current step number (used for timestamp offset) | |
| base_time: Episode start time (used for timestamps) | |
| noise_ratio: Fraction of logs that are noise (0.0 = all signal, 1.0 = all noise) | |
| batch_size: Total number of log lines to return | |
| rng: Optional seeded Random for reproducibility | |
| Returns: | |
| List of LogLine objects, shuffled (signal mixed into noise) | |
| """ | |
| if rng is None: | |
| rng = random.Random() | |
| logs = [] | |
| base_offset = step * 30 # 30 simulated seconds per step | |
| # Add signal logs | |
| for i, (service, level, message) in enumerate(scenario_signals): | |
| req_id = f"req-{rng.randint(1000, 9999)}" if level in ("ERROR", "WARN") else None | |
| logs.append(LogLine( | |
| timestamp=_make_timestamp(base_time, base_offset + i), | |
| level=level, | |
| service=service, | |
| request_id=req_id, | |
| message=message, | |
| latency_ms=rng.randint(200, 5000) if "timeout" in message.lower() or "latency" in message.lower() else None, | |
| )) | |
| # Fill remaining slots with noise logs | |
| noise_count = max(0, batch_size - len(logs)) | |
| noise_services = rng.choices(SERVICES, k=noise_count) | |
| for i, svc in enumerate(noise_services): | |
| logs.append(_noise_log(svc, base_time, base_offset + len(scenario_signals) + i)) | |
| # Shuffle β signal should not always be first | |
| rng.shuffle(logs) | |
| return logs[:batch_size] | |
| def generate_healthy_system_state(base_time: datetime) -> dict[str, ServiceStatus]: | |
| """Generate a fully healthy system state snapshot.""" | |
| now = _make_timestamp(base_time) | |
| return { | |
| svc: ServiceStatus( | |
| name=svc, | |
| status="up", | |
| error_rate=round(random.uniform(0.001, 0.01), 4), | |
| latency_p99_ms=random.randint(20, 80), | |
| last_updated=now, | |
| ) | |
| for svc in SERVICES | |
| } | |