Pablo
feat: APOHARA: Context Forge V5 — synthesis + rebrand complete
cf0a8ed
"""Zero-overhead AMD GPU memory monitor via PyRSMI - fixes BUG-003 / IMPROVEMENT-004.
Replaces blocking subprocess.run(["rocm-smi"]) with native PyRSMI C bindings.
No subprocess, no shell, no event loop blocking. <1ms overhead.
Install: pip install pyrsmi
Docs: https://github.com/ROCm/pyrsmi
"""
import asyncio
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class VRAMMonitor:
"""
Zero-overhead AMD GPU memory monitor using PyRSMI native C bindings.
MI300X specs:
- 192GB HBM3 total
- PyRSMI reads via ROCm SMI kernel driver (/dev/mem mapped)
- Native bindings return bytes directly, no shell parsing
Usage:
monitor = VRAMMonitor()
monitor.start() # Start background monitoring
pressure = monitor.get_pressure() # 0.0-1.0
mode = monitor.get_eviction_mode() # "relaxed", "normal", "pressure", "critical", "emergency"
used_gb = monitor.get_used_gb()
available_gb = monitor.get_available_gb()
monitor.stop()
"""
VRAM_CHECK_INTERVAL = 2.0 # seconds between checks
def __init__(self, device_id: int = 0):
self._device_id = device_id
self._initialized = False
self._pyrsml = None
self._current_pressure = 0.0
self._monitor_task: Optional[asyncio.Task] = None
self._init()
def _init(self) -> None:
"""Initialize PyRSMI (fails gracefully if unavailable)."""
try:
from pyrsmi import rocml
rocml.smi_initialize()
self._pyrsml = rocml
self._initialized = True
logger.info(f"PyRSMI initialized for device {self._device_id}")
except ImportError:
logger.warning(
"pyrsmi not available. Install with: pip install pyrsmi. "
"Falling back to /sys/class/drm (read-only, ~5ms overhead)."
)
except Exception as e:
logger.error(f"PyRSMI initialization failed: {e}")
async def start(self) -> None:
"""Start background VRAM monitoring loop."""
if self._monitor_task is not None:
return
self._monitor_task = asyncio.create_task(self._monitor_loop())
async def stop(self) -> None:
"""Stop background monitoring."""
if self._monitor_task:
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
self._monitor_task = None
async def _monitor_loop(self) -> None:
"""Background loop: updates pressure every VRAM_CHECK_INTERVAL."""
while True:
try:
self._current_pressure = self.get_pressure()
await asyncio.sleep(self.VRAM_CHECK_INTERVAL)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"VRAM monitor loop error: {e}")
def get_used_bytes(self) -> int:
"""Get used VRAM in bytes."""
if self._initialized and self._pyrsml:
try:
return self._pyrsml.smi_get_device_memory_used(self._device_id)
except Exception as e:
logger.warning(f"PyRSMI get_used_bytes failed: {e}")
return self._fallback_used_bytes()
def get_total_bytes(self) -> int:
"""Get total VRAM in bytes."""
if self._initialized and self._pyrsml:
try:
return self._pyrsml.smi_get_device_memory_total(self._device_id)
except Exception as e:
logger.warning(f"PyRSMI get_total_bytes failed: {e}")
return self._fallback_total_bytes()
def get_available_bytes(self) -> int:
"""Get available VRAM in bytes."""
return self.get_total_bytes() - self.get_used_bytes()
def get_used_gb(self) -> float:
"""Get used VRAM in gigabytes."""
return self.get_used_bytes() / (1024 ** 3)
def get_total_gb(self) -> float:
"""Get total VRAM in gigabytes."""
return self.get_total_bytes() / (1024 ** 3)
def get_available_gb(self) -> float:
"""Get available VRAM in gigabytes."""
return self.get_available_bytes() / (1024 ** 3)
def get_pressure(self) -> float:
"""
Returns VRAM utilization 0.0–1.0. <1ms overhead.
Returns:
Pressure ratio (0.0 = free, 1.0 = saturated)
"""
total = self.get_total_bytes()
if total == 0:
return 0.0
return self.get_used_bytes() / total
def get_eviction_mode(self) -> str:
"""
Returns eviction mode based on VRAM pressure.
Returns:
One of: "relaxed", "normal", "pressure", "critical", "emergency"
"""
p = self.get_pressure()
if p < 0.70: return "relaxed"
if p < 0.85: return "normal"
if p < 0.92: return "pressure"
if p < 0.96: return "critical"
return "emergency"
@staticmethod
def _fallback_used_bytes() -> int:
"""
Fallback: read from Linux DRM sysfs (read-only, ~5ms overhead).
Works on any Linux system with AMD GPU.
"""
try:
with open("/sys/class/drm/card0/device/mem_info_vram_used", "r") as f:
return int(f.read().strip())
except Exception:
return 0
@staticmethod
def _fallback_total_bytes() -> int:
"""
Fallback: read from Linux DRM sysfs.
Default to 192GB MI300X if unable to read.
"""
try:
with open("/sys/class/drm/card0/device/mem_info_vram_total", "r") as f:
return int(f.read().strip())
except Exception:
# MI300X has 192GB HBM3
return 192 * (1024 ** 3)
def __del__(self):
"""Cleanup PyRSMI on destruction."""
if self._initialized and self._pyrsml:
try:
self._pyrsml.smi_shutdown()
except Exception:
pass
# Module-level singleton
_monitor: Optional[VRAMMonitor] = None
def get_monitor() -> VRAMMonitor:
"""Get or create module-level VRAMMonitor singleton."""
global _monitor
if _monitor is None:
_monitor = VRAMMonitor()
return _monitor
def get_vram_pressure() -> float:
"""Quick VRAM pressure check."""
return get_monitor().get_pressure()
def get_vram_used_gb() -> float:
"""Quick VRAM used GB."""
return get_monitor().get_used_gb()
def get_vram_available_gb() -> float:
"""Quick VRAM available GB."""
return get_monitor().get_available_gb()
def get_eviction_mode() -> str:
"""Quick eviction mode check."""
return get_monitor().get_eviction_mode()