ZeroTsai0308 commited on
Commit
1cc2ee5
·
verified ·
1 Parent(s): 350aeeb

Add sre_agent/tools/rca_tools.py

Browse files
Files changed (1) hide show
  1. sre_agent/tools/rca_tools.py +537 -0
sre_agent/tools/rca_tools.py ADDED
@@ -0,0 +1,537 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Root Cause Analysis Tools for SRE Agent
3
+
4
+ Implements multi-signal correlation and dependency-aware RCA.
5
+ Based on:
6
+ - AMER-RCL recursive reasoning pattern (arxiv:2601.02732)
7
+ - RCACopilot statistical pre-filter + LLM interpretation (arxiv:2507.03224)
8
+ - TrioXpert multi-dimensional system status representation (arxiv:2506.10043)
9
+ """
10
+
11
+ import json
12
+ from datetime import datetime, timedelta
13
+ from smolagents import Tool
14
+
15
+
16
+ class RootCauseCorrelatorTool(Tool):
17
+ """Multi-signal correlation engine for root cause analysis."""
18
+ name = "rca_correlator"
19
+ description = """Correlates anomalies across metrics, logs, traces, and alerts to determine root cause.
20
+
21
+ This is the PRIMARY RCA tool — feed it evidence from other tools (anomaly detector, log parser, etc.)
22
+ and it will:
23
+ 1. Temporally align anomalies across signals
24
+ 2. Score candidate root causes by evidence strength
25
+ 3. Identify the propagation path (which component failed first, what cascaded)
26
+ 4. Provide a ranked list of likely root causes with confidence scores
27
+
28
+ The evidence should be a JSON object with any combination of:
29
+ - "metrics": list of metric anomalies (from timeseries_anomaly_detector)
30
+ - "logs": list of log findings (from log_parser)
31
+ - "alerts": list of active alerts (from alert_summary)
32
+ - "changes": list of recent changes (from change_correlator)
33
+ - "topology": service dependency info (from service_dependency_analyzer)
34
+
35
+ Returns a ranked list of root cause hypotheses with evidence and confidence.
36
+ """
37
+ inputs = {
38
+ "evidence_json": {
39
+ "type": "string",
40
+ "description": "JSON object containing evidence from various signals. Keys can include: metrics, logs, alerts, changes, topology. Or 'auto' for a simulated incident.",
41
+ },
42
+ "service_name": {
43
+ "type": "string",
44
+ "description": "Primary service under investigation.",
45
+ "nullable": True,
46
+ },
47
+ "incident_description": {
48
+ "type": "string",
49
+ "description": "Brief description of the incident symptoms, e.g. 'High latency on checkout endpoint for last 30 minutes'.",
50
+ "nullable": True,
51
+ },
52
+ }
53
+ output_type = "string"
54
+
55
+ def _generate_sample_evidence(self, service_name: str) -> dict:
56
+ """Generate realistic multi-signal evidence for a simulated incident."""
57
+ return {
58
+ "metrics": [
59
+ {"metric": "cpu_utilization", "service": service_name, "anomaly_type": "spike", "value": 94.5, "threshold": 80, "timestamp": "2024-01-15T10:32:00Z", "severity": "critical"},
60
+ {"metric": "p99_latency_ms", "service": service_name, "anomaly_type": "spike", "value": 2500, "threshold": 500, "timestamp": "2024-01-15T10:33:00Z", "severity": "critical"},
61
+ {"metric": "error_rate", "service": service_name, "anomaly_type": "spike", "value": 12.5, "threshold": 1.0, "timestamp": "2024-01-15T10:33:30Z", "severity": "critical"},
62
+ {"metric": "memory_usage_pct", "service": "database-primary", "anomaly_type": "gradual_increase", "value": 91, "threshold": 85, "timestamp": "2024-01-15T10:25:00Z", "severity": "warning"},
63
+ {"metric": "connection_pool_active", "service": "database-primary", "anomaly_type": "spike", "value": 495, "threshold": 200, "timestamp": "2024-01-15T10:30:00Z", "severity": "critical"},
64
+ {"metric": "gc_pause_ms", "service": service_name, "anomaly_type": "spike", "value": 450, "threshold": 50, "timestamp": "2024-01-15T10:31:00Z", "severity": "warning"},
65
+ ],
66
+ "logs": [
67
+ {"service": "database-primary", "level": "ERROR", "message": "Connection pool exhausted: 500/500 connections active", "timestamp": "2024-01-15T10:30:15Z", "count": 47},
68
+ {"service": service_name, "level": "ERROR", "message": "Connection timeout to database after 30000ms", "timestamp": "2024-01-15T10:31:00Z", "count": 230},
69
+ {"service": service_name, "level": "CRITICAL", "message": "Circuit breaker OPEN for database-primary", "timestamp": "2024-01-15T10:32:00Z", "count": 15},
70
+ {"service": "api-gateway", "level": "ERROR", "message": f"Upstream {service_name} returned 503", "timestamp": "2024-01-15T10:32:30Z", "count": 180},
71
+ {"service": service_name, "level": "WARN", "message": "GC pause exceeded 200ms threshold", "timestamp": "2024-01-15T10:31:30Z", "count": 8},
72
+ ],
73
+ "alerts": [
74
+ {"name": f"{service_name}-high-error-rate", "severity": "critical", "fired_at": "2024-01-15T10:33:00Z", "status": "firing"},
75
+ {"name": "database-primary-connection-saturation", "severity": "critical", "fired_at": "2024-01-15T10:30:00Z", "status": "firing"},
76
+ {"name": f"{service_name}-high-latency", "severity": "warning", "fired_at": "2024-01-15T10:33:00Z", "status": "firing"},
77
+ ],
78
+ "changes": [
79
+ {"type": "deployment", "service": service_name, "version": "v2.4.1 → v2.5.0", "timestamp": "2024-01-15T10:00:00Z", "author": "ci-pipeline"},
80
+ {"type": "config_change", "service": "database-primary", "description": "max_connections reduced from 1000 to 500", "timestamp": "2024-01-15T09:45:00Z", "author": "dba-team"},
81
+ ],
82
+ "topology": {
83
+ "service": service_name,
84
+ "dependencies": [
85
+ {"name": "database-primary", "type": "database", "protocol": "tcp", "critical": True},
86
+ {"name": "cache-redis", "type": "cache", "protocol": "tcp", "critical": False},
87
+ {"name": "auth-service", "type": "service", "protocol": "grpc", "critical": True},
88
+ ],
89
+ "dependents": [
90
+ {"name": "api-gateway", "type": "gateway", "protocol": "http"},
91
+ {"name": "web-frontend", "type": "frontend", "protocol": "http"},
92
+ ],
93
+ },
94
+ }
95
+
96
+ def forward(
97
+ self,
98
+ evidence_json: str,
99
+ service_name: str = "unknown-service",
100
+ incident_description: str = "",
101
+ ) -> str:
102
+ if evidence_json.strip().lower() == "auto":
103
+ evidence = self._generate_sample_evidence(service_name)
104
+ print(f"[RCACorrelator] Using simulated incident evidence for '{service_name}'")
105
+ else:
106
+ evidence = json.loads(evidence_json)
107
+
108
+ print(f"[RCACorrelator] Analyzing evidence for service '{service_name}': {incident_description}")
109
+
110
+ # ── 1. Temporal alignment ──
111
+ all_events = []
112
+ for m in evidence.get("metrics", []):
113
+ all_events.append({"timestamp": m.get("timestamp", ""), "type": "metric", "source": m.get("service", ""), "detail": f"{m['metric']}={m['value']}", "severity": m.get("severity", "info")})
114
+ for l in evidence.get("logs", []):
115
+ all_events.append({"timestamp": l.get("timestamp", ""), "type": "log", "source": l.get("service", ""), "detail": l["message"], "severity": l.get("level", "INFO").lower()})
116
+ for a in evidence.get("alerts", []):
117
+ all_events.append({"timestamp": a.get("fired_at", ""), "type": "alert", "source": a.get("name", ""), "detail": a["name"], "severity": a.get("severity", "info")})
118
+ for c in evidence.get("changes", []):
119
+ all_events.append({"timestamp": c.get("timestamp", ""), "type": "change", "source": c.get("service", ""), "detail": c.get("description", c.get("version", "")), "severity": "info"})
120
+
121
+ all_events.sort(key=lambda x: x.get("timestamp", ""))
122
+
123
+ # ── 2. Root cause hypothesis generation ──
124
+ hypotheses = []
125
+
126
+ # Hypothesis: Recent deployment caused the issue
127
+ changes = evidence.get("changes", [])
128
+ deployments = [c for c in changes if c.get("type") == "deployment"]
129
+ config_changes = [c for c in changes if c.get("type") == "config_change"]
130
+
131
+ for deploy in deployments:
132
+ hypotheses.append({
133
+ "hypothesis": f"Recent deployment of {deploy.get('service', 'unknown')} ({deploy.get('version', 'unknown')}) introduced a regression",
134
+ "category": "deployment",
135
+ "evidence_for": [
136
+ f"Deployment at {deploy.get('timestamp', 'unknown')} preceded incident",
137
+ f"Service {deploy.get('service', 'unknown')} showing anomalies",
138
+ ],
139
+ "evidence_against": [],
140
+ "confidence": 0.0,
141
+ "remediation": f"Rollback {deploy.get('service', 'unknown')} to previous version",
142
+ })
143
+
144
+ for change in config_changes:
145
+ hypotheses.append({
146
+ "hypothesis": f"Configuration change on {change.get('service', 'unknown')}: {change.get('description', '')}",
147
+ "category": "config_change",
148
+ "evidence_for": [
149
+ f"Config changed at {change.get('timestamp', 'unknown')} preceded incident",
150
+ f"{change.get('description', 'Unknown change')}",
151
+ ],
152
+ "evidence_against": [],
153
+ "confidence": 0.0,
154
+ "remediation": f"Revert configuration change on {change.get('service', 'unknown')}",
155
+ })
156
+
157
+ # Hypothesis: Resource exhaustion
158
+ metrics = evidence.get("metrics", [])
159
+ resource_metrics = [m for m in metrics if m.get("metric") in ("cpu_utilization", "memory_usage_pct", "connection_pool_active", "disk_io_mbps")]
160
+ if resource_metrics:
161
+ # Find earliest resource anomaly
162
+ resource_metrics.sort(key=lambda x: x.get("timestamp", ""))
163
+ first_resource = resource_metrics[0]
164
+ hypotheses.append({
165
+ "hypothesis": f"Resource exhaustion on {first_resource.get('service', 'unknown')}: {first_resource['metric']} at {first_resource['value']}",
166
+ "category": "resource_exhaustion",
167
+ "evidence_for": [
168
+ f"{m['metric']} on {m.get('service', 'unknown')} at {m['value']} (threshold: {m.get('threshold', 'N/A')})"
169
+ for m in resource_metrics
170
+ ],
171
+ "evidence_against": [],
172
+ "confidence": 0.0,
173
+ "remediation": f"Scale up {first_resource.get('service', 'unknown')} or optimize resource usage",
174
+ })
175
+
176
+ # Hypothesis: Dependency failure
177
+ topology = evidence.get("topology", {})
178
+ deps = topology.get("dependencies", [])
179
+ for dep in deps:
180
+ dep_name = dep["name"]
181
+ dep_logs = [l for l in evidence.get("logs", []) if dep_name in l.get("service", "") or dep_name in l.get("message", "")]
182
+ dep_metrics = [m for m in metrics if dep_name in m.get("service", "")]
183
+ if dep_logs or dep_metrics:
184
+ hypotheses.append({
185
+ "hypothesis": f"Dependency failure: {dep_name} ({dep['type']}) is degraded, causing cascading failure",
186
+ "category": "dependency_failure",
187
+ "evidence_for": [
188
+ *[f"Log: {l['message'][:100]}" for l in dep_logs[:3]],
189
+ *[f"Metric: {m['metric']}={m['value']} on {dep_name}" for m in dep_metrics[:3]],
190
+ ],
191
+ "evidence_against": [],
192
+ "confidence": 0.0,
193
+ "remediation": f"Investigate {dep_name}, check circuit breakers, consider failover",
194
+ "critical_dependency": dep.get("critical", False),
195
+ })
196
+
197
+ # ── 3. Score hypotheses ──
198
+ for h in hypotheses:
199
+ score = 0.0
200
+ n_evidence = len(h["evidence_for"])
201
+
202
+ # Base score from evidence count
203
+ score += min(n_evidence * 0.15, 0.6)
204
+
205
+ # Boost for temporal precedence (changes before anomalies)
206
+ if h["category"] in ("deployment", "config_change"):
207
+ score += 0.2
208
+
209
+ # Boost for critical dependencies
210
+ if h.get("critical_dependency"):
211
+ score += 0.15
212
+
213
+ # Boost for resource exhaustion with multiple signals
214
+ if h["category"] == "resource_exhaustion" and n_evidence >= 3:
215
+ score += 0.2
216
+
217
+ h["confidence"] = round(min(score, 0.95), 2)
218
+
219
+ # Sort by confidence
220
+ hypotheses.sort(key=lambda x: x["confidence"], reverse=True)
221
+
222
+ # ── 4. Determine propagation path ──
223
+ propagation_path = []
224
+ for event in all_events:
225
+ if event["severity"] in ("critical", "error"):
226
+ propagation_path.append({
227
+ "timestamp": event["timestamp"],
228
+ "component": event["source"],
229
+ "signal_type": event["type"],
230
+ "detail": event["detail"][:100],
231
+ })
232
+
233
+ result = {
234
+ "incident": {
235
+ "service": service_name,
236
+ "description": incident_description,
237
+ "signals_analyzed": {
238
+ "metrics": len(evidence.get("metrics", [])),
239
+ "logs": len(evidence.get("logs", [])),
240
+ "alerts": len(evidence.get("alerts", [])),
241
+ "changes": len(evidence.get("changes", [])),
242
+ },
243
+ },
244
+ "root_cause_hypotheses": hypotheses,
245
+ "primary_hypothesis": hypotheses[0] if hypotheses else None,
246
+ "propagation_timeline": propagation_path[:20],
247
+ "blast_radius": {
248
+ "directly_affected": [service_name],
249
+ "indirectly_affected": [d["name"] for d in topology.get("dependents", [])],
250
+ "total_services_impacted": 1 + len(topology.get("dependents", [])),
251
+ },
252
+ "recommended_actions": [
253
+ {"priority": i + 1, "action": h["remediation"], "confidence": h["confidence"]}
254
+ for i, h in enumerate(hypotheses[:5])
255
+ ],
256
+ }
257
+
258
+ print(f"[RCACorrelator] Generated {len(hypotheses)} hypotheses. Primary: {hypotheses[0]['hypothesis'][:80]}..." if hypotheses else "[RCACorrelator] No hypotheses generated")
259
+ return json.dumps(result, indent=2)
260
+
261
+
262
+ class ServiceDependencyAnalyzerTool(Tool):
263
+ """Analyze service dependency topology for impact assessment."""
264
+ name = "service_dependency_analyzer"
265
+ description = """Analyzes service dependency topology to understand blast radius and failure propagation.
266
+
267
+ Given a service name, returns:
268
+ - Upstream and downstream dependencies
269
+ - Critical path analysis (which dependencies are on the critical path)
270
+ - Single points of failure
271
+ - Recommended investigation order for incidents
272
+
273
+ Use this EARLY in incident investigation to understand which services to check.
274
+ """
275
+ inputs = {
276
+ "service_name": {
277
+ "type": "string",
278
+ "description": "Service to analyze, e.g. 'payment-service'.",
279
+ },
280
+ "topology_json": {
281
+ "type": "string",
282
+ "description": "Optional: JSON service topology. If 'auto', uses a simulated microservice topology.",
283
+ "nullable": True,
284
+ },
285
+ }
286
+ output_type = "string"
287
+
288
+ def forward(self, service_name: str, topology_json: str = "auto") -> str:
289
+ if topology_json.strip().lower() == "auto":
290
+ # Simulated microservice topology
291
+ topology = {
292
+ "api-gateway": {"deps": ["auth-service", "user-service", "order-service", "payment-service"], "type": "gateway"},
293
+ "web-frontend": {"deps": ["api-gateway"], "type": "frontend"},
294
+ "mobile-bff": {"deps": ["api-gateway"], "type": "frontend"},
295
+ "auth-service": {"deps": ["user-db", "cache-redis", "jwt-signer"], "type": "service"},
296
+ "user-service": {"deps": ["user-db", "cache-redis"], "type": "service"},
297
+ "order-service": {"deps": ["order-db", "payment-service", "inventory-service", "notification-service"], "type": "service"},
298
+ "payment-service": {"deps": ["payment-db", "payment-gateway-ext", "fraud-detection"], "type": "service"},
299
+ "inventory-service": {"deps": ["inventory-db", "cache-redis"], "type": "service"},
300
+ "notification-service": {"deps": ["email-provider-ext", "sms-provider-ext", "kafka"], "type": "service"},
301
+ "fraud-detection": {"deps": ["ml-model-service", "fraud-db"], "type": "service"},
302
+ "ml-model-service": {"deps": ["model-store-s3"], "type": "service"},
303
+ "user-db": {"deps": [], "type": "database"},
304
+ "order-db": {"deps": [], "type": "database"},
305
+ "payment-db": {"deps": [], "type": "database"},
306
+ "inventory-db": {"deps": [], "type": "database"},
307
+ "fraud-db": {"deps": [], "type": "database"},
308
+ "cache-redis": {"deps": [], "type": "cache"},
309
+ "kafka": {"deps": [], "type": "queue"},
310
+ "jwt-signer": {"deps": [], "type": "infrastructure"},
311
+ "payment-gateway-ext": {"deps": [], "type": "external"},
312
+ "email-provider-ext": {"deps": [], "type": "external"},
313
+ "sms-provider-ext": {"deps": [], "type": "external"},
314
+ "model-store-s3": {"deps": [], "type": "storage"},
315
+ }
316
+ else:
317
+ topology = json.loads(topology_json)
318
+
319
+ print(f"[DependencyAnalyzer] Analyzing topology for '{service_name}' ({len(topology)} services)")
320
+
321
+ # Find direct dependencies
322
+ service_info = topology.get(service_name, {"deps": [], "type": "unknown"})
323
+ direct_deps = service_info.get("deps", [])
324
+
325
+ # Find transitive dependencies (BFS)
326
+ transitive_deps = set()
327
+ queue = list(direct_deps)
328
+ visited = set()
329
+ while queue:
330
+ current = queue.pop(0)
331
+ if current in visited:
332
+ continue
333
+ visited.add(current)
334
+ transitive_deps.add(current)
335
+ if current in topology:
336
+ for dep in topology[current].get("deps", []):
337
+ if dep not in visited:
338
+ queue.append(dep)
339
+
340
+ # Find reverse dependencies (who depends on this service)
341
+ dependents = []
342
+ transitive_dependents = set()
343
+ for svc, info in topology.items():
344
+ if service_name in info.get("deps", []):
345
+ dependents.append(svc)
346
+
347
+ # Transitive dependents (BFS reverse)
348
+ queue = list(dependents)
349
+ visited = set()
350
+ while queue:
351
+ current = queue.pop(0)
352
+ if current in visited:
353
+ continue
354
+ visited.add(current)
355
+ transitive_dependents.add(current)
356
+ for svc, info in topology.items():
357
+ if current in info.get("deps", []) and svc not in visited:
358
+ queue.append(svc)
359
+
360
+ # Single points of failure
361
+ spofs = []
362
+ for dep in direct_deps:
363
+ dep_info = topology.get(dep, {})
364
+ if dep_info.get("type") in ("database", "external"):
365
+ spofs.append({"service": dep, "type": dep_info.get("type"), "reason": f"Single {dep_info.get('type')} dependency with no failover"})
366
+
367
+ # Critical path (dependencies that are on the path of all dependents)
368
+ critical_deps = [dep for dep in direct_deps if topology.get(dep, {}).get("type") in ("database", "infrastructure")]
369
+
370
+ result = {
371
+ "service": service_name,
372
+ "service_type": service_info.get("type", "unknown"),
373
+ "direct_dependencies": [
374
+ {"name": d, "type": topology.get(d, {}).get("type", "unknown")}
375
+ for d in direct_deps
376
+ ],
377
+ "transitive_dependencies": {
378
+ "count": len(transitive_deps),
379
+ "services": sorted(transitive_deps),
380
+ },
381
+ "direct_dependents": [
382
+ {"name": d, "type": topology.get(d, {}).get("type", "unknown")}
383
+ for d in dependents
384
+ ],
385
+ "blast_radius": {
386
+ "direct_impact": len(dependents),
387
+ "transitive_impact": len(transitive_dependents),
388
+ "affected_services": sorted(transitive_dependents),
389
+ },
390
+ "single_points_of_failure": spofs,
391
+ "critical_dependencies": critical_deps,
392
+ "investigation_order": [
393
+ {"step": 1, "action": f"Check {service_name} health, metrics, and recent deployments"},
394
+ *[{"step": i + 2, "action": f"Check dependency: {dep} ({topology.get(dep, {}).get('type', 'unknown')})"} for i, dep in enumerate(direct_deps)],
395
+ {"step": len(direct_deps) + 2, "action": f"Check impact on dependents: {', '.join(dependents[:5])}"},
396
+ ],
397
+ }
398
+
399
+ print(f"[DependencyAnalyzer] {service_name} has {len(direct_deps)} deps, {len(dependents)} dependents, blast radius: {len(transitive_dependents)} services")
400
+ return json.dumps(result, indent=2)
401
+
402
+
403
+ class ChangeCorrelationTool(Tool):
404
+ """Correlate recent changes with incident timing."""
405
+ name = "change_correlator"
406
+ description = """Correlates recent infrastructure/code changes with incident timing to identify change-induced failures.
407
+
408
+ Checks:
409
+ - Recent deployments (code releases, container updates)
410
+ - Configuration changes (env vars, feature flags, resource limits)
411
+ - Infrastructure changes (scaling events, DNS changes, cert rotations)
412
+ - Dependency updates (library versions, API version changes)
413
+
414
+ Ranks changes by temporal proximity to incident and likelihood of causing the observed symptoms.
415
+ """
416
+ inputs = {
417
+ "incident_time": {
418
+ "type": "string",
419
+ "description": "When the incident started (ISO8601), e.g. '2024-01-15T10:30:00Z'.",
420
+ },
421
+ "service_name": {
422
+ "type": "string",
423
+ "description": "Service experiencing the incident.",
424
+ "nullable": True,
425
+ },
426
+ "lookback_hours": {
427
+ "type": "integer",
428
+ "description": "How many hours before incident to check for changes. Default: 24.",
429
+ "nullable": True,
430
+ },
431
+ "changes_json": {
432
+ "type": "string",
433
+ "description": "Optional: JSON array of recent changes. Or 'auto' for simulated change log.",
434
+ "nullable": True,
435
+ },
436
+ }
437
+ output_type = "string"
438
+
439
+ def forward(
440
+ self,
441
+ incident_time: str,
442
+ service_name: str = "unknown-service",
443
+ lookback_hours: int = 24,
444
+ changes_json: str = "auto",
445
+ ) -> str:
446
+ if changes_json.strip().lower() == "auto":
447
+ changes = [
448
+ {"type": "deployment", "service": service_name, "version": "v2.4.1 → v2.5.0", "timestamp": "2024-01-15T10:00:00Z", "author": "ci-pipeline", "description": "Added new payment processing endpoint with connection pooling changes"},
449
+ {"type": "config_change", "service": "database-primary", "timestamp": "2024-01-15T09:45:00Z", "author": "dba-team", "description": "Reduced max_connections from 1000 to 500 for memory optimization"},
450
+ {"type": "scaling", "service": service_name, "timestamp": "2024-01-15T08:00:00Z", "author": "hpa", "description": "Auto-scaled from 3 to 5 replicas due to morning traffic ramp"},
451
+ {"type": "deployment", "service": "auth-service", "version": "v1.8.0 → v1.8.1", "timestamp": "2024-01-15T07:00:00Z", "author": "ci-pipeline", "description": "Security patch for JWT validation"},
452
+ {"type": "config_change", "service": "api-gateway", "timestamp": "2024-01-14T22:00:00Z", "author": "platform-team", "description": "Updated rate limit from 1000 to 2000 req/s"},
453
+ {"type": "cert_rotation", "service": "tls-ingress", "timestamp": "2024-01-14T20:00:00Z", "author": "cert-manager", "description": "Automated TLS certificate rotation"},
454
+ {"type": "dependency_update", "service": service_name, "timestamp": "2024-01-14T16:00:00Z", "author": "dependabot", "description": "Updated database driver from 4.2.1 to 5.0.0 (breaking API changes)"},
455
+ ]
456
+ else:
457
+ changes = json.loads(changes_json)
458
+
459
+ try:
460
+ incident_dt = datetime.fromisoformat(incident_time.replace("Z", "+00:00"))
461
+ except ValueError:
462
+ incident_dt = datetime.utcnow()
463
+
464
+ print(f"[ChangeCorrelator] Correlating changes within {lookback_hours}h before incident at {incident_time}")
465
+
466
+ # Score each change
467
+ scored_changes = []
468
+ for change in changes:
469
+ try:
470
+ change_dt = datetime.fromisoformat(change["timestamp"].replace("Z", "+00:00"))
471
+ except ValueError:
472
+ continue
473
+
474
+ time_diff = (incident_dt - change_dt).total_seconds() / 3600 # hours
475
+
476
+ if time_diff < 0 or time_diff > lookback_hours:
477
+ continue
478
+
479
+ # Scoring
480
+ score = 0.0
481
+
482
+ # Temporal proximity (closer = higher score)
483
+ if time_diff < 1:
484
+ score += 0.4
485
+ elif time_diff < 4:
486
+ score += 0.3
487
+ elif time_diff < 12:
488
+ score += 0.15
489
+ else:
490
+ score += 0.05
491
+
492
+ # Change type risk
493
+ type_risk = {
494
+ "deployment": 0.3,
495
+ "config_change": 0.25,
496
+ "dependency_update": 0.25,
497
+ "scaling": 0.1,
498
+ "cert_rotation": 0.05,
499
+ "feature_flag": 0.2,
500
+ }
501
+ score += type_risk.get(change.get("type", ""), 0.1)
502
+
503
+ # Same service bonus
504
+ if change.get("service", "") == service_name:
505
+ score += 0.2
506
+
507
+ # Breaking change keyword detection
508
+ desc = change.get("description", "").lower()
509
+ risk_keywords = ["breaking", "major", "migration", "reduced", "limit", "pooling", "connection", "auth", "security"]
510
+ keyword_hits = sum(1 for kw in risk_keywords if kw in desc)
511
+ score += min(keyword_hits * 0.05, 0.15)
512
+
513
+ scored_changes.append({
514
+ **change,
515
+ "hours_before_incident": round(time_diff, 2),
516
+ "risk_score": round(min(score, 0.95), 2),
517
+ "risk_level": "high" if score > 0.6 else "medium" if score > 0.3 else "low",
518
+ })
519
+
520
+ scored_changes.sort(key=lambda x: x["risk_score"], reverse=True)
521
+
522
+ result = {
523
+ "incident_time": incident_time,
524
+ "service": service_name,
525
+ "lookback_hours": lookback_hours,
526
+ "changes_found": len(scored_changes),
527
+ "changes_ranked_by_risk": scored_changes,
528
+ "highest_risk_change": scored_changes[0] if scored_changes else None,
529
+ "recommendation": (
530
+ f"INVESTIGATE: {scored_changes[0]['type']} on {scored_changes[0].get('service', 'unknown')} at {scored_changes[0]['timestamp']} (risk score: {scored_changes[0]['risk_score']})"
531
+ if scored_changes and scored_changes[0]["risk_score"] > 0.5
532
+ else "No high-risk changes found in the lookback window."
533
+ ),
534
+ }
535
+
536
+ print(f"[ChangeCorrelator] Found {len(scored_changes)} changes, highest risk: {scored_changes[0]['risk_score'] if scored_changes else 'N/A'}")
537
+ return json.dumps(result, indent=2)