File size: 4,742 Bytes
020c337 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | # coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: zouhw <zhw@bt.cn>
# -------------------------------------------------------------------
# ------------------------------
# Docker模型
# ------------------------------
import db
import public
import json
import os
def check_db():
db_path = "/www/server/panel/data/docker.db"
if not os.path.exists(db_path) or os.path.getsize(db_path) == 0:
execstr = "wget -O {} {}/install/src/docker.db".format(db_path, public.get_url())
public.ExecShell(execstr)
def sql(table):
check_db()
with db.Sql() as sql:
sql.dbfile("docker")
return sql.table(table)
# 实例化docker
def docker_client(url="unix:///var/run/docker.sock"):
import docker
"""
目前仅支持本地服务器
:param url: unix:///var/run/docker.sock
:return:
"""
try:
client = docker.DockerClient(base_url=url)
client.networks.list()
return client
except:
return False
def docker_client_low(url="unix:///var/run/docker.sock"):
"""
docker 低级接口
:param url:
:return:
"""
import docker
try:
client = docker.APIClient(base_url=url)
return client
except docker.errors.DockerException:
return False
#取CPU类型
def get_cpu_count():
import re
cpuinfo = open('/proc/cpuinfo','r').read()
rep = "processor\s*:"
tmp = re.findall(rep,cpuinfo)
if not tmp:
return 0
return len(tmp)
def set_kv(kv_str):
"""
将键值字符串转为对象
:param data:
:return:
"""
if not kv_str:
return None
res = kv_str.split('\n')
data = dict()
for i in res:
i = i.strip()
if not i:
continue
k,v = i.split('=')
data[k]= v
return data
def get_mem_info():
#取内存信息
import psutil
mem = psutil.virtual_memory()
memInfo = int(mem.total)
return memInfo
def byte_conversion(data):
if "b" in data:
return int(data.replace('b',''))
elif "KB" in data:
return int(data.replace('KB','')) * 1024
elif "MB" in data:
return int(data.replace('MB','')) * 1024 * 1024
elif "GB" in data:
return int(data.replace('GB','')) * 1024 * 1024 * 1024
else:
return False
def log_docker(generator, task_name):
__log_path = '/tmp/dockertmp.log'
while True:
try:
output = generator.__next__()
try:
output = json.loads(output)
if 'status' in output:
output_str = "{}\n".format(output['status'])
public.writeFile(__log_path, output_str, 'a+')
except:
public.writeFile(__log_path, public.get_error_info(), 'a+')
if 'stream' in output:
output_str = output['stream']
public.writeFile(__log_path,output_str,'a+')
except StopIteration:
public.writeFile(__log_path,f'{task_name} complete.','a+')
break
except ValueError:
public.writeFile(log_path,f'Error parsing output from {task_name}: {output}','a+')
def docker_conf():
"""
解析docker配置文件
KEY=VAULE
KEY1=VALUE1
:return:
"""
docker_conf = public.readFile("{}/data/docker.conf".format(public.get_panel_path()))
if not docker_conf:
return {"SAVE":30}
data = dict()
for i in docker_conf.split("\n"):
if not i:
continue
k,v = i.split("=")
if k == "SAVE":
v = int(v)
data[k] = v
return data
def get_process_id(pname,cmd_line):
import psutil
pids = psutil.pids()
for pid in pids:
try:
p = psutil.Process(pid)
if p.name() == pname and cmd_line in p.cmdline():
return pid
except:
pass
return False
def write_log(log_data):
public.WriteLog("Docker", log_data)
def check_socket(port):
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
location = ("127.0.0.1", int(port))
result_of_check = s.connect_ex(location)
s.close()
if result_of_check == 0:
return True
else:
return False
def download_file(url, filename):
'''
下载方法
@param url:
@param filename:
@return:
'''
return public.ExecShell(f"wget -O {filename} {url} --no-check-certificate")
|