| import asyncio |
| from collections.abc import Callable |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
| from astrbot import logger |
| from astrbot.core.utils.astrbot_path import get_astrbot_temp_path |
|
|
|
|
| def parse_size_to_bytes(value: str | int | float | None) -> int: |
| """Parse size in MB to bytes.""" |
| if value is None: |
| return 0 |
|
|
| try: |
| size_mb = float(str(value).strip()) |
| except (TypeError, ValueError): |
| return 0 |
|
|
| if size_mb <= 0: |
| return 0 |
|
|
| return int(size_mb * 1024**2) |
|
|
|
|
| @dataclass |
| class TempFileInfo: |
| path: Path |
| size: int |
| mtime: float |
|
|
|
|
| class TempDirCleaner: |
| CONFIG_KEY = "temp_dir_max_size" |
| DEFAULT_MAX_SIZE = 1024 |
| CHECK_INTERVAL_SECONDS = 10 * 60 |
| CLEANUP_RATIO = 0.30 |
|
|
| def __init__( |
| self, |
| max_size_getter: Callable[[], str | int | float | None], |
| temp_dir: Path | None = None, |
| ) -> None: |
| self._max_size_getter = max_size_getter |
| self._temp_dir = temp_dir or Path(get_astrbot_temp_path()) |
| self._stop_event = asyncio.Event() |
|
|
| def _limit_bytes(self) -> int: |
| configured = self._max_size_getter() |
| parsed = parse_size_to_bytes(configured) |
| if parsed <= 0: |
| fallback = parse_size_to_bytes(self.DEFAULT_MAX_SIZE) |
| logger.warning( |
| f"Invalid {self.CONFIG_KEY}={configured!r}, fallback to {self.DEFAULT_MAX_SIZE}MB.", |
| ) |
| return fallback |
| return parsed |
|
|
| def _scan_temp_files(self) -> tuple[int, list[TempFileInfo]]: |
| if not self._temp_dir.exists(): |
| return 0, [] |
|
|
| total_size = 0 |
| files: list[TempFileInfo] = [] |
| for path in self._temp_dir.rglob("*"): |
| if not path.is_file(): |
| continue |
| try: |
| stat = path.stat() |
| except OSError as e: |
| logger.debug(f"Skip temp file {path} due to stat error: {e}") |
| continue |
| total_size += stat.st_size |
| files.append( |
| TempFileInfo(path=path, size=stat.st_size, mtime=stat.st_mtime) |
| ) |
|
|
| return total_size, files |
|
|
| def _cleanup_empty_dirs(self) -> None: |
| if not self._temp_dir.exists(): |
| return |
| for path in sorted( |
| self._temp_dir.rglob("*"), key=lambda p: len(p.parts), reverse=True |
| ): |
| if not path.is_dir(): |
| continue |
| try: |
| path.rmdir() |
| except OSError: |
| continue |
|
|
| def cleanup_once(self) -> None: |
| limit = self._limit_bytes() |
| if limit <= 0: |
| return |
|
|
| total_size, files = self._scan_temp_files() |
| if total_size <= limit: |
| return |
|
|
| target_release = max(int(total_size * self.CLEANUP_RATIO), 1) |
| released = 0 |
| removed_files = 0 |
|
|
| for file_info in sorted(files, key=lambda item: item.mtime): |
| try: |
| file_info.path.unlink() |
| except OSError as e: |
| logger.warning(f"Failed to delete temp file {file_info.path}: {e}") |
| continue |
|
|
| released += file_info.size |
| removed_files += 1 |
| if released >= target_release: |
| break |
|
|
| self._cleanup_empty_dirs() |
|
|
| logger.warning( |
| f"Temp dir exceeded limit ({total_size} > {limit}). " |
| f"Removed {removed_files} files, released {released} bytes " |
| f"(target {target_release} bytes).", |
| ) |
|
|
| async def run(self) -> None: |
| logger.info( |
| f"TempDirCleaner started. interval={self.CHECK_INTERVAL_SECONDS}s " |
| f"cleanup_ratio={self.CLEANUP_RATIO}", |
| ) |
| while not self._stop_event.is_set(): |
| try: |
| |
| |
| await asyncio.to_thread(self.cleanup_once) |
| except Exception as e: |
| logger.error(f"TempDirCleaner run failed: {e}", exc_info=True) |
|
|
| try: |
| await asyncio.wait_for( |
| self._stop_event.wait(), |
| timeout=self.CHECK_INTERVAL_SECONDS, |
| ) |
| except asyncio.TimeoutError: |
| continue |
|
|
| logger.info("TempDirCleaner stopped.") |
|
|
| async def stop(self) -> None: |
| self._stop_event.set() |
|
|