sentinel-finops / agent.py
akshay4's picture
update agent
25c3a9a verified
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
import json
import pandas as pd
from datetime import datetime
from typing import Dict, List, Literal, Optional
from typing_extensions import TypedDict
from enum import Enum
from pydantic import BaseModel, Field
import re
# Smolagents imports
from smolagents import CodeAgent, tool, LiteLLMModel
# OpenAI for direct API calls
# OpenAI for direct API calls
from openai import OpenAI
import google.generativeai as genai
# LlamaIndex for RAG
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from dotenv import load_dotenv
load_dotenv()
# --- Configuration ---
DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4o-mini")
REASONING_EFFORT = os.getenv("REASONING_EFFORT", "low")
if "OPENAI_API_KEY" not in os.environ:
print("WARNING: OPENAI_API_KEY not found in environment. Agent may fail.")
if "GOOGLE_API_KEY" not in os.environ:
print("WARNING: GOOGLE_API_KEY not found in environment. Gemini will fail.")
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
# Gemini Model Configuration
GEMINI_MODEL = "gemini-2.5-flash"#"gemini-2.0-flash" #"gemini-2.0-flash-lite"#"gemini-2.0-flash-exp"
# --- 1. ENUMS & PYDANTIC MODELS ---
class TeamName(str, Enum):
FINANCE = "Finance"
DATA = "Data"
QA = "QA"
DEVOPS = "DevOps"
SECURITY = "Security"
FRONTEND = "Frontend"
BACKEND = "Backend"
INFRA = "Infrastructure"
PLATFORM = "Platform"
ML = "ML"
UNKNOWN = "Unknown"
class QueryIntent(BaseModel):
"""Translates natural language into strict data filters."""
target_services: Optional[List[str]] = Field(default_factory=list, description="List of services to filter (e.g. ['RDS', 'EBS', 'EC2'])")
target_env: Optional[str] = Field(None, description="Environment to focus on (e.g. 'prod', 'dev')")
target_team: Optional[str] = Field(None, description="Team to focus on (e.g. 'DevOps', 'ML', 'Data')")
min_cost: Optional[float] = Field(0.0, description="Minimum monthly cost to filter for")
only_orphans: bool = Field(False, description="True if user is looking for untagged/unowned resources")
focus_area: Literal["SAVINGS", "SAFETY", "GENERAL", "TEAM_RANKING"] = Field("GENERAL", description="The user's primary goal")
# TypedDict for Gemini Schema (Avoids 'default' error)
class QueryIntentSchema(TypedDict):
target_services: List[str]
target_env: Optional[str]
target_team: Optional[str]
min_cost: float
only_orphans: bool
focus_area: Literal["SAVINGS", "SAFETY", "GENERAL", "TEAM_RANKING"]
class ResourceClassification(BaseModel):
"""Classification for a single resource."""
resource_id: str
action: Literal["KEEP", "DELETE", "HIBERNATE", "AUDIT"] = Field("AUDIT", description="Recommended action")
reason: str = Field("No reason provided", description="Justification for the action")
policy_rule: str = Field("Manual Review", description="Policy rule applied")
inferred_team: Optional[TeamName] = Field(None, description="Inferred team ownership based on tags or resource name")
class ClassificationResponse(BaseModel):
"""Complete classification response for all resources."""
classifications: list[ResourceClassification]
# TypedDict for Gemini Schema
class ResourceClassificationSchema(TypedDict):
resource_id: str
action: Literal["KEEP", "DELETE", "HIBERNATE", "AUDIT"]
reason: str
policy_rule: str
inferred_team: Optional[TeamName]
class ClassificationResponseSchema(TypedDict):
classifications: List[ResourceClassificationSchema]
# --- 2. REASONING LOGGER ---
class ReasoningLogger:
"""Captures step-by-step agent reasoning for transparency with optional streaming."""
def __init__(self, log_callback=None):
self.logs = []
self.log_callback = log_callback
def log(self, emoji, message):
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {emoji} {message}"
self.logs.append(log_entry)
print(log_entry)
if self.log_callback:
self.log_callback(log_entry)
def get_logs(self):
return "\n".join(self.logs)
# --- 3. RAG SETUP (POLICY ENGINE) ---
def setup_policy_rag():
try:
# Use lightweight model for RAG
Settings.llm = LlamaOpenAI(model="gpt-4o-mini")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
if os.path.exists("policy.txt"):
documents = SimpleDirectoryReader(input_files=["policy.txt"]).load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(similarity_top_k=3)
return query_engine
return None
except Exception as e:
print(f"Warning: Policy RAG setup failed: {e}")
return None
policy_engine = setup_policy_rag()
@tool
def query_policy(question: str) -> str:
"""Query enterprise policy using RAG.
Args:
question: The policy question to ask (e.g., 'What are the deletion rules?')
"""
if not policy_engine:
return "Policy engine unavailable (policy.txt missing)."
try:
response = policy_engine.query(question)
return str(response)
except Exception as e:
return f"Policy query error: {e}"
# --- 4. HELPER FUNCTIONS ---
def get_tag(tags_dict, keys, default=''):
"""Robust case-insensitive tag retrieval."""
if isinstance(keys, str):
keys = [keys]
keys_lower = [k.lower() for k in keys]
for k, v in tags_dict.items():
if k.lower() in keys_lower:
return v
return default
def infer_team_from_tags(tags_dict, resource_id="") -> TeamName:
"""Simple Python-based team inference from tags OR resource ID."""
team_str = get_tag(tags_dict, ['team', 'Team', 'TEAM', 'owner_team']).lower()
# If no tag, try to infer from ResourceID (Sherlock Mode)
if not team_str and resource_id:
if 'ml-' in resource_id or 'training' in resource_id: team_str = 'ml'
elif 'db-' in resource_id or 'rds' in resource_id: team_str = 'data'
elif 'web' in resource_id or 'frontend' in resource_id: team_str = 'frontend'
elif 'api' in resource_id or 'backend' in resource_id: team_str = 'backend'
elif 'test' in resource_id or 'qa' in resource_id: team_str = 'qa'
# Map common variations to canonical TeamName
if not team_str:
return TeamName.UNKNOWN
if 'finance' in team_str or 'fin' in team_str:
return TeamName.FINANCE
elif 'data' in team_str or 'science' in team_str:
return TeamName.DATA
elif 'ml' in team_str or 'ai' in team_str:
return TeamName.ML
elif 'qa' in team_str or 'test' in team_str or 'quality' in team_str:
return TeamName.QA
elif 'devops' in team_str or 'ops' in team_str or 'sre' in team_str:
return TeamName.DEVOPS
elif 'platform' in team_str:
return TeamName.PLATFORM
elif 'security' in team_str or 'sec' in team_str or 'infosec' in team_str:
return TeamName.SECURITY
elif 'frontend' in team_str or 'web' in team_str or 'ui' in team_str:
return TeamName.FRONTEND
elif 'backend' in team_str or 'api' in team_str:
return TeamName.BACKEND
elif 'infra' in team_str:
return TeamName.INFRA
else:
return TeamName.UNKNOWN
# --- 5. INTENT EXTRACTION & FILTERING (NEW) ---
def extract_query_intent(query: str, logger) -> QueryIntent:
"""Uses LLM to convert 'Find expensive databases' -> {'target_services': ['RDS'], 'min_cost': 500}"""
if not query:
return QueryIntent()
logger.log("🧠", f"Translating query: '{query}'...")
prompt = f"""
You are a Data Query Translator for a Cloud FinOps Tool.
Convert the user's natural language query into structured filters.
User Query: "{query}"
---------------------------------------------------------
STRICT VOCABULARY MAPPING (DO NOT INVENT TERMS)
---------------------------------------------------------
1. SERVICES (Map keywords to EXACTLY one of these 5 options):
- "database", "db", "sql", "rds", "postgres" -> ["RDS"]
- "server", "vm", "instance", "ec2", "compute", "host" -> ["EC2"]
- "volume", "disk", "storage", "ebs", "block store" -> ["EBS"]
- "bucket", "object", "s3", "blob" -> ["S3"]
- "function", "serverless", "lambda" -> ["Lambda"]
* IF NO SPECIFIC SERVICE IS MENTIONED, RETURN [] (Empty List).
2. ENVIRONMENTS (Map keywords to EXACTLY one of these 2 options):
- "production", "prod", "live" -> "prod"
- "development", "dev", "test", "staging", "qa" -> "dev"
3. TEAMS (Map keywords to canonical team names):
- "devops", "ops", "sre" -> "DevOps"
- "ml", "ai", "machine learning" -> "ML"
- "data", "analytics", "science" -> "Data"
- "frontend", "web", "ui" -> "Frontend"
- "backend", "api" -> "Backend"
- "qa", "test", "quality" -> "QA"
- "security", "infosec" -> "Security"
- "platform", "infrastructure", "infra" -> "Platform"
4. INTENT RULES:
- "kill", "delete", "prune", "remove" -> focus_area="SAVINGS", min_cost=0.0
- "waste", "zombie", "unused", "idle" -> focus_area="SAVINGS", min_cost=0.0
- "expensive", "costly" -> focus_area="GENERAL", min_cost=100.0
- "orphan", "untagged" -> only_orphans=True
- "audit", "safety", "check" -> focus_area="SAFETY"
- "biggest offender", "worst team", "team ranking", "team leaderboard" -> focus_area="TEAM_RANKING"
OUTPUT FORMAT:
Return valid JSON matching the schema.
"""
try:
model = genai.GenerativeModel(GEMINI_MODEL)
response = model.generate_content(
prompt,
generation_config=genai.GenerationConfig(
response_mime_type="application/json",
response_schema=QueryIntentSchema
)
)
data = json.loads(response.text)
intent = QueryIntent(**data)
# UPDATED LOGGING: Now visualizes Env and Orphans
env_str = intent.target_env if intent.target_env else "All"
svc_str = str(intent.target_services) if intent.target_services else "All"
team_str = intent.target_team if intent.target_team else "All"
logger.log("🎯", f"Intent: {intent.focus_area} | Team: {team_str} | Env: {env_str} | Svc: {svc_str} | Cost > ${intent.min_cost} | Orphans: {intent.only_orphans}")
return intent
except Exception as e:
logger.log("⚠️", f"Intent extraction failed: {e}. Running full audit.")
return QueryIntent()
def filter_dataframe(df, intent: QueryIntent, logger):
"""Applies the LLM-derived intent to the Pandas DataFrame."""
initial_count = len(df)
# 1. Service Filter
if intent.target_services:
df = df[df['Service'].isin(intent.target_services)]
# 2. Environment Filter
if intent.target_env:
def check_env(tags_str):
try:
tags = json.loads(tags_str)
env = get_tag(tags, ['env', 'environment'], '').lower()
return intent.target_env in env
except: return False
df = df[df['Tags'].apply(check_env)]
# 3. Cost Filter
if intent.min_cost and intent.min_cost > 0:
df = df[df['Cost_Monthly'] >= intent.min_cost]
# 4. Team Filter
if intent.target_team:
def check_team(tags_str):
try:
tags = json.loads(tags_str)
team = get_tag(tags, ['team', 'owner_team'], '').lower()
return intent.target_team.lower() in team
except: return False
df = df[df['Tags'].apply(check_team)]
# 5. Orphan Filter
if intent.only_orphans:
def check_orphan(tags_str):
try:
tags = json.loads(tags_str)
# Orphan = No Team tag
return not get_tag(tags, ['team', 'owner_team'])
except: return True
df = df[df['Tags'].apply(check_orphan)]
logger.log("🔍", f"Scoped analysis from {initial_count} to {len(df)} resources.")
return df
# --- 6. CORE ANALYSIS LOGIC ---
def classify_simple_cases(df, logger):
"""Fast Python rules for obvious cases (90% of resources)."""
logger.log("⚡", "Applying Python rules for simple cases...")
simple_results = []
complex_cases = []
for idx, row in df.iterrows():
resource = row.to_dict()
try: tags = json.loads(resource.get('Tags', '{}'))
except: tags = {}
environment = get_tag(tags, ['env', 'environment', 'Environment']).lower()
# Rule 1: Unattached EBS
if (resource.get('Service') == 'EBS' and
get_tag(tags, 'state') == 'available' and
resource.get('CPU_avg', 0) == 0):
simple_results.append({
'resource': resource,
'action': 'DELETE',
'reason': 'Unattached EBS volume (no activity)',
'policy_rule': 'Rule: Unattached volumes should be deleted',
'inferred_team': infer_team_from_tags(tags, resource.get('ResourceID', ''))
})
continue
# Rule 2: Obvious Keep (High CPU)
if resource.get('CPU_avg', 0) > 75:
simple_results.append({
'resource': resource,
'action': 'KEEP',
'reason': f'High CPU utilization ({resource.get("CPU_avg")}%)',
'policy_rule': 'High utilization indicates active use',
'inferred_team': infer_team_from_tags(tags, resource.get('ResourceID', ''))
})
continue
# Everything else is complex
complex_cases.append(resource)
logger.log("✅", f"Python rules: {len(simple_results)} simple, {len(complex_cases)} complex (need AI)")
return simple_results, complex_cases
@tool
def classify_resources(resources_json: str, policy_context: str) -> ClassificationResponse:
"""Classify cloud resources based on policy using strong AI.
Args:
resources_json: JSON string containing a list of resource dictionaries to classify.
policy_context: Relevant policy text retrieved from the RAG engine.
"""
prompt = f"""You are Sentinal, an expert Cloud FinOps Auditor.
Analyze the following resources against the policy.
**Policy Context:**
{policy_context}
**Input Resources:**
{resources_json}
**YOUR TASKS:**
1. **DECIDE ACTION:** Determine if the resource should be KEEP, DELETE, HIBERNATE, or AUDIT.
- DELETE: Low CPU (<10%) + idle (>30 days) + dev/test environment
- HIBERNATE: Low CPU (<10%) + idle (>30 days) + prod environment
- KEEP: High CPU (>50%) OR active (<30 days) OR critical tags
- AUDIT: Missing required tags (team, environment) OR ambiguous state
2. **SHERLOCK MODE (Team Inference):**
- PRIMARY: Check the 'Team' tag first. If present and valid, use it.
- FALLBACK: If 'Team' tag is missing, empty, or generic, use your world knowledge to infer ownership.
- Look at 'ResourceID', 'Name', 'Service', or any other clues.
**Inference Examples:**
- 'kafka', 'hadoop', 'spark', 'airflow', 'ml-', 'training', 'gpu' → ML or Data
- 'jenkins', 'terraform', 'k8s', 'docker', 'cicd' → DevOps or Platform
- 'web', 'frontend', 'react', 'vue', 'ui' → Frontend
- 'api', 'backend', 'service', 'payment', 'catalogue' → Backend
- 'test', 'qa', 'selenium' → QA
- 'security', 'vault', 'secrets' → Security
- 'tableau', 'looker', 'analytics' → Data
**Canonical Teams:** Finance, Data, QA, DevOps, Security, Frontend, Backend, Infrastructure, Platform, ML, Unknown
- Set 'inferred_team' to the most likely team based on your analysis.
- If absolutely no clues exist, set 'inferred_team' to "Unknown".
3. **CRITICAL SAFETY RULES:**
- Production resources (env=prod) must NEVER be DELETE. Use HIBERNATE instead.
- If unsure, choose AUDIT to flag for human review.
**OUTPUT:**
For EACH resource in the input, you MUST return:
- resource_id: Copy the 'ResourceID' field from the input resource exactly as-is
- action: Your decision (KEEP/DELETE/HIBERNATE/AUDIT)
- reason: Brief justification
- policy_rule: The policy rule that applies
- inferred_team: The team name you inferred
CRITICAL: The 'resource_id' field is MANDATORY. Copy it from the input 'ResourceID' field.
Return valid JSON matching the ClassificationResponse schema with ALL fields populated."""
try:
model = genai.GenerativeModel(GEMINI_MODEL)
response = model.generate_content(
prompt,
generation_config=genai.GenerationConfig(
response_mime_type="application/json",
response_schema=ClassificationResponseSchema
)
)
# Convert to Pydantic
data = json.loads(response.text)
return ClassificationResponse(**data)
except Exception as e:
print(f"Gemini Error: {e}")
# Fallback to empty list if fails
return ClassificationResponse(classifications=[])
def apply_safety_rules(resource, classification, logger):
"""Hardcoded safety overrides."""
tags = json.loads(resource.get("Tags", "{}"))
env = get_tag(tags, ['env', 'environment']).lower()
action = classification["action"]
# CRITICAL: Prod Safety
if "prod" in env and action == "DELETE":
# logger.log("⚠️", f"{resource['ResourceID']}: Downgraded DELETE→HIBERNATE (prod safety rule)")
return "HIBERNATE"
return action
# --- 7. REPORT GENERATORS (HTML & EMAILS) ---
def generate_report(results, total_resources, total_spend):
"""Generate HTML report with scrollable table and CFO Banner."""
analyzed_spend = sum(r['resource']['Cost_Monthly'] for r in results)
# Calculate Unallocated Spend
unallocated_cost = 0
savings = 0
action_counts = {"KEEP": 0, "DELETE": 0, "HIBERNATE": 0, "AUDIT": 0}
for r in results:
res = r['resource']
act = r['classification']['action']
tags = json.loads(res.get('Tags', '{}'))
if not get_tag(tags, 'team'):
unallocated_cost += res['Cost_Monthly']
if act in ['DELETE', 'HIBERNATE']:
savings += res['Cost_Monthly']
action_counts[act] = action_counts.get(act, 0) + 1
# Waste CPU
waste_items = [r for r in results if r['classification']['action'] in ['DELETE', 'HIBERNATE']]
avg_waste_util = (sum(r['resource']['CPU_avg'] for r in waste_items) / len(waste_items)) if waste_items else 0
# --- CHART DATA CALCULATIONS ---
# 1. Action Breakdown (Cost by Action)
action_costs = {"KEEP": 0, "DELETE": 0, "HIBERNATE": 0, "AUDIT": 0}
for r in results:
act = r['classification']['action']
action_costs[act] = action_costs.get(act, 0) + r['resource']['Cost_Monthly']
# 2. Service Breakdown (Cost by Service)
service_costs = {}
for r in results:
svc = r['resource'].get('Service', 'Unknown')
service_costs[svc] = service_costs.get(svc, 0) + r['resource']['Cost_Monthly']
# Generate Action Chart HTML
action_chart_html = ""
action_colors = {"KEEP": "#10b981", "DELETE": "#ef4444", "HIBERNATE": "#f59e0b", "AUDIT": "#f97316"}
max_action_cost = max(action_costs.values()) if action_costs.values() else 1
for action in ["DELETE", "HIBERNATE", "KEEP", "AUDIT"]: # Order by importance
cost = action_costs.get(action, 0)
pct = (cost / max_action_cost * 100) if max_action_cost > 0 else 0
color = action_colors.get(action, "#6b7280")
action_chart_html += f"""
<div style="margin-bottom: 12px;">
<div style="display: flex; justify-content: space-between; margin-bottom: 4px;">
<span style="font-size: 13px; font-weight: 600; color: #1f2937;">{action}</span>
<span style="font-size: 13px; font-weight: 700; color: {color};">${cost:,.2f}</span>
</div>
<div style="background: #f3f4f6; border-radius: 4px; height: 24px; overflow: hidden;">
<div style="background: {color}; height: 100%; width: {pct}%; transition: width 0.3s;"></div>
</div>
</div>
"""
# Generate Service Chart HTML
service_chart_html = ""
service_colors = {"EC2": "#ff9900", "RDS": "#3b82f6", "EBS": "#8b5cf6", "S3": "#10b981", "Lambda": "#f59e0b"}
sorted_services = sorted(service_costs.items(), key=lambda x: x[1], reverse=True)
max_service_cost = sorted_services[0][1] if sorted_services else 1
for svc, cost in sorted_services:
pct = (cost / max_service_cost * 100) if max_service_cost > 0 else 0
color = service_colors.get(svc, "#6b7280")
service_chart_html += f"""
<div style="margin-bottom: 12px;">
<div style="display: flex; justify-content: space-between; margin-bottom: 4px;">
<span style="font-size: 13px; font-weight: 600; color: #1f2937;">{svc}</span>
<span style="font-size: 13px; font-weight: 700; color: {color};">${cost:,.2f}</span>
</div>
<div style="background: #f3f4f6; border-radius: 4px; height: 24px; overflow: hidden;">
<div style="background: {color}; height: 100%; width: {pct}%; transition: width 0.3s;"></div>
</div>
</div>
"""
# 3. Team Waste Leaderboard (Accountability Metrics)
team_waste = {}
for r in results:
if r['classification']['action'] in ['DELETE', 'HIBERNATE']:
team = r['classification'].get('inferred_team', 'Unknown')
team_waste[team] = team_waste.get(team, 0) + r['resource']['Cost_Monthly']
# Generate Team Waste Chart HTML
team_chart_html = ""
sorted_teams = sorted(team_waste.items(), key=lambda x: x[1], reverse=True)
max_team_waste = sorted_teams[0][1] if sorted_teams else 1
team_colors = ["#ef4444", "#f59e0b", "#f97316", "#fb923c", "#fdba74"]
for idx, (team, waste) in enumerate(sorted_teams[:5]): # Top 5 teams
pct = (waste / max_team_waste * 100) if max_team_waste > 0 else 0
color = team_colors[idx] if idx < len(team_colors) else "#6b7280"
team_chart_html += f"""
<div style="margin-bottom: 12px;">
<div style="display: flex; justify-content: space-between; margin-bottom: 4px;">
<span style="font-size: 13px; font-weight: 600; color: #1f2937;">{team}</span>
<span style="font-size: 13px; font-weight: 700; color: {color};">${waste:,.2f}</span>
</div>
<div style="background: #f3f4f6; border-radius: 4px; height: 24px; overflow: hidden;">
<div style="background: {color}; height: 100%; width: {pct}%; transition: width 0.3s;"></div>
</div>
</div>
"""
report = """
<div style="font-family: 'Inter', system-ui, sans-serif; color: #1f2937;">
<!-- Badges -->
<div style="display: flex; gap: 10px; margin-bottom: 20px;">
<img src="https://img.shields.io/badge/Status-Complete-success?style=flat-square" alt="Status">
<img src="https://img.shields.io/badge/Model-Gemini_2.0_Flash-blue?style=flat-square" alt="Model">
<img src="https://img.shields.io/badge/Security-Enterprise_Grade-purple?style=flat-square" alt="Security">
</div>
<div style="font-family: 'Inter', system-ui, sans-serif; color: #1f2937;">
<!-- CFO Scorecards -->
<div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 20px; margin-bottom: 30px;">
<!-- CARD 1: POTENTIAL SAVINGS (Most Important) -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); border-left: 5px solid #10b981;">
<h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">💰 Potential Savings</h3>
<p style="margin: 0; font-size: 1.875rem; font-weight: 800; color: #10b981;">${savings:,.2f}</p>
<p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">Delete + Hibernate</p>
</div>
<!-- CARD 2: TOTAL SPEND -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);">
<h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">Total Spend</h3>
<p style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #1f2937;">${total_spend:,.2f}</p>
<p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">Analyzed: ${analyzed_spend:,.2f}</p>
</div>
<!-- CARD 3: ORPHAN SPEND -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); border-left: 5px solid #f59e0b;">
<h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">Orphan Spend</h3>
<p style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #1f2937;">${unallocated_cost:,.2f}</p>
<p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">No Team Tag</p>
</div>
<!-- CARD 4: WASTE CPU -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); border-left: 5px solid #3b82f6;">
<h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">Waste CPU Avg</h3>
<p style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #1f2937;">{avg_waste_util:.1f}%</p>
<p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">Idle Resources</p>
</div>
</div>
<!-- DATA VISUALIZATIONS -->
<div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 20px; margin-bottom: 30px;">
<!-- CHART 1: ACTION BREAKDOWN -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);">
<h3 style="margin: 0 0 15px 0; color: #1f2937; font-size: 1rem; font-weight: 700;">💼 Cost by Action</h3>
{action_chart}
</div>
<!-- CHART 2: SERVICE BREAKDOWN -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);">
<h3 style="margin: 0 0 15px 0; color: #1f2937; font-size: 1rem; font-weight: 700;">🔧 Cost by Service</h3>
{service_chart}
</div>
<!-- CHART 3: TEAM WASTE LEADERBOARD -->
<div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);">
<h3 style="margin: 0 0 15px 0; color: #1f2937; font-size: 1rem; font-weight: 700;">🏆 Team Waste Ranking</h3>
{team_chart}
</div>
</div>
<h3>🛡️ Triage Details</h3>
<div style="overflow: auto; max-height: 500px; border: 1px solid #e5e7eb; border-radius: 8px;">
<table style="width: 100%; border-collapse: collapse; font-size: 13px;">
<thead style="position: sticky; top: 0; background: #f9fafb;">
<tr style="border-bottom: 1px solid #e5e7eb;">
<th style="padding: 12px; text-align: left;">ResourceID</th>
<th style="padding: 12px; text-align: left;">Service</th>
<th style="padding: 12px; text-align: right;">Cost</th>
<th style="padding: 12px; text-align: left;">Action</th>
<th style="padding: 12px; text-align: left;">Reason</th>
</tr>
</thead>
<tbody>
""".format(
total_spend=total_spend,
analyzed_spend=analyzed_spend,
savings=savings,
unallocated_cost=unallocated_cost,
avg_waste_util=avg_waste_util,
action_chart=action_chart_html,
service_chart=service_chart_html,
team_chart=team_chart_html
)
action_colors = {"KEEP": "#10b981", "DELETE": "#ef4444", "HIBERNATE": "#f59e0b", "AUDIT": "#f97316"}
for result in results:
res = result["resource"]
cls = result["classification"]
color = action_colors.get(cls['action'], "#6b7280")
report += f"""
<tr style="border-bottom: 1px solid #f3f4f6;">
<td style="padding: 10px; font-family: monospace;">{res['ResourceID']}</td>
<td style="padding: 10px;">{res['Service']}</td>
<td style="padding: 10px; text-align: right;">${res['Cost_Monthly']:.2f}</td>
<td style="padding: 10px;"><span style="color: {color}; font-weight: 700;">{cls['action']}</span></td>
<td style="padding: 10px;">{cls['reason']}</td>
</tr>
"""
report += "</tbody></table></div></div>"
return report
def generate_emails(results):
"""Generates individual email drafts per team."""
team_data = {}
# Group resources by team
for result in results:
resource = result["resource"]
classification = result["classification"]
# Constraint: Do NOT generate emails for resources marked "KEEP"
if classification["action"] == "KEEP":
continue
# 1. Trust AI first
team_name = "Unknown"
# Check if AI provided inferred_team (Pydantic model field)
if classification.get("inferred_team"):
val = classification["inferred_team"]
# Handle Enum or String
if hasattr(val, 'value'):
team_name = val.value
else:
team_name = str(val)
# 2. Fallback to existing Tag (if AI returned Unknown)
if team_name == "Unknown":
tags = json.loads(resource.get('Tags', '{}'))
tag_team = get_tag(tags, ['team', 'owner_team'], "")
if tag_team:
team_name = tag_team.strip().title()
# Grouping Logic
if team_name not in team_data:
team_data[team_name] = {'resources': [], 'poc_emails': set()}
team_data[team_name]['resources'].append(result)
# Extract POC email (use OwnerEmail as POC)
poc_email = resource.get('OwnerEmail', '')
if poc_email and '@' in poc_email:
team_data[team_name]['poc_emails'].add(poc_email)
# Generate individual email drafts
if not team_data:
return "# No Action Required\n\nAll resources are optimally configured. No emails to send."
output = f"# 📧 Team Email Drafts ({len(team_data)} Teams)\n\n"
output += "_Copy and paste each email below to send to the team POC_\n\n"
output += "---\n\n"
# Generate ONE email draft per Team
for team in sorted(team_data.keys()):
items = team_data[team]['resources']
poc_emails = team_data[team]['poc_emails']
# Calculate total savings for this team
savings = sum(r['resource']['Cost_Monthly'] for r in items)
# Count actions
action_counts = {}
for item in items:
action = item['classification']['action']
action_counts[action] = action_counts.get(action, 0) + 1
# Email header
output += f"## 📮 Email Draft for {team}\n\n"
output += "```\n"
output += f"To: {', '.join(sorted(poc_emails)) if poc_emails else 'team-email@company.com'}\n"
output += f"Subject: [Action Required] Cloud FinOps Audit - ${savings:,.2f} Potential Savings\n"
output += "\n"
# Email body
output += f"Dear {team},\n\n"
output += f"Our automated FinOps audit has identified {len(items)} cloud resources under your team's ownership "
output += f"that can be optimized to save ${savings:,.2f} per month.\n\n"
output += "**Summary:**\n"
action_summary = " | ".join([f"{count} {action}" for action, count in sorted(action_counts.items())])
output += f"- {len(items)} resources flagged: {action_summary}\n"
output += f"- Total potential savings: ${savings:,.2f}/month\n\n"
output += "**Top Resources by Cost:**\n\n"
# Show top 5 resources in table format
top_resources = sorted(items, key=lambda x: x['resource']['Cost_Monthly'], reverse=True)[:5]
for i, item in enumerate(top_resources, 1):
res = item['resource']
cls = item['classification']
action_emoji = {
'DELETE': '🔴',
'HIBERNATE': '🟡',
'AUDIT': '🟠'
}.get(cls['action'], '⚪')
output += f"{i}. {res['ResourceID']} ({res['Service']})\n"
output += f" Cost: ${res['Cost_Monthly']:.2f}/mo | Action: {action_emoji} {cls['action']}\n"
output += f" Reason: {cls['reason'][:80]}{'...' if len(cls['reason']) > 80 else ''}\n\n"
if len(items) > 5:
output += f"...and {len(items) - 5} more resources (see full report)\n\n"
output += "**Next Steps:**\n"
output += f"1. Review the flagged resources in the attached report\n"
output += f"2. Confirm or reject the recommended actions\n"
output += f"3. Contact finops@company.com with questions\n\n"
output += "Best regards,\n"
output += "Cloud FinOps Team\n"
output += "```\n\n"
output += "---\n\n"
return output
# --- 8. MAIN ORCHESTRATION ---
def run_audit(custom_query="", progress_callback=None, log_callback=None):
"""Main function called by app.py."""
import time
logger = ReasoningLogger(log_callback)
try:
logger.log("🚀", "Starting Sentinal Cloud Audit...")
if progress_callback: progress_callback(0.1, "🚀 Initializing...")
# Load Data
if not os.path.exists("billing_export.csv"):
return json.dumps({"report": "Error: billing_export.csv missing", "emails": ""})
df = pd.read_csv("billing_export.csv")
logger.log("📊", f"Loaded {len(df)} resources.")
# --- STEP 1: INTENT EXTRACTION ---
if custom_query:
intent = extract_query_intent(custom_query, logger)
df = filter_dataframe(df, intent, logger)
if len(df) == 0:
return json.dumps({"report": "<h3>No resources matched your query.</h3>", "emails": "", "reasoning": logger.get_logs()})
# --- STEP 2: HYBRID ANALYSIS ---
if progress_callback: progress_callback(0.3, "⚡ Running Python Heuristics...")
simple_results, complex_cases = classify_simple_cases(df, logger)
all_results = []
for sr in simple_results:
all_results.append({
"resource": sr['resource'],
"classification": {
"action": sr['action'],
"reason": sr['reason'],
"policy_rule": sr['policy_rule'],
"inferred_team": sr.get('inferred_team', TeamName.UNKNOWN)
}
})
# --- STEP 3: AI REASONING ---
if complex_cases:
if progress_callback: progress_callback(0.6, f"🤖 AI Analyzing {len(complex_cases)} complex cases...")
logger.log("📚", "Checking Policy via RAG...")
policy_ctx = query_policy("cloud resource lifecycle policy including deletion, hibernation, and retention rules")
# OPTIMIZATION: Prioritize Top 150 Most Expensive Resources
# Instead of batching everything (slow/expensive), we focus AI on high-impact items.
complex_cases.sort(key=lambda x: x.get('Cost_Monthly', 0), reverse=True)
TOP_N = 100
high_impact_batch = complex_cases[:TOP_N]
skipped_count = len(complex_cases) - len(high_impact_batch)
if skipped_count > 0:
logger.log("⚡", f"Prioritizing Top {TOP_N} spenders. Skipped {skipped_count} low-cost items.")
if progress_callback:
progress_callback(0.7, f"🤖 AI Analyzing Top {len(high_impact_batch)} High-Impact Cases...")
try:
resources_json = json.dumps(high_impact_batch)
ai_response = classify_resources(resources_json, policy_ctx)
for cls in ai_response.classifications:
res = next((r for r in high_impact_batch if r['ResourceID'] == cls.resource_id), None)
if res:
all_results.append({
"resource": res,
"classification": {
"action": cls.action,
"reason": cls.reason,
"policy_rule": cls.policy_rule,
"inferred_team": cls.inferred_team
}
})
except Exception as e:
logger.log("⚠️", f"AI Analysis Error: {e}")
# --- STEP 4: SAFETY CHECKS ---
logger.log("🛡️", "Applying final safety guardrails...")
final_results = []
for res in all_results:
safe_action = apply_safety_rules(res['resource'], res['classification'], logger)
res['classification']['action'] = safe_action
final_results.append(res)
# --- STEP 5: REPORTING ---
if progress_callback: progress_callback(0.9, "📝 Generatng Report...")
total_spend = df['Cost_Monthly'].sum()
report = generate_report(final_results, len(df), total_spend)
emails = generate_emails(final_results)
logger.log("✅", "Audit Complete.")
return json.dumps({
"report": report,
"emails": emails,
"reasoning": logger.get_logs()
})
except Exception as e:
import traceback
traceback.print_exc()
logger.log("❌", f"Fatal Error: {e}")
return json.dumps({"report": f"Error: {e}", "emails": "", "reasoning": logger.get_logs()})
if __name__ == "__main__":
# Test run
print(run_audit("find expensive databases"))