| import time |
| import requests |
| import re |
| from typing import List, Dict, Union |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
| class DockerMirrorDetector: |
| |
| DEFAULT_HEADERS = { |
| "User-Agent": "Docker-Client/19.03.8 (linux)", |
| "Accept": "application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json" |
| } |
| |
| |
| class Colors: |
| HEADER = '\033[95m' |
| BLUE = '\033[94m' |
| GREEN = '\033[92m' |
| WARNING = '\033[93m' |
| FAIL = '\033[91m' |
| ENDC = '\033[0m' |
| BOLD = '\033[1m' |
| |
| def __init__(self, mirror_url: str, image: str = "library/nginx", tag: str = "latest", debug: bool = False, |
| timeout: int = 5): |
| """ |
| 初始化测试器 |
| :param mirror_url: 镜像站地址 (例如 hub.1panel.dev) |
| :param image: 测试用的镜像 (默认 library/nginx) |
| :param tag: 镜像标签 (默认 latest) |
| :param debug: 是否打印详细调试信息 |
| :param timeout: 请求超时时间 |
| """ |
| |
| self.mirror_url = mirror_url.replace("https://", "").replace("http://", "").rstrip("/") |
| self.image = image |
| self.tag = tag |
| self.debug = debug |
| self.timeout = timeout |
| self.base_url = f"https://{self.mirror_url}" |
| |
| |
| requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) |
| |
| def _log(self, message: str, level: str = "INFO"): |
| """内部方法:格式化打印日志""" |
| if not self.debug: |
| return |
| |
| prefix = "" |
| if level == "INFO": |
| prefix = f"{self.Colors.BLUE}[INFO]{self.Colors.ENDC}" |
| elif level == "SUCCESS": |
| prefix = f"{self.Colors.GREEN}[SUCCESS]{self.Colors.ENDC}" |
| elif level == "ERROR": |
| prefix = f"{self.Colors.FAIL}[ERROR]{self.Colors.ENDC}" |
| elif level == "WARN": |
| prefix = f"{self.Colors.WARNING}[WARN]{self.Colors.ENDC}" |
| |
| print(f"{prefix} {message}") |
| |
| def _parse_auth_header(self, header_value: str): |
| """解析 WWW-Authenticate 头""" |
| realm_match = re.search(r'realm="([^"]+)"', header_value) |
| service_match = re.search(r'service="([^"]+)"', header_value) |
| |
| realm = realm_match.group(1) if realm_match else None |
| service = service_match.group(1) if service_match else None |
| return realm, service |
| |
| def check(self) -> Dict: |
| """ |
| 同步检测单个镜像站 (使用 requests) |
| 这个方法会被 test_batch 和 单次调用复用 |
| """ |
| result = { |
| "url": self.base_url, |
| "available": False, |
| "latency": 0, |
| "status_code": 0, |
| "msg": "", |
| "ts": int(time.time()) |
| } |
| |
| self._log(f"开始测试镜像站: {self.Colors.BOLD}{self.mirror_url}{self.Colors.ENDC}") |
| |
| session = requests.Session() |
| session.verify = False |
| |
| try: |
| |
| v2_url = f"{self.base_url}/v2/" |
| self._log(f"正在连接握手: {v2_url}") |
| |
| resp = session.get(v2_url, headers=self.DEFAULT_HEADERS, timeout=self.timeout) |
| result['status_code'] = resp.status_code |
| |
| auth_headers = self.DEFAULT_HEADERS.copy() |
| |
| |
| if resp.status_code == 401: |
| auth_header = resp.headers.get('WWW-Authenticate', '') |
| self._log(f"需要认证,解析 Header: {auth_header}", "WARN") |
| |
| realm, service = self._parse_auth_header(auth_header) |
| if realm and service: |
| token_url = f"{realm}?service={service}&scope=repository:{self.image}:pull" |
| self._log(f"获取 Token: {token_url}") |
| |
| token_resp = session.get(token_url, verify=False, timeout=self.timeout) |
| if token_resp.status_code == 200: |
| token = token_resp.json().get('token') |
| auth_headers['Authorization'] = f"Bearer {token}" |
| self._log("Token 获取成功", "SUCCESS") |
| else: |
| raise Exception(f"获取 Token 失败: {token_resp.status_code}") |
| else: |
| raise Exception("无法解析认证 Header") |
| |
| elif resp.status_code != 200: |
| raise Exception(f"握手失败,状态码: {resp.status_code}") |
| |
| |
| manifest_url = f"{self.base_url}/v2/{self.image}/manifests/{self.tag}" |
| self._log(f"尝试获取 Manifest: {manifest_url}") |
| |
| |
| req_start = time.time() |
| resp = session.get(manifest_url, headers=auth_headers, timeout=self.timeout) |
| req_end = time.time() |
| |
| if resp.status_code == 200: |
| latency_ms = int((req_end - req_start) * 1000) |
| result['available'] = True |
| result['latency'] = latency_ms |
| result['msg'] = "OK" |
| self._log(f"测试通过! 延迟: {latency_ms}ms", "SUCCESS") |
| else: |
| result['msg'] = f"Manifest获取失败: {resp.status_code}" |
| |
| if self.debug: |
| self._log(f"Manifest获取失败详情: {resp.status_code} \n{resp.text[:100]}", "ERROR") |
| |
| except requests.exceptions.Timeout: |
| result['msg'] = "Timeout" |
| self._log(f"请求超时 ({self.timeout}s)", "ERROR") |
| except requests.exceptions.ConnectionError: |
| result['msg'] = "Connection Error" |
| self._log("连接错误,可能域名不存在或被防火墙拦截", "ERROR") |
| except Exception as e: |
| result['msg'] = str(e) |
| self._log(f"发生异常: {str(e)}", "ERROR") |
| |
| return result |
| |
| @staticmethod |
| def test_batch(mirror_list: List[str], image: str = "library/nginx", tag: str = "latest", max_workers: int = 10, |
| timeout: int = 5) -> List[Dict]: |
| """ |
| 批量多线程检测接口 (替换了 asyncio) |
| :param mirror_list: 镜像站URL列表 |
| :param image: 测试用的镜像 |
| :param tag: 镜像标签 |
| :param max_workers: 最大并发线程数 |
| :param timeout: 请求超时时间 |
| :return: 结果列表 |
| """ |
| results = [] |
| |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| |
| |
| future_to_url = { |
| executor.submit( |
| DockerMirrorDetector( |
| mirror_url=url, |
| image=image, |
| tag=tag, |
| debug=False, |
| timeout=timeout |
| ).check |
| ): url for url in mirror_list |
| } |
| |
| |
| for future in as_completed(future_to_url): |
| try: |
| |
| result = future.result() |
| results.append(result) |
| except Exception as exc: |
| |
| url = future_to_url[future] |
| results.append({ |
| "url": url, |
| "available": False, |
| "latency": 0, |
| "status_code": 0, |
| "msg": f"ThreadPool Exception: {exc}", |
| "ts": int(time.time()) |
| }) |
| |
| return results |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == '__main__': |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| print("\n--- 2. 批量多线程测试模式 ---") |
|
|
| mirrors = [ |
| "docker.m.daocloud.io", |
| "docker.nju.edu.cn", |
| |
| |
| |
| ] |
|
|
| |
| start_t_batch = time.time() |
| results = DockerMirrorDetector.test_batch(mirrors, max_workers=5, timeout=5) |
| end_t_batch = time.time() |
| |
| |
| |
| sorted_results = sorted(results,key=lambda x: (not x['available'], x['latency'] if x['available'] else float('inf'))) |
|
|
| for r in sorted_results: |
| |
| if r['available']: |
| status_icon = f"{DockerMirrorDetector.Colors.GREEN}✅ Available{DockerMirrorDetector.Colors.ENDC}" |
| latency_str = f"{r['latency']} ms" |
| msg = r['msg'] |
| else: |
| status_icon = f"{DockerMirrorDetector.Colors.FAIL}❌ Failed{DockerMirrorDetector.Colors.ENDC}" |
| latency_str = "-" |
| msg = f"{DockerMirrorDetector.Colors.FAIL}{r['msg']}{DockerMirrorDetector.Colors.ENDC}" |
|
|
| print(f"{r['url']:<35} | {status_icon:<20} | {latency_str:<10} | {msg}") |