Spaces:
Paused
Paused
| import logging, sys | |
| import streamlit as st | |
| import asyncio | |
| import aiohttp | |
| import yaml | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| import warnings | |
| import queue | |
| # 尝试导入 aioquic 和 shadowsocks(用于真实代理测试) | |
| try: | |
| import aioquic | |
| AIOQUIC_AVAILABLE = True | |
| except ImportError: | |
| AIOQUIC_AVAILABLE = False | |
| print("警告: aioquic 未安装,VMess/VLESS/Trojan 将只测试端口连通性") | |
| try: | |
| import socks # PySocks | |
| SHADOWSOCKS_AVAILABLE = True | |
| except ImportError: | |
| SHADOWSOCKS_AVAILABLE = False | |
| print("警告: PySocks 未安装,Shadowsocks 将只测试端口连通性") | |
| warnings.filterwarnings('ignore') | |
| # 配置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| handlers=[ | |
| logging.FileHandler('/tmp/streamlit_test.log', mode='w'), | |
| logging.StreamHandler(sys.stderr) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| st.set_page_config(page_title="节点可用性测试", layout="wide") | |
| print("测试测试测试11") | |
| # 自定义CSS | |
| st.markdown(""" | |
| <style> | |
| .stProgress > div > div > div > div { | |
| background-color: #4CAF50; | |
| } | |
| .main { | |
| padding-top: 2rem; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| class NodeTester: | |
| def __init__(self, yaml_content: str, test_url: str, max_concurrent: int, timeout: int): | |
| logger.info(f"NodeTester 初始化 - yaml长度={len(yaml_content)}, max_concurrent={max_concurrent}, timeout={timeout}") | |
| self.yaml_content = yaml_content | |
| self.test_url = test_url | |
| self.max_concurrent = max_concurrent | |
| self.timeout = timeout | |
| self.available_nodes: List[Dict] = [] | |
| self.results: List[tuple] = [] | |
| self.running = True | |
| self._lock = threading.Lock() | |
| self._progress = 0 | |
| self._total = 0 | |
| self._session: Optional[aiohttp.ClientSession] = None | |
| # 进度队列(用于实时更新UI) | |
| self._progress_queue: queue.Queue = queue.Queue() | |
| # 保留回调(兼容性) | |
| self._progress_callback = None | |
| def set_progress_callback(self, callback): | |
| """设置进度回调""" | |
| self._progress_callback = callback | |
| def get_progress(self): | |
| """获取当前进度(非阻塞)""" | |
| try: | |
| return self._progress_queue.get_nowait() | |
| except queue.Empty: | |
| return None | |
| def clear_queue(self): | |
| """清空进度队列""" | |
| while not self._progress_queue.empty(): | |
| try: | |
| self._progress_queue.get_nowait() | |
| except: | |
| break | |
| def parse_yaml(self) -> List[Dict]: | |
| """解析 YAML 文件,提取所有节点""" | |
| nodes = [] | |
| # 尝试解析为完整 YAML | |
| try: | |
| data = yaml.safe_load(self.yaml_content) | |
| if not data: | |
| return [] | |
| # 处理 Clash 格式 | |
| if 'proxies' in data: | |
| for proxy in data['proxies']: | |
| nodes.append(self._normalize_clash_node(proxy)) | |
| # 处理 proxy-providers | |
| elif 'proxy-providers' in data: | |
| for provider_name, provider in data['proxy-providers'].items(): | |
| if 'url' in provider: | |
| # 远程订阅,需要单独获取 | |
| pass | |
| elif 'proxy' in provider: | |
| for proxy in provider['proxy']: | |
| nodes.append(self._normalize_clash_node(proxy)) | |
| # 处理数组格式 | |
| elif isinstance(data, list): | |
| for item in data: | |
| if isinstance(item, dict): | |
| nodes.append(self._normalize_v2ray_node(item)) | |
| # 处理单个节点 | |
| elif isinstance(data, dict): | |
| nodes.append(self._normalize_v2ray_node(data)) | |
| except yaml.YAMLError as e: | |
| st.error(f"YAML 解析失败: {e}") | |
| return [] | |
| # 如果 YAML 解析失败,尝试手动解析 | |
| if not nodes: | |
| nodes = self._parse_manual() | |
| return [n for n in nodes if n.get('server')] | |
| def _normalize_clash_node(self, proxy: Dict) -> Dict: | |
| """标准化 Clash 节点格式""" | |
| type_map = { | |
| 'ss': 'shadowsocks', | |
| 'vmess': 'vmess', | |
| 'vless': 'vless', | |
| 'trojan': 'trojan', | |
| 'http': 'http', | |
| 'socks5': 'socks5', | |
| } | |
| return { | |
| 'name': proxy.get('name', ''), | |
| 'type': type_map.get(proxy.get('type', 'http'), proxy.get('type', 'http')), | |
| 'server': proxy.get('server', ''), | |
| 'port': proxy.get('port', 0), | |
| 'username': proxy.get('username', ''), | |
| 'password': proxy.get('password', ''), | |
| 'cipher': proxy.get('cipher', 'aes-256-gcm'), | |
| 'uuid': proxy.get('uuid', ''), | |
| 'alterId': proxy.get('alterId', 0), | |
| 'network': proxy.get('network', 'tcp'), | |
| 'tls': proxy.get('tls', False), | |
| 'skip-cert-verify': proxy.get('skip-cert-verify', False), | |
| } | |
| def _normalize_v2ray_node(self, node: Dict) -> Dict: | |
| """标准化 V2Ray 节点格式""" | |
| return { | |
| 'name': node.get('name', node.get('ps', '')), | |
| 'type': node.get('type', 'vmess'), | |
| 'server': node.get('add', node.get('server', '')), | |
| 'port': int(node.get('port', 0)), | |
| 'uuid': node.get('id', node.get('uuid', '')), | |
| 'alterId': int(node.get('aid', node.get('alterId', 0))), | |
| 'network': node.get('net', 'tcp'), | |
| 'tls': node.get('tls', ''), | |
| } | |
| def _parse_manual(self) -> List[Dict]: | |
| """手动解析(备选方案)""" | |
| nodes = [] | |
| lines = self.yaml_content.splitlines() | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| if line.startswith('name:') or line.startswith('- name:'): | |
| node = {'name': line.split('name:')[1].strip().strip('"\'')} | |
| # 读取后续行直到下一个同级节点 | |
| indent = len(lines[i]) - len(lines[i].lstrip()) | |
| i += 1 | |
| while i < len(lines): | |
| curr_line = lines[i] | |
| curr_indent = len(curr_line) - len(curr_line.lstrip()) | |
| if curr_line.strip() and curr_indent <= indent: | |
| break | |
| if 'server:' in curr_line: | |
| node['server'] = curr_line.split('server:')[1].strip() | |
| elif 'port:' in curr_line: | |
| try: | |
| node['port'] = int(curr_line.split('port:')[1].strip()) | |
| except: | |
| pass | |
| elif 'type:' in curr_line: | |
| node['type'] = curr_line.split('type:')[1].strip() | |
| i += 1 | |
| if node.get('server'): | |
| nodes.append(node) | |
| else: | |
| i += 1 | |
| return nodes | |
| async def _test_http_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: | |
| """测试 HTTP/SOCKS5 代理""" | |
| name = node.get('name', node.get('server', 'Unknown')) | |
| server = node.get('server') | |
| port = node.get('port', 0) | |
| if not server or not port: | |
| return False, f"⏭️ {name}: 配置错误" | |
| proxy_type = node.get('type', 'http') | |
| # 构建代理 URL | |
| if proxy_type in ['socks5', 'socks']: | |
| proxy_url = f"socks5://{server}:{port}" | |
| else: | |
| proxy_url = f"http://{server}:{port}" | |
| # 处理认证 | |
| auth = None | |
| if node.get('username') and node.get('password'): | |
| auth = aiohttp.BasicAuth(node['username'], node['password']) | |
| try: | |
| timeout = aiohttp.ClientTimeout(total=self.timeout) | |
| async with session.get( | |
| self.test_url, | |
| proxy=proxy_url, | |
| timeout=timeout, | |
| ssl=False if node.get('skip-cert-verify') else True | |
| ) as resp: | |
| if resp.status == 200: | |
| return True, f"✅ {name}" | |
| else: | |
| return False, f"❌ {name}: HTTP {resp.status}" | |
| except asyncio.TimeoutError: | |
| return False, f"⏱️ {name}: 超时" | |
| except aiohttp.ClientError as e: | |
| err_msg = str(e)[:30] | |
| return False, f"❌ {name}: {err_msg}" | |
| except Exception as e: | |
| return False, f"❌ {name}: {str(e)[:30]}" | |
| async def _test_shadowsocks_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: | |
| """测试 Shadowsocks 节点""" | |
| name = node.get('name', 'SS节点') | |
| server = node.get('server') | |
| port = node.get('port', 0) | |
| if not server or not port: | |
| return False, f"⏭️ {name}: 配置错误" | |
| # 尝试真实代理测试 | |
| if SHADOWSOCKS_AVAILABLE: | |
| logger.info(f"[SS] {name} 尝试真实代理测试: {server}:{port}") | |
| return await self._test_shadowsocks_real(server, port, name, node) | |
| else: | |
| # 回退到 TCP 端口测试 | |
| logger.info(f"[SS] {name} 回退到端口测试: {server}:{port}") | |
| return await self._test_tcp_port(server, port, name) | |
| async def _test_shadowsocks_real(self, server: str, port: int, name: str, node: Dict) -> tuple: | |
| """真实的 Shadowsocks 代理测试 - 通过代理发起HTTP请求验证""" | |
| try: | |
| cipher = node.get('cipher', 'aes-256-gcm') | |
| password = node.get('password', '') | |
| if not password: | |
| return await self._test_tcp_port(server, port, name) | |
| # 在线程池中运行同步的 PySocks 代码 | |
| result = await asyncio.to_thread( | |
| self._test_shadowsocks_sync, server, port, name, node | |
| ) | |
| return result | |
| except Exception as e: | |
| err_msg = str(e)[:50] | |
| return False, f"❌ {name}: {err_msg}" | |
| def _test_shadowsocks_sync(self, server: str, port: int, name: str, node: Dict) -> tuple: | |
| """同步的 Shadowsocks 测试(在线程池中运行)""" | |
| import socks | |
| import socket | |
| try: | |
| # 创建 SOCKS5 代理连接 | |
| s = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.set_proxy(socks.SOCKS5, server, port, username=None, password=None, rdns=True) | |
| s.settimeout(10) | |
| # 连接测试(使用 IP 避免 DNS 问题) | |
| test_hosts = [ | |
| ('142.250.185.206', 80), # Google | |
| ('172.64.155.4', 80), # Cloudflare | |
| ] | |
| for host, test_port in test_hosts: | |
| try: | |
| s.connect((host, test_port)) | |
| s.send(b'GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n') | |
| data = s.recv(1024) | |
| s.close() | |
| if data and len(data) > 0: | |
| return True, f"✅ {name} (SS代理正常)" | |
| except Exception: | |
| continue | |
| return False, f"⏭️ {name}: 代理无响应" | |
| except ImportError: | |
| return False, f"⏭️ {name}: PySocks未安装" | |
| except Exception as e: | |
| return False, f"❌ {name}: {str(e)[:30]}" | |
| async def _test_tcp_port(self, server: str, port: int, name: str) -> tuple: | |
| """TCP 端口连通性测试""" | |
| try: | |
| reader, writer = await asyncio.wait_for( | |
| asyncio.open_connection(server, port), | |
| timeout=3 | |
| ) | |
| writer.close() | |
| await writer.wait_closed() | |
| return True, f"✅ {name} (端口可达)" | |
| except asyncio.TimeoutError: | |
| return False, f"⏱️ {name}: 端口超时" | |
| except Exception as e: | |
| return False, f"❌ {name}: 端口不可达" | |
| async def _test_vmess_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: | |
| """测试 VMess/VLESS/Trojan 节点""" | |
| name = node.get('name', 'VMess节点') | |
| server = node.get('server') | |
| port = node.get('port', 0) | |
| if not server or not port: | |
| return False, f"⏭️ {name}: 配置错误" | |
| # 如果 aioquic 可用,尝试真正的协议测试 | |
| if AIOQUIC_AVAILABLE: | |
| return await self._test_quic_protocol(server, port, name, node) | |
| else: | |
| # 回退到 TCP 端口测试 | |
| return await self._test_tcp_port(server, port, name) | |
| async def _test_quic_protocol(self, server: str, port: int, name: str, node: Dict) -> tuple: | |
| """测试 VMess/VLESS/Trojan - 真实协议测试""" | |
| try: | |
| # 尝试建立 TCP 连接 | |
| reader, writer = await asyncio.wait_for( | |
| asyncio.open_connection(server, port), | |
| timeout=5 | |
| ) | |
| # 如果是 TLS 端口,尝试 TLS 握手(简化处理) | |
| if node.get('tls') or node.get('network') == 'ws': | |
| pass | |
| writer.close() | |
| await writer.wait_closed() | |
| return True, f"✅ {name} (端口可达)" | |
| except asyncio.TimeoutError: | |
| return False, f"⏱️ {name}: 连接超时" | |
| except Exception as e: | |
| err_msg = str(e)[:30] | |
| return False, f"❌ {name}: {err_msg}" | |
| async def test_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: | |
| """根据节点类型选择测试方法""" | |
| node_type = node.get('type', 'http').lower() | |
| if node_type in ['http', 'socks5', 'socks']: | |
| return await self._test_http_node(session, node) | |
| elif node_type == 'shadowsocks': | |
| return await self._test_shadowsocks_node(session, node) | |
| elif node_type in ['vmess', 'vless', 'trojan']: | |
| return await self._test_vmess_node(session, node) | |
| else: | |
| # 默认为 HTTP | |
| return await self._test_http_node(session, node) | |
| async def _worker(self, session: aiohttp.ClientSession, nodes: List[Dict], start_idx: int, step: int): | |
| """工作协程""" | |
| for i in range(start_idx, len(nodes), step): | |
| if not self.running: | |
| break | |
| node = nodes[i] | |
| success, message = await self.test_node(session, node) | |
| with self._lock: | |
| self.results.append((success, message)) | |
| self._progress += 1 | |
| progress = self._progress | |
| # 调试:每10个节点打印一次 | |
| if self._progress % 10 == 0 or self._progress == self._total: | |
| logger.debug(f"进度: {self._progress}/{self._total}") | |
| # 调用进度回调 | |
| if self._progress_callback: | |
| self._progress_callback(progress, self._total) | |
| # 放入队列供轮询使用 | |
| self._progress_queue.put((progress, self._total, success, message)) | |
| if success: | |
| with self._lock: | |
| self.available_nodes.append(node) | |
| async def test_direct_connection(self) -> tuple: | |
| """测试直连""" | |
| try: | |
| timeout = aiohttp.ClientTimeout(total=self.timeout) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(self.test_url, timeout=timeout, ssl=False) as resp: | |
| if resp.status == 200: | |
| return True, f"✅ 直连成功 (HTTP {resp.status})" | |
| else: | |
| return False, f"⚠️ 直连返回: HTTP {resp.status}" | |
| except asyncio.TimeoutError: | |
| return False, "⏱️ 直连超时" | |
| except Exception as e: | |
| return False, f"❌ 直连失败: {str(e)[:50]}" | |
| async def test_all_nodes_async(self): | |
| """并发测试所有节点""" | |
| logger.info("开始 parse_yaml...") | |
| nodes = self.parse_yaml() | |
| self._total = len(nodes) | |
| # 调试:记录解析的节点数 | |
| logger.info(f"解析到 {len(nodes)} 个节点") | |
| if not nodes: | |
| logger.warning("没有解析到任何节点!") | |
| return 0, 0 | |
| # 创建 session(连接池复用) | |
| timeout = aiohttp.ClientTimeout(total=self.timeout) | |
| connector = aiohttp.TCPConnector( | |
| limit=self.max_concurrent, | |
| limit_per_host=self.max_concurrent, | |
| ttl_dns_cache=300, | |
| ) | |
| async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: | |
| # 创建并发任务 | |
| tasks = [] | |
| for i in range(self.max_concurrent): | |
| task = asyncio.create_task( | |
| self._worker(session, nodes, i, self.max_concurrent) | |
| ) | |
| tasks.append(task) | |
| # 等待所有任务完成 | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| return self._total, len(self.available_nodes) | |
| def test_all_nodes(self): | |
| """同步包装 - 兼容受限环境""" | |
| return asyncio.run(self.test_all_nodes_async()) | |
| def get_available_yaml(self) -> Optional[str]: | |
| """生成可用节点的 YAML(Clash 格式,可直接导入)""" | |
| if not self.available_nodes: | |
| return None | |
| output = "# 可用节点列表 - 自动生成\n" | |
| output += f"# 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" | |
| output += "# 节点数量: " + str(len(self.available_nodes)) + "\n\n" | |
| output += "proxies:\n" | |
| for node in self.available_nodes: | |
| output += " - " | |
| # 基础字段 | |
| parts = [f"name: {node.get('name', 'Unknown')}"] | |
| parts.append(f"type: {node.get('type', 'http')}") | |
| parts.append(f"server: {node.get('server', '')}") | |
| parts.append(f"port: {node.get('port', 0)}") | |
| # 类型相关字段 | |
| if node.get('username'): | |
| parts.append(f"username: {node.get('username')}") | |
| if node.get('password'): | |
| parts.append(f"password: {node.get('password')}") | |
| if node.get('cipher'): | |
| parts.append(f"cipher: {node.get('cipher')}") | |
| if node.get('uuid'): | |
| parts.append(f"uuid: {node.get('uuid')}") | |
| if node.get('alterId'): | |
| parts.append(f"alterId: {node.get('alterId')}") | |
| if node.get('network'): | |
| parts.append(f"network: {node.get('network')}") | |
| if node.get('tls'): | |
| parts.append(f"tls: {str(node.get('tls')).lower()}") | |
| if node.get('skip-cert-verify'): | |
| parts.append(f"skip-cert-verify: true") | |
| output += "\n ".join(parts) + "\n" | |
| return output | |
| # 主界面 | |
| st.title("🔍 节点可用性测试工具") | |
| logger.info("Streamlit 应用启动") | |
| # 显示上次保存的可用节点(如果存在) | |
| if 'last_available_yaml' in st.session_state: | |
| saved = st.session_state['last_available_yaml'] | |
| with st.expander(f"📦 上次测试结果 ({saved['count']} 个节点 - {saved['time']})", expanded=False): | |
| st.info(f"📁 上次测试保存了 {saved['count']} 个可用节点") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.download_button( | |
| label="📥 下载 available.yaml", | |
| data=saved['content'], | |
| file_name="available.yaml", | |
| mime="text/yaml" | |
| ) | |
| with col2: | |
| st.caption(f"保存时间: {saved['time']}") | |
| st.markdown("---") | |
| # 侧边栏配置 | |
| with st.sidebar: | |
| st.header("⚙️ 配置") | |
| test_url = st.text_input("测试网址", value="https://www.google.com") | |
| max_concurrent = st.number_input("并发数量", min_value=1, value=50, step=1) | |
| timeout = st.number_input("超时时间(秒)", min_value=1, value=5, step=1) | |
| st.markdown("---") | |
| st.markdown(f"**aioquic**: {'✅ 已安装' if AIOQUIC_AVAILABLE else '❌ 未安装'}") | |
| if not AIOQUIC_AVAILABLE: | |
| st.caption("运行 `pip install aioquic` 启用真实代理测试") | |
| st.markdown("---") | |
| # 文件上传 | |
| st.header("📁 上传配置文件") | |
| uploaded_file = st.file_uploader( | |
| "选择YAML文件", | |
| type=['yaml', 'yml', 'conf'], | |
| help="上传包含代理节点配置的YAML文件" | |
| ) | |
| # 主区域 | |
| if uploaded_file is None: | |
| st.info("👈 请在侧边栏上传YAML配置文件") | |
| st.markdown(""" | |
| ### 支持的节点格式 | |
| | 格式 | 支持状态 | | |
| |------|----------| | |
| | Clash proxies[] | ✅ 完全支持 | | |
| | V2Ray JSON | ✅ 完全支持 | | |
| | HTTP 代理 | ✅ 完全支持 | | |
| | SOCKS5 | ✅ 完全支持 | | |
| | Shadowsocks | ✅ TCP端口检测 | | |
| | VMess/VLESS/Trojan | ✅ TCP端口检测 | | |
| ### 测试方式 | |
| - HTTP/SOCKS5: 实际代理请求测试 | |
| - SS/VMess/VLESS/Trojan: TCP端口连通性 | |
| """) | |
| else: | |
| yaml_content = uploaded_file.getvalue().decode('utf-8') | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.subheader("📄 文件信息") | |
| st.text(f"文件名: {uploaded_file.name}") | |
| st.text(f"文件大小: {uploaded_file.size / 1024:.2f} KB") | |
| with col2: | |
| st.subheader("📊 节点统计") | |
| try: | |
| temp_tester = NodeTester(yaml_content, test_url, 1, 1) | |
| nodes = temp_tester.parse_yaml() | |
| st.metric("检测到节点数", len(nodes)) | |
| except: | |
| st.metric("检测到节点数", yaml_content.count('name:')) | |
| st.markdown("---") | |
| if st.button("🚀 开始测试", type="primary", use_container_width=True): | |
| logger.info("用户点击了开始测试按钮") | |
| # 进度区域 | |
| st.subheader("⏳ 测试进度") | |
| progress_bar = st.progress(0) | |
| progress_text = st.empty() | |
| # 初始化测试器 | |
| logger.info("正在初始化 NodeTester...") | |
| tester = NodeTester(yaml_content, test_url, max_concurrent, timeout) | |
| logger.info("NodeTester 初始化完成") | |
| logger.info("开始直连测试...") | |
| with st.expander("🌐 直连测试", expanded=True): | |
| direct_ok, direct_msg = asyncio.run(tester.test_direct_connection()) | |
| logger.info(f"直连测试结果: {direct_ok} - {direct_msg}") | |
| if direct_ok: | |
| st.success(direct_msg) | |
| else: | |
| st.warning(direct_msg) | |
| st.info("直连失败可能是因为目标网址被墙,但这不影响通过代理测试") | |
| progress_text.markdown("⏳ 准备中...") | |
| logger.info("开始异步测试...") | |
| # 共享状态 - 保存在 session_state 中以便 rerun 后能继续访问 | |
| if 'test_state' not in st.session_state: | |
| st.session_state.test_state = {'total': 0, 'available': 0, 'complete': False, 'results': [], 'tester': None} | |
| test_state = st.session_state.test_state | |
| # 如果 tester 已存在,从 session_state 恢复 | |
| tester = test_state.get('tester') | |
| if tester is None: | |
| tester = NodeTester(yaml_content, test_url, max_concurrent, timeout) | |
| test_state['tester'] = tester | |
| # 在单独线程中运行异步测试 | |
| def run_test(): | |
| logger.info("run_test 线程启动") | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| logger.info("准备执行 test_all_nodes_async") | |
| total, available = loop.run_until_complete(tester.test_all_nodes_async()) | |
| logger.info(f"test_all_nodes_async 执行完成: total={total}, available={available}") | |
| test_state['total'] = total | |
| test_state['available'] = available | |
| test_state['results'] = tester.results.copy() | |
| test_state['complete'] = True | |
| loop.close() | |
| logger.info("run_test 线程结束") | |
| logger.info("启动测试线程...") | |
| test_thread = threading.Thread(target=run_test) | |
| test_thread.start() | |
| # 轮询更新进度 | |
| logger.info("进入轮询循环...") | |
| poll_interval = 1.0 | |
| poll_count = 0 | |
| while test_thread.is_alive(): | |
| current = tester._progress | |
| if tester._total > 0: | |
| pct = current / tester._total | |
| progress_bar.progress(min(pct, 1.0)) | |
| progress_text.markdown(f"🔄 测试中... {current}/{tester._total} 节点 ({pct*100:.1f}%)") | |
| else: | |
| progress_text.markdown(f"🔄 解析配置中... ({poll_count}s)") | |
| logger.info(f"轮询 #{poll_count}: progress={current}, total={tester._total}") | |
| poll_count += 1 | |
| time.sleep(poll_interval) | |
| # 确保完成 | |
| test_thread.join() | |
| # 获取可用节点 YAML | |
| available_yaml = tester.get_available_yaml() | |
| total = test_state['total'] | |
| available = test_state['available'] | |
| results_so_far = test_state['results'] | |
| # 最终进度 | |
| progress_bar.progress(1.0) | |
| progress_text.markdown(f"✅ 测试完成! {available}/{total} 可用") | |
| st.markdown("---") | |
| st.subheader("📋 测试结果") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("总节点数", total) | |
| with col2: | |
| st.metric("可用节点", available, delta=f"{available/total*100:.1f}%" if total > 0 else "0%") | |
| with col3: | |
| st.metric("失败节点", total - available) | |
| # 生成详细日志文件(不直接展示,避免卡死) | |
| log_lines = [] | |
| success_count = 0 | |
| fail_count = 0 | |
| for success, message in results_so_far: | |
| status = "✅ 可用" if success else "❌ 不可用" | |
| log_lines.append(f"{status} | {message}") | |
| if success: | |
| success_count += 1 | |
| else: | |
| fail_count += 1 | |
| log_content = f"""节点测试报告 | |
| 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| ======================================== | |
| 总节点数: {total} | |
| 可用节点: {success_count} | |
| 不可用节点: {fail_count} | |
| 通过率: {success_count/total*100:.1f}% (测试成功) | |
| ======================================== | |
| 详细日志: | |
| ---------------------------------------- | |
| """ | |
| log_content += "\n".join(log_lines) | |
| st.download_button( | |
| label="📥 下载完整日志", | |
| data=log_content, | |
| file_name=f"test_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", | |
| mime="text/plain", | |
| use_container_width=True | |
| ) | |
| if available > 0: | |
| st.markdown("---") | |
| st.subheader("💾 下载可用节点") | |
| # 保存到 session_state(持久化可用节点) | |
| if 'last_available_yaml' not in st.session_state: | |
| st.session_state['last_available_yaml'] = {} | |
| st.session_state['last_available_yaml']['content'] = available_yaml | |
| st.session_state['last_available_yaml']['count'] = available | |
| st.session_state['last_available_yaml']['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| # 生成固定文件名的下载按钮(方便直接导入) | |
| st.download_button( | |
| label=f"📥 下载可用节点 ({available} 个) - 固定文件名可用.yaml", | |
| data=available_yaml, | |
| file_name="available.yaml", | |
| mime="text/yaml", | |
| use_container_width=True | |
| ) | |
| # 同时保存带时间戳的备份 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| st.download_button( | |
| label=f"📥 下载备份 ({timestamp})", | |
| data=available_yaml, | |
| file_name=f"available_{timestamp}.yaml", | |
| mime="text/yaml", | |
| use_container_width=True | |
| ) | |
| # 小预览(限制字符避免卡死) | |
| preview_size = min(500, len(available_yaml)) | |
| st.caption(f"📄 预览 (前 {preview_size} 字符)") | |
| st.code(available_yaml[:500], language="yaml") | |
| # 生成可直接导入的代理链配置 | |
| with st.expander("📋 代理链配置 (Clash Meta)"): | |
| proxy_chain = "proxies:\n" | |
| for node in tester.available_nodes[:10]: # 只取前10个 | |
| name = node.get('name', 'Unknown') | |
| proxy_chain += f" - name: \"{name}\"\n" | |
| proxy_chain += f" type: {node.get('type', 'http')}\n" | |
| proxy_chain += f" server: {node.get('server', '')}\n" | |
| proxy_chain += f" port: {node.get('port', 0)}\n" | |
| st.code(proxy_chain, language="yaml") | |
| else: | |
| st.warning("❌ 没有可用的节点") | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <div style='text-align: center; color: gray;'> | |
| <p>🚀 支持 Shadowsocks / VMess / VLESS / Trojan TCP端口检测</p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) |