File size: 8,515 Bytes
a757bd3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | # coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 宝塔软件(http:#bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: 1249648969@qq.com
# -------------------------------------------------------------------
# ------------------------------
# 数据库工具类
# ------------------------------
import sys, os
os.chdir("/www/server/panel")
if not 'class/' in sys.path:
sys.path.insert(0,'class/')
import panelMysql
import re,json,public
class datatools:
DB_MySQL = None
# 字节单位转换
def ToSize(self, size):
ds = ['b', 'KB', 'MB', 'GB', 'TB']
for d in ds:
if size < 1024: return ('%.2f' % size) + d
size = size / 1024
return '0b'
# # 获取当前数据库信息
# def GetdataInfo(self,get):
# '''
# 传递一个数据库名称即可 get.databases
# '''
# db_name=get.db_name
# if not db_name:return False
# if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
# if not self.DB_MySQL: return self.DB_MySQL
# ret = {}
# tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
# if type(tables) == list:
# try:
# data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables where table_schema='%s'" % db_name))[0][0]
# except:
# data=0
# if not data: data = 0
# ret['data_size'] = self.ToSize(data)
# ret['database'] = db_name
# ret3 = []
# for i in tables:
# if i == 1049: return public.returnMsg(False,'指定数据库不存在!')
# if type(i) == int: continue
# table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
# if not table: continue
# try:
# ret2 = {}
# ret2['type']=table[0][1]
# data_size = table[0][6]
# ret2['rows_count'] = self.DB_MySQL.query("select count(*) from `{}`.`{}`".format(db_name,i[0]))[0][0] #table[0][4] 实时获取行数 @authow hwliang<2021-08-05> 修改
# ret2['collation'] = table[0][14]
# ret2['data_size'] = self.ToSize(int(data_size))
# ret2['table_name'] = i[0]
# ret3.append(ret2)
# except: continue
# ret['tables'] = (ret3)
# return ret
# 获取当前数据库信息
def GetdataInfo(self, get):
'''
传递一个数据库名称即可 get.databases
'''
db_name = get.db_name
try:
table_name =get.table_name
except:
table_name = None
if not db_name: return False
if not self.DB_MySQL: self.DB_MySQL = public.get_mysql_obj(db_name)
if not self.DB_MySQL: return self.DB_MySQL
ret = {}
tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
if type(tables) == list:
try:
data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables where table_schema='%s'" % db_name))[0][0]
except:
data = 0
if not data: data = 0
ret['data_size'] = self.ToSize(data)
ret['database'] = db_name
if table_name:
regex = re.compile(table_name, re.IGNORECASE)
matched_tables = [table for table in tables if regex.search(table[0])]
if matched_tables:
tables=matched_tables
else:
tables=[]
ret3 = []
for i in tables:
if i == 1049: return public.returnMsg(False, '指定数据库不存在!')
if type(i) == int: continue
table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
if not table: continue
try:
ret2 = {}
ret2['type'] = table[0][1]
data_size = table[0][6]
ret2['rows_count'] = self.DB_MySQL.query("select count(*) from `{}`.`{}`".format(db_name, i[0]))[0][0] # 实时获取行数
ret2['collation'] = table[0][14]
ret2['data_size'] = self.ToSize(int(data_size))
ret2['o_size'] = int(data_size)
ret2['table_name'] = i[0]
# 获取表的注释信息
comment = self.map_to_list(self.DB_MySQL.query("SELECT table_comment FROM information_schema.tables WHERE table_schema = '{}' AND table_name = '{}'".format(db_name, i[0])))[0][0]
ret2['comment'] = comment
ret3.append(ret2)
except: continue
ret['tables'] = ret3
return ret
#修复表信息
def RepairTable(self,get):
'''
POST:
db_name=web
tables=['web1','web2']
'''
db_name = get.db_name
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
m_version = self.DB_MySQL.query('select version();')[0][0]
if m_version.find('5.1.')!=-1:return public.returnMsg(False,"不支持mysql5.1!")
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table)==list:
if len(mysql_table)>0:
for i in mysql_table:
for i2 in tables:
if i2==i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('REPAIR TABLE `%s`.`%s`'%(db_name,i))
return True
return False
#map to list
def map_to_list(self,map_obj):
try:
if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj)
return map_obj
except: return []
# 优化表
def OptimizeTable(self,get):
'''
POST:
db_name=web
tables=['web1','web2']
'''
db_name = get.db_name
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table) == list:
if len(mysql_table) > 0:
for i in mysql_table:
for i2 in tables:
if i2 == i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('OPTIMIZE table `%s`.`%s` ENGINE=MyISAM' % (db_name,i))
return True
return False
# 更改表引擎
def AlterTable(self,get):
'''
POST:
db_name=web
table_type=innodb
tables=['web1','web2']
'''
db_name = get.db_name
table_type = get.table_type
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table)==list:
if len(mysql_table)>0:
for i in mysql_table:
for i2 in tables:
if i2==i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('alter table `%s`.`%s` ENGINE=`%s`' % (db_name,i,table_type))
return True
return False
#检查表
def CheckTable(self,database,tables,*args,**kwargs):
pass
|