""" AegisOps AI — FastAPI backend (complete) Modes: Single Technique, APT Group, Kill Chain, Topology Lab SSE streaming + CORS + health + PDF/Sigma export Run: uvicorn server:api --reload --port 8000 """ from __future__ import annotations import asyncio import json import re import sys from pathlib import Path from typing import AsyncIterator from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from dotenv import load_dotenv load_dotenv() ROOT = Path(__file__).parent sys.path.insert(0, str(ROOT)) # Internal AegisOps imports from agents.llm import live_health, get_model_routing_status from demo_output import DEMO_INVOKE_RESULT from graph import app as pipeline from export import generate_pdf api = FastAPI(title="AegisOps AI", version="5.0") api.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) assets_dir = ROOT / "assets" if assets_dir.exists(): api.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets") @api.get("/", response_class=HTMLResponse) async def index(): f = ROOT / "index.html" return HTMLResponse(f.read_text(encoding="utf-8") if f.exists() else "

AegisOps AI API Server is Running

") # ── Health ──────────────────────────────────────────────────────────────────── @api.get("/health") async def health(): return JSONResponse(dict(live_health())) @api.get("/model-routing") async def model_routing(): return JSONResponse(get_model_routing_status()) # ── Artifact helpers ────────────────────────────────────────────────────────── def _extract_fenced(text: str, lang: str) -> str: m = re.search(rf"```{lang}\s*(.*?)\s*```", text or "", re.DOTALL | re.IGNORECASE) return m.group(1).strip() if m else "" def _splunk_spl(red: str, tid: str) -> str: try: payload = _extract_fenced(red, "json") obs = json.loads(payload).get("observables", []) if payload else [] obs = [str(o) for o in obs if o] except Exception: obs = [] if not obs: return f'index=windows | eval mitre_technique="{tid}" | stats count by host, user' clause = " OR ".join(f'"{o}"' for o in obs[:10]) return f'index=windows ({clause}) | eval mitre_technique="{tid}" | stats count by host' def _parse_verifier(verifier: str) -> dict: try: m = re.search(r'```json\s*(.*?)\s*```', verifier, re.DOTALL) d = json.loads(m.group(1) if m else verifier) return { "coverage": d.get("coverage_score", 0), "product_readiness": d.get("product_readiness_score", 0), "real_world": d.get("real_world_applicability_score", 0), "safety_verdict": d.get("safety_verdict", "PASS"), "verdict": d.get("verdict", "PASS"), "covered_observables": d.get("covered_observables", []), "missing_observables": d.get("missing_observables", []), "production_gaps": d.get("production_gaps", []), "improvement_suggestions": d.get("improvement_suggestions", []), } except Exception: return {"coverage": 0, "product_readiness": 0, "real_world": 0, "safety_verdict": "PENDING", "verdict": "PENDING", "covered_observables": [], "missing_observables": [], "production_gaps": [], "improvement_suggestions": []} def _build_response(result: dict, tid: str) -> dict: red = result.get("red_output", "") blue = result.get("blue_output", "") return { "status": "success", "technique_id": tid, "verifier_model": result.get("verifier_model", "Unknown verifier model"), "verifier_model_role": result.get("verifier_model_role", "unknown"), "outputs": { "red": red, "blue": blue, "response": result.get("response_output", ""), "verifier": result.get("verifier_output", ""), }, "artifacts": { "sigma": _extract_fenced(blue, "yaml"), "splunk": _splunk_spl(red, tid), "raw_red": red, "raw_blue": blue, }, "scores": _parse_verifier(result.get("verifier_output", "")), "metrics": result.get("metrics", {}), } # ── Mode Resolution ─────────────────────────────────────────────────────────── def _resolve_techniques(mode: str, technique_id: str) -> list[str]: """Return list of technique IDs to run based on the selected mode.""" tid = technique_id.split("·")[0].strip().upper() if mode == "single": return [tid] if mode == "apt": try: from apt import get_apt_techniques techniques = get_apt_techniques(technique_id) return [t["technique_id"] for t in techniques] or [tid] except Exception: return [tid] if mode == "chain": try: from chain import get_next_techniques chain = [tid] + [t["technique_id"] for t in get_next_techniques(tid)] return chain[:3] # limit to 3 for demo purposes except Exception: return [tid] if mode == "topology": try: from topology import generate_attack_paths paths = generate_attack_paths(tid) if paths: return paths[0]["seed_techniques"][:3] except Exception: pass return [tid] return [tid] # ── Streaming (SSE) ─────────────────────────────────────────────────────────── def _sse(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" async def _stream_demo(technique_id: str) -> AsyncIterator[str]: result = DEMO_INVOKE_RESULT stages = [ ("red", "red_output", "Threat Agent", 3.8), ("blue", "blue_output", "Detection Agent", 3.2), ("response", "response_output", "Response Agent", 2.4), ("verifier", "verifier_output", "Validation Agent", 1.9), ] yield _sse("start", { "demo": True, "technique_id": technique_id, "pipeline_version": "aegisops-production-hybrid-v1", }) for key, field, label, delay in stages: yield _sse("agent_start", {"agent": key, "label": label}) await asyncio.sleep(delay) yield _sse("agent_done", { "agent": key, "label": label, "output": result.get(field, ""), }) full = _build_response(result, technique_id) yield _sse("done", { "demo": True, "metrics": full["metrics"], "artifacts": full["artifacts"], "scores": full["scores"], "verifier_model": full.get("verifier_model"), "verifier_model_role": full.get("verifier_model_role"), }) def _run_node(node_name: str, state: dict) -> dict: from agents.red_agent import run_red_agent from agents.blue_agent import run_blue_agent from agents.response_agent import run_response_agent from agents.verifier_agent import run_verifier_agent return { "red_agent": run_red_agent, "blue_agent": run_blue_agent, "response_agent": run_response_agent, "verifier_agent": run_verifier_agent, }[node_name](state) async def _stream_live(technique_id: str, mode: str) -> AsyncIterator[str]: techniques = _resolve_techniques(mode, technique_id) yield _sse("start", {"demo": False, "technique_id": technique_id, "mode": mode, "techniques": techniques, "pipeline_version": "aegisops-production-hybrid-v1"}) all_results = [] loop = asyncio.get_event_loop() for i, tid in enumerate(techniques): if len(techniques) > 1: yield _sse("technique_start", {"technique_id": tid, "index": i, "total": len(techniques)}) agent_order = [ ("red_agent", "red", "red_output", "Threat Agent"), ("blue_agent", "blue", "blue_output", "Detection Agent"), ("response_agent", "response", "response_output", "Response Agent"), ("verifier_agent", "verifier", "verifier_output", "Validation Agent"), ] state: dict = {"technique_id": tid} for node_name, key, field, label in agent_order: yield _sse("agent_start", {"agent": key, "label": label, "technique_id": tid}) try: result = await loop.run_in_executor( None, lambda s=state, n=node_name: _run_node(n, s) ) state.update(result) yield _sse("agent_done", { "agent": key, "label": label, "output": state.get(field, ""), "technique_id": tid, }) except Exception as exc: yield _sse("agent_error", {"agent": key, "label": label, "error": str(exc), "technique_id": tid}) return all_results.append(state) # Yield a sub-completion for multi-technique chains full_sub = _build_response(state, tid) yield _sse("done", { "demo": False, "metrics": full_sub["metrics"], "artifacts": full_sub["artifacts"], "scores": full_sub["scores"], "verifier_model": full_sub.get("verifier_model"), "verifier_model_role": full_sub.get("verifier_model_role"), }) # ── Endpoints ───────────────────────────────────────────────────────────────── @api.post("/run") async def run_streaming(request: Request): body = await request.json() demo = body.get("demo", True) technique = body.get("technique_id", "T1059.001").strip() mode = body.get("mode", "single").lower().replace(" ", "_") mode_map = {"single_technique": "single", "apt_group": "apt", "kill_chain": "chain", "topology_lab": "topology"} mode = mode_map.get(mode, mode) async def generate(): if demo: async for chunk in _stream_demo(technique): yield chunk else: async for chunk in _stream_live(technique, mode): yield chunk return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) class DrillRequest(BaseModel): technique_id: str = "T1059.001" demo_mode: bool = True @api.post("/api/run-drill") async def run_drill(req: DrillRequest): """Legacy non-streaming endpoint for single runs.""" if req.demo_mode: result = DEMO_INVOKE_RESULT else: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: pipeline.invoke({"technique_id": req.technique_id}) ) return _build_response(result, req.technique_id) @api.post("/export/pdf") async def export_pdf(request: Request): body = await request.json() tid = body.get("technique_id", "T1059.001") pdf = generate_pdf(tid, body.get("red_output", ""), body.get("blue_output", "")) return StreamingResponse(iter([pdf]), media_type="application/pdf", headers={"Content-Disposition": f"attachment; filename=aegisops_{tid}.pdf"}) @api.post("/export/sigma") async def export_sigma(request: Request): body = await request.json() sigma = _extract_fenced(body.get("blue_output", ""), "yaml") tid = body.get("technique_id", "rule") return StreamingResponse(iter([sigma.encode()]), media_type="text/plain", headers={"Content-Disposition": f"attachment; filename=sigma_{tid}.yml"}) @api.get("/topology") async def get_topology(seed: str = "T1566.001"): from topology import generate_topology, generate_attack_paths topo = generate_topology(seed) paths = generate_attack_paths(seed) return JSONResponse({"topology": topo, "paths": paths}) @api.get("/intel/group") async def get_intel_group(name: str = "APT28"): from apt import get_group_info, get_apt_techniques info = get_group_info(name) techniques = get_apt_techniques(name) return JSONResponse({"group": info, "techniques": techniques})