Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """A2A (Agent-to-Agent) Protocol implementation for SENTINEL. | |
| Implements the Google A2A protocol so that SENTINEL can: | |
| 1. Be discovered by other agents via ``/.well-known/agent.json`` | |
| 2. Receive assessment tasks from an AgentBeats Assessor (Green Agent) | |
| 3. Send results back via standard JSON-RPC 2.0 | |
| Architecture (matches workshop AgentBeats slide): | |
| Assessor (Green Agent) β A2A: send task β SENTINEL (Purple Agent) | |
| SENTINEL processes the task β A2A: return result | |
| Communication Protocols: | |
| - A2A Protocol β agent-to-agent structured communication (JSON-RPC 2.0) | |
| - MCP β environment tools exposed as callable functions (see mcp_server.py) | |
| - Human-understandable instructions β natural language task descriptions | |
| References: | |
| - https://google.github.io/A2A/ | |
| - Workshop slide: "Introducing AgentBeats: Agentified Agent Assessment" | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| from enum import Enum | |
| from typing import Any, Dict, List, Optional | |
| from fastapi import APIRouter, Request | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel, Field | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # A2A Agent Card β served at /.well-known/agent.json | |
| # --------------------------------------------------------------------------- | |
| AGENT_CARD = { | |
| "name": "SENTINEL Oversight Command", | |
| "description": ( | |
| "An AI oversight agent that supervises worker agents during production " | |
| "incident response. SENTINEL intercepts proposed actions, evaluates them " | |
| "against constitutional principles and counterfactual risk, and decides " | |
| "whether to APPROVE, BLOCK, REDIRECT, REASSIGN, or FLAG each action." | |
| ), | |
| "version": "1.0.0", | |
| "url": "https://srikrishna2005-openenv.hf.space", | |
| "provider": { | |
| "organization": "SENTINEL OpenEnv", | |
| "url": "https://srikrishna2005-openenv.hf.space", | |
| }, | |
| "capabilities": { | |
| "streaming": False, | |
| "pushNotifications": False, | |
| "stateTransitionHistory": True, | |
| }, | |
| "authentication": { | |
| "schemes": ["none"], | |
| }, | |
| "defaultInputModes": ["text", "application/json"], | |
| "defaultOutputModes": ["text", "application/json"], | |
| "skills": [ | |
| { | |
| "id": "oversight-basic", | |
| "name": "Basic Oversight", | |
| "description": ( | |
| "Evaluate worker agent proposals for safety, accuracy, and " | |
| "constitutional alignment. Detect hallucinations, scope violations, " | |
| "and premature actions." | |
| ), | |
| "tags": ["ai-safety", "oversight", "incident-response"], | |
| "examples": [ | |
| "Evaluate whether this agent action is safe to execute", | |
| "Check if the proposed remediation has sufficient evidence", | |
| "Detect if the worker is hallucinating a target service", | |
| ], | |
| }, | |
| { | |
| "id": "oversight-adversarial", | |
| "name": "Adversarial Worker Detection", | |
| "description": ( | |
| "Detect reward hacking, confidence washing, escalation bombing, " | |
| "and other adversarial misbehaviors in worker agent proposals." | |
| ), | |
| "tags": ["ai-safety", "adversarial", "misbehavior-detection"], | |
| "examples": [ | |
| "Detect if a worker is gaming the reward function", | |
| "Identify if confidence levels are artificially inflated", | |
| "Check for disproportionate escalation patterns", | |
| ], | |
| }, | |
| { | |
| "id": "oversight-fleet", | |
| "name": "Fleet Monitoring & Multi-Crisis", | |
| "description": ( | |
| "Coordinate oversight across multiple concurrent worker agents " | |
| "handling multi-crisis scenarios with conflicting priorities." | |
| ), | |
| "tags": ["fleet-management", "multi-agent", "coordination"], | |
| "examples": [ | |
| "Manage oversight for 4 workers handling a cascading outage", | |
| "Resolve conflicting remediation proposals from different workers", | |
| "Coordinate a multi-team escalation during a critical incident", | |
| ], | |
| }, | |
| ], | |
| } | |
| # --------------------------------------------------------------------------- | |
| # A2A Task management | |
| # --------------------------------------------------------------------------- | |
| class TaskState(str, Enum): | |
| SUBMITTED = "submitted" | |
| WORKING = "working" | |
| INPUT_REQUIRED = "input-required" | |
| COMPLETED = "completed" | |
| CANCELED = "canceled" | |
| FAILED = "failed" | |
| class A2ATask: | |
| """Tracks the lifecycle of an A2A task.""" | |
| def __init__(self, task_id: str, message: Dict[str, Any]): | |
| self.id = task_id | |
| self.state = TaskState.SUBMITTED | |
| self.message = message | |
| self.result: Optional[Dict[str, Any]] = None | |
| self.history: List[Dict[str, Any]] = [] | |
| self.created_at = time.time() | |
| self.updated_at = time.time() | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "id": self.id, | |
| "status": { | |
| "state": self.state.value, | |
| "timestamp": self.updated_at, | |
| }, | |
| "artifacts": [self.result] if self.result else [], | |
| "history": self.history, | |
| } | |
| _A2A_TASKS: Dict[str, A2ATask] = {} | |
| # --------------------------------------------------------------------------- | |
| # Task execution β using SENTINEL environment | |
| # --------------------------------------------------------------------------- | |
| def _execute_task(task: A2ATask) -> None: | |
| """Execute an A2A task by running a SENTINEL oversight episode.""" | |
| from sentinel.environment import SentinelEnv | |
| task.state = TaskState.WORKING | |
| task.updated_at = time.time() | |
| task.history.append({ | |
| "state": TaskState.WORKING.value, | |
| "timestamp": task.updated_at, | |
| "message": "Processing oversight request...", | |
| }) | |
| try: | |
| msg = task.message | |
| parts = msg.get("parts", []) | |
| text_content = "" | |
| json_content = {} | |
| for part in parts: | |
| if part.get("type") == "text": | |
| text_content = part.get("text", "") | |
| elif part.get("type") == "data": | |
| json_content = part.get("data", {}) | |
| # Extract action from the message (supports both text and structured) | |
| agent_action = json_content.get("agent_action", {}) | |
| if not agent_action and text_content: | |
| # Try to parse text as JSON (human-understandable instructions) | |
| try: | |
| agent_action = json.loads(text_content) | |
| except (json.JSONDecodeError, ValueError): | |
| # Treat as natural language instruction | |
| agent_action = { | |
| "action_type": "investigate", | |
| "reasoning": text_content, | |
| } | |
| environment_state = json_content.get("environment_state", {}) | |
| domain = json_content.get("domain", "infrastructure") | |
| # Use universal adapter for the actual oversight | |
| from sentinel.universal_adapter import universal_intercept | |
| result = universal_intercept( | |
| agent_action=agent_action, | |
| environment_state=environment_state, | |
| domain=domain, | |
| agent_id=json_content.get("agent_id", "a2a_agent"), | |
| agent_role=json_content.get("agent_role", "general"), | |
| ) | |
| task.result = { | |
| "parts": [ | |
| { | |
| "type": "text", | |
| "text": ( | |
| f"SENTINEL Decision: {result.decision}\n" | |
| f"Reason: {result.reason or 'N/A'}\n" | |
| f"Explanation: {result.explanation}\n" | |
| f"Risk Level: {result.risk_level} ({result.risk_score:.2f})\n" | |
| f"Worker Message: {result.worker_message}" | |
| ), | |
| }, | |
| { | |
| "type": "data", | |
| "data": result.model_dump(mode="json"), | |
| }, | |
| ], | |
| } | |
| task.state = TaskState.COMPLETED | |
| task.updated_at = time.time() | |
| task.history.append({ | |
| "state": TaskState.COMPLETED.value, | |
| "timestamp": task.updated_at, | |
| "message": f"Oversight decision: {result.decision}", | |
| }) | |
| except Exception as exc: | |
| logger.exception("A2A task execution failed: %s", task.id) | |
| task.state = TaskState.FAILED | |
| task.updated_at = time.time() | |
| task.result = { | |
| "parts": [{"type": "text", "text": f"Error: {exc}"}], | |
| } | |
| task.history.append({ | |
| "state": TaskState.FAILED.value, | |
| "timestamp": task.updated_at, | |
| "message": str(exc), | |
| }) | |
| # --------------------------------------------------------------------------- | |
| # A2A JSON-RPC 2.0 methods | |
| # --------------------------------------------------------------------------- | |
| def _handle_tasks_send(params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle tasks/send β receive a task and execute it synchronously.""" | |
| task_id = params.get("id", str(uuid.uuid4())) | |
| message = params.get("message", {}) | |
| task = A2ATask(task_id, message) | |
| _A2A_TASKS[task_id] = task | |
| # Execute synchronously (for non-streaming mode) | |
| _execute_task(task) | |
| return task.to_dict() | |
| def _handle_tasks_get(params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle tasks/get β retrieve task status.""" | |
| task_id = params.get("id", "") | |
| task = _A2A_TASKS.get(task_id) | |
| if not task: | |
| return {"error": f"Task not found: {task_id}"} | |
| return task.to_dict() | |
| def _handle_tasks_cancel(params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle tasks/cancel β cancel a task.""" | |
| task_id = params.get("id", "") | |
| task = _A2A_TASKS.get(task_id) | |
| if not task: | |
| return {"error": f"Task not found: {task_id}"} | |
| task.state = TaskState.CANCELED | |
| task.updated_at = time.time() | |
| return task.to_dict() | |
| def _handle_message_send(params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle message/send (A2A v0.3+) β normalize schema and delegate to tasks/send. | |
| A2A v0.3 changed the method name from ``tasks/send`` to ``message/send`` and | |
| uses ``kind`` instead of ``type`` in message parts. This adapter normalises | |
| the new envelope so SENTINEL can be reached by both v0.2 and v0.3 clients. | |
| """ | |
| raw_message = params.get("message", {}) | |
| # Normalise parts: A2A v0.3 uses "kind", v0.2 uses "type" | |
| normalized_parts = [] | |
| for part in raw_message.get("parts", []): | |
| p = dict(part) | |
| if "kind" in p and "type" not in p: | |
| p["type"] = p.pop("kind") | |
| normalized_parts.append(p) | |
| normalized_message = { | |
| "parts": normalized_parts, | |
| "role": raw_message.get("role", "user"), | |
| } | |
| task_id = params.get("id", str(uuid.uuid4())) | |
| return _handle_tasks_send({"id": task_id, "message": normalized_message}) | |
| # --------------------------------------------------------------------------- | |
| # FastAPI Router | |
| # --------------------------------------------------------------------------- | |
| a2a_router = APIRouter(tags=["A2A"]) | |
| async def agent_card(): | |
| """A2A Agent Card β the standard discovery endpoint. | |
| Other agents discover SENTINEL's capabilities by fetching this card. | |
| Hosted at the well-known URL as specified by the A2A protocol. | |
| """ | |
| return JSONResponse(AGENT_CARD) | |
| async def a2a_endpoint(request: Request): | |
| """A2A JSON-RPC 2.0 endpoint. | |
| Handles task lifecycle: tasks/send, tasks/get, tasks/cancel. | |
| Communication uses standard HTTP + JSON-RPC 2.0 as specified by the A2A protocol. | |
| """ | |
| body = await request.json() | |
| method = body.get("method", "") | |
| params = body.get("params", {}) or {} | |
| req_id = body.get("id") | |
| handlers = { | |
| "tasks/send": _handle_tasks_send, | |
| "tasks/get": _handle_tasks_get, | |
| "tasks/cancel": _handle_tasks_cancel, | |
| "message/send": _handle_message_send, # A2A v0.3+ alias | |
| "message/stream": _handle_message_send, # A2A v0.3+ streaming (sync fallback) | |
| } | |
| handler = handlers.get(method) | |
| if handler is None: | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": req_id, | |
| "error": {"code": -32601, "message": f"Method not found: {method}"}, | |
| }) | |
| try: | |
| result = handler(params) | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": req_id, | |
| "result": result, | |
| }) | |
| except Exception as exc: | |
| logger.exception("A2A method failed: %s", method) | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": req_id, | |
| "error": {"code": -32000, "message": str(exc)}, | |
| }) | |
| # --------------------------------------------------------------------------- | |
| # Human-understandable instruction endpoint | |
| # --------------------------------------------------------------------------- | |
| async def a2a_human_instruction(request: Request): | |
| """Accept human-understandable natural language oversight requests. | |
| This endpoint supports the third communication mode from the workshop: | |
| 'Human-understandable instructions' β agents can describe their action | |
| in plain English and SENTINEL will evaluate it. | |
| Example: | |
| POST /a2a/human | |
| { | |
| "instruction": "I want to restart the payment-gateway service", | |
| "context": "There's a timeout on payment processing" | |
| } | |
| """ | |
| body = await request.json() | |
| instruction = body.get("instruction", body.get("text", body.get("message", ""))) | |
| context = body.get("context", body.get("environment_state", {})) | |
| if isinstance(context, str): | |
| context = {"description": context} | |
| # Wrap as A2A task | |
| task_id = str(uuid.uuid4()) | |
| message = { | |
| "parts": [ | |
| {"type": "text", "text": instruction}, | |
| {"type": "data", "data": { | |
| "agent_action": {"action_type": "investigate", "reasoning": instruction}, | |
| "environment_state": context, | |
| "domain": body.get("domain", "infrastructure"), | |
| }}, | |
| ], | |
| } | |
| task = A2ATask(task_id, message) | |
| _A2A_TASKS[task_id] = task | |
| _execute_task(task) | |
| return JSONResponse({ | |
| "task_id": task_id, | |
| "decision": task.result["parts"][1]["data"]["decision"] if task.result else "ERROR", | |
| "explanation": task.result["parts"][0]["text"] if task.result else "Failed", | |
| "full_result": task.to_dict(), | |
| }) | |