Spaces:
Sleeping
Sleeping
| import os | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Dict, List, Literal, Optional | |
| from typing_extensions import TypedDict | |
| from enum import Enum | |
| from pydantic import BaseModel, Field | |
| import re | |
| # Smolagents imports | |
| from smolagents import CodeAgent, tool, LiteLLMModel | |
| # OpenAI for direct API calls | |
| # OpenAI for direct API calls | |
| from openai import OpenAI | |
| import google.generativeai as genai | |
| # LlamaIndex for RAG | |
| from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings | |
| from llama_index.llms.openai import OpenAI as LlamaOpenAI | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # --- Configuration --- | |
| DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4o-mini") | |
| REASONING_EFFORT = os.getenv("REASONING_EFFORT", "low") | |
| if "OPENAI_API_KEY" not in os.environ: | |
| print("WARNING: OPENAI_API_KEY not found in environment. Agent may fail.") | |
| if "GOOGLE_API_KEY" not in os.environ: | |
| print("WARNING: GOOGLE_API_KEY not found in environment. Gemini will fail.") | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
| # Gemini Model Configuration | |
| GEMINI_MODEL = "gemini-2.5-flash"#"gemini-2.0-flash" #"gemini-2.0-flash-lite"#"gemini-2.0-flash-exp" | |
| # --- 1. ENUMS & PYDANTIC MODELS --- | |
| class TeamName(str, Enum): | |
| FINANCE = "Finance" | |
| DATA = "Data" | |
| QA = "QA" | |
| DEVOPS = "DevOps" | |
| SECURITY = "Security" | |
| FRONTEND = "Frontend" | |
| BACKEND = "Backend" | |
| INFRA = "Infrastructure" | |
| PLATFORM = "Platform" | |
| ML = "ML" | |
| UNKNOWN = "Unknown" | |
| class QueryIntent(BaseModel): | |
| """Translates natural language into strict data filters.""" | |
| target_services: Optional[List[str]] = Field(default_factory=list, description="List of services to filter (e.g. ['RDS', 'EBS', 'EC2'])") | |
| target_env: Optional[str] = Field(None, description="Environment to focus on (e.g. 'prod', 'dev')") | |
| target_team: Optional[str] = Field(None, description="Team to focus on (e.g. 'DevOps', 'ML', 'Data')") | |
| min_cost: Optional[float] = Field(0.0, description="Minimum monthly cost to filter for") | |
| only_orphans: bool = Field(False, description="True if user is looking for untagged/unowned resources") | |
| focus_area: Literal["SAVINGS", "SAFETY", "GENERAL", "TEAM_RANKING"] = Field("GENERAL", description="The user's primary goal") | |
| # TypedDict for Gemini Schema (Avoids 'default' error) | |
| class QueryIntentSchema(TypedDict): | |
| target_services: List[str] | |
| target_env: Optional[str] | |
| target_team: Optional[str] | |
| min_cost: float | |
| only_orphans: bool | |
| focus_area: Literal["SAVINGS", "SAFETY", "GENERAL", "TEAM_RANKING"] | |
| class ResourceClassification(BaseModel): | |
| """Classification for a single resource.""" | |
| resource_id: str | |
| action: Literal["KEEP", "DELETE", "HIBERNATE", "AUDIT"] = Field("AUDIT", description="Recommended action") | |
| reason: str = Field("No reason provided", description="Justification for the action") | |
| policy_rule: str = Field("Manual Review", description="Policy rule applied") | |
| inferred_team: Optional[TeamName] = Field(None, description="Inferred team ownership based on tags or resource name") | |
| class ClassificationResponse(BaseModel): | |
| """Complete classification response for all resources.""" | |
| classifications: list[ResourceClassification] | |
| # TypedDict for Gemini Schema | |
| class ResourceClassificationSchema(TypedDict): | |
| resource_id: str | |
| action: Literal["KEEP", "DELETE", "HIBERNATE", "AUDIT"] | |
| reason: str | |
| policy_rule: str | |
| inferred_team: Optional[TeamName] | |
| class ClassificationResponseSchema(TypedDict): | |
| classifications: List[ResourceClassificationSchema] | |
| # --- 2. REASONING LOGGER --- | |
| class ReasoningLogger: | |
| """Captures step-by-step agent reasoning for transparency with optional streaming.""" | |
| def __init__(self, log_callback=None): | |
| self.logs = [] | |
| self.log_callback = log_callback | |
| def log(self, emoji, message): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {emoji} {message}" | |
| self.logs.append(log_entry) | |
| print(log_entry) | |
| if self.log_callback: | |
| self.log_callback(log_entry) | |
| def get_logs(self): | |
| return "\n".join(self.logs) | |
| # --- 3. RAG SETUP (POLICY ENGINE) --- | |
| def setup_policy_rag(): | |
| try: | |
| # Use lightweight model for RAG | |
| Settings.llm = LlamaOpenAI(model="gpt-4o-mini") | |
| Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") | |
| if os.path.exists("policy.txt"): | |
| documents = SimpleDirectoryReader(input_files=["policy.txt"]).load_data() | |
| index = VectorStoreIndex.from_documents(documents) | |
| query_engine = index.as_query_engine(similarity_top_k=3) | |
| return query_engine | |
| return None | |
| except Exception as e: | |
| print(f"Warning: Policy RAG setup failed: {e}") | |
| return None | |
| policy_engine = setup_policy_rag() | |
| def query_policy(question: str) -> str: | |
| """Query enterprise policy using RAG. | |
| Args: | |
| question: The policy question to ask (e.g., 'What are the deletion rules?') | |
| """ | |
| if not policy_engine: | |
| return "Policy engine unavailable (policy.txt missing)." | |
| try: | |
| response = policy_engine.query(question) | |
| return str(response) | |
| except Exception as e: | |
| return f"Policy query error: {e}" | |
| # --- 4. HELPER FUNCTIONS --- | |
| def get_tag(tags_dict, keys, default=''): | |
| """Robust case-insensitive tag retrieval.""" | |
| if isinstance(keys, str): | |
| keys = [keys] | |
| keys_lower = [k.lower() for k in keys] | |
| for k, v in tags_dict.items(): | |
| if k.lower() in keys_lower: | |
| return v | |
| return default | |
| def infer_team_from_tags(tags_dict, resource_id="") -> TeamName: | |
| """Simple Python-based team inference from tags OR resource ID.""" | |
| team_str = get_tag(tags_dict, ['team', 'Team', 'TEAM', 'owner_team']).lower() | |
| # If no tag, try to infer from ResourceID (Sherlock Mode) | |
| if not team_str and resource_id: | |
| if 'ml-' in resource_id or 'training' in resource_id: team_str = 'ml' | |
| elif 'db-' in resource_id or 'rds' in resource_id: team_str = 'data' | |
| elif 'web' in resource_id or 'frontend' in resource_id: team_str = 'frontend' | |
| elif 'api' in resource_id or 'backend' in resource_id: team_str = 'backend' | |
| elif 'test' in resource_id or 'qa' in resource_id: team_str = 'qa' | |
| # Map common variations to canonical TeamName | |
| if not team_str: | |
| return TeamName.UNKNOWN | |
| if 'finance' in team_str or 'fin' in team_str: | |
| return TeamName.FINANCE | |
| elif 'data' in team_str or 'science' in team_str: | |
| return TeamName.DATA | |
| elif 'ml' in team_str or 'ai' in team_str: | |
| return TeamName.ML | |
| elif 'qa' in team_str or 'test' in team_str or 'quality' in team_str: | |
| return TeamName.QA | |
| elif 'devops' in team_str or 'ops' in team_str or 'sre' in team_str: | |
| return TeamName.DEVOPS | |
| elif 'platform' in team_str: | |
| return TeamName.PLATFORM | |
| elif 'security' in team_str or 'sec' in team_str or 'infosec' in team_str: | |
| return TeamName.SECURITY | |
| elif 'frontend' in team_str or 'web' in team_str or 'ui' in team_str: | |
| return TeamName.FRONTEND | |
| elif 'backend' in team_str or 'api' in team_str: | |
| return TeamName.BACKEND | |
| elif 'infra' in team_str: | |
| return TeamName.INFRA | |
| else: | |
| return TeamName.UNKNOWN | |
| # --- 5. INTENT EXTRACTION & FILTERING (NEW) --- | |
| def extract_query_intent(query: str, logger) -> QueryIntent: | |
| """Uses LLM to convert 'Find expensive databases' -> {'target_services': ['RDS'], 'min_cost': 500}""" | |
| if not query: | |
| return QueryIntent() | |
| logger.log("🧠", f"Translating query: '{query}'...") | |
| prompt = f""" | |
| You are a Data Query Translator for a Cloud FinOps Tool. | |
| Convert the user's natural language query into structured filters. | |
| User Query: "{query}" | |
| --------------------------------------------------------- | |
| STRICT VOCABULARY MAPPING (DO NOT INVENT TERMS) | |
| --------------------------------------------------------- | |
| 1. SERVICES (Map keywords to EXACTLY one of these 5 options): | |
| - "database", "db", "sql", "rds", "postgres" -> ["RDS"] | |
| - "server", "vm", "instance", "ec2", "compute", "host" -> ["EC2"] | |
| - "volume", "disk", "storage", "ebs", "block store" -> ["EBS"] | |
| - "bucket", "object", "s3", "blob" -> ["S3"] | |
| - "function", "serverless", "lambda" -> ["Lambda"] | |
| * IF NO SPECIFIC SERVICE IS MENTIONED, RETURN [] (Empty List). | |
| 2. ENVIRONMENTS (Map keywords to EXACTLY one of these 2 options): | |
| - "production", "prod", "live" -> "prod" | |
| - "development", "dev", "test", "staging", "qa" -> "dev" | |
| 3. TEAMS (Map keywords to canonical team names): | |
| - "devops", "ops", "sre" -> "DevOps" | |
| - "ml", "ai", "machine learning" -> "ML" | |
| - "data", "analytics", "science" -> "Data" | |
| - "frontend", "web", "ui" -> "Frontend" | |
| - "backend", "api" -> "Backend" | |
| - "qa", "test", "quality" -> "QA" | |
| - "security", "infosec" -> "Security" | |
| - "platform", "infrastructure", "infra" -> "Platform" | |
| 4. INTENT RULES: | |
| - "kill", "delete", "prune", "remove" -> focus_area="SAVINGS", min_cost=0.0 | |
| - "waste", "zombie", "unused", "idle" -> focus_area="SAVINGS", min_cost=0.0 | |
| - "expensive", "costly" -> focus_area="GENERAL", min_cost=100.0 | |
| - "orphan", "untagged" -> only_orphans=True | |
| - "audit", "safety", "check" -> focus_area="SAFETY" | |
| - "biggest offender", "worst team", "team ranking", "team leaderboard" -> focus_area="TEAM_RANKING" | |
| OUTPUT FORMAT: | |
| Return valid JSON matching the schema. | |
| """ | |
| try: | |
| model = genai.GenerativeModel(GEMINI_MODEL) | |
| response = model.generate_content( | |
| prompt, | |
| generation_config=genai.GenerationConfig( | |
| response_mime_type="application/json", | |
| response_schema=QueryIntentSchema | |
| ) | |
| ) | |
| data = json.loads(response.text) | |
| intent = QueryIntent(**data) | |
| # UPDATED LOGGING: Now visualizes Env and Orphans | |
| env_str = intent.target_env if intent.target_env else "All" | |
| svc_str = str(intent.target_services) if intent.target_services else "All" | |
| team_str = intent.target_team if intent.target_team else "All" | |
| logger.log("🎯", f"Intent: {intent.focus_area} | Team: {team_str} | Env: {env_str} | Svc: {svc_str} | Cost > ${intent.min_cost} | Orphans: {intent.only_orphans}") | |
| return intent | |
| except Exception as e: | |
| logger.log("⚠️", f"Intent extraction failed: {e}. Running full audit.") | |
| return QueryIntent() | |
| def filter_dataframe(df, intent: QueryIntent, logger): | |
| """Applies the LLM-derived intent to the Pandas DataFrame.""" | |
| initial_count = len(df) | |
| # 1. Service Filter | |
| if intent.target_services: | |
| df = df[df['Service'].isin(intent.target_services)] | |
| # 2. Environment Filter | |
| if intent.target_env: | |
| def check_env(tags_str): | |
| try: | |
| tags = json.loads(tags_str) | |
| env = get_tag(tags, ['env', 'environment'], '').lower() | |
| return intent.target_env in env | |
| except: return False | |
| df = df[df['Tags'].apply(check_env)] | |
| # 3. Cost Filter | |
| if intent.min_cost and intent.min_cost > 0: | |
| df = df[df['Cost_Monthly'] >= intent.min_cost] | |
| # 4. Team Filter | |
| if intent.target_team: | |
| def check_team(tags_str): | |
| try: | |
| tags = json.loads(tags_str) | |
| team = get_tag(tags, ['team', 'owner_team'], '').lower() | |
| return intent.target_team.lower() in team | |
| except: return False | |
| df = df[df['Tags'].apply(check_team)] | |
| # 5. Orphan Filter | |
| if intent.only_orphans: | |
| def check_orphan(tags_str): | |
| try: | |
| tags = json.loads(tags_str) | |
| # Orphan = No Team tag | |
| return not get_tag(tags, ['team', 'owner_team']) | |
| except: return True | |
| df = df[df['Tags'].apply(check_orphan)] | |
| logger.log("🔍", f"Scoped analysis from {initial_count} to {len(df)} resources.") | |
| return df | |
| # --- 6. CORE ANALYSIS LOGIC --- | |
| def classify_simple_cases(df, logger): | |
| """Fast Python rules for obvious cases (90% of resources).""" | |
| logger.log("⚡", "Applying Python rules for simple cases...") | |
| simple_results = [] | |
| complex_cases = [] | |
| for idx, row in df.iterrows(): | |
| resource = row.to_dict() | |
| try: tags = json.loads(resource.get('Tags', '{}')) | |
| except: tags = {} | |
| environment = get_tag(tags, ['env', 'environment', 'Environment']).lower() | |
| # Rule 1: Unattached EBS | |
| if (resource.get('Service') == 'EBS' and | |
| get_tag(tags, 'state') == 'available' and | |
| resource.get('CPU_avg', 0) == 0): | |
| simple_results.append({ | |
| 'resource': resource, | |
| 'action': 'DELETE', | |
| 'reason': 'Unattached EBS volume (no activity)', | |
| 'policy_rule': 'Rule: Unattached volumes should be deleted', | |
| 'inferred_team': infer_team_from_tags(tags, resource.get('ResourceID', '')) | |
| }) | |
| continue | |
| # Rule 2: Obvious Keep (High CPU) | |
| if resource.get('CPU_avg', 0) > 75: | |
| simple_results.append({ | |
| 'resource': resource, | |
| 'action': 'KEEP', | |
| 'reason': f'High CPU utilization ({resource.get("CPU_avg")}%)', | |
| 'policy_rule': 'High utilization indicates active use', | |
| 'inferred_team': infer_team_from_tags(tags, resource.get('ResourceID', '')) | |
| }) | |
| continue | |
| # Everything else is complex | |
| complex_cases.append(resource) | |
| logger.log("✅", f"Python rules: {len(simple_results)} simple, {len(complex_cases)} complex (need AI)") | |
| return simple_results, complex_cases | |
| def classify_resources(resources_json: str, policy_context: str) -> ClassificationResponse: | |
| """Classify cloud resources based on policy using strong AI. | |
| Args: | |
| resources_json: JSON string containing a list of resource dictionaries to classify. | |
| policy_context: Relevant policy text retrieved from the RAG engine. | |
| """ | |
| prompt = f"""You are Sentinal, an expert Cloud FinOps Auditor. | |
| Analyze the following resources against the policy. | |
| **Policy Context:** | |
| {policy_context} | |
| **Input Resources:** | |
| {resources_json} | |
| **YOUR TASKS:** | |
| 1. **DECIDE ACTION:** Determine if the resource should be KEEP, DELETE, HIBERNATE, or AUDIT. | |
| - DELETE: Low CPU (<10%) + idle (>30 days) + dev/test environment | |
| - HIBERNATE: Low CPU (<10%) + idle (>30 days) + prod environment | |
| - KEEP: High CPU (>50%) OR active (<30 days) OR critical tags | |
| - AUDIT: Missing required tags (team, environment) OR ambiguous state | |
| 2. **SHERLOCK MODE (Team Inference):** | |
| - PRIMARY: Check the 'Team' tag first. If present and valid, use it. | |
| - FALLBACK: If 'Team' tag is missing, empty, or generic, use your world knowledge to infer ownership. | |
| - Look at 'ResourceID', 'Name', 'Service', or any other clues. | |
| **Inference Examples:** | |
| - 'kafka', 'hadoop', 'spark', 'airflow', 'ml-', 'training', 'gpu' → ML or Data | |
| - 'jenkins', 'terraform', 'k8s', 'docker', 'cicd' → DevOps or Platform | |
| - 'web', 'frontend', 'react', 'vue', 'ui' → Frontend | |
| - 'api', 'backend', 'service', 'payment', 'catalogue' → Backend | |
| - 'test', 'qa', 'selenium' → QA | |
| - 'security', 'vault', 'secrets' → Security | |
| - 'tableau', 'looker', 'analytics' → Data | |
| **Canonical Teams:** Finance, Data, QA, DevOps, Security, Frontend, Backend, Infrastructure, Platform, ML, Unknown | |
| - Set 'inferred_team' to the most likely team based on your analysis. | |
| - If absolutely no clues exist, set 'inferred_team' to "Unknown". | |
| 3. **CRITICAL SAFETY RULES:** | |
| - Production resources (env=prod) must NEVER be DELETE. Use HIBERNATE instead. | |
| - If unsure, choose AUDIT to flag for human review. | |
| **OUTPUT:** | |
| For EACH resource in the input, you MUST return: | |
| - resource_id: Copy the 'ResourceID' field from the input resource exactly as-is | |
| - action: Your decision (KEEP/DELETE/HIBERNATE/AUDIT) | |
| - reason: Brief justification | |
| - policy_rule: The policy rule that applies | |
| - inferred_team: The team name you inferred | |
| CRITICAL: The 'resource_id' field is MANDATORY. Copy it from the input 'ResourceID' field. | |
| Return valid JSON matching the ClassificationResponse schema with ALL fields populated.""" | |
| try: | |
| model = genai.GenerativeModel(GEMINI_MODEL) | |
| response = model.generate_content( | |
| prompt, | |
| generation_config=genai.GenerationConfig( | |
| response_mime_type="application/json", | |
| response_schema=ClassificationResponseSchema | |
| ) | |
| ) | |
| # Convert to Pydantic | |
| data = json.loads(response.text) | |
| return ClassificationResponse(**data) | |
| except Exception as e: | |
| print(f"Gemini Error: {e}") | |
| # Fallback to empty list if fails | |
| return ClassificationResponse(classifications=[]) | |
| def apply_safety_rules(resource, classification, logger): | |
| """Hardcoded safety overrides.""" | |
| tags = json.loads(resource.get("Tags", "{}")) | |
| env = get_tag(tags, ['env', 'environment']).lower() | |
| action = classification["action"] | |
| # CRITICAL: Prod Safety | |
| if "prod" in env and action == "DELETE": | |
| # logger.log("⚠️", f"{resource['ResourceID']}: Downgraded DELETE→HIBERNATE (prod safety rule)") | |
| return "HIBERNATE" | |
| return action | |
| # --- 7. REPORT GENERATORS (HTML & EMAILS) --- | |
| def generate_report(results, total_resources, total_spend): | |
| """Generate HTML report with scrollable table and CFO Banner.""" | |
| analyzed_spend = sum(r['resource']['Cost_Monthly'] for r in results) | |
| # Calculate Unallocated Spend | |
| unallocated_cost = 0 | |
| savings = 0 | |
| action_counts = {"KEEP": 0, "DELETE": 0, "HIBERNATE": 0, "AUDIT": 0} | |
| for r in results: | |
| res = r['resource'] | |
| act = r['classification']['action'] | |
| tags = json.loads(res.get('Tags', '{}')) | |
| if not get_tag(tags, 'team'): | |
| unallocated_cost += res['Cost_Monthly'] | |
| if act in ['DELETE', 'HIBERNATE']: | |
| savings += res['Cost_Monthly'] | |
| action_counts[act] = action_counts.get(act, 0) + 1 | |
| # Waste CPU | |
| waste_items = [r for r in results if r['classification']['action'] in ['DELETE', 'HIBERNATE']] | |
| avg_waste_util = (sum(r['resource']['CPU_avg'] for r in waste_items) / len(waste_items)) if waste_items else 0 | |
| # --- CHART DATA CALCULATIONS --- | |
| # 1. Action Breakdown (Cost by Action) | |
| action_costs = {"KEEP": 0, "DELETE": 0, "HIBERNATE": 0, "AUDIT": 0} | |
| for r in results: | |
| act = r['classification']['action'] | |
| action_costs[act] = action_costs.get(act, 0) + r['resource']['Cost_Monthly'] | |
| # 2. Service Breakdown (Cost by Service) | |
| service_costs = {} | |
| for r in results: | |
| svc = r['resource'].get('Service', 'Unknown') | |
| service_costs[svc] = service_costs.get(svc, 0) + r['resource']['Cost_Monthly'] | |
| # Generate Action Chart HTML | |
| action_chart_html = "" | |
| action_colors = {"KEEP": "#10b981", "DELETE": "#ef4444", "HIBERNATE": "#f59e0b", "AUDIT": "#f97316"} | |
| max_action_cost = max(action_costs.values()) if action_costs.values() else 1 | |
| for action in ["DELETE", "HIBERNATE", "KEEP", "AUDIT"]: # Order by importance | |
| cost = action_costs.get(action, 0) | |
| pct = (cost / max_action_cost * 100) if max_action_cost > 0 else 0 | |
| color = action_colors.get(action, "#6b7280") | |
| action_chart_html += f""" | |
| <div style="margin-bottom: 12px;"> | |
| <div style="display: flex; justify-content: space-between; margin-bottom: 4px;"> | |
| <span style="font-size: 13px; font-weight: 600; color: #1f2937;">{action}</span> | |
| <span style="font-size: 13px; font-weight: 700; color: {color};">${cost:,.2f}</span> | |
| </div> | |
| <div style="background: #f3f4f6; border-radius: 4px; height: 24px; overflow: hidden;"> | |
| <div style="background: {color}; height: 100%; width: {pct}%; transition: width 0.3s;"></div> | |
| </div> | |
| </div> | |
| """ | |
| # Generate Service Chart HTML | |
| service_chart_html = "" | |
| service_colors = {"EC2": "#ff9900", "RDS": "#3b82f6", "EBS": "#8b5cf6", "S3": "#10b981", "Lambda": "#f59e0b"} | |
| sorted_services = sorted(service_costs.items(), key=lambda x: x[1], reverse=True) | |
| max_service_cost = sorted_services[0][1] if sorted_services else 1 | |
| for svc, cost in sorted_services: | |
| pct = (cost / max_service_cost * 100) if max_service_cost > 0 else 0 | |
| color = service_colors.get(svc, "#6b7280") | |
| service_chart_html += f""" | |
| <div style="margin-bottom: 12px;"> | |
| <div style="display: flex; justify-content: space-between; margin-bottom: 4px;"> | |
| <span style="font-size: 13px; font-weight: 600; color: #1f2937;">{svc}</span> | |
| <span style="font-size: 13px; font-weight: 700; color: {color};">${cost:,.2f}</span> | |
| </div> | |
| <div style="background: #f3f4f6; border-radius: 4px; height: 24px; overflow: hidden;"> | |
| <div style="background: {color}; height: 100%; width: {pct}%; transition: width 0.3s;"></div> | |
| </div> | |
| </div> | |
| """ | |
| # 3. Team Waste Leaderboard (Accountability Metrics) | |
| team_waste = {} | |
| for r in results: | |
| if r['classification']['action'] in ['DELETE', 'HIBERNATE']: | |
| team = r['classification'].get('inferred_team', 'Unknown') | |
| team_waste[team] = team_waste.get(team, 0) + r['resource']['Cost_Monthly'] | |
| # Generate Team Waste Chart HTML | |
| team_chart_html = "" | |
| sorted_teams = sorted(team_waste.items(), key=lambda x: x[1], reverse=True) | |
| max_team_waste = sorted_teams[0][1] if sorted_teams else 1 | |
| team_colors = ["#ef4444", "#f59e0b", "#f97316", "#fb923c", "#fdba74"] | |
| for idx, (team, waste) in enumerate(sorted_teams[:5]): # Top 5 teams | |
| pct = (waste / max_team_waste * 100) if max_team_waste > 0 else 0 | |
| color = team_colors[idx] if idx < len(team_colors) else "#6b7280" | |
| team_chart_html += f""" | |
| <div style="margin-bottom: 12px;"> | |
| <div style="display: flex; justify-content: space-between; margin-bottom: 4px;"> | |
| <span style="font-size: 13px; font-weight: 600; color: #1f2937;">{team}</span> | |
| <span style="font-size: 13px; font-weight: 700; color: {color};">${waste:,.2f}</span> | |
| </div> | |
| <div style="background: #f3f4f6; border-radius: 4px; height: 24px; overflow: hidden;"> | |
| <div style="background: {color}; height: 100%; width: {pct}%; transition: width 0.3s;"></div> | |
| </div> | |
| </div> | |
| """ | |
| report = """ | |
| <div style="font-family: 'Inter', system-ui, sans-serif; color: #1f2937;"> | |
| <!-- Badges --> | |
| <div style="display: flex; gap: 10px; margin-bottom: 20px;"> | |
| <img src="https://img.shields.io/badge/Status-Complete-success?style=flat-square" alt="Status"> | |
| <img src="https://img.shields.io/badge/Model-Gemini_2.0_Flash-blue?style=flat-square" alt="Model"> | |
| <img src="https://img.shields.io/badge/Security-Enterprise_Grade-purple?style=flat-square" alt="Security"> | |
| </div> | |
| <div style="font-family: 'Inter', system-ui, sans-serif; color: #1f2937;"> | |
| <!-- CFO Scorecards --> | |
| <div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 20px; margin-bottom: 30px;"> | |
| <!-- CARD 1: POTENTIAL SAVINGS (Most Important) --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); border-left: 5px solid #10b981;"> | |
| <h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">💰 Potential Savings</h3> | |
| <p style="margin: 0; font-size: 1.875rem; font-weight: 800; color: #10b981;">${savings:,.2f}</p> | |
| <p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">Delete + Hibernate</p> | |
| </div> | |
| <!-- CARD 2: TOTAL SPEND --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);"> | |
| <h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">Total Spend</h3> | |
| <p style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #1f2937;">${total_spend:,.2f}</p> | |
| <p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">Analyzed: ${analyzed_spend:,.2f}</p> | |
| </div> | |
| <!-- CARD 3: ORPHAN SPEND --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); border-left: 5px solid #f59e0b;"> | |
| <h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">Orphan Spend</h3> | |
| <p style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #1f2937;">${unallocated_cost:,.2f}</p> | |
| <p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">No Team Tag</p> | |
| </div> | |
| <!-- CARD 4: WASTE CPU --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); border-left: 5px solid #3b82f6;"> | |
| <h3 style="margin: 0 0 10px 0; color: #6b7280; font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em;">Waste CPU Avg</h3> | |
| <p style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #1f2937;">{avg_waste_util:.1f}%</p> | |
| <p style="margin: 5px 0 0 0; font-size: 0.75rem; color: #9ca3af;">Idle Resources</p> | |
| </div> | |
| </div> | |
| <!-- DATA VISUALIZATIONS --> | |
| <div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 20px; margin-bottom: 30px;"> | |
| <!-- CHART 1: ACTION BREAKDOWN --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);"> | |
| <h3 style="margin: 0 0 15px 0; color: #1f2937; font-size: 1rem; font-weight: 700;">💼 Cost by Action</h3> | |
| {action_chart} | |
| </div> | |
| <!-- CHART 2: SERVICE BREAKDOWN --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);"> | |
| <h3 style="margin: 0 0 15px 0; color: #1f2937; font-size: 1rem; font-weight: 700;">🔧 Cost by Service</h3> | |
| {service_chart} | |
| </div> | |
| <!-- CHART 3: TEAM WASTE LEADERBOARD --> | |
| <div style="background: white; padding: 20px; border-radius: 12px; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);"> | |
| <h3 style="margin: 0 0 15px 0; color: #1f2937; font-size: 1rem; font-weight: 700;">🏆 Team Waste Ranking</h3> | |
| {team_chart} | |
| </div> | |
| </div> | |
| <h3>🛡️ Triage Details</h3> | |
| <div style="overflow: auto; max-height: 500px; border: 1px solid #e5e7eb; border-radius: 8px;"> | |
| <table style="width: 100%; border-collapse: collapse; font-size: 13px;"> | |
| <thead style="position: sticky; top: 0; background: #f9fafb;"> | |
| <tr style="border-bottom: 1px solid #e5e7eb;"> | |
| <th style="padding: 12px; text-align: left;">ResourceID</th> | |
| <th style="padding: 12px; text-align: left;">Service</th> | |
| <th style="padding: 12px; text-align: right;">Cost</th> | |
| <th style="padding: 12px; text-align: left;">Action</th> | |
| <th style="padding: 12px; text-align: left;">Reason</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """.format( | |
| total_spend=total_spend, | |
| analyzed_spend=analyzed_spend, | |
| savings=savings, | |
| unallocated_cost=unallocated_cost, | |
| avg_waste_util=avg_waste_util, | |
| action_chart=action_chart_html, | |
| service_chart=service_chart_html, | |
| team_chart=team_chart_html | |
| ) | |
| action_colors = {"KEEP": "#10b981", "DELETE": "#ef4444", "HIBERNATE": "#f59e0b", "AUDIT": "#f97316"} | |
| for result in results: | |
| res = result["resource"] | |
| cls = result["classification"] | |
| color = action_colors.get(cls['action'], "#6b7280") | |
| report += f""" | |
| <tr style="border-bottom: 1px solid #f3f4f6;"> | |
| <td style="padding: 10px; font-family: monospace;">{res['ResourceID']}</td> | |
| <td style="padding: 10px;">{res['Service']}</td> | |
| <td style="padding: 10px; text-align: right;">${res['Cost_Monthly']:.2f}</td> | |
| <td style="padding: 10px;"><span style="color: {color}; font-weight: 700;">{cls['action']}</span></td> | |
| <td style="padding: 10px;">{cls['reason']}</td> | |
| </tr> | |
| """ | |
| report += "</tbody></table></div></div>" | |
| return report | |
| def generate_emails(results): | |
| """Generates individual email drafts per team.""" | |
| team_data = {} | |
| # Group resources by team | |
| for result in results: | |
| resource = result["resource"] | |
| classification = result["classification"] | |
| # Constraint: Do NOT generate emails for resources marked "KEEP" | |
| if classification["action"] == "KEEP": | |
| continue | |
| # 1. Trust AI first | |
| team_name = "Unknown" | |
| # Check if AI provided inferred_team (Pydantic model field) | |
| if classification.get("inferred_team"): | |
| val = classification["inferred_team"] | |
| # Handle Enum or String | |
| if hasattr(val, 'value'): | |
| team_name = val.value | |
| else: | |
| team_name = str(val) | |
| # 2. Fallback to existing Tag (if AI returned Unknown) | |
| if team_name == "Unknown": | |
| tags = json.loads(resource.get('Tags', '{}')) | |
| tag_team = get_tag(tags, ['team', 'owner_team'], "") | |
| if tag_team: | |
| team_name = tag_team.strip().title() | |
| # Grouping Logic | |
| if team_name not in team_data: | |
| team_data[team_name] = {'resources': [], 'poc_emails': set()} | |
| team_data[team_name]['resources'].append(result) | |
| # Extract POC email (use OwnerEmail as POC) | |
| poc_email = resource.get('OwnerEmail', '') | |
| if poc_email and '@' in poc_email: | |
| team_data[team_name]['poc_emails'].add(poc_email) | |
| # Generate individual email drafts | |
| if not team_data: | |
| return "# No Action Required\n\nAll resources are optimally configured. No emails to send." | |
| output = f"# 📧 Team Email Drafts ({len(team_data)} Teams)\n\n" | |
| output += "_Copy and paste each email below to send to the team POC_\n\n" | |
| output += "---\n\n" | |
| # Generate ONE email draft per Team | |
| for team in sorted(team_data.keys()): | |
| items = team_data[team]['resources'] | |
| poc_emails = team_data[team]['poc_emails'] | |
| # Calculate total savings for this team | |
| savings = sum(r['resource']['Cost_Monthly'] for r in items) | |
| # Count actions | |
| action_counts = {} | |
| for item in items: | |
| action = item['classification']['action'] | |
| action_counts[action] = action_counts.get(action, 0) + 1 | |
| # Email header | |
| output += f"## 📮 Email Draft for {team}\n\n" | |
| output += "```\n" | |
| output += f"To: {', '.join(sorted(poc_emails)) if poc_emails else 'team-email@company.com'}\n" | |
| output += f"Subject: [Action Required] Cloud FinOps Audit - ${savings:,.2f} Potential Savings\n" | |
| output += "\n" | |
| # Email body | |
| output += f"Dear {team},\n\n" | |
| output += f"Our automated FinOps audit has identified {len(items)} cloud resources under your team's ownership " | |
| output += f"that can be optimized to save ${savings:,.2f} per month.\n\n" | |
| output += "**Summary:**\n" | |
| action_summary = " | ".join([f"{count} {action}" for action, count in sorted(action_counts.items())]) | |
| output += f"- {len(items)} resources flagged: {action_summary}\n" | |
| output += f"- Total potential savings: ${savings:,.2f}/month\n\n" | |
| output += "**Top Resources by Cost:**\n\n" | |
| # Show top 5 resources in table format | |
| top_resources = sorted(items, key=lambda x: x['resource']['Cost_Monthly'], reverse=True)[:5] | |
| for i, item in enumerate(top_resources, 1): | |
| res = item['resource'] | |
| cls = item['classification'] | |
| action_emoji = { | |
| 'DELETE': '🔴', | |
| 'HIBERNATE': '🟡', | |
| 'AUDIT': '🟠' | |
| }.get(cls['action'], '⚪') | |
| output += f"{i}. {res['ResourceID']} ({res['Service']})\n" | |
| output += f" Cost: ${res['Cost_Monthly']:.2f}/mo | Action: {action_emoji} {cls['action']}\n" | |
| output += f" Reason: {cls['reason'][:80]}{'...' if len(cls['reason']) > 80 else ''}\n\n" | |
| if len(items) > 5: | |
| output += f"...and {len(items) - 5} more resources (see full report)\n\n" | |
| output += "**Next Steps:**\n" | |
| output += f"1. Review the flagged resources in the attached report\n" | |
| output += f"2. Confirm or reject the recommended actions\n" | |
| output += f"3. Contact finops@company.com with questions\n\n" | |
| output += "Best regards,\n" | |
| output += "Cloud FinOps Team\n" | |
| output += "```\n\n" | |
| output += "---\n\n" | |
| return output | |
| # --- 8. MAIN ORCHESTRATION --- | |
| def run_audit(custom_query="", progress_callback=None, log_callback=None): | |
| """Main function called by app.py.""" | |
| import time | |
| logger = ReasoningLogger(log_callback) | |
| try: | |
| logger.log("🚀", "Starting Sentinal Cloud Audit...") | |
| if progress_callback: progress_callback(0.1, "🚀 Initializing...") | |
| # Load Data | |
| if not os.path.exists("billing_export.csv"): | |
| return json.dumps({"report": "Error: billing_export.csv missing", "emails": ""}) | |
| df = pd.read_csv("billing_export.csv") | |
| logger.log("📊", f"Loaded {len(df)} resources.") | |
| # --- STEP 1: INTENT EXTRACTION --- | |
| if custom_query: | |
| intent = extract_query_intent(custom_query, logger) | |
| df = filter_dataframe(df, intent, logger) | |
| if len(df) == 0: | |
| return json.dumps({"report": "<h3>No resources matched your query.</h3>", "emails": "", "reasoning": logger.get_logs()}) | |
| # --- STEP 2: HYBRID ANALYSIS --- | |
| if progress_callback: progress_callback(0.3, "⚡ Running Python Heuristics...") | |
| simple_results, complex_cases = classify_simple_cases(df, logger) | |
| all_results = [] | |
| for sr in simple_results: | |
| all_results.append({ | |
| "resource": sr['resource'], | |
| "classification": { | |
| "action": sr['action'], | |
| "reason": sr['reason'], | |
| "policy_rule": sr['policy_rule'], | |
| "inferred_team": sr.get('inferred_team', TeamName.UNKNOWN) | |
| } | |
| }) | |
| # --- STEP 3: AI REASONING --- | |
| if complex_cases: | |
| if progress_callback: progress_callback(0.6, f"🤖 AI Analyzing {len(complex_cases)} complex cases...") | |
| logger.log("📚", "Checking Policy via RAG...") | |
| policy_ctx = query_policy("cloud resource lifecycle policy including deletion, hibernation, and retention rules") | |
| # OPTIMIZATION: Prioritize Top 150 Most Expensive Resources | |
| # Instead of batching everything (slow/expensive), we focus AI on high-impact items. | |
| complex_cases.sort(key=lambda x: x.get('Cost_Monthly', 0), reverse=True) | |
| TOP_N = 100 | |
| high_impact_batch = complex_cases[:TOP_N] | |
| skipped_count = len(complex_cases) - len(high_impact_batch) | |
| if skipped_count > 0: | |
| logger.log("⚡", f"Prioritizing Top {TOP_N} spenders. Skipped {skipped_count} low-cost items.") | |
| if progress_callback: | |
| progress_callback(0.7, f"🤖 AI Analyzing Top {len(high_impact_batch)} High-Impact Cases...") | |
| try: | |
| resources_json = json.dumps(high_impact_batch) | |
| ai_response = classify_resources(resources_json, policy_ctx) | |
| for cls in ai_response.classifications: | |
| res = next((r for r in high_impact_batch if r['ResourceID'] == cls.resource_id), None) | |
| if res: | |
| all_results.append({ | |
| "resource": res, | |
| "classification": { | |
| "action": cls.action, | |
| "reason": cls.reason, | |
| "policy_rule": cls.policy_rule, | |
| "inferred_team": cls.inferred_team | |
| } | |
| }) | |
| except Exception as e: | |
| logger.log("⚠️", f"AI Analysis Error: {e}") | |
| # --- STEP 4: SAFETY CHECKS --- | |
| logger.log("🛡️", "Applying final safety guardrails...") | |
| final_results = [] | |
| for res in all_results: | |
| safe_action = apply_safety_rules(res['resource'], res['classification'], logger) | |
| res['classification']['action'] = safe_action | |
| final_results.append(res) | |
| # --- STEP 5: REPORTING --- | |
| if progress_callback: progress_callback(0.9, "📝 Generatng Report...") | |
| total_spend = df['Cost_Monthly'].sum() | |
| report = generate_report(final_results, len(df), total_spend) | |
| emails = generate_emails(final_results) | |
| logger.log("✅", "Audit Complete.") | |
| return json.dumps({ | |
| "report": report, | |
| "emails": emails, | |
| "reasoning": logger.get_logs() | |
| }) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| logger.log("❌", f"Fatal Error: {e}") | |
| return json.dumps({"report": f"Error: {e}", "emails": "", "reasoning": logger.get_logs()}) | |
| if __name__ == "__main__": | |
| # Test run | |
| print(run_audit("find expensive databases")) | |