| import json |
| import socket |
| import struct |
| import sys |
| import threading |
| import os |
| import atexit |
| from typing import Callable, Any, Union, Tuple, Optional, List |
|
|
|
|
| class StatusServer: |
| def __init__(self, get_status_func: Callable[[bool], Any], server_address: Union[str, Tuple[str, int]]): |
| """ |
| 初始化服务端 |
| :param get_status_func: 获取状态的函数,返回当前进程状态字典, 支持一个参数 init, |
| 当init为True时,表示获取初始化状态,否则为更新状态 |
| :param server_address: 本地套接字文件路径(Unix域)或 (host, port)(TCP) |
| """ |
| self.get_status_func = get_status_func |
| self.server_address = server_address |
| self.clients: List[socket.socket] = [] |
| self.lock = threading.Lock() |
| self.running = False |
| self.server_socket = None |
|
|
| def handle_client(self, client_socket): |
| """处理客户端连接""" |
| try: |
| |
| new_status = self.get_status_func(True) |
| status_bytes = json.dumps(new_status).encode() |
| packed_data = len(status_bytes).to_bytes(4, 'little') + status_bytes |
|
|
| |
| try: |
| |
| client_socket.sendall(packed_data) |
| except Exception as e: |
| print(f"Failed to send update to client: {e}") |
| client_socket.close() |
| return |
|
|
| with self.lock: |
| self.clients.append(client_socket) |
|
|
| |
| while self.running: |
| try: |
| |
| data = client_socket.recv(1024) |
| if not data: |
| break |
| except: |
| break |
|
|
| finally: |
| |
| client_socket.close() |
| with self.lock: |
| if client_socket in self.clients: |
| self.clients.remove(client_socket) |
|
|
| def start_server(self): |
| """启动本地套接字服务端""" |
| self.running = True |
|
|
| if isinstance(self.server_address, str): |
| |
| self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| try: |
| os.unlink(self.server_address) |
| except OSError: |
| if os.path.exists(self.server_address): |
| raise |
| self.server_socket.bind(self.server_address) |
| else: |
| |
| self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| self.server_socket.bind(self.server_address) |
|
|
| self.server_socket.listen(5) |
| print(f"Server is listening on {self.server_address}...") |
|
|
| try: |
| self.running = True |
| while self.running: |
| client_socket, _ = self.server_socket.accept() |
| print("Client connected") |
|
|
| |
| client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) |
| client_thread.start() |
| except KeyboardInterrupt: |
| print("Shutting down server...") |
| finally: |
| self.stop() |
|
|
| def stop(self): |
| """停止服务端并清理资源""" |
| if not self.running: |
| return |
| self.running = False |
|
|
| with self.lock: |
| for client in self.clients: |
| client.close() |
| self.clients.clear() |
|
|
| if self.server_socket: |
| self.server_socket.close() |
| self.server_socket = None |
|
|
| |
| if isinstance(self.server_address, str) and os.path.exists(self.server_address): |
| os.remove(self.server_address) |
| print(f"Socket file removed: {self.server_address}") |
|
|
| def update_status(self, update_data: Optional[dict]=None): |
| """获取最新的状态并推送给所有客户端""" |
| if not update_data: |
| new_status = self.get_status_func(False) |
| else: |
| new_status = update_data |
| status_bytes = json.dumps(new_status).encode() |
| packed_data = len(status_bytes).to_bytes(4, 'little') + status_bytes |
|
|
| with self.lock: |
| for client in self.clients: |
| print("Sending update to client...") |
| print(len(status_bytes), status_bytes, packed_data) |
| try: |
| client.sendall(packed_data) |
| except Exception as e: |
| print(f"Failed to send update to client: {e}") |
| client.close() |
| if client in self.clients: |
| self.clients.remove(client) |
|
|
|
|
| class StatusClient: |
| def __init__(self, server_address, callback=None): |
| """ |
| 初始化客户端 |
| :param server_address: Unix 域路径(字符串) 或 TCP 地址元组 (host, port) |
| :param callback: 接收到状态更新时的回调函数,接受一个 dict 参数 |
| """ |
| self.server_address = server_address |
| self.callback = callback |
| self.sock: Optional[socket.socket] = None |
| self.running = False |
| self.receive_thread = None |
|
|
| def connect(self): |
| """连接到服务端""" |
| if isinstance(self.server_address, str): |
| print("Connecting to Unix socket...", self.server_address) |
| |
| self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| self.sock.connect(self.server_address) |
| else: |
| |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.sock.connect(self.server_address) |
|
|
| print("Connected to server.") |
|
|
| |
| self.running = True |
| self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) |
| self.receive_thread.start() |
|
|
| def _receive_loop(self): |
| buffer = b'' |
| while self.running: |
| try: |
| |
| while len(buffer) < 4: |
| data = self.sock.recv(4) |
| if not data: |
| raise ConnectionResetError("Server closed the connection") |
| buffer += data |
| length = int.from_bytes(buffer[:4], 'little') |
| buffer = buffer[4:] |
|
|
| |
| while len(buffer) < length: |
| data = self.sock.recv(length - len(buffer)) |
| if not data: |
| raise ConnectionResetError("Server closed the connection") |
| buffer += data |
| message = buffer[:length] |
| buffer = buffer[length:] |
|
|
| |
| status = json.loads(message.decode()) |
| if self.callback: |
| self.callback(status) |
| except ConnectionResetError as e: |
| print("连接中断:", e) |
| self.disconnect() |
| break |
| except json.JSONDecodeError as e: |
| print("JSON解析失败:", e) |
| continue |
| except Exception as e: |
| print("接收错误:", e) |
| self.disconnect() |
| break |
|
|
| def disconnect(self): |
| """断开连接""" |
| self.running = False |
| if self.sock: |
| self.sock.close() |
| self.sock = None |
| print("Disconnected from server.") |
|
|
| def stop(self): |
| """停止客户端""" |
| self.disconnect() |
| if self.receive_thread and self.receive_thread.is_alive(): |
| self.receive_thread.join() |
| print("Client stopped.") |
|
|
| def wait_receive(self): |
| if self.receive_thread and self.receive_thread.is_alive(): |
| self.receive_thread.join() |
|
|
|
|
| |
| def register_cleanup(server_instance): |
| def cleanup(): |
| server_instance.stop() |
|
|
| atexit.register(cleanup) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|