action / bt-source /panel /class /datatool.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
020c337 verified
# coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 宝塔软件(http:#bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: 1249648969@qq.com
# -------------------------------------------------------------------
# ------------------------------
# 数据库工具类
# ------------------------------
import sys, os
os.chdir("/www/server/panel")
if not 'class/' in sys.path:
sys.path.insert(0,'class/')
import panelMysql
import re,json,public
class datatools:
DB_MySQL = None
# 字节单位转换
def ToSize(self, size):
ds = ['b', 'KB', 'MB', 'GB', 'TB']
for d in ds:
if size < 1024: return ('%.2f' % size) + d
size = size / 1024
return '0b'
# # 获取当前数据库信息
# def GetdataInfo(self,get):
# '''
# 传递一个数据库名称即可 get.databases
# '''
# db_name=get.db_name
# if not db_name:return False
# if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
# if not self.DB_MySQL: return self.DB_MySQL
# ret = {}
# tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
# if type(tables) == list:
# try:
# data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables where table_schema='%s'" % db_name))[0][0]
# except:
# data=0
# if not data: data = 0
# ret['data_size'] = self.ToSize(data)
# ret['database'] = db_name
# ret3 = []
# for i in tables:
# if i == 1049: return public.returnMsg(False,'指定数据库不存在!')
# if type(i) == int: continue
# table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
# if not table: continue
# try:
# ret2 = {}
# ret2['type']=table[0][1]
# data_size = table[0][6]
# ret2['rows_count'] = self.DB_MySQL.query("select count(*) from `{}`.`{}`".format(db_name,i[0]))[0][0] #table[0][4] 实时获取行数 @authow hwliang<2021-08-05> 修改
# ret2['collation'] = table[0][14]
# ret2['data_size'] = self.ToSize(int(data_size))
# ret2['table_name'] = i[0]
# ret3.append(ret2)
# except: continue
# ret['tables'] = (ret3)
# return ret
# 获取当前数据库信息
def GetdataInfo(self, get):
'''
传递一个数据库名称即可 get.databases
'''
db_name = get.db_name
try:
table_name =get.table_name
except:
table_name = None
if not db_name: return False
if not self.DB_MySQL: self.DB_MySQL = public.get_mysql_obj(db_name)
if not self.DB_MySQL: return self.DB_MySQL
ret = {}
tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
if type(tables) == list:
try:
data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables where table_schema='%s'" % db_name))[0][0]
except:
data = 0
if not data: data = 0
ret['data_size'] = self.ToSize(data)
ret['database'] = db_name
if table_name:
regex = re.compile(table_name, re.IGNORECASE)
matched_tables = [table for table in tables if regex.search(table[0])]
if matched_tables:
tables=matched_tables
else:
tables=[]
ret3 = []
for i in tables:
if i == 1049: return public.returnMsg(False, '指定数据库不存在!')
if type(i) == int: continue
table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
if not table: continue
try:
ret2 = {}
ret2['type'] = table[0][1]
data_size = table[0][6]
ret2['rows_count'] = self.DB_MySQL.query("select count(*) from `{}`.`{}`".format(db_name, i[0]))[0][0] # 实时获取行数
ret2['collation'] = table[0][14]
ret2['data_size'] = self.ToSize(int(data_size))
ret2['o_size'] = int(data_size)
ret2['table_name'] = i[0]
# 获取表的注释信息
comment = self.map_to_list(self.DB_MySQL.query("SELECT table_comment FROM information_schema.tables WHERE table_schema = '{}' AND table_name = '{}'".format(db_name, i[0])))[0][0]
ret2['comment'] = comment
ret3.append(ret2)
except: continue
ret['tables'] = ret3
return ret
#修复表信息
def RepairTable(self,get):
'''
POST:
db_name=web
tables=['web1','web2']
'''
db_name = get.db_name
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
m_version = self.DB_MySQL.query('select version();')[0][0]
if m_version.find('5.1.')!=-1:return public.returnMsg(False,"不支持mysql5.1!")
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table)==list:
if len(mysql_table)>0:
for i in mysql_table:
for i2 in tables:
if i2==i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('REPAIR TABLE `%s`.`%s`'%(db_name,i))
return True
return False
#map to list
def map_to_list(self,map_obj):
try:
if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj)
return map_obj
except: return []
# 优化表
def OptimizeTable(self,get):
'''
POST:
db_name=web
tables=['web1','web2']
'''
db_name = get.db_name
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table) == list:
if len(mysql_table) > 0:
for i in mysql_table:
for i2 in tables:
if i2 == i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('OPTIMIZE table `%s`.`%s` ENGINE=MyISAM' % (db_name,i))
return True
return False
# 更改表引擎
def AlterTable(self,get):
'''
POST:
db_name=web
table_type=innodb
tables=['web1','web2']
'''
db_name = get.db_name
table_type = get.table_type
tables = json.loads(get.tables)
if not db_name or not tables: return False
if not self.DB_MySQL:self.DB_MySQL = public.get_mysql_obj(db_name)
mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
ret=[]
if type(mysql_table)==list:
if len(mysql_table)>0:
for i in mysql_table:
for i2 in tables:
if i2==i[0]:
ret.append(i2)
if len(ret)>0:
for i in ret:
self.DB_MySQL.execute('alter table `%s`.`%s` ENGINE=`%s`' % (db_name,i,table_type))
return True
return False
#检查表
def CheckTable(self,database,tables,*args,**kwargs):
pass