chaosops / agents /llm_adapter.py
helloAK96's picture
Initializing space
83136ac
"""Prompt & parser layer between LLM outputs and ``ChaosOpsAction``.
This module is the ONLY place where observation-to-string and
string-to-action conversion happens. Keeping it isolated means:
* The same adapter works for Unsloth + TRL GRPO training and for inference
via the OpenEnv HTTP client.
* Prompt iteration (add/remove fields, tweak phrasing) never touches the
simulator, reward function, or environment code.
* Tests can exercise the parser against fixed action strings without ever
loading a model.
Public API, in layers:
* Rendering: ``render_observation``, ``build_prompt``
* Parsing: ``parse_action`` (single string), ``StreamingActionParser`` (token stream)
* Structured-output schemas: ``ACTION_JSON_SCHEMA``, ``openai_tool_spec``,
``anthropic_tool_spec``
* Provider builders: ``build_openai_messages``, ``build_anthropic_messages``
* Robustness: ``generate_action_with_retry`` for retry + fallback parsing
"""
from __future__ import annotations
import json
import logging
import re
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable
from chaosops.env.models import (
ActionType,
AgentRole,
ChaosOpsAction,
ChaosOpsObservation,
FailureType,
RoleView,
)
_LOG = logging.getLogger(__name__)
PROMPT_DIR = Path(__file__).parent / "prompts"
ROLE_PROMPT_FILES: dict[AgentRole, str] = {
AgentRole.SRE: "sre.md",
AgentRole.DEV: "dev.md",
AgentRole.MANAGER: "manager.md",
AgentRole.OVERSIGHT: "oversight.md",
}
# ---------------------------------------------------------------------------
# Prompt construction
# ---------------------------------------------------------------------------
def load_role_prompt(role: AgentRole) -> str:
path = PROMPT_DIR / ROLE_PROMPT_FILES[role]
if not path.exists():
return ""
return path.read_text().strip()
def render_observation(view: RoleView, *, step: int) -> str:
"""Render a ``RoleView`` as a compact text prompt body.
Compact because each role turn eats into the 8B model's context budget
five times per episode; we want the entire trajectory to fit in <1,200
tokens. Numbers are rounded and logs are bulleted.
"""
lines: list[str] = [f"STEP {step}", f"ROLE {view.role.value.upper()}"]
if view.visible_metrics:
lines.append("METRICS")
for name, m in view.visible_metrics.items():
lines.append(
f" {name}: cpu={m.cpu_pct:.0f}% mem={m.memory_mb:.0f}MB "
f"lat={m.latency_ms:.0f}ms err={m.error_rate:.0%} "
f"repl={m.replicas} health={m.health.value}"
)
if view.visible_alerts:
lines.append("ALERTS")
for a in view.visible_alerts[-4:]:
lines.append(f" [{a.severity}] {a.service.value}: {a.message}")
if view.visible_logs:
lines.append("LOGS")
for log in view.visible_logs[-4:]:
lines.append(f" ({log.level}) {log.service.value}: {log.message}")
if view.visible_fleet_actions:
lines.append("FLEET_ACTIONS")
for fa in view.visible_fleet_actions[-6:]:
lines.append(
f" step {fa.step}: {fa.agent_name} {fa.action} -> {fa.target}"
)
if view.shared_chat:
lines.append("CHAT")
for msg in view.shared_chat[-6:]:
lines.append(f" {msg}")
if view.private_inbox:
lines.append("PRIVATE_INBOX")
for msg in view.private_inbox[-4:]:
lines.append(f" {msg}")
if view.private_note:
lines.append(f"NOTE {view.private_note}")
lines.append(
"RESPOND with JSON only: "
'{"action_type": "<type>", "target": "<service|agent>", "args": {...}}'
)
lines.append(
"For private messages use args.to=\"sre|dev|manager|oversight\"; "
"default communicate broadcasts to all roles."
)
return "\n".join(lines)
def build_prompt(
obs: ChaosOpsObservation, *, system_prompt: str | None = None
) -> str:
role_prompt = system_prompt or load_role_prompt(obs.turn_role)
body = render_observation(obs.view, step=obs.step)
return f"{role_prompt}\n\n{body}" if role_prompt else body
# ---------------------------------------------------------------------------
# Parser
# ---------------------------------------------------------------------------
_JSON_BLOCK = re.compile(r"\{[^{}]*\}")
_VALID_ACTION_STRINGS: dict[str, ActionType] = {a.value: a for a in ActionType}
def parse_action(
raw: str, *, role: AgentRole, fallback: ActionType = ActionType.NOOP
) -> ChaosOpsAction:
"""Extract a ``ChaosOpsAction`` from an LLM output string.
The parser tolerates:
* leading/trailing chatter around a JSON block
* unknown action types (falls back to ``NOOP``)
* missing ``target`` or ``args``
* integer-valued replicas encoded as strings
What it won't do: silently turn a malformed output into a confident
action. If nothing parses, it returns a NOOP (and the reward function
logs a miscommunication penalty upstream).
"""
payload = _extract_json(raw)
if payload is None:
return ChaosOpsAction(role=role, action_type=fallback)
action_type_raw = str(payload.get("action_type", "")).strip().lower()
action_type = _VALID_ACTION_STRINGS.get(action_type_raw, fallback)
target = payload.get("target")
if target is not None:
target = str(target)
args = payload.get("args")
if not isinstance(args, Mapping):
args = {}
coerced_args: dict[str, object] = {}
for key, value in args.items():
if key == "replicas":
try:
coerced_args["replicas"] = int(value)
except (TypeError, ValueError):
continue
elif key == "failure_type":
try:
coerced_args["failure_type"] = FailureType(str(value)).value
except ValueError:
continue
else:
coerced_args[str(key)] = value
return ChaosOpsAction(
role=role, action_type=action_type, target=target, args=coerced_args
)
def _extract_json(raw: str) -> dict[str, object] | None:
raw = raw.strip()
# Fast path — entire output is JSON.
for candidate in _iter_json_candidates(raw):
try:
parsed = json.loads(candidate)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
return None
def _iter_json_candidates(raw: str):
if raw.startswith("{") and raw.endswith("}"):
yield raw
# Code fences.
for match in re.finditer(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", raw):
yield match.group(1)
# Brace-matched candidates (handles nested ``args`` objects).
yield from _iter_balanced_braces(raw)
# Last-ditch: bare non-nested braces (kept for backward compat with
# older test fixtures that relied on this path).
for match in _JSON_BLOCK.finditer(raw):
yield match.group(0)
def _iter_balanced_braces(raw: str):
"""Yield every top-level balanced ``{...}`` block, string-aware."""
depth = 0
start = -1
in_string = False
escape = False
for i, ch in enumerate(raw):
if escape:
escape = False
continue
if in_string:
if ch == "\\":
escape = True
elif ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
continue
if ch == "{":
if depth == 0:
start = i
depth += 1
elif ch == "}":
if depth > 0:
depth -= 1
if depth == 0 and start >= 0:
yield raw[start : i + 1]
start = -1
def action_to_training_target(action: ChaosOpsAction) -> str:
"""Canonical serialization of an action — used as the supervised target
when running SFT warm-up before GRPO, and as the greedy decoding target
for the oracle-trajectory teacher."""
return json.dumps(
{
"action_type": action.action_type.value,
"target": action.target,
"args": action.args,
},
sort_keys=True,
)
# ---------------------------------------------------------------------------
# Function-calling / structured-output schema
# ---------------------------------------------------------------------------
#: JSON Schema describing a ``ChaosOpsAction`` payload. Shared between OpenAI
#: function-calling, Anthropic tool use, and local validators so the action
#: vocabulary has a single source of truth.
ACTION_JSON_SCHEMA: dict[str, Any] = {
"type": "object",
"additionalProperties": False,
"required": ["action_type"],
"properties": {
"action_type": {
"type": "string",
"enum": [a.value for a in ActionType],
"description": "Which action to take this turn.",
},
"target": {
"type": ["string", "null"],
"description": (
"Target service or fleet-agent name (e.g., 'db', "
"'autoscaler'). Omit or null for NOOP / COMMUNICATE."
),
},
"args": {
"type": "object",
"description": (
"Action-specific parameters. "
"communicate: {to?: role, message: str}. "
"scale: {replicas: int}. "
"identify_root_cause: {failure_type: FailureType}."
),
"additionalProperties": True,
"properties": {
"to": {
"type": "string",
"enum": [r.value for r in AgentRole],
"description": "Private recipient role for communicate.",
},
"message": {"type": "string"},
"replicas": {"type": "integer", "minimum": 0},
"failure_type": {
"type": "string",
"enum": [f.value for f in FailureType],
},
},
},
},
}
def openai_tool_spec(name: str = "chaosops_action") -> dict[str, Any]:
"""Return an OpenAI ``tools[]`` entry for structured action output."""
return {
"type": "function",
"function": {
"name": name,
"description": (
"Emit exactly one ChaosOps incident-response action for the "
"current turn."
),
"parameters": ACTION_JSON_SCHEMA,
},
}
def anthropic_tool_spec(name: str = "chaosops_action") -> dict[str, Any]:
"""Return an Anthropic ``tools[]`` entry for structured action output."""
return {
"name": name,
"description": (
"Emit exactly one ChaosOps incident-response action for the "
"current turn."
),
"input_schema": ACTION_JSON_SCHEMA,
}
# ---------------------------------------------------------------------------
# Provider-specific prompt builders
# ---------------------------------------------------------------------------
def build_openai_messages(
obs: ChaosOpsObservation, *, system_prompt: str | None = None
) -> list[dict[str, str]]:
"""Build an OpenAI chat-completions ``messages`` array.
The system message carries the role-specific prompt; the user message
carries the rendered observation. Pair with :func:`openai_tool_spec` and
``tool_choice={"type": "function", "function": {"name": ...}}`` to force
structured output.
"""
system = system_prompt or load_role_prompt(obs.turn_role)
body = render_observation(obs.view, step=obs.step)
messages: list[dict[str, str]] = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": body})
return messages
def build_anthropic_messages(
obs: ChaosOpsObservation, *, system_prompt: str | None = None
) -> dict[str, Any]:
"""Build an Anthropic ``messages.create`` payload fragment.
Anthropic separates ``system`` from ``messages``; returning both in one
dict keeps call-sites ergonomic: ``client.messages.create(**payload, ...)``.
"""
system = system_prompt or load_role_prompt(obs.turn_role)
body = render_observation(obs.view, step=obs.step)
return {
"system": system or "",
"messages": [{"role": "user", "content": body}],
}
def parse_openai_response(
response: Any, *, role: AgentRole, fallback: ActionType = ActionType.NOOP
) -> ChaosOpsAction:
"""Extract an action from an OpenAI chat-completions response.
Handles both paths: tool/function call arguments (preferred) and plain
``content`` text. Accepts either the SDK object or a plain dict.
"""
message = _openai_first_message(response)
if message is None:
return ChaosOpsAction(role=role, action_type=fallback)
tool_calls = _get(message, "tool_calls") or []
for call in tool_calls:
fn = _get(call, "function") or {}
args_raw = _get(fn, "arguments")
if args_raw:
action = _action_from_json_str(args_raw, role=role, fallback=fallback)
if action.action_type is not fallback or args_raw.strip():
return action
content = _get(message, "content") or ""
if isinstance(content, list):
content = "".join(
str(_get(c, "text") or "") for c in content if _get(c, "type") == "text"
)
return parse_action(str(content), role=role, fallback=fallback)
def parse_anthropic_response(
response: Any, *, role: AgentRole, fallback: ActionType = ActionType.NOOP
) -> ChaosOpsAction:
"""Extract an action from an Anthropic ``messages.create`` response."""
content = _get(response, "content") or []
text_chunks: list[str] = []
for block in content:
block_type = _get(block, "type")
if block_type == "tool_use":
payload = _get(block, "input") or {}
if isinstance(payload, Mapping):
return _action_from_mapping(payload, role=role, fallback=fallback)
elif block_type == "text":
text_chunks.append(str(_get(block, "text") or ""))
return parse_action("\n".join(text_chunks), role=role, fallback=fallback)
def _openai_first_message(response: Any) -> Any:
choices = _get(response, "choices") or []
if not choices:
return None
first = choices[0]
return _get(first, "message") or first
def _get(obj: Any, key: str) -> Any:
if obj is None:
return None
if isinstance(obj, Mapping):
return obj.get(key)
return getattr(obj, key, None)
def _action_from_json_str(
raw: str, *, role: AgentRole, fallback: ActionType
) -> ChaosOpsAction:
try:
payload = json.loads(raw)
except (TypeError, json.JSONDecodeError):
return parse_action(raw, role=role, fallback=fallback)
if not isinstance(payload, Mapping):
return ChaosOpsAction(role=role, action_type=fallback)
return _action_from_mapping(payload, role=role, fallback=fallback)
def _action_from_mapping(
payload: Mapping[str, Any],
*,
role: AgentRole,
fallback: ActionType,
) -> ChaosOpsAction:
return parse_action(json.dumps(dict(payload)), role=role, fallback=fallback)
# ---------------------------------------------------------------------------
# Streaming parser — consumes tokens, yields an action once JSON closes
# ---------------------------------------------------------------------------
@dataclass
class StreamingActionParser:
"""Incremental parser for token streams.
Feed tokens one-by-one via :meth:`feed`; once a balanced top-level JSON
object closes, ``finished`` flips to True and :meth:`action` returns the
parsed :class:`ChaosOpsAction`. Safe to call :meth:`action` before the
stream finishes — it falls back through :func:`parse_action`.
Handles:
* leading chatter before the first ``{``
* nested braces (``args`` is an object)
* strings containing braces (``"}"`` inside quoted message bodies)
* escaped characters inside strings (``\\"``)
"""
role: AgentRole
fallback: ActionType = ActionType.NOOP
_buf: list[str] = field(default_factory=list)
_depth: int = 0
_started: bool = False
_in_string: bool = False
_escape: bool = False
_closed: bool = False
def feed(self, chunk: str) -> bool:
"""Append ``chunk`` to the stream. Returns True once JSON closes."""
if self._closed or not chunk:
return self._closed
for ch in chunk:
if not self._started:
if ch == "{":
self._started = True
self._depth = 1
self._buf.append(ch)
continue
self._buf.append(ch)
if self._escape:
self._escape = False
continue
if ch == "\\" and self._in_string:
self._escape = True
continue
if ch == '"':
self._in_string = not self._in_string
continue
if self._in_string:
continue
if ch == "{":
self._depth += 1
elif ch == "}":
self._depth -= 1
if self._depth == 0:
self._closed = True
return True
return False
@property
def finished(self) -> bool:
return self._closed
@property
def raw(self) -> str:
return "".join(self._buf)
def action(self) -> ChaosOpsAction:
"""Return the parsed action, falling back to NOOP on failure."""
return parse_action(self.raw, role=self.role, fallback=self.fallback)
def parse_streaming_action(
chunks: Iterable[str],
*,
role: AgentRole,
fallback: ActionType = ActionType.NOOP,
) -> ChaosOpsAction:
"""Consume a token iterator and return the parsed action."""
parser = StreamingActionParser(role=role, fallback=fallback)
for chunk in chunks:
if parser.feed(chunk):
break
return parser.action()
# ---------------------------------------------------------------------------
# Retry + fallback wrapper
# ---------------------------------------------------------------------------
GenerateFn = Callable[[str], str]
"""A caller-supplied function: ``prompt -> raw model output``.
Kept provider-agnostic so the same retry wrapper can front OpenAI,
Anthropic, a local vLLM endpoint, or a deterministic stub in tests.
"""
def generate_action_with_retry(
prompt: str,
*,
role: AgentRole,
generate: GenerateFn,
max_attempts: int = 3,
fallback: ActionType = ActionType.NOOP,
reminder: str = (
"Previous reply did not contain a valid JSON action. "
"Respond with ONLY a JSON object matching the schema."
),
) -> ChaosOpsAction:
"""Call ``generate`` up to ``max_attempts`` times; return a valid action.
On each failed attempt (parser returns the raw fallback action with no
target/args) we append a reminder to the prompt and retry. If every
attempt fails — including provider exceptions — we return a NOOP rather
than crash the episode; downstream reward logic will charge the penalty.
"""
if max_attempts < 1:
max_attempts = 1
current_prompt = prompt
last_raw = ""
for attempt in range(1, max_attempts + 1):
try:
last_raw = generate(current_prompt) or ""
except Exception as exc: # noqa: BLE001 — provider errors vary widely
_LOG.warning("LLM generation attempt %d failed: %s", attempt, exc)
current_prompt = f"{prompt}\n\n{reminder}"
continue
action = parse_action(last_raw, role=role, fallback=fallback)
if _is_usable_action(action, last_raw):
return action
current_prompt = f"{prompt}\n\n{reminder}\n\nPrevious output:\n{last_raw[:500]}"
_LOG.info(
"LLM retry budget exhausted for role=%s; falling back to %s",
role.value,
fallback.value,
)
return ChaosOpsAction(role=role, action_type=fallback)
def _is_usable_action(action: ChaosOpsAction, raw: str) -> bool:
"""Heuristic: did the parser actually find structured content?
A bare fallback (``NOOP`` with no target, no args, and no JSON in the
raw output) means the model produced nothing parseable — retry-worthy.
A deliberate NOOP with an explicit JSON block is fine.
"""
if action.target or action.args:
return True
if action.action_type is not ActionType.NOOP:
return True
return "{" in raw and "}" in raw