Spaces:
Running
Running
File size: 8,223 Bytes
5884d9c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """
Log generator for LogTriageEnv.
Produces realistic-looking log lines for the simulated microservice cluster.
"""
from __future__ import annotations
import random
from datetime import datetime, timedelta
from server.models import LogLine, ServiceStatus
# βββ SERVICES βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
SERVICES = [
"api-gateway",
"auth-service",
"user-db",
"payment-service",
"payment-db",
"notification-service",
"email-queue",
]
# βββ LOG TEMPLATES ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Noise logs β realistic but irrelevant to the incident
NOISE_TEMPLATES = {
"api-gateway": [
("INFO", "health check passed β all upstream services reachable"),
("INFO", "request completed: GET /api/v1/users/profile [200] 45ms"),
("INFO", "rate limiter: 1240/5000 requests this minute"),
("DEBUG", "connection pool: 12/100 active connections"),
("INFO", "TLS certificate valid for 87 more days"),
],
"auth-service": [
("INFO", "JWT token issued for user_id=88142 [expires: 3600s]"),
("INFO", "OAuth2 flow completed successfully"),
("DEBUG", "session cache hit ratio: 94.2%"),
("INFO", "password reset email queued for user_id=23019"),
],
"user-db": [
("INFO", "daily vacuum completed: 0 dead tuples removed"),
("INFO", "checkpoint complete: wrote 142 buffers"),
("DEBUG", "autovacuum: processing table 'sessions'"),
("INFO", "replication lag: 12ms (within threshold)"),
],
"payment-service": [
("INFO", "payment processed: txn_id=TXN-8812 amount=299.00 INR [success]"),
("INFO", "webhook delivered: stripe event=payment.succeeded"),
("DEBUG", "idempotency key cache: 2341 keys active"),
],
"payment-db": [
("INFO", "connection pool: 8/50 active"),
("DEBUG", "query plan cache: 88% hit ratio"),
("INFO", "index usage: 99.1% queries using indexed scans"),
],
"notification-service": [
("INFO", "email dispatched: template=welcome_email to=user@example.com"),
("INFO", "SMS delivered: +91XXXXXXXXXX [provider=twilio]"),
("WARN", "email bounce rate: 1.2% (threshold: 5%)"),
("INFO", "push notification sent: device_tokens=1240"),
],
"email-queue": [
("INFO", "queue depth: 42 messages pending"),
("INFO", "consumer lag: 0.3s (healthy)"),
("DEBUG", "partition rebalance completed in 120ms"),
],
}
# Signal logs β actual incident indicators
SIGNAL_TEMPLATES = {
# Single service crash signals (Task 1 β payment-service crash)
"single_crash_payment": [
("ERROR", "NullPointerException: Cannot invoke method processPayment() on null object β PaymentProcessor.java:142"),
("ERROR", "HTTP 500 Internal Server Error: payment gateway returned null response"),
("ERROR", "NullPointerException in PaymentService.execute() β retrying (attempt 1/3)"),
("ERROR", "NullPointerException in PaymentService.execute() β retrying (attempt 2/3)"),
("FATAL", "NullPointerException in PaymentService.execute() β all retries exhausted, request failed"),
("ERROR", "health check FAILED: payment-service returned 500 (was 200)"),
("ERROR", "circuit breaker OPEN: payment-service error rate 98.2% (threshold: 10%)"),
],
# Cascading failure signals (Task 2 β user-db β auth-service β api-gateway)
"cascading_userdb": [
("WARN", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 2847ms, threshold: 200ms]"),
("ERROR", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 4120ms]"),
("ERROR", "query timeout: SELECT * FROM active_sessions [timeout after 5000ms]"),
],
"cascading_auth": [
("WARN", "db connection pool: 42/50 active connections (84% utilization)"),
("ERROR", "db connection pool exhausted: 50/50 connections in use β requests queuing"),
("ERROR", "authentication request timed out waiting for db connection [5200ms]"),
],
"cascading_gateway": [
("ERROR", "upstream timeout: auth-service failed to respond within 5000ms [req-id: {req_id}]"),
("ERROR", "upstream timeout: auth-service [req-id: {req_id}] β returning 504 to client"),
("WARN", "error rate spike: 34.2% of requests failing (threshold: 5%)"),
],
# Silent degradation signals (Task 3 β payment-db slow)
"silent_paymentdb": [
("WARN", "query latency elevated: avg=450ms (normal: 80ms) β monitoring"),
("WARN", "query latency elevated: avg=620ms β possible memory pressure"),
("WARN", "query latency elevated: avg=890ms β recommend investigation"),
("WARN", "query latency elevated: avg=1200ms β approaching timeout threshold"),
("WARN", "buffer cache hit ratio degraded: 87% (normal: 98%) β possible memory issue"),
],
}
def _make_timestamp(base_time: datetime, offset_seconds: int = 0) -> str:
t = base_time + timedelta(seconds=offset_seconds)
return t.strftime("%Y-%m-%dT%H:%M:%SZ")
def _noise_log(service: str, base_time: datetime, offset: int) -> LogLine:
templates = NOISE_TEMPLATES.get(service, [("INFO", "routine operation completed")])
level, message = random.choice(templates)
return LogLine(
timestamp=_make_timestamp(base_time, offset),
level=level,
service=service,
request_id=None,
message=message,
latency_ms=None,
)
def generate_log_batch(
scenario_signals: list[tuple[str, str, str]], # [(service, level, message), ...]
step: int,
base_time: datetime,
noise_ratio: float = 0.3,
batch_size: int = 8,
rng: random.Random = None,
) -> list[LogLine]:
"""
Generate a mixed batch of signal + noise log lines.
Args:
scenario_signals: List of (service, level, message) tuples β the actual signals for this step
step: Current step number (used for timestamp offset)
base_time: Episode start time (used for timestamps)
noise_ratio: Fraction of logs that are noise (0.0 = all signal, 1.0 = all noise)
batch_size: Total number of log lines to return
rng: Optional seeded Random for reproducibility
Returns:
List of LogLine objects, shuffled (signal mixed into noise)
"""
if rng is None:
rng = random.Random()
logs = []
base_offset = step * 30 # 30 simulated seconds per step
# Add signal logs
for i, (service, level, message) in enumerate(scenario_signals):
req_id = f"req-{rng.randint(1000, 9999)}" if level in ("ERROR", "WARN") else None
logs.append(LogLine(
timestamp=_make_timestamp(base_time, base_offset + i),
level=level,
service=service,
request_id=req_id,
message=message,
latency_ms=rng.randint(200, 5000) if "timeout" in message.lower() or "latency" in message.lower() else None,
))
# Fill remaining slots with noise logs
noise_count = max(0, batch_size - len(logs))
noise_services = rng.choices(SERVICES, k=noise_count)
for i, svc in enumerate(noise_services):
logs.append(_noise_log(svc, base_time, base_offset + len(scenario_signals) + i))
# Shuffle β signal should not always be first
rng.shuffle(logs)
return logs[:batch_size]
def generate_healthy_system_state(base_time: datetime) -> dict[str, ServiceStatus]:
"""Generate a fully healthy system state snapshot."""
now = _make_timestamp(base_time)
return {
svc: ServiceStatus(
name=svc,
status="up",
error_rate=round(random.uniform(0.001, 0.01), 4),
latency_p99_ms=random.randint(20, 80),
last_updated=now,
)
for svc in SERVICES
}
|