page / bt-source /panel /api /terminal_monitor.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
a757bd3 verified
#!/usr/bin/env python3
# coding: utf-8
"""
SSH Terminal Connection Monitoring API
终端连接监控API
Provides REST API endpoints to monitor SSH/Terminal connection status
提供REST API端点来监控SSH/终端连接状态
"""
from flask import Flask, jsonify, request
import os
import time
import json
from pathlib import Path
app = Flask(__name__)
class TerminalMonitor:
"""终端连接监控类"""
def __init__(self):
self.panel_path = '/www/server/panel'
self.log_file = os.path.join(self.panel_path, 'logs/terminal.log')
self.video_dir = os.path.join(self.panel_path, 'data/jumpserver_video')
self.record_db = os.path.join(self.panel_path, 'data/db/default.db')
def get_active_connections(self):
"""获取当前活跃连接"""
connections = []
try:
import sqlite3
conn = sqlite3.connect(self.record_db)
cursor = conn.cursor()
# 查询未关闭的连接(close_time = 0 或未设置)
cursor.execute("""
SELECT id, addr, server_ip, ssh_user, login_time, video_addr
FROM ssh_login_record
WHERE close_time = 0 OR close_time IS NULL
ORDER BY login_time DESC
LIMIT 100
""")
for row in cursor.fetchall():
connections.append({
'id': row[0],
'client_addr': row[1],
'server_ip': row[2],
'ssh_user': row[3],
'login_time': row[4],
'login_time_str': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(row[4])),
'duration': int(time.time()) - row[4] if row[4] > 0 else 0,
'video_addr': row[5] if len(row) > 5 else '',
})
conn.close()
except Exception as e:
print(f"Error getting active connections: {e}")
return connections
def get_connection_stats(self):
"""获取连接统计信息"""
stats = {
'total_connections': 0,
'active_connections': 0,
'today_connections': 0,
'total_duration': 0,
}
try:
import sqlite3
conn = sqlite3.connect(self.record_db)
cursor = conn.cursor()
# 总连接数
cursor.execute("SELECT COUNT(*) FROM ssh_login_record")
stats['total_connections'] = cursor.fetchone()[0]
# 活跃连接数
cursor.execute("SELECT COUNT(*) FROM ssh_login_record WHERE close_time = 0 OR close_time IS NULL")
stats['active_connections'] = cursor.fetchone()[0]
# 今日连接数
today_start = int(time.mktime(time.localtime(time.time())[:3] + (0, 0, 0, 0, 0, 0)))
cursor.execute("SELECT COUNT(*) FROM ssh_login_record WHERE login_time >= ?", (today_start,))
stats['today_connections'] = cursor.fetchone()[0]
# 总使用时长
cursor.execute("SELECT SUM(close_time - login_time) FROM ssh_login_record WHERE close_time > 0")
result = cursor.fetchone()[0]
stats['total_duration'] = result if result else 0
conn.close()
except Exception as e:
print(f"Error getting connection stats: {e}")
return stats
def get_recent_logs(self, lines=100):
"""获取最近的终端日志"""
logs = []
try:
if os.path.exists(self.log_file):
# 读取最后N行
with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.readlines()
logs = [line.strip() for line in content[-lines:]]
except Exception as e:
print(f"Error reading logs: {e}")
return logs
def get_video_recordings(self):
"""获取录像记录"""
videos = []
try:
if os.path.exists(self.video_dir):
for filename in os.listdir(self.video_dir):
if filename.endswith('.json'):
filepath = os.path.join(self.video_dir, filename)
stat = os.stat(filepath)
videos.append({
'filename': filename,
'size': stat.st_size,
'created': int(stat.st_ctime),
'modified': int(stat.st_mtime),
})
except Exception as e:
print(f"Error getting video recordings: {e}")
return sorted(videos, key=lambda x: x['created'], reverse=True)
def check_terminal_health(self):
"""检查终端服务健康状态"""
health = {
'status': 'healthy',
'issues': [],
}
# 检查日志文件
if not os.path.exists(self.log_file):
health['status'] = 'warning'
health['issues'].append('Terminal log file not found')
# 检查录像目录
if not os.path.exists(self.video_dir):
health['status'] = 'warning'
health['issues'].append('Video directory not found')
# 检查数据库连接
try:
import sqlite3
conn = sqlite3.connect(self.record_db)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sqlite_master")
conn.close()
except Exception as e:
health['status'] = 'error'
health['issues'].append(f'Database connection failed: {e}')
return health
# API路由
monitor = TerminalMonitor()
@app.route('/api/terminal/monitor/active', methods=['GET'])
def get_active_connections():
"""获取活跃连接"""
connections = monitor.get_active_connections()
return jsonify({
'status': 'success',
'data': connections,
'count': len(connections),
})
@app.route('/api/terminal/monitor/stats', methods=['GET'])
def get_stats():
"""获取连接统计"""
stats = monitor.get_connection_stats()
return jsonify({
'status': 'success',
'data': stats,
})
@app.route('/api/terminal/monitor/logs', methods=['GET'])
def get_logs():
"""获取最近日志"""
lines = request.args.get('lines', 100, type=int)
logs = monitor.get_recent_logs(lines)
return jsonify({
'status': 'success',
'data': logs,
'count': len(logs),
})
@app.route('/api/terminal/monitor/videos', methods=['GET'])
def get_videos():
"""获取录像列表"""
videos = monitor.get_video_recordings()
return jsonify({
'status': 'success',
'data': videos,
'count': len(videos),
})
@app.route('/api/terminal/monitor/health', methods=['GET'])
def get_health():
"""获取健康状态"""
health = monitor.check_terminal_health()
return jsonify({
'status': 'success',
'data': health,
})
@app.route('/api/terminal/monitor/close/<int:conn_id>', methods=['POST'])
def close_connection(conn_id):
"""关闭指定连接"""
try:
import sqlite3
conn = sqlite3.connect(monitor.record_db)
cursor = conn.cursor()
cursor.execute("UPDATE ssh_login_record SET close_time = ? WHERE id = ?", (int(time.time()), conn_id))
conn.commit()
conn.close()
return jsonify({
'status': 'success',
'msg': f'Connection {conn_id} closed',
})
except Exception as e:
return jsonify({
'status': 'error',
'msg': str(e),
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)