| """ |
| 运行时配置管理模块 |
| |
| 负责管理不同模型在不同平台下的运行时参数配置,包括: |
| - max_token_length: 文本分析的最大 token 数限制(信息密度分析) |
| - chunk_size: 推理时的分块大小 |
| - 语义分析有独立的 SEMANTIC_RUNTIME_CONFIGS,仅含 max_token_length |
| |
| 平台 ID 说明: |
| - local_mps: 本地 Apple Silicon(M1/M2/M3) |
| - cloud_cuda: 云端 CUDA GPU |
| - cloud_cpu_16g: 云端大内存 CPU(如 HF Space 免费层,16G RAM) |
| - cloud_cpu_32g: 云端大内存 CPU(如 HF Space CPU upgrade,32G RAM) |
| - default_cpu_machine: 默认 CPU 机器(未知或未识别的 CPU 环境) |
| - 未来可扩展: cloud_cuda_a100, cloud_cuda_24g 等 |
| """ |
|
|
| import os |
| import torch |
| import sys |
| from typing import Dict, Optional |
|
|
|
|
| |
|
|
| |
| |
| DEFAULT_TOPK = 10 |
|
|
| |
| |
| MPS_TOPK_BUG_THRESHOLD = 2048 |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| RUNTIME_CONFIGS = { |
| |
| "default_model": { |
| |
| "default_cpu_machine": { |
| "max_token_length": 2000, |
| "chunk_size": 256 |
| }, |
| |
| "cloud_cpu_16g": { |
| "max_token_length": 2000, |
| "chunk_size": 256 |
| }, |
| |
| "cloud_cpu_32g": { |
| "max_token_length": 5000, |
| "chunk_size": 512 |
| }, |
| |
| "cloud_cuda": { |
| |
| "max_token_length": 5000, |
| "chunk_size": 1024 |
| }, |
| |
| "local_mps": { |
| "max_token_length": 2000, |
| "chunk_size": 512 |
| } |
| }, |
| |
| |
| |
| |
| |
| |
| |
| } |
|
|
|
|
| |
| |
|
|
| SEMANTIC_RUNTIME_CONFIGS = { |
| "default_cpu_machine": {"max_token_length": 300}, |
| "cloud_cpu_16g": {"max_token_length": 300}, |
| "cloud_cpu_32g": {"max_token_length": 1000}, |
| "cloud_cuda": {"max_token_length": 1000}, |
| "local_mps": {"max_token_length": 300}, |
| } |
|
|
|
|
| |
|
|
| def detect_platform(verbose: bool = True) -> str: |
| """ |
| 自动检测当前运行平台 |
| |
| 优先级: |
| 1. 环境变量 FORCE_CPU(显式强制 CPU 模式) |
| 2. 自动探测硬件(cuda/mps/cpu) |
| 3. 细分 CPU 类型(如 cloud_cpu_16g) |
| |
| Args: |
| verbose: 是否打印检测信息 |
| |
| Returns: |
| 平台 ID 字符串(如 'local_mps', 'cloud_cuda', 'cloud_cpu_16g', 'cloud_cpu_32g', 'default_cpu_machine') |
| """ |
| |
| if os.environ.get("FORCE_CPU") == "1": |
| print(f"🔧 强制 CPU 模式") |
| return _detect_cpu_variant() |
| |
| |
| if torch.cuda.is_available(): |
| platform = "cloud_cuda" |
| elif torch.backends.mps.is_available(): |
| platform = "local_mps" |
| else: |
| |
| platform = _detect_cpu_variant() |
| |
| print(f"🔍 自动检测平台配置: {platform}") |
| return platform |
|
|
|
|
| def _detect_cpu_variant() -> str: |
| """ |
| 检测具体的 CPU 环境变体(内部函数) |
| 根据内存大小识别不同的 CPU 环境: |
| - >= 30GB: cloud_cpu_32g(32G 内存环境) |
| - >= 15GB: cloud_cpu_16g(16G 内存环境) |
| - 其他: default_cpu_machine(默认配置) |
| |
| 优先检测容器内存限制(cgroup),如果不可用则回退到系统内存检测。 |
| """ |
| total_memory = 0 |
| |
| try: |
| |
| |
| |
| cgroup_paths = [ |
| "/sys/fs/cgroup/memory.max", |
| "/sys/fs/cgroup/memory/memory.limit_in_bytes", |
| ] |
| |
| for cgroup_path in cgroup_paths: |
| try: |
| if os.path.exists(cgroup_path): |
| with open(cgroup_path, 'r') as f: |
| limit_str = f.read().strip() |
| |
| if limit_str == "max": |
| break |
| limit_bytes = int(limit_str) |
| if limit_bytes > 0 and limit_bytes < (2 ** 63): |
| total_memory = limit_bytes |
| print(f"🔍 从 cgroup 检测到容器内存限制: {total_memory / (1024 ** 3):.2f} GB") |
| break |
| except (ValueError, IOError, OSError): |
| continue |
| |
| |
| if total_memory == 0 and sys.platform != "win32": |
| try: |
| page_size = os.sysconf('SC_PAGE_SIZE') |
| phys_pages = os.sysconf('SC_PHYS_PAGES') |
| total_memory = page_size * phys_pages |
| print(f"🔍 从系统配置检测到内存: {total_memory / (1024 ** 3):.2f} GB") |
| except (ValueError, AttributeError): |
| pass |
| |
| |
| total_memory_gb = total_memory / (1024 ** 3) |
| |
| |
| |
| |
| if total_memory_gb >= 30.0: |
| return "cloud_cpu_32g" |
| elif total_memory_gb >= 15.0: |
| return "cloud_cpu_16g" |
| |
| except Exception as e: |
| print(f"⚠️ CPU 环境检测失败,回退到默认配置: {e}") |
| |
| return "default_cpu_machine" |
|
|
|
|
| def merge_runtime_config(model_name: str, platform: str, verbose: bool = True) -> Dict[str, int]: |
| """ |
| 四层配置合并:支持部分覆盖,并追踪配置来源 |
| |
| 优先级(从高到低): |
| 1. (model_name, platform) - 模型在该平台的专用配置 |
| 2. (model_name, "default_cpu_machine") - 模型通用配置 |
| 3. ("default_model", platform) - 平台通用配置 |
| 4. ("default_model", "default_cpu_machine") - 全局兜底 |
| |
| Args: |
| model_name: 模型名称(如 'qwen3-1.7b') |
| platform: 平台 ID(如 'local_mps') |
| verbose: 是否打印配置来源提示 |
| |
| Returns: |
| 合并后的配置字典 {"max_token_length": int, "chunk_size": int} |
| |
| Raises: |
| ValueError: 配置不完整时抛出 |
| """ |
| |
| layers = [ |
| { |
| "name": "default_model.default_cpu_machine", |
| "config": RUNTIME_CONFIGS.get("default_model", {}).get("default_cpu_machine", {}) |
| }, |
| { |
| "name": f"default_model.{platform}", |
| "config": RUNTIME_CONFIGS.get("default_model", {}).get(platform, {}) |
| }, |
| { |
| "name": f"{model_name}.default_cpu_machine", |
| "config": RUNTIME_CONFIGS.get(model_name, {}).get("default_cpu_machine", {}) |
| }, |
| { |
| "name": f"{model_name}.{platform}", |
| "config": RUNTIME_CONFIGS.get(model_name, {}).get(platform, {}) |
| } |
| ] |
| |
| |
| config_sources = {} |
| merged = {} |
| |
| |
| for layer in layers: |
| layer_config = layer["config"] |
| for key, value in layer_config.items(): |
| merged[key] = value |
| config_sources[key] = layer["name"] |
| |
| |
| if "max_token_length" not in merged or "chunk_size" not in merged: |
| raise ValueError( |
| f"配置不完整: model={model_name}, platform={platform}, " |
| f"merged={merged}. 缺少必需字段!" |
| ) |
| |
| |
| for key, source in config_sources.items(): |
| actual_value = merged[key] |
| print(f"\t{key}={actual_value} ( {source})") |
| |
| return merged |
|
|
|
|
| _semantic_max_token_length_cache: Optional[int] = None |
|
|
|
|
| def get_semantic_max_token_length(verbose: bool = False) -> int: |
| """ |
| 获取语义分析的 max_token_length(从 SEMANTIC_RUNTIME_CONFIGS 按平台读取) |
| 平台检测结果会缓存,避免每次分析重复检测。 |
| """ |
| global _semantic_max_token_length_cache |
| if _semantic_max_token_length_cache is not None: |
| return _semantic_max_token_length_cache |
| platform = detect_platform(verbose=verbose) |
| config = SEMANTIC_RUNTIME_CONFIGS.get(platform, SEMANTIC_RUNTIME_CONFIGS["default_cpu_machine"]) |
| _semantic_max_token_length_cache = config["max_token_length"] |
| return _semantic_max_token_length_cache |
|
|
|
|
| def validate_platform_config(platform: str, chunk_size: int, verbose: bool = True) -> None: |
| """ |
| 平台级安全校验(前置到初始化阶段) |
| |
| Args: |
| platform: 平台 ID |
| chunk_size: 配置的 chunk_size |
| verbose: 是否打印校验信息 |
| |
| Raises: |
| ValueError: 配置不符合平台限制时抛出 |
| """ |
| |
| if "mps" in platform.lower(): |
| if chunk_size > MPS_TOPK_BUG_THRESHOLD: |
| raise ValueError( |
| f"❌ MPS 平台配置错误: chunk_size ({chunk_size}) " |
| f"超过安全上限 ({MPS_TOPK_BUG_THRESHOLD})\n" |
| f" 平台: {platform}\n" |
| f" 建议: 调整 RUNTIME_CONFIGS 中 {platform} 的 chunk_size" |
| ) |
| if verbose: |
| print(f"✓ MPS 平台安全检查通过: chunk_size={chunk_size} (上限={MPS_TOPK_BUG_THRESHOLD})") |
|
|
|
|
| def _get_cpu_info() -> Optional[str]: |
| """ |
| 读取 CPU 型号信息(仅用于显示) |
| |
| Returns: |
| model_name, if None, return "未知" |
| """ |
| model_name = None |
| |
| try: |
| if sys.platform == 'linux': |
| with open('/proc/cpuinfo', 'r') as f: |
| for line in f: |
| |
| if model_name is None and 'model name' in line.lower(): |
| model_name = line.split(':', 1)[1].strip() |
| |
| |
| if model_name: |
| break |
| except Exception: |
| pass |
| |
| return model_name |
|
|
|
|
| def _print_cpu_info() -> None: |
| """ |
| 打印 CPU 型号信息(所有平台都打印) |
| """ |
| try: |
| cpu_model = _get_cpu_info() |
| model = cpu_model or "未知" |
| |
| print(f"💻 CPU 型号: {model}") |
| except Exception as e: |
| print(f"⚠️ CPU 信息获取失败: {e}") |
|
|
|
|
| def _print_cpu_thread_info() -> None: |
| """打印 CPU 线程配置信息(PyTorch 默认配置)""" |
| try: |
| intra_threads = torch.get_num_threads() |
| inter_threads = torch.get_num_interop_threads() |
| print(f"🧵 PyTorch 线程配置: intra-op={intra_threads}, inter-op={inter_threads}") |
| except Exception as e: |
| print(f"⚠️ CPU 线程信息获取失败: {e}") |
|
|
|
|
| def load_runtime_config(model_name: str, verbose: bool = False) -> tuple[str, int, int]: |
| """ |
| 加载运行时配置的完整流程:检测平台 -> 合并配置 -> 校验 -> CPU调试信息 |
| |
| 这是配置加载的主入口函数,封装了完整的配置加载逻辑。 |
| |
| Args: |
| model_name: 模型标识符(如 'qwen3-1.7b') |
| verbose: 是否打印详细的配置信息 |
| |
| Returns: |
| tuple[platform, max_token_length, chunk_size] |
| |
| Raises: |
| ValueError: 配置不完整或不符合平台限制时抛出 |
| """ |
| |
| platform = detect_platform(verbose=verbose) |
| |
| |
| config = merge_runtime_config( |
| model_name=model_name or "default_model", |
| platform=platform, |
| verbose=verbose |
| ) |
| |
| |
| max_token_length = config["max_token_length"] |
| chunk_size = config["chunk_size"] |
| |
| |
| validate_platform_config(platform, chunk_size, verbose=verbose) |
| |
| |
| _print_cpu_info() |
| |
| |
| if "cpu" in platform.lower(): |
| _print_cpu_thread_info() |
| |
| |
| print( |
| f"⚙️ 运行时配置已加载 [model={model_name}, platform={platform}]: " |
| f"max_token_length={max_token_length}, chunk_size={chunk_size}" |
| ) |
| |
| return platform, max_token_length, chunk_size |
|
|