import os from dotenv import load_dotenv # Load environment variables load_dotenv() import json import pandas as pd from datetime import datetime from typing import Dict, List, Literal, Optional from typing_extensions import TypedDict from enum import Enum from pydantic import BaseModel, Field import re # Smolagents imports from smolagents import CodeAgent, tool, LiteLLMModel # OpenAI for direct API calls # OpenAI for direct API calls from openai import OpenAI import google.generativeai as genai # LlamaIndex for RAG from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings from llama_index.llms.openai import OpenAI as LlamaOpenAI from llama_index.embeddings.openai import OpenAIEmbedding from dotenv import load_dotenv load_dotenv() # --- Configuration --- DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4o-mini") REASONING_EFFORT = os.getenv("REASONING_EFFORT", "low") if "OPENAI_API_KEY" not in os.environ: print("WARNING: OPENAI_API_KEY not found in environment. Agent may fail.") if "GOOGLE_API_KEY" not in os.environ: print("WARNING: GOOGLE_API_KEY not found in environment. Gemini will fail.") client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) # Gemini Model Configuration GEMINI_MODEL = "gemini-2.5-flash"#"gemini-2.0-flash" #"gemini-2.0-flash-lite"#"gemini-2.0-flash-exp" # --- 1. ENUMS & PYDANTIC MODELS --- class TeamName(str, Enum): FINANCE = "Finance" DATA = "Data" QA = "QA" DEVOPS = "DevOps" SECURITY = "Security" FRONTEND = "Frontend" BACKEND = "Backend" INFRA = "Infrastructure" PLATFORM = "Platform" ML = "ML" UNKNOWN = "Unknown" class QueryIntent(BaseModel): """Translates natural language into strict data filters.""" target_services: Optional[List[str]] = Field(default_factory=list, description="List of services to filter (e.g. ['RDS', 'EBS', 'EC2'])") target_env: Optional[str] = Field(None, description="Environment to focus on (e.g. 'prod', 'dev')") target_team: Optional[str] = Field(None, description="Team to focus on (e.g. 'DevOps', 'ML', 'Data')") min_cost: Optional[float] = Field(0.0, description="Minimum monthly cost to filter for") only_orphans: bool = Field(False, description="True if user is looking for untagged/unowned resources") focus_area: Literal["SAVINGS", "SAFETY", "GENERAL", "TEAM_RANKING"] = Field("GENERAL", description="The user's primary goal") # TypedDict for Gemini Schema (Avoids 'default' error) class QueryIntentSchema(TypedDict): target_services: List[str] target_env: Optional[str] target_team: Optional[str] min_cost: float only_orphans: bool focus_area: Literal["SAVINGS", "SAFETY", "GENERAL", "TEAM_RANKING"] class ResourceClassification(BaseModel): """Classification for a single resource.""" resource_id: str action: Literal["KEEP", "DELETE", "HIBERNATE", "AUDIT"] = Field("AUDIT", description="Recommended action") reason: str = Field("No reason provided", description="Justification for the action") policy_rule: str = Field("Manual Review", description="Policy rule applied") inferred_team: Optional[TeamName] = Field(None, description="Inferred team ownership based on tags or resource name") class ClassificationResponse(BaseModel): """Complete classification response for all resources.""" classifications: list[ResourceClassification] # TypedDict for Gemini Schema class ResourceClassificationSchema(TypedDict): resource_id: str action: Literal["KEEP", "DELETE", "HIBERNATE", "AUDIT"] reason: str policy_rule: str inferred_team: Optional[TeamName] class ClassificationResponseSchema(TypedDict): classifications: List[ResourceClassificationSchema] # --- 2. REASONING LOGGER --- class ReasoningLogger: """Captures step-by-step agent reasoning for transparency with optional streaming.""" def __init__(self, log_callback=None): self.logs = [] self.log_callback = log_callback def log(self, emoji, message): timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {emoji} {message}" self.logs.append(log_entry) print(log_entry) if self.log_callback: self.log_callback(log_entry) def get_logs(self): return "\n".join(self.logs) # --- 3. RAG SETUP (POLICY ENGINE) --- def setup_policy_rag(): try: # Use lightweight model for RAG Settings.llm = LlamaOpenAI(model="gpt-4o-mini") Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") if os.path.exists("policy.txt"): documents = SimpleDirectoryReader(input_files=["policy.txt"]).load_data() index = VectorStoreIndex.from_documents(documents) query_engine = index.as_query_engine(similarity_top_k=3) return query_engine return None except Exception as e: print(f"Warning: Policy RAG setup failed: {e}") return None policy_engine = setup_policy_rag() @tool def query_policy(question: str) -> str: """Query enterprise policy using RAG. Args: question: The policy question to ask (e.g., 'What are the deletion rules?') """ if not policy_engine: return "Policy engine unavailable (policy.txt missing)." try: response = policy_engine.query(question) return str(response) except Exception as e: return f"Policy query error: {e}" # --- 4. HELPER FUNCTIONS --- def get_tag(tags_dict, keys, default=''): """Robust case-insensitive tag retrieval.""" if isinstance(keys, str): keys = [keys] keys_lower = [k.lower() for k in keys] for k, v in tags_dict.items(): if k.lower() in keys_lower: return v return default def infer_team_from_tags(tags_dict, resource_id="") -> TeamName: """Simple Python-based team inference from tags OR resource ID.""" team_str = get_tag(tags_dict, ['team', 'Team', 'TEAM', 'owner_team']).lower() # If no tag, try to infer from ResourceID (Sherlock Mode) if not team_str and resource_id: if 'ml-' in resource_id or 'training' in resource_id: team_str = 'ml' elif 'db-' in resource_id or 'rds' in resource_id: team_str = 'data' elif 'web' in resource_id or 'frontend' in resource_id: team_str = 'frontend' elif 'api' in resource_id or 'backend' in resource_id: team_str = 'backend' elif 'test' in resource_id or 'qa' in resource_id: team_str = 'qa' # Map common variations to canonical TeamName if not team_str: return TeamName.UNKNOWN if 'finance' in team_str or 'fin' in team_str: return TeamName.FINANCE elif 'data' in team_str or 'science' in team_str: return TeamName.DATA elif 'ml' in team_str or 'ai' in team_str: return TeamName.ML elif 'qa' in team_str or 'test' in team_str or 'quality' in team_str: return TeamName.QA elif 'devops' in team_str or 'ops' in team_str or 'sre' in team_str: return TeamName.DEVOPS elif 'platform' in team_str: return TeamName.PLATFORM elif 'security' in team_str or 'sec' in team_str or 'infosec' in team_str: return TeamName.SECURITY elif 'frontend' in team_str or 'web' in team_str or 'ui' in team_str: return TeamName.FRONTEND elif 'backend' in team_str or 'api' in team_str: return TeamName.BACKEND elif 'infra' in team_str: return TeamName.INFRA else: return TeamName.UNKNOWN # --- 5. INTENT EXTRACTION & FILTERING (NEW) --- def extract_query_intent(query: str, logger) -> QueryIntent: """Uses LLM to convert 'Find expensive databases' -> {'target_services': ['RDS'], 'min_cost': 500}""" if not query: return QueryIntent() logger.log("🧠", f"Translating query: '{query}'...") prompt = f""" You are a Data Query Translator for a Cloud FinOps Tool. Convert the user's natural language query into structured filters. User Query: "{query}" --------------------------------------------------------- STRICT VOCABULARY MAPPING (DO NOT INVENT TERMS) --------------------------------------------------------- 1. SERVICES (Map keywords to EXACTLY one of these 5 options): - "database", "db", "sql", "rds", "postgres" -> ["RDS"] - "server", "vm", "instance", "ec2", "compute", "host" -> ["EC2"] - "volume", "disk", "storage", "ebs", "block store" -> ["EBS"] - "bucket", "object", "s3", "blob" -> ["S3"] - "function", "serverless", "lambda" -> ["Lambda"] * IF NO SPECIFIC SERVICE IS MENTIONED, RETURN [] (Empty List). 2. ENVIRONMENTS (Map keywords to EXACTLY one of these 2 options): - "production", "prod", "live" -> "prod" - "development", "dev", "test", "staging", "qa" -> "dev" 3. TEAMS (Map keywords to canonical team names): - "devops", "ops", "sre" -> "DevOps" - "ml", "ai", "machine learning" -> "ML" - "data", "analytics", "science" -> "Data" - "frontend", "web", "ui" -> "Frontend" - "backend", "api" -> "Backend" - "qa", "test", "quality" -> "QA" - "security", "infosec" -> "Security" - "platform", "infrastructure", "infra" -> "Platform" 4. INTENT RULES: - "kill", "delete", "prune", "remove" -> focus_area="SAVINGS", min_cost=0.0 - "waste", "zombie", "unused", "idle" -> focus_area="SAVINGS", min_cost=0.0 - "expensive", "costly" -> focus_area="GENERAL", min_cost=100.0 - "orphan", "untagged" -> only_orphans=True - "audit", "safety", "check" -> focus_area="SAFETY" - "biggest offender", "worst team", "team ranking", "team leaderboard" -> focus_area="TEAM_RANKING" OUTPUT FORMAT: Return valid JSON matching the schema. """ try: model = genai.GenerativeModel(GEMINI_MODEL) response = model.generate_content( prompt, generation_config=genai.GenerationConfig( response_mime_type="application/json", response_schema=QueryIntentSchema ) ) data = json.loads(response.text) intent = QueryIntent(**data) # UPDATED LOGGING: Now visualizes Env and Orphans env_str = intent.target_env if intent.target_env else "All" svc_str = str(intent.target_services) if intent.target_services else "All" team_str = intent.target_team if intent.target_team else "All" logger.log("🎯", f"Intent: {intent.focus_area} | Team: {team_str} | Env: {env_str} | Svc: {svc_str} | Cost > ${intent.min_cost} | Orphans: {intent.only_orphans}") return intent except Exception as e: logger.log("⚠️", f"Intent extraction failed: {e}. Running full audit.") return QueryIntent() def filter_dataframe(df, intent: QueryIntent, logger): """Applies the LLM-derived intent to the Pandas DataFrame.""" initial_count = len(df) # 1. Service Filter if intent.target_services: df = df[df['Service'].isin(intent.target_services)] # 2. Environment Filter if intent.target_env: def check_env(tags_str): try: tags = json.loads(tags_str) env = get_tag(tags, ['env', 'environment'], '').lower() return intent.target_env in env except: return False df = df[df['Tags'].apply(check_env)] # 3. Cost Filter if intent.min_cost and intent.min_cost > 0: df = df[df['Cost_Monthly'] >= intent.min_cost] # 4. Team Filter if intent.target_team: def check_team(tags_str): try: tags = json.loads(tags_str) team = get_tag(tags, ['team', 'owner_team'], '').lower() return intent.target_team.lower() in team except: return False df = df[df['Tags'].apply(check_team)] # 5. Orphan Filter if intent.only_orphans: def check_orphan(tags_str): try: tags = json.loads(tags_str) # Orphan = No Team tag return not get_tag(tags, ['team', 'owner_team']) except: return True df = df[df['Tags'].apply(check_orphan)] logger.log("🔍", f"Scoped analysis from {initial_count} to {len(df)} resources.") return df # --- 6. CORE ANALYSIS LOGIC --- def classify_simple_cases(df, logger): """Fast Python rules for obvious cases (90% of resources).""" logger.log("⚡", "Applying Python rules for simple cases...") simple_results = [] complex_cases = [] for idx, row in df.iterrows(): resource = row.to_dict() try: tags = json.loads(resource.get('Tags', '{}')) except: tags = {} environment = get_tag(tags, ['env', 'environment', 'Environment']).lower() # Rule 1: Unattached EBS if (resource.get('Service') == 'EBS' and get_tag(tags, 'state') == 'available' and resource.get('CPU_avg', 0) == 0): simple_results.append({ 'resource': resource, 'action': 'DELETE', 'reason': 'Unattached EBS volume (no activity)', 'policy_rule': 'Rule: Unattached volumes should be deleted', 'inferred_team': infer_team_from_tags(tags, resource.get('ResourceID', '')) }) continue # Rule 2: Obvious Keep (High CPU) if resource.get('CPU_avg', 0) > 75: simple_results.append({ 'resource': resource, 'action': 'KEEP', 'reason': f'High CPU utilization ({resource.get("CPU_avg")}%)', 'policy_rule': 'High utilization indicates active use', 'inferred_team': infer_team_from_tags(tags, resource.get('ResourceID', '')) }) continue # Everything else is complex complex_cases.append(resource) logger.log("✅", f"Python rules: {len(simple_results)} simple, {len(complex_cases)} complex (need AI)") return simple_results, complex_cases @tool def classify_resources(resources_json: str, policy_context: str) -> ClassificationResponse: """Classify cloud resources based on policy using strong AI. Args: resources_json: JSON string containing a list of resource dictionaries to classify. policy_context: Relevant policy text retrieved from the RAG engine. """ prompt = f"""You are Sentinal, an expert Cloud FinOps Auditor. Analyze the following resources against the policy. **Policy Context:** {policy_context} **Input Resources:** {resources_json} **YOUR TASKS:** 1. **DECIDE ACTION:** Determine if the resource should be KEEP, DELETE, HIBERNATE, or AUDIT. - DELETE: Low CPU (<10%) + idle (>30 days) + dev/test environment - HIBERNATE: Low CPU (<10%) + idle (>30 days) + prod environment - KEEP: High CPU (>50%) OR active (<30 days) OR critical tags - AUDIT: Missing required tags (team, environment) OR ambiguous state 2. **SHERLOCK MODE (Team Inference):** - PRIMARY: Check the 'Team' tag first. If present and valid, use it. - FALLBACK: If 'Team' tag is missing, empty, or generic, use your world knowledge to infer ownership. - Look at 'ResourceID', 'Name', 'Service', or any other clues. **Inference Examples:** - 'kafka', 'hadoop', 'spark', 'airflow', 'ml-', 'training', 'gpu' → ML or Data - 'jenkins', 'terraform', 'k8s', 'docker', 'cicd' → DevOps or Platform - 'web', 'frontend', 'react', 'vue', 'ui' → Frontend - 'api', 'backend', 'service', 'payment', 'catalogue' → Backend - 'test', 'qa', 'selenium' → QA - 'security', 'vault', 'secrets' → Security - 'tableau', 'looker', 'analytics' → Data **Canonical Teams:** Finance, Data, QA, DevOps, Security, Frontend, Backend, Infrastructure, Platform, ML, Unknown - Set 'inferred_team' to the most likely team based on your analysis. - If absolutely no clues exist, set 'inferred_team' to "Unknown". 3. **CRITICAL SAFETY RULES:** - Production resources (env=prod) must NEVER be DELETE. Use HIBERNATE instead. - If unsure, choose AUDIT to flag for human review. **OUTPUT:** For EACH resource in the input, you MUST return: - resource_id: Copy the 'ResourceID' field from the input resource exactly as-is - action: Your decision (KEEP/DELETE/HIBERNATE/AUDIT) - reason: Brief justification - policy_rule: The policy rule that applies - inferred_team: The team name you inferred CRITICAL: The 'resource_id' field is MANDATORY. Copy it from the input 'ResourceID' field. Return valid JSON matching the ClassificationResponse schema with ALL fields populated.""" try: model = genai.GenerativeModel(GEMINI_MODEL) response = model.generate_content( prompt, generation_config=genai.GenerationConfig( response_mime_type="application/json", response_schema=ClassificationResponseSchema ) ) # Convert to Pydantic data = json.loads(response.text) return ClassificationResponse(**data) except Exception as e: print(f"Gemini Error: {e}") # Fallback to empty list if fails return ClassificationResponse(classifications=[]) def apply_safety_rules(resource, classification, logger): """Hardcoded safety overrides.""" tags = json.loads(resource.get("Tags", "{}")) env = get_tag(tags, ['env', 'environment']).lower() action = classification["action"] # CRITICAL: Prod Safety if "prod" in env and action == "DELETE": # logger.log("⚠️", f"{resource['ResourceID']}: Downgraded DELETE→HIBERNATE (prod safety rule)") return "HIBERNATE" return action # --- 7. REPORT GENERATORS (HTML & EMAILS) --- def generate_report(results, total_resources, total_spend): """Generate HTML report with scrollable table and CFO Banner.""" analyzed_spend = sum(r['resource']['Cost_Monthly'] for r in results) # Calculate Unallocated Spend unallocated_cost = 0 savings = 0 action_counts = {"KEEP": 0, "DELETE": 0, "HIBERNATE": 0, "AUDIT": 0} for r in results: res = r['resource'] act = r['classification']['action'] tags = json.loads(res.get('Tags', '{}')) if not get_tag(tags, 'team'): unallocated_cost += res['Cost_Monthly'] if act in ['DELETE', 'HIBERNATE']: savings += res['Cost_Monthly'] action_counts[act] = action_counts.get(act, 0) + 1 # Waste CPU waste_items = [r for r in results if r['classification']['action'] in ['DELETE', 'HIBERNATE']] avg_waste_util = (sum(r['resource']['CPU_avg'] for r in waste_items) / len(waste_items)) if waste_items else 0 # --- CHART DATA CALCULATIONS --- # 1. Action Breakdown (Cost by Action) action_costs = {"KEEP": 0, "DELETE": 0, "HIBERNATE": 0, "AUDIT": 0} for r in results: act = r['classification']['action'] action_costs[act] = action_costs.get(act, 0) + r['resource']['Cost_Monthly'] # 2. Service Breakdown (Cost by Service) service_costs = {} for r in results: svc = r['resource'].get('Service', 'Unknown') service_costs[svc] = service_costs.get(svc, 0) + r['resource']['Cost_Monthly'] # Generate Action Chart HTML action_chart_html = "" action_colors = {"KEEP": "#10b981", "DELETE": "#ef4444", "HIBERNATE": "#f59e0b", "AUDIT": "#f97316"} max_action_cost = max(action_costs.values()) if action_costs.values() else 1 for action in ["DELETE", "HIBERNATE", "KEEP", "AUDIT"]: # Order by importance cost = action_costs.get(action, 0) pct = (cost / max_action_cost * 100) if max_action_cost > 0 else 0 color = action_colors.get(action, "#6b7280") action_chart_html += f"""
{action} ${cost:,.2f}
""" # Generate Service Chart HTML service_chart_html = "" service_colors = {"EC2": "#ff9900", "RDS": "#3b82f6", "EBS": "#8b5cf6", "S3": "#10b981", "Lambda": "#f59e0b"} sorted_services = sorted(service_costs.items(), key=lambda x: x[1], reverse=True) max_service_cost = sorted_services[0][1] if sorted_services else 1 for svc, cost in sorted_services: pct = (cost / max_service_cost * 100) if max_service_cost > 0 else 0 color = service_colors.get(svc, "#6b7280") service_chart_html += f"""
{svc} ${cost:,.2f}
""" # 3. Team Waste Leaderboard (Accountability Metrics) team_waste = {} for r in results: if r['classification']['action'] in ['DELETE', 'HIBERNATE']: team = r['classification'].get('inferred_team', 'Unknown') team_waste[team] = team_waste.get(team, 0) + r['resource']['Cost_Monthly'] # Generate Team Waste Chart HTML team_chart_html = "" sorted_teams = sorted(team_waste.items(), key=lambda x: x[1], reverse=True) max_team_waste = sorted_teams[0][1] if sorted_teams else 1 team_colors = ["#ef4444", "#f59e0b", "#f97316", "#fb923c", "#fdba74"] for idx, (team, waste) in enumerate(sorted_teams[:5]): # Top 5 teams pct = (waste / max_team_waste * 100) if max_team_waste > 0 else 0 color = team_colors[idx] if idx < len(team_colors) else "#6b7280" team_chart_html += f"""
{team} ${waste:,.2f}
""" report = """
Status Model Security

