""" Budget audit pipeline — logs every slot spend and provides rollup reports. Appends one row per control tick to a parquet file. The weekly rollup verifies cumulative sacrifice stays within the 5% annual ceiling. Usage: # In control_loop.py tick(): from src.budget_audit import BudgetAuditLog audit = BudgetAuditLog() audit.log_slot(tick_result) # Weekly report: python -m src.budget_audit --report """ from __future__ import annotations import logging from dataclasses import dataclass from datetime import date, datetime, timezone from pathlib import Path from typing import Optional import pandas as pd from config.settings import ( DATA_DIR, MAX_ENERGY_REDUCTION_PCT, SYSTEM_CAPACITY_KW, ) logger = logging.getLogger(__name__) AUDIT_DIR = DATA_DIR / "budget_audit" AUDIT_PATH = AUDIT_DIR / "slot_log.parquet" @dataclass class SlotRecord: """One row in the audit log.""" timestamp: datetime date: date slot_index: int planned_offset_deg: float actual_offset_deg: float energy_cost_kwh: float budget_spent_kwh: float budget_remaining_kwh: float gate_passed: bool source: str stage_id: str class BudgetAuditLog: """Append-only parquet log for budget slot spends.""" def __init__(self, path: Path = AUDIT_PATH): self.path = path self.path.parent.mkdir(parents=True, exist_ok=True) def log_slot(self, tick_result) -> None: """Append a tick result to the audit log.""" try: record = { "timestamp": getattr(tick_result, "timestamp", datetime.now(timezone.utc)), "date": str(getattr(tick_result, "timestamp", datetime.now(timezone.utc)).date() if hasattr(getattr(tick_result, "timestamp", None), "date") else date.today()), "slot_index": getattr(tick_result, "slot_index", -1), "planned_offset_deg": getattr(tick_result, "plan_offset_deg", 0.0), "actual_offset_deg": getattr(tick_result, "target_angle", 0.0), "energy_cost_kwh": getattr(tick_result, "energy_cost_kwh", 0.0), "budget_spent_kwh": getattr(tick_result, "budget_spent_kwh", 0.0), "budget_remaining_kwh": getattr(tick_result, "budget_remaining_kwh", 0.0), "gate_passed": getattr(tick_result, "live_gate_passed", False), "source": getattr(tick_result, "source", ""), "stage_id": getattr(tick_result, "stage_id", "unknown"), } new_row = pd.DataFrame([record]) if self.path.exists(): existing = pd.read_parquet(self.path) combined = pd.concat([existing, new_row], ignore_index=True) else: combined = new_row combined.to_parquet(self.path, index=False) logger.debug("Audit log: slot %d, cost=%.4f kWh", record["slot_index"], record["energy_cost_kwh"]) except Exception as exc: logger.warning("Budget audit log failed: %s", exc) def load(self) -> pd.DataFrame: """Load the full audit log.""" if self.path.exists(): return pd.read_parquet(self.path) return pd.DataFrame() def daily_summary(self, target_date: Optional[date] = None) -> dict: """Summarize a single day's budget usage.""" df = self.load() if df.empty: return {"error": "No audit data"} if target_date is None: target_date = date.today() day = df[df["date"] == str(target_date)] if day.empty: return {"date": str(target_date), "slots": 0, "total_cost_kwh": 0.0} return { "date": str(target_date), "slots": len(day), "total_cost_kwh": round(float(day["energy_cost_kwh"].sum()), 4), "interventions": int(day["gate_passed"].sum()), "max_offset_deg": round(float(day["actual_offset_deg"].abs().max()), 1), "budget_remaining_kwh": round(float(day["budget_remaining_kwh"].iloc[-1]), 4), } def weekly_report(self) -> dict: """Generate a weekly rollup report for budget compliance.""" df = self.load() if df.empty: return {"error": "No audit data"} total_cost = float(df["energy_cost_kwh"].sum()) days = df["date"].nunique() daily_potential_kwh = SYSTEM_CAPACITY_KW * 6.0 # ~6 peak sun hours annual_potential_kwh = daily_potential_kwh * 365 ceiling_kwh = annual_potential_kwh * MAX_ENERGY_REDUCTION_PCT / 100.0 # Project annual rate from observed data if days > 0: daily_rate = total_cost / days projected_annual = daily_rate * 365 else: daily_rate = 0 projected_annual = 0 compliant = projected_annual <= ceiling_kwh return { "period_days": days, "total_cost_kwh": round(total_cost, 3), "daily_avg_kwh": round(daily_rate, 4), "projected_annual_kwh": round(projected_annual, 1), "ceiling_kwh": round(ceiling_kwh, 1), "ceiling_pct": MAX_ENERGY_REDUCTION_PCT, "utilization_pct": round(projected_annual / ceiling_kwh * 100, 1) if ceiling_kwh > 0 else 0, "compliant": compliant, "total_interventions": int(df["gate_passed"].sum()), "intervention_rate_pct": round(float(df["gate_passed"].mean()) * 100, 1), } # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse import json parser = argparse.ArgumentParser(description="Budget audit report") parser.add_argument("--report", action="store_true", help="Weekly rollup report") parser.add_argument("--daily", type=str, help="Daily summary for YYYY-MM-DD") args = parser.parse_args() audit = BudgetAuditLog() if args.report: print(json.dumps(audit.weekly_report(), indent=2)) elif args.daily: print(json.dumps(audit.daily_summary(date.fromisoformat(args.daily)), indent=2)) else: df = audit.load() if df.empty: print("No audit data yet.") else: print(f"Audit log: {len(df)} slots, {df['date'].nunique()} days") print(json.dumps(audit.weekly_report(), indent=2))