"""Zero-overhead AMD GPU memory monitor via PyRSMI - fixes BUG-003 / IMPROVEMENT-004. Replaces blocking subprocess.run(["rocm-smi"]) with native PyRSMI C bindings. No subprocess, no shell, no event loop blocking. <1ms overhead. Install: pip install pyrsmi Docs: https://github.com/ROCm/pyrsmi """ import asyncio import logging from typing import Optional logger = logging.getLogger(__name__) class VRAMMonitor: """ Zero-overhead AMD GPU memory monitor using PyRSMI native C bindings. MI300X specs: - 192GB HBM3 total - PyRSMI reads via ROCm SMI kernel driver (/dev/mem mapped) - Native bindings return bytes directly, no shell parsing Usage: monitor = VRAMMonitor() monitor.start() # Start background monitoring pressure = monitor.get_pressure() # 0.0-1.0 mode = monitor.get_eviction_mode() # "relaxed", "normal", "pressure", "critical", "emergency" used_gb = monitor.get_used_gb() available_gb = monitor.get_available_gb() monitor.stop() """ VRAM_CHECK_INTERVAL = 2.0 # seconds between checks def __init__(self, device_id: int = 0): self._device_id = device_id self._initialized = False self._pyrsml = None self._current_pressure = 0.0 self._monitor_task: Optional[asyncio.Task] = None self._init() def _init(self) -> None: """Initialize PyRSMI (fails gracefully if unavailable).""" try: from pyrsmi import rocml rocml.smi_initialize() self._pyrsml = rocml self._initialized = True logger.info(f"PyRSMI initialized for device {self._device_id}") except ImportError: logger.warning( "pyrsmi not available. Install with: pip install pyrsmi. " "Falling back to /sys/class/drm (read-only, ~5ms overhead)." ) except Exception as e: logger.error(f"PyRSMI initialization failed: {e}") async def start(self) -> None: """Start background VRAM monitoring loop.""" if self._monitor_task is not None: return self._monitor_task = asyncio.create_task(self._monitor_loop()) async def stop(self) -> None: """Stop background monitoring.""" if self._monitor_task: self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass self._monitor_task = None async def _monitor_loop(self) -> None: """Background loop: updates pressure every VRAM_CHECK_INTERVAL.""" while True: try: self._current_pressure = self.get_pressure() await asyncio.sleep(self.VRAM_CHECK_INTERVAL) except asyncio.CancelledError: break except Exception as e: logger.error(f"VRAM monitor loop error: {e}") def get_used_bytes(self) -> int: """Get used VRAM in bytes.""" if self._initialized and self._pyrsml: try: return self._pyrsml.smi_get_device_memory_used(self._device_id) except Exception as e: logger.warning(f"PyRSMI get_used_bytes failed: {e}") return self._fallback_used_bytes() def get_total_bytes(self) -> int: """Get total VRAM in bytes.""" if self._initialized and self._pyrsml: try: return self._pyrsml.smi_get_device_memory_total(self._device_id) except Exception as e: logger.warning(f"PyRSMI get_total_bytes failed: {e}") return self._fallback_total_bytes() def get_available_bytes(self) -> int: """Get available VRAM in bytes.""" return self.get_total_bytes() - self.get_used_bytes() def get_used_gb(self) -> float: """Get used VRAM in gigabytes.""" return self.get_used_bytes() / (1024 ** 3) def get_total_gb(self) -> float: """Get total VRAM in gigabytes.""" return self.get_total_bytes() / (1024 ** 3) def get_available_gb(self) -> float: """Get available VRAM in gigabytes.""" return self.get_available_bytes() / (1024 ** 3) def get_pressure(self) -> float: """ Returns VRAM utilization 0.0–1.0. <1ms overhead. Returns: Pressure ratio (0.0 = free, 1.0 = saturated) """ total = self.get_total_bytes() if total == 0: return 0.0 return self.get_used_bytes() / total def get_eviction_mode(self) -> str: """ Returns eviction mode based on VRAM pressure. Returns: One of: "relaxed", "normal", "pressure", "critical", "emergency" """ p = self.get_pressure() if p < 0.70: return "relaxed" if p < 0.85: return "normal" if p < 0.92: return "pressure" if p < 0.96: return "critical" return "emergency" @staticmethod def _fallback_used_bytes() -> int: """ Fallback: read from Linux DRM sysfs (read-only, ~5ms overhead). Works on any Linux system with AMD GPU. """ try: with open("/sys/class/drm/card0/device/mem_info_vram_used", "r") as f: return int(f.read().strip()) except Exception: return 0 @staticmethod def _fallback_total_bytes() -> int: """ Fallback: read from Linux DRM sysfs. Default to 192GB MI300X if unable to read. """ try: with open("/sys/class/drm/card0/device/mem_info_vram_total", "r") as f: return int(f.read().strip()) except Exception: # MI300X has 192GB HBM3 return 192 * (1024 ** 3) def __del__(self): """Cleanup PyRSMI on destruction.""" if self._initialized and self._pyrsml: try: self._pyrsml.smi_shutdown() except Exception: pass # Module-level singleton _monitor: Optional[VRAMMonitor] = None def get_monitor() -> VRAMMonitor: """Get or create module-level VRAMMonitor singleton.""" global _monitor if _monitor is None: _monitor = VRAMMonitor() return _monitor def get_vram_pressure() -> float: """Quick VRAM pressure check.""" return get_monitor().get_pressure() def get_vram_used_gb() -> float: """Quick VRAM used GB.""" return get_monitor().get_used_gb() def get_vram_available_gb() -> float: """Quick VRAM available GB.""" return get_monitor().get_available_gb() def get_eviction_mode() -> str: """Quick eviction mode check.""" return get_monitor().get_eviction_mode()