💰 Potential Savings

${savings:,.2f}

Delete + Hibernate

Total Spend

${total_spend:,.2f}

Analyzed: ${analyzed_spend:,.2f}

Orphan Spend

${unallocated_cost:,.2f}

No Team Tag

Waste CPU Avg

{avg_waste_util:.1f}%

Idle Resources

💼 Cost by Action

{action_chart}

🔧 Cost by Service

{service_chart}

🏆 Team Waste Ranking

{team_chart}

🛡️ Triage Details

""".format( total_spend=total_spend, analyzed_spend=analyzed_spend, savings=savings, unallocated_cost=unallocated_cost, avg_waste_util=avg_waste_util, action_chart=action_chart_html, service_chart=service_chart_html, team_chart=team_chart_html ) action_colors = {"KEEP": "#10b981", "DELETE": "#ef4444", "HIBERNATE": "#f59e0b", "AUDIT": "#f97316"} for result in results: res = result["resource"] cls = result["classification"] color = action_colors.get(cls['action'], "#6b7280") report += f""" """ report += "
ResourceID Service Cost Action Reason
{res['ResourceID']} {res['Service']} ${res['Cost_Monthly']:.2f} {cls['action']} {cls['reason']}
" return report def generate_emails(results): """Generates individual email drafts per team.""" team_data = {} # Group resources by team for result in results: resource = result["resource"] classification = result["classification"] # Constraint: Do NOT generate emails for resources marked "KEEP" if classification["action"] == "KEEP": continue # 1. Trust AI first team_name = "Unknown" # Check if AI provided inferred_team (Pydantic model field) if classification.get("inferred_team"): val = classification["inferred_team"] # Handle Enum or String if hasattr(val, 'value'): team_name = val.value else: team_name = str(val) # 2. Fallback to existing Tag (if AI returned Unknown) if team_name == "Unknown": tags = json.loads(resource.get('Tags', '{}')) tag_team = get_tag(tags, ['team', 'owner_team'], "") if tag_team: team_name = tag_team.strip().title() # Grouping Logic if team_name not in team_data: team_data[team_name] = {'resources': [], 'poc_emails': set()} team_data[team_name]['resources'].append(result) # Extract POC email (use OwnerEmail as POC) poc_email = resource.get('OwnerEmail', '') if poc_email and '@' in poc_email: team_data[team_name]['poc_emails'].add(poc_email) # Generate individual email drafts if not team_data: return "# No Action Required\n\nAll resources are optimally configured. No emails to send." output = f"# 📧 Team Email Drafts ({len(team_data)} Teams)\n\n" output += "_Copy and paste each email below to send to the team POC_\n\n" output += "---\n\n" # Generate ONE email draft per Team for team in sorted(team_data.keys()): items = team_data[team]['resources'] poc_emails = team_data[team]['poc_emails'] # Calculate total savings for this team savings = sum(r['resource']['Cost_Monthly'] for r in items) # Count actions action_counts = {} for item in items: action = item['classification']['action'] action_counts[action] = action_counts.get(action, 0) + 1 # Email header output += f"## 📮 Email Draft for {team}\n\n" output += "```\n" output += f"To: {', '.join(sorted(poc_emails)) if poc_emails else 'team-email@company.com'}\n" output += f"Subject: [Action Required] Cloud FinOps Audit - ${savings:,.2f} Potential Savings\n" output += "\n" # Email body output += f"Dear {team},\n\n" output += f"Our automated FinOps audit has identified {len(items)} cloud resources under your team's ownership " output += f"that can be optimized to save ${savings:,.2f} per month.\n\n" output += "**Summary:**\n" action_summary = " | ".join([f"{count} {action}" for action, count in sorted(action_counts.items())]) output += f"- {len(items)} resources flagged: {action_summary}\n" output += f"- Total potential savings: ${savings:,.2f}/month\n\n" output += "**Top Resources by Cost:**\n\n" # Show top 5 resources in table format top_resources = sorted(items, key=lambda x: x['resource']['Cost_Monthly'], reverse=True)[:5] for i, item in enumerate(top_resources, 1): res = item['resource'] cls = item['classification'] action_emoji = { 'DELETE': '🔴', 'HIBERNATE': '🟡', 'AUDIT': '🟠' }.get(cls['action'], '⚪') output += f"{i}. {res['ResourceID']} ({res['Service']})\n" output += f" Cost: ${res['Cost_Monthly']:.2f}/mo | Action: {action_emoji} {cls['action']}\n" output += f" Reason: {cls['reason'][:80]}{'...' if len(cls['reason']) > 80 else ''}\n\n" if len(items) > 5: output += f"...and {len(items) - 5} more resources (see full report)\n\n" output += "**Next Steps:**\n" output += f"1. Review the flagged resources in the attached report\n" output += f"2. Confirm or reject the recommended actions\n" output += f"3. Contact finops@company.com with questions\n\n" output += "Best regards,\n" output += "Cloud FinOps Team\n" output += "```\n\n" output += "---\n\n" return output # --- 8. MAIN ORCHESTRATION --- def run_audit(custom_query="", progress_callback=None, log_callback=None): """Main function called by app.py.""" import time logger = ReasoningLogger(log_callback) try: logger.log("🚀", "Starting Sentinal Cloud Audit...") if progress_callback: progress_callback(0.1, "🚀 Initializing...") # Load Data if not os.path.exists("billing_export.csv"): return json.dumps({"report": "Error: billing_export.csv missing", "emails": ""}) df = pd.read_csv("billing_export.csv") logger.log("📊", f"Loaded {len(df)} resources.") # --- STEP 1: INTENT EXTRACTION --- if custom_query: intent = extract_query_intent(custom_query, logger) df = filter_dataframe(df, intent, logger) if len(df) == 0: return json.dumps({"report": "

No resources matched your query.

", "emails": "", "reasoning": logger.get_logs()}) # --- STEP 2: HYBRID ANALYSIS --- if progress_callback: progress_callback(0.3, "⚡ Running Python Heuristics...") simple_results, complex_cases = classify_simple_cases(df, logger) all_results = [] for sr in simple_results: all_results.append({ "resource": sr['resource'], "classification": { "action": sr['action'], "reason": sr['reason'], "policy_rule": sr['policy_rule'], "inferred_team": sr.get('inferred_team', TeamName.UNKNOWN) } }) # --- STEP 3: AI REASONING --- if complex_cases: if progress_callback: progress_callback(0.6, f"🤖 AI Analyzing {len(complex_cases)} complex cases...") logger.log("📚", "Checking Policy via RAG...") policy_ctx = query_policy("cloud resource lifecycle policy including deletion, hibernation, and retention rules") # OPTIMIZATION: Prioritize Top 150 Most Expensive Resources # Instead of batching everything (slow/expensive), we focus AI on high-impact items. complex_cases.sort(key=lambda x: x.get('Cost_Monthly', 0), reverse=True) TOP_N = 100 high_impact_batch = complex_cases[:TOP_N] skipped_count = len(complex_cases) - len(high_impact_batch) if skipped_count > 0: logger.log("⚡", f"Prioritizing Top {TOP_N} spenders. Skipped {skipped_count} low-cost items.") if progress_callback: progress_callback(0.7, f"🤖 AI Analyzing Top {len(high_impact_batch)} High-Impact Cases...") try: resources_json = json.dumps(high_impact_batch) ai_response = classify_resources(resources_json, policy_ctx) for cls in ai_response.classifications: res = next((r for r in high_impact_batch if r['ResourceID'] == cls.resource_id), None) if res: all_results.append({ "resource": res, "classification": { "action": cls.action, "reason": cls.reason, "policy_rule": cls.policy_rule, "inferred_team": cls.inferred_team } }) except Exception as e: logger.log("⚠️", f"AI Analysis Error: {e}") # --- STEP 4: SAFETY CHECKS --- logger.log("🛡️", "Applying final safety guardrails...") final_results = [] for res in all_results: safe_action = apply_safety_rules(res['resource'], res['classification'], logger) res['classification']['action'] = safe_action final_results.append(res) # --- STEP 5: REPORTING --- if progress_callback: progress_callback(0.9, "📝 Generatng Report...") total_spend = df['Cost_Monthly'].sum() report = generate_report(final_results, len(df), total_spend) emails = generate_emails(final_results) logger.log("✅", "Audit Complete.") return json.dumps({ "report": report, "emails": emails, "reasoning": logger.get_logs() }) except Exception as e: import traceback traceback.print_exc() logger.log("❌", f"Fatal Error: {e}") return json.dumps({"report": f"Error: {e}", "emails": "", "reasoning": logger.get_logs()}) if __name__ == "__main__": # Test run print(run_audit("find expensive databases"))