logtriage-env / server /log_generator.py
OGrohit's picture
Day 2: environment.py, log_generator.py, single_crash scenario, real endpoints
5884d9c
"""
Log generator for LogTriageEnv.
Produces realistic-looking log lines for the simulated microservice cluster.
"""
from __future__ import annotations
import random
from datetime import datetime, timedelta
from server.models import LogLine, ServiceStatus
# ─── SERVICES ─────────────────────────────────────────────────────────────────
SERVICES = [
"api-gateway",
"auth-service",
"user-db",
"payment-service",
"payment-db",
"notification-service",
"email-queue",
]
# ─── LOG TEMPLATES ────────────────────────────────────────────────────────────
# Noise logs β€” realistic but irrelevant to the incident
NOISE_TEMPLATES = {
"api-gateway": [
("INFO", "health check passed β€” all upstream services reachable"),
("INFO", "request completed: GET /api/v1/users/profile [200] 45ms"),
("INFO", "rate limiter: 1240/5000 requests this minute"),
("DEBUG", "connection pool: 12/100 active connections"),
("INFO", "TLS certificate valid for 87 more days"),
],
"auth-service": [
("INFO", "JWT token issued for user_id=88142 [expires: 3600s]"),
("INFO", "OAuth2 flow completed successfully"),
("DEBUG", "session cache hit ratio: 94.2%"),
("INFO", "password reset email queued for user_id=23019"),
],
"user-db": [
("INFO", "daily vacuum completed: 0 dead tuples removed"),
("INFO", "checkpoint complete: wrote 142 buffers"),
("DEBUG", "autovacuum: processing table 'sessions'"),
("INFO", "replication lag: 12ms (within threshold)"),
],
"payment-service": [
("INFO", "payment processed: txn_id=TXN-8812 amount=299.00 INR [success]"),
("INFO", "webhook delivered: stripe event=payment.succeeded"),
("DEBUG", "idempotency key cache: 2341 keys active"),
],
"payment-db": [
("INFO", "connection pool: 8/50 active"),
("DEBUG", "query plan cache: 88% hit ratio"),
("INFO", "index usage: 99.1% queries using indexed scans"),
],
"notification-service": [
("INFO", "email dispatched: template=welcome_email to=user@example.com"),
("INFO", "SMS delivered: +91XXXXXXXXXX [provider=twilio]"),
("WARN", "email bounce rate: 1.2% (threshold: 5%)"),
("INFO", "push notification sent: device_tokens=1240"),
],
"email-queue": [
("INFO", "queue depth: 42 messages pending"),
("INFO", "consumer lag: 0.3s (healthy)"),
("DEBUG", "partition rebalance completed in 120ms"),
],
}
# Signal logs β€” actual incident indicators
SIGNAL_TEMPLATES = {
# Single service crash signals (Task 1 β€” payment-service crash)
"single_crash_payment": [
("ERROR", "NullPointerException: Cannot invoke method processPayment() on null object β€” PaymentProcessor.java:142"),
("ERROR", "HTTP 500 Internal Server Error: payment gateway returned null response"),
("ERROR", "NullPointerException in PaymentService.execute() β€” retrying (attempt 1/3)"),
("ERROR", "NullPointerException in PaymentService.execute() β€” retrying (attempt 2/3)"),
("FATAL", "NullPointerException in PaymentService.execute() β€” all retries exhausted, request failed"),
("ERROR", "health check FAILED: payment-service returned 500 (was 200)"),
("ERROR", "circuit breaker OPEN: payment-service error rate 98.2% (threshold: 10%)"),
],
# Cascading failure signals (Task 2 β€” user-db β†’ auth-service β†’ api-gateway)
"cascading_userdb": [
("WARN", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 2847ms, threshold: 200ms]"),
("ERROR", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 4120ms]"),
("ERROR", "query timeout: SELECT * FROM active_sessions [timeout after 5000ms]"),
],
"cascading_auth": [
("WARN", "db connection pool: 42/50 active connections (84% utilization)"),
("ERROR", "db connection pool exhausted: 50/50 connections in use β€” requests queuing"),
("ERROR", "authentication request timed out waiting for db connection [5200ms]"),
],
"cascading_gateway": [
("ERROR", "upstream timeout: auth-service failed to respond within 5000ms [req-id: {req_id}]"),
("ERROR", "upstream timeout: auth-service [req-id: {req_id}] β€” returning 504 to client"),
("WARN", "error rate spike: 34.2% of requests failing (threshold: 5%)"),
],
# Silent degradation signals (Task 3 β€” payment-db slow)
"silent_paymentdb": [
("WARN", "query latency elevated: avg=450ms (normal: 80ms) β€” monitoring"),
("WARN", "query latency elevated: avg=620ms β€” possible memory pressure"),
("WARN", "query latency elevated: avg=890ms β€” recommend investigation"),
("WARN", "query latency elevated: avg=1200ms β€” approaching timeout threshold"),
("WARN", "buffer cache hit ratio degraded: 87% (normal: 98%) β€” possible memory issue"),
],
}
def _make_timestamp(base_time: datetime, offset_seconds: int = 0) -> str:
t = base_time + timedelta(seconds=offset_seconds)
return t.strftime("%Y-%m-%dT%H:%M:%SZ")
def _noise_log(service: str, base_time: datetime, offset: int) -> LogLine:
templates = NOISE_TEMPLATES.get(service, [("INFO", "routine operation completed")])
level, message = random.choice(templates)
return LogLine(
timestamp=_make_timestamp(base_time, offset),
level=level,
service=service,
request_id=None,
message=message,
latency_ms=None,
)
def generate_log_batch(
scenario_signals: list[tuple[str, str, str]], # [(service, level, message), ...]
step: int,
base_time: datetime,
noise_ratio: float = 0.3,
batch_size: int = 8,
rng: random.Random = None,
) -> list[LogLine]:
"""
Generate a mixed batch of signal + noise log lines.
Args:
scenario_signals: List of (service, level, message) tuples β€” the actual signals for this step
step: Current step number (used for timestamp offset)
base_time: Episode start time (used for timestamps)
noise_ratio: Fraction of logs that are noise (0.0 = all signal, 1.0 = all noise)
batch_size: Total number of log lines to return
rng: Optional seeded Random for reproducibility
Returns:
List of LogLine objects, shuffled (signal mixed into noise)
"""
if rng is None:
rng = random.Random()
logs = []
base_offset = step * 30 # 30 simulated seconds per step
# Add signal logs
for i, (service, level, message) in enumerate(scenario_signals):
req_id = f"req-{rng.randint(1000, 9999)}" if level in ("ERROR", "WARN") else None
logs.append(LogLine(
timestamp=_make_timestamp(base_time, base_offset + i),
level=level,
service=service,
request_id=req_id,
message=message,
latency_ms=rng.randint(200, 5000) if "timeout" in message.lower() or "latency" in message.lower() else None,
))
# Fill remaining slots with noise logs
noise_count = max(0, batch_size - len(logs))
noise_services = rng.choices(SERVICES, k=noise_count)
for i, svc in enumerate(noise_services):
logs.append(_noise_log(svc, base_time, base_offset + len(scenario_signals) + i))
# Shuffle β€” signal should not always be first
rng.shuffle(logs)
return logs[:batch_size]
def generate_healthy_system_state(base_time: datetime) -> dict[str, ServiceStatus]:
"""Generate a fully healthy system state snapshot."""
now = _make_timestamp(base_time)
return {
svc: ServiceStatus(
name=svc,
status="up",
error_rate=round(random.uniform(0.001, 0.01), 4),
latency_p99_ms=random.randint(20, 80),
last_updated=now,
)
for svc in SERVICES
}