File size: 8,223 Bytes
5884d9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
Log generator for LogTriageEnv.
Produces realistic-looking log lines for the simulated microservice cluster.
"""
from __future__ import annotations
import random
from datetime import datetime, timedelta
from server.models import LogLine, ServiceStatus

# ─── SERVICES ─────────────────────────────────────────────────────────────────

SERVICES = [
    "api-gateway",
    "auth-service",
    "user-db",
    "payment-service",
    "payment-db",
    "notification-service",
    "email-queue",
]

# ─── LOG TEMPLATES ────────────────────────────────────────────────────────────

# Noise logs β€” realistic but irrelevant to the incident
NOISE_TEMPLATES = {
    "api-gateway": [
        ("INFO",  "health check passed β€” all upstream services reachable"),
        ("INFO",  "request completed: GET /api/v1/users/profile [200] 45ms"),
        ("INFO",  "rate limiter: 1240/5000 requests this minute"),
        ("DEBUG", "connection pool: 12/100 active connections"),
        ("INFO",  "TLS certificate valid for 87 more days"),
    ],
    "auth-service": [
        ("INFO",  "JWT token issued for user_id=88142 [expires: 3600s]"),
        ("INFO",  "OAuth2 flow completed successfully"),
        ("DEBUG", "session cache hit ratio: 94.2%"),
        ("INFO",  "password reset email queued for user_id=23019"),
    ],
    "user-db": [
        ("INFO",  "daily vacuum completed: 0 dead tuples removed"),
        ("INFO",  "checkpoint complete: wrote 142 buffers"),
        ("DEBUG", "autovacuum: processing table 'sessions'"),
        ("INFO",  "replication lag: 12ms (within threshold)"),
    ],
    "payment-service": [
        ("INFO",  "payment processed: txn_id=TXN-8812 amount=299.00 INR [success]"),
        ("INFO",  "webhook delivered: stripe event=payment.succeeded"),
        ("DEBUG", "idempotency key cache: 2341 keys active"),
    ],
    "payment-db": [
        ("INFO",  "connection pool: 8/50 active"),
        ("DEBUG", "query plan cache: 88% hit ratio"),
        ("INFO",  "index usage: 99.1% queries using indexed scans"),
    ],
    "notification-service": [
        ("INFO",  "email dispatched: template=welcome_email to=user@example.com"),
        ("INFO",  "SMS delivered: +91XXXXXXXXXX [provider=twilio]"),
        ("WARN",  "email bounce rate: 1.2% (threshold: 5%)"),
        ("INFO",  "push notification sent: device_tokens=1240"),
    ],
    "email-queue": [
        ("INFO",  "queue depth: 42 messages pending"),
        ("INFO",  "consumer lag: 0.3s (healthy)"),
        ("DEBUG", "partition rebalance completed in 120ms"),
    ],
}

# Signal logs β€” actual incident indicators
SIGNAL_TEMPLATES = {
    # Single service crash signals (Task 1 β€” payment-service crash)
    "single_crash_payment": [
        ("ERROR", "NullPointerException: Cannot invoke method processPayment() on null object β€” PaymentProcessor.java:142"),
        ("ERROR", "HTTP 500 Internal Server Error: payment gateway returned null response"),
        ("ERROR", "NullPointerException in PaymentService.execute() β€” retrying (attempt 1/3)"),
        ("ERROR", "NullPointerException in PaymentService.execute() β€” retrying (attempt 2/3)"),
        ("FATAL", "NullPointerException in PaymentService.execute() β€” all retries exhausted, request failed"),
        ("ERROR", "health check FAILED: payment-service returned 500 (was 200)"),
        ("ERROR", "circuit breaker OPEN: payment-service error rate 98.2% (threshold: 10%)"),
    ],
    # Cascading failure signals (Task 2 β€” user-db β†’ auth-service β†’ api-gateway)
    "cascading_userdb": [
        ("WARN",  "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 2847ms, threshold: 200ms]"),
        ("ERROR", "slow query detected: SELECT * FROM sessions WHERE user_id=? [latency: 4120ms]"),
        ("ERROR", "query timeout: SELECT * FROM active_sessions [timeout after 5000ms]"),
    ],
    "cascading_auth": [
        ("WARN",  "db connection pool: 42/50 active connections (84% utilization)"),
        ("ERROR", "db connection pool exhausted: 50/50 connections in use β€” requests queuing"),
        ("ERROR", "authentication request timed out waiting for db connection [5200ms]"),
    ],
    "cascading_gateway": [
        ("ERROR", "upstream timeout: auth-service failed to respond within 5000ms [req-id: {req_id}]"),
        ("ERROR", "upstream timeout: auth-service [req-id: {req_id}] β€” returning 504 to client"),
        ("WARN",  "error rate spike: 34.2% of requests failing (threshold: 5%)"),
    ],
    # Silent degradation signals (Task 3 β€” payment-db slow)
    "silent_paymentdb": [
        ("WARN",  "query latency elevated: avg=450ms (normal: 80ms) β€” monitoring"),
        ("WARN",  "query latency elevated: avg=620ms β€” possible memory pressure"),
        ("WARN",  "query latency elevated: avg=890ms β€” recommend investigation"),
        ("WARN",  "query latency elevated: avg=1200ms β€” approaching timeout threshold"),
        ("WARN",  "buffer cache hit ratio degraded: 87% (normal: 98%) β€” possible memory issue"),
    ],
}


def _make_timestamp(base_time: datetime, offset_seconds: int = 0) -> str:
    t = base_time + timedelta(seconds=offset_seconds)
    return t.strftime("%Y-%m-%dT%H:%M:%SZ")


def _noise_log(service: str, base_time: datetime, offset: int) -> LogLine:
    templates = NOISE_TEMPLATES.get(service, [("INFO", "routine operation completed")])
    level, message = random.choice(templates)
    return LogLine(
        timestamp=_make_timestamp(base_time, offset),
        level=level,
        service=service,
        request_id=None,
        message=message,
        latency_ms=None,
    )


def generate_log_batch(
    scenario_signals: list[tuple[str, str, str]],  # [(service, level, message), ...]
    step: int,
    base_time: datetime,
    noise_ratio: float = 0.3,
    batch_size: int = 8,
    rng: random.Random = None,
) -> list[LogLine]:
    """
    Generate a mixed batch of signal + noise log lines.

    Args:
        scenario_signals: List of (service, level, message) tuples β€” the actual signals for this step
        step: Current step number (used for timestamp offset)
        base_time: Episode start time (used for timestamps)
        noise_ratio: Fraction of logs that are noise (0.0 = all signal, 1.0 = all noise)
        batch_size: Total number of log lines to return
        rng: Optional seeded Random for reproducibility

    Returns:
        List of LogLine objects, shuffled (signal mixed into noise)
    """
    if rng is None:
        rng = random.Random()

    logs = []
    base_offset = step * 30  # 30 simulated seconds per step

    # Add signal logs
    for i, (service, level, message) in enumerate(scenario_signals):
        req_id = f"req-{rng.randint(1000, 9999)}" if level in ("ERROR", "WARN") else None
        logs.append(LogLine(
            timestamp=_make_timestamp(base_time, base_offset + i),
            level=level,
            service=service,
            request_id=req_id,
            message=message,
            latency_ms=rng.randint(200, 5000) if "timeout" in message.lower() or "latency" in message.lower() else None,
        ))

    # Fill remaining slots with noise logs
    noise_count = max(0, batch_size - len(logs))
    noise_services = rng.choices(SERVICES, k=noise_count)
    for i, svc in enumerate(noise_services):
        logs.append(_noise_log(svc, base_time, base_offset + len(scenario_signals) + i))

    # Shuffle β€” signal should not always be first
    rng.shuffle(logs)
    return logs[:batch_size]


def generate_healthy_system_state(base_time: datetime) -> dict[str, ServiceStatus]:
    """Generate a fully healthy system state snapshot."""
    now = _make_timestamp(base_time)
    return {
        svc: ServiceStatus(
            name=svc,
            status="up",
            error_rate=round(random.uniform(0.001, 0.01), 4),
            latency_p99_ms=random.randint(20, 80),
            last_updated=now,
        )
        for svc in SERVICES
    }