#!/usr/bin/env python3 # coding: utf-8 """ SSH Terminal Connection Monitoring API 终端连接监控API Provides REST API endpoints to monitor SSH/Terminal connection status 提供REST API端点来监控SSH/终端连接状态 """ from flask import Flask, jsonify, request import os import time import json from pathlib import Path app = Flask(__name__) class TerminalMonitor: """终端连接监控类""" def __init__(self): self.panel_path = '/www/server/panel' self.log_file = os.path.join(self.panel_path, 'logs/terminal.log') self.video_dir = os.path.join(self.panel_path, 'data/jumpserver_video') self.record_db = os.path.join(self.panel_path, 'data/db/default.db') def get_active_connections(self): """获取当前活跃连接""" connections = [] try: import sqlite3 conn = sqlite3.connect(self.record_db) cursor = conn.cursor() # 查询未关闭的连接(close_time = 0 或未设置) cursor.execute(""" SELECT id, addr, server_ip, ssh_user, login_time, video_addr FROM ssh_login_record WHERE close_time = 0 OR close_time IS NULL ORDER BY login_time DESC LIMIT 100 """) for row in cursor.fetchall(): connections.append({ 'id': row[0], 'client_addr': row[1], 'server_ip': row[2], 'ssh_user': row[3], 'login_time': row[4], 'login_time_str': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(row[4])), 'duration': int(time.time()) - row[4] if row[4] > 0 else 0, 'video_addr': row[5] if len(row) > 5 else '', }) conn.close() except Exception as e: print(f"Error getting active connections: {e}") return connections def get_connection_stats(self): """获取连接统计信息""" stats = { 'total_connections': 0, 'active_connections': 0, 'today_connections': 0, 'total_duration': 0, } try: import sqlite3 conn = sqlite3.connect(self.record_db) cursor = conn.cursor() # 总连接数 cursor.execute("SELECT COUNT(*) FROM ssh_login_record") stats['total_connections'] = cursor.fetchone()[0] # 活跃连接数 cursor.execute("SELECT COUNT(*) FROM ssh_login_record WHERE close_time = 0 OR close_time IS NULL") stats['active_connections'] = cursor.fetchone()[0] # 今日连接数 today_start = int(time.mktime(time.localtime(time.time())[:3] + (0, 0, 0, 0, 0, 0))) cursor.execute("SELECT COUNT(*) FROM ssh_login_record WHERE login_time >= ?", (today_start,)) stats['today_connections'] = cursor.fetchone()[0] # 总使用时长 cursor.execute("SELECT SUM(close_time - login_time) FROM ssh_login_record WHERE close_time > 0") result = cursor.fetchone()[0] stats['total_duration'] = result if result else 0 conn.close() except Exception as e: print(f"Error getting connection stats: {e}") return stats def get_recent_logs(self, lines=100): """获取最近的终端日志""" logs = [] try: if os.path.exists(self.log_file): # 读取最后N行 with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f: content = f.readlines() logs = [line.strip() for line in content[-lines:]] except Exception as e: print(f"Error reading logs: {e}") return logs def get_video_recordings(self): """获取录像记录""" videos = [] try: if os.path.exists(self.video_dir): for filename in os.listdir(self.video_dir): if filename.endswith('.json'): filepath = os.path.join(self.video_dir, filename) stat = os.stat(filepath) videos.append({ 'filename': filename, 'size': stat.st_size, 'created': int(stat.st_ctime), 'modified': int(stat.st_mtime), }) except Exception as e: print(f"Error getting video recordings: {e}") return sorted(videos, key=lambda x: x['created'], reverse=True) def check_terminal_health(self): """检查终端服务健康状态""" health = { 'status': 'healthy', 'issues': [], } # 检查日志文件 if not os.path.exists(self.log_file): health['status'] = 'warning' health['issues'].append('Terminal log file not found') # 检查录像目录 if not os.path.exists(self.video_dir): health['status'] = 'warning' health['issues'].append('Video directory not found') # 检查数据库连接 try: import sqlite3 conn = sqlite3.connect(self.record_db) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM sqlite_master") conn.close() except Exception as e: health['status'] = 'error' health['issues'].append(f'Database connection failed: {e}') return health # API路由 monitor = TerminalMonitor() @app.route('/api/terminal/monitor/active', methods=['GET']) def get_active_connections(): """获取活跃连接""" connections = monitor.get_active_connections() return jsonify({ 'status': 'success', 'data': connections, 'count': len(connections), }) @app.route('/api/terminal/monitor/stats', methods=['GET']) def get_stats(): """获取连接统计""" stats = monitor.get_connection_stats() return jsonify({ 'status': 'success', 'data': stats, }) @app.route('/api/terminal/monitor/logs', methods=['GET']) def get_logs(): """获取最近日志""" lines = request.args.get('lines', 100, type=int) logs = monitor.get_recent_logs(lines) return jsonify({ 'status': 'success', 'data': logs, 'count': len(logs), }) @app.route('/api/terminal/monitor/videos', methods=['GET']) def get_videos(): """获取录像列表""" videos = monitor.get_video_recordings() return jsonify({ 'status': 'success', 'data': videos, 'count': len(videos), }) @app.route('/api/terminal/monitor/health', methods=['GET']) def get_health(): """获取健康状态""" health = monitor.check_terminal_health() return jsonify({ 'status': 'success', 'data': health, }) @app.route('/api/terminal/monitor/close/', methods=['POST']) def close_connection(conn_id): """关闭指定连接""" try: import sqlite3 conn = sqlite3.connect(monitor.record_db) cursor = conn.cursor() cursor.execute("UPDATE ssh_login_record SET close_time = ? WHERE id = ?", (int(time.time()), conn_id)) conn.commit() conn.close() return jsonify({ 'status': 'success', 'msg': f'Connection {conn_id} closed', }) except Exception as e: return jsonify({ 'status': 'error', 'msg': str(e), }), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)