fe / bt-source /panel /mod /project /docker /config /mirrorDetector.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
import time
import requests
import re
from typing import List, Dict, Union
from concurrent.futures import ThreadPoolExecutor, as_completed # 引入多线程库
class DockerMirrorDetector:
# Docker V2 协议需要的 Header
DEFAULT_HEADERS = {
"User-Agent": "Docker-Client/19.03.8 (linux)",
"Accept": "application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json"
}
# ANSI 颜色代码用于美化输出
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
def __init__(self, mirror_url: str, image: str = "library/nginx", tag: str = "latest", debug: bool = False,
timeout: int = 5):
"""
初始化测试器
:param mirror_url: 镜像站地址 (例如 hub.1panel.dev)
:param image: 测试用的镜像 (默认 library/nginx)
:param tag: 镜像标签 (默认 latest)
:param debug: 是否打印详细调试信息
:param timeout: 请求超时时间
"""
# 自动去除 URL 末尾的斜杠和 http/https 前缀,保持纯净域名
self.mirror_url = mirror_url.replace("https://", "").replace("http://", "").rstrip("/")
self.image = image
self.tag = tag
self.debug = debug
self.timeout = timeout
self.base_url = f"https://{self.mirror_url}"
# 禁止 urllib3 打印 InsecureRequestWarning,仅在 __init__ 执行一次
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
def _log(self, message: str, level: str = "INFO"):
"""内部方法:格式化打印日志"""
if not self.debug:
return
prefix = ""
if level == "INFO":
prefix = f"{self.Colors.BLUE}[INFO]{self.Colors.ENDC}"
elif level == "SUCCESS":
prefix = f"{self.Colors.GREEN}[SUCCESS]{self.Colors.ENDC}"
elif level == "ERROR":
prefix = f"{self.Colors.FAIL}[ERROR]{self.Colors.ENDC}"
elif level == "WARN":
prefix = f"{self.Colors.WARNING}[WARN]{self.Colors.ENDC}"
print(f"{prefix} {message}")
def _parse_auth_header(self, header_value: str):
"""解析 WWW-Authenticate 头"""
realm_match = re.search(r'realm="([^"]+)"', header_value)
service_match = re.search(r'service="([^"]+)"', header_value)
realm = realm_match.group(1) if realm_match else None
service = service_match.group(1) if service_match else None
return realm, service
def check(self) -> Dict:
"""
同步检测单个镜像站 (使用 requests)
这个方法会被 test_batch 和 单次调用复用
"""
result = {
"url": self.base_url,
"available": False,
"latency": 0,
"status_code": 0,
"msg": "",
"ts": int(time.time())
}
self._log(f"开始测试镜像站: {self.Colors.BOLD}{self.mirror_url}{self.Colors.ENDC}")
# 在多线程环境中,每个线程应该有自己的 Session,避免竞争
session = requests.Session()
session.verify = False # 忽略 SSL 证书验证
try:
# 1. 握手检查 /v2/
v2_url = f"{self.base_url}/v2/"
self._log(f"正在连接握手: {v2_url}")
resp = session.get(v2_url, headers=self.DEFAULT_HEADERS, timeout=self.timeout)
result['status_code'] = resp.status_code
auth_headers = self.DEFAULT_HEADERS.copy()
# 2. 处理认证 (如果返回 401)
if resp.status_code == 401:
auth_header = resp.headers.get('WWW-Authenticate', '')
self._log(f"需要认证,解析 Header: {auth_header}", "WARN")
realm, service = self._parse_auth_header(auth_header)
if realm and service:
token_url = f"{realm}?service={service}&scope=repository:{self.image}:pull"
self._log(f"获取 Token: {token_url}")
token_resp = session.get(token_url, verify=False, timeout=self.timeout)
if token_resp.status_code == 200:
token = token_resp.json().get('token')
auth_headers['Authorization'] = f"Bearer {token}"
self._log("Token 获取成功", "SUCCESS")
else:
raise Exception(f"获取 Token 失败: {token_resp.status_code}")
else:
raise Exception("无法解析认证 Header")
elif resp.status_code != 200:
raise Exception(f"握手失败,状态码: {resp.status_code}")
# 3. 拉取 Manifest 清单 (真正的可用性测试)
manifest_url = f"{self.base_url}/v2/{self.image}/manifests/{self.tag}"
self._log(f"尝试获取 Manifest: {manifest_url}")
# 这里是主要的测速点
req_start = time.time()
resp = session.get(manifest_url, headers=auth_headers, timeout=self.timeout)
req_end = time.time()
if resp.status_code == 200:
latency_ms = int((req_end - req_start) * 1000)
result['available'] = True
result['latency'] = latency_ms
result['msg'] = "OK"
self._log(f"测试通过! 延迟: {latency_ms}ms", "SUCCESS")
else:
result['msg'] = f"Manifest获取失败: {resp.status_code}"
# 仅在 debug 模式下打印失败的详情
if self.debug:
self._log(f"Manifest获取失败详情: {resp.status_code} \n{resp.text[:100]}", "ERROR")
except requests.exceptions.Timeout:
result['msg'] = "Timeout"
self._log(f"请求超时 ({self.timeout}s)", "ERROR")
except requests.exceptions.ConnectionError:
result['msg'] = "Connection Error"
self._log("连接错误,可能域名不存在或被防火墙拦截", "ERROR")
except Exception as e:
result['msg'] = str(e)
self._log(f"发生异常: {str(e)}", "ERROR")
return result
@staticmethod
def test_batch(mirror_list: List[str], image: str = "library/nginx", tag: str = "latest", max_workers: int = 10,
timeout: int = 5) -> List[Dict]:
"""
批量多线程检测接口 (替换了 asyncio)
:param mirror_list: 镜像站URL列表
:param image: 测试用的镜像
:param tag: 镜像标签
:param max_workers: 最大并发线程数
:param timeout: 请求超时时间
:return: 结果列表
"""
results = []
# 使用 ThreadPoolExecutor 来管理并发
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交任务,为每个镜像站创建一个 DockerMirrorDetector 实例并调用其 check 方法
# 注意:这里传入 debug=False,以避免多线程下日志交错打印
future_to_url = {
executor.submit(
DockerMirrorDetector(
mirror_url=url,
image=image,
tag=tag,
debug=False, # 批量模式下通常关闭 debug
timeout=timeout
).check
): url for url in mirror_list
}
# 收集结果
for future in as_completed(future_to_url):
try:
# 获取线程执行的结果(即 check 方法返回的字典)
result = future.result()
results.append(result)
except Exception as exc:
# 捕获线程内部的意外异常
url = future_to_url[future]
results.append({
"url": url,
"available": False,
"latency": 0,
"status_code": 0,
"msg": f"ThreadPool Exception: {exc}",
"ts": int(time.time())
})
return results
# ==========================================
# 使用示例
# ==========================================
if __name__ == '__main__':
# print("--- 1. 单个测试模式 (Debug开启) ---")
# 实例化:只需传入域名,image可选
# 注意:单次调用需要自行调用 check()
# tester = DockerMirrorDetector(mirror_url="https://docker.1ms.run", debug=False, timeout=10)
# res = tester.check()
#
# # 打印最终结果结构
# print("\n单次测试结果:", res)
# print("=" * 75)
print("\n--- 2. 批量多线程测试模式 ---")
mirrors = [
"docker.m.daocloud.io", # 可用
"docker.nju.edu.cn", # 可用
# "registry-1.docker.io", # 官方镜像站
# "this-domain-not-exist-123.com", # 模拟一个域名无法解析
# "registry.aliyuncs.com" # 阿里云等可能需要登录
]
# 调用静态方法进行多线程批量测试
start_t_batch = time.time()
results = DockerMirrorDetector.test_batch(mirrors, max_workers=5, timeout=5)
end_t_batch = time.time()
# # 按延迟排序(可用在前,延迟低的在前)
sorted_results = sorted(results,key=lambda x: (not x['available'], x['latency'] if x['available'] else float('inf')))
for r in sorted_results:
# 使用颜色和图标来美化输出
if r['available']:
status_icon = f"{DockerMirrorDetector.Colors.GREEN}✅ Available{DockerMirrorDetector.Colors.ENDC}"
latency_str = f"{r['latency']} ms"
msg = r['msg']
else:
status_icon = f"{DockerMirrorDetector.Colors.FAIL}❌ Failed{DockerMirrorDetector.Colors.ENDC}"
latency_str = "-"
msg = f"{DockerMirrorDetector.Colors.FAIL}{r['msg']}{DockerMirrorDetector.Colors.ENDC}"
print(f"{r['url']:<35} | {status_icon:<20} | {latency_str:<10} | {msg}")