Spaces:
Sleeping
Sleeping
| """Zero-overhead AMD GPU memory monitor via PyRSMI - fixes BUG-003 / IMPROVEMENT-004. | |
| Replaces blocking subprocess.run(["rocm-smi"]) with native PyRSMI C bindings. | |
| No subprocess, no shell, no event loop blocking. <1ms overhead. | |
| Install: pip install pyrsmi | |
| Docs: https://github.com/ROCm/pyrsmi | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| class VRAMMonitor: | |
| """ | |
| Zero-overhead AMD GPU memory monitor using PyRSMI native C bindings. | |
| MI300X specs: | |
| - 192GB HBM3 total | |
| - PyRSMI reads via ROCm SMI kernel driver (/dev/mem mapped) | |
| - Native bindings return bytes directly, no shell parsing | |
| Usage: | |
| monitor = VRAMMonitor() | |
| monitor.start() # Start background monitoring | |
| pressure = monitor.get_pressure() # 0.0-1.0 | |
| mode = monitor.get_eviction_mode() # "relaxed", "normal", "pressure", "critical", "emergency" | |
| used_gb = monitor.get_used_gb() | |
| available_gb = monitor.get_available_gb() | |
| monitor.stop() | |
| """ | |
| VRAM_CHECK_INTERVAL = 2.0 # seconds between checks | |
| def __init__(self, device_id: int = 0): | |
| self._device_id = device_id | |
| self._initialized = False | |
| self._pyrsml = None | |
| self._current_pressure = 0.0 | |
| self._monitor_task: Optional[asyncio.Task] = None | |
| self._init() | |
| def _init(self) -> None: | |
| """Initialize PyRSMI (fails gracefully if unavailable).""" | |
| try: | |
| from pyrsmi import rocml | |
| rocml.smi_initialize() | |
| self._pyrsml = rocml | |
| self._initialized = True | |
| logger.info(f"PyRSMI initialized for device {self._device_id}") | |
| except ImportError: | |
| logger.warning( | |
| "pyrsmi not available. Install with: pip install pyrsmi. " | |
| "Falling back to /sys/class/drm (read-only, ~5ms overhead)." | |
| ) | |
| except Exception as e: | |
| logger.error(f"PyRSMI initialization failed: {e}") | |
| async def start(self) -> None: | |
| """Start background VRAM monitoring loop.""" | |
| if self._monitor_task is not None: | |
| return | |
| self._monitor_task = asyncio.create_task(self._monitor_loop()) | |
| async def stop(self) -> None: | |
| """Stop background monitoring.""" | |
| if self._monitor_task: | |
| self._monitor_task.cancel() | |
| try: | |
| await self._monitor_task | |
| except asyncio.CancelledError: | |
| pass | |
| self._monitor_task = None | |
| async def _monitor_loop(self) -> None: | |
| """Background loop: updates pressure every VRAM_CHECK_INTERVAL.""" | |
| while True: | |
| try: | |
| self._current_pressure = self.get_pressure() | |
| await asyncio.sleep(self.VRAM_CHECK_INTERVAL) | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| logger.error(f"VRAM monitor loop error: {e}") | |
| def get_used_bytes(self) -> int: | |
| """Get used VRAM in bytes.""" | |
| if self._initialized and self._pyrsml: | |
| try: | |
| return self._pyrsml.smi_get_device_memory_used(self._device_id) | |
| except Exception as e: | |
| logger.warning(f"PyRSMI get_used_bytes failed: {e}") | |
| return self._fallback_used_bytes() | |
| def get_total_bytes(self) -> int: | |
| """Get total VRAM in bytes.""" | |
| if self._initialized and self._pyrsml: | |
| try: | |
| return self._pyrsml.smi_get_device_memory_total(self._device_id) | |
| except Exception as e: | |
| logger.warning(f"PyRSMI get_total_bytes failed: {e}") | |
| return self._fallback_total_bytes() | |
| def get_available_bytes(self) -> int: | |
| """Get available VRAM in bytes.""" | |
| return self.get_total_bytes() - self.get_used_bytes() | |
| def get_used_gb(self) -> float: | |
| """Get used VRAM in gigabytes.""" | |
| return self.get_used_bytes() / (1024 ** 3) | |
| def get_total_gb(self) -> float: | |
| """Get total VRAM in gigabytes.""" | |
| return self.get_total_bytes() / (1024 ** 3) | |
| def get_available_gb(self) -> float: | |
| """Get available VRAM in gigabytes.""" | |
| return self.get_available_bytes() / (1024 ** 3) | |
| def get_pressure(self) -> float: | |
| """ | |
| Returns VRAM utilization 0.0–1.0. <1ms overhead. | |
| Returns: | |
| Pressure ratio (0.0 = free, 1.0 = saturated) | |
| """ | |
| total = self.get_total_bytes() | |
| if total == 0: | |
| return 0.0 | |
| return self.get_used_bytes() / total | |
| def get_eviction_mode(self) -> str: | |
| """ | |
| Returns eviction mode based on VRAM pressure. | |
| Returns: | |
| One of: "relaxed", "normal", "pressure", "critical", "emergency" | |
| """ | |
| p = self.get_pressure() | |
| if p < 0.70: return "relaxed" | |
| if p < 0.85: return "normal" | |
| if p < 0.92: return "pressure" | |
| if p < 0.96: return "critical" | |
| return "emergency" | |
| def _fallback_used_bytes() -> int: | |
| """ | |
| Fallback: read from Linux DRM sysfs (read-only, ~5ms overhead). | |
| Works on any Linux system with AMD GPU. | |
| """ | |
| try: | |
| with open("/sys/class/drm/card0/device/mem_info_vram_used", "r") as f: | |
| return int(f.read().strip()) | |
| except Exception: | |
| return 0 | |
| def _fallback_total_bytes() -> int: | |
| """ | |
| Fallback: read from Linux DRM sysfs. | |
| Default to 192GB MI300X if unable to read. | |
| """ | |
| try: | |
| with open("/sys/class/drm/card0/device/mem_info_vram_total", "r") as f: | |
| return int(f.read().strip()) | |
| except Exception: | |
| # MI300X has 192GB HBM3 | |
| return 192 * (1024 ** 3) | |
| def __del__(self): | |
| """Cleanup PyRSMI on destruction.""" | |
| if self._initialized and self._pyrsml: | |
| try: | |
| self._pyrsml.smi_shutdown() | |
| except Exception: | |
| pass | |
| # Module-level singleton | |
| _monitor: Optional[VRAMMonitor] = None | |
| def get_monitor() -> VRAMMonitor: | |
| """Get or create module-level VRAMMonitor singleton.""" | |
| global _monitor | |
| if _monitor is None: | |
| _monitor = VRAMMonitor() | |
| return _monitor | |
| def get_vram_pressure() -> float: | |
| """Quick VRAM pressure check.""" | |
| return get_monitor().get_pressure() | |
| def get_vram_used_gb() -> float: | |
| """Quick VRAM used GB.""" | |
| return get_monitor().get_used_gb() | |
| def get_vram_available_gb() -> float: | |
| """Quick VRAM available GB.""" | |
| return get_monitor().get_available_gb() | |
| def get_eviction_mode() -> str: | |
| """Quick eviction mode check.""" | |
| return get_monitor().get_eviction_mode() |