ai / bt-source /panel /script /init_db.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
17e971c verified
import re
import sys
import os
import time
import json
import datetime
import shutil
panelPath = '/www/server/panel/'
os.chdir(panelPath)
sys.path.insert(0, panelPath + "class/")
import public
import db
# 数据库配置文件
DATABASES_PATH = os.path.join(public.get_panel_path(), "config/databases.json")
ERR_INFO = None
# 获取数据库对象
def get_db_obj(sfile):
db_obj = db.Sql()
db_obj.dbfile(sfile)
return db_obj
default_db_obj = get_db_obj(os.path.join(panelPath, "data/default.db"))
# default_db_obj = get_db_obj("/www/default.db")
dst_dir = '{}/data/db/'.format(panelPath)
dst_dir_f = dst_dir[:-1] # 去掉最后的斜杠,否则无法判断文件是否存在
if not os.path.exists(dst_dir_f):
os.makedirs(dst_dir_f, 384)
else:
# 处理存在./data/db文件的情况
if os.path.isfile(dst_dir_f):
os.remove(dst_dir_f)
os.makedirs(dst_dir_f, 384)
def print_x(msg):
if public.is_debug():
print(msg)
# 获取表结构
def get_table_byjson(table, dst_db_name):
nkey = '{}@{}'.format(table, dst_db_name)
alltables = get_database_json(False)
if not nkey in alltables:
return None
db_info = alltables[nkey]
if not 'fields' in db_info or not 'sql' in db_info:
return None
field_keys = []
field_info = {}
for field in db_info['fields']:
field_info[field[1]] = field
field_keys.append('`{}`'.format(field[1]))
data = {}
data['field'] = field_info
data['sql'] = alltables[nkey]['sql']
data['field_keys'] = field_keys
data['wheres'] = []
return data
# 获取表结构
def get_table_info(db_obj, table):
field_info = {}
wheres = []
field_keys = []
tb_list = db_obj.query("PRAGMA table_info({});".format(table))
if not isinstance(tb_list, list):
if 'unable to open database file' in tb_list:
return None
print(table, tb_list)
print_x('{} 表结构读取失败,可能原因表已损坏'.format(table))
return None
for tb in tb_list:
field_info[tb[1]] = tb
field_keys.append('`{}`'.format(tb[1]))
wheres.append('`{}`=?'.format(tb[1]))
res = db_obj.query("SELECT * FROM sqlite_master WHERE type='table' AND name='{}';".format(table))
if not isinstance(res, list):
return None
if len(res) == 0:
return None
data = {}
data['field'] = field_info
data['sql'] = res[0][4]
data['field_keys'] = field_keys
data['wheres'] = wheres
return data
# 检查表是否存在
def check_exists_table(db_obj, table):
res = db_obj.query("SELECT * FROM sqlite_master WHERE type='table' AND name='{}';".format(table))
if 'unable to open database file' in res:
return True
if not isinstance(res, list):
print_x('{} 读取失败,可能原因表已损坏'.format(table))
return False
if len(res) > 0:
return True
return False
# 同步单个表
def sync_db_table(table, dst_db_name, src_db_obj):
try:
if src_db_obj == None:
src_field = get_table_byjson(table, dst_db_name)
else:
src_field = get_table_info(src_db_obj, table)
if not src_field:
return False
# 检查表存在
db_file = '{}/{}'.format(dst_dir, dst_db_name)
dst_obj = get_db_obj(db_file)
if not check_exists_table(dst_obj, table):
print_x('{}数据库 {} 表不存在,准备创建数据库'.format(dst_db_name, table))
res = dst_obj.execute(src_field['sql'])
try:
# 数据库文件损坏
if str(res).find('file is not a database') != -1:
print_x('数据库文件损坏,尝试重新创建!')
if os.path.exists(db_file):
print_x('备份已损坏的数据库文件:{}'.format(db_file))
os.rename(db_file, '{}.{}.bak'.format(db_file, time.strftime('%Y%m%d%H%M%S')))
return sync_db_all(table,dst_db_name,src_db_obj)
except:pass
if str(res).find("error") != -1:
print_x("创建表 {} 错误:{}".format(table, res))
return False
# 修复字段
dst_field = get_table_info(dst_obj, table)
if not dst_field:
return False
is_alert = False
for field in src_field['field']:
if not field in dst_field['field']:
continue
dst_fs = dst_field['field'][field]
src_fs = src_field['field'][field]
d4 = dst_fs[4]
s4 = src_fs[4]
try:
d4 = int(d4)
except:pass
try:
s4 = int(s4)
except:pass
if d4 == '0': d4 = 0
if s4 == '0': s4 = 0
try:
if not d4: d4 = ''
d4 = d4.strip("'").strip('"')
if not d4: d4 = ''
except:pass
try:
if not s4: s4 = ''
s4 = s4.strip("'").strip('"')
except: pass
if d4 == s4: continue
is_alert = True
break
#修改字段类型
if is_alert:
print_x('dst : {} src :{}'.format(dst_fs, src_fs))
print_x('{}数据库 {} 表 {} 字段默认值不一致,准备修改字段值'.format(dst_db_name, table, field))
new_table = 'wz_{}_{}'.format(table,public.md5(str(time.time())))
sql_list = [
"ALTER TABLE {table} RENAME TO {new_table};".format(table=table, new_table=new_table),
"{};".format(src_field['sql'])
]
# 获取目标表字段信息
dst_field_info = get_table_info(dst_obj, table)
# 确保 dst_field_info 不是 None
if dst_field_info is None:
print_x("目标数据库中表 {} 的信息无法获取,可能表不存在。".format(table))
# 在这里处理表不存在的情况,例如创建表或返回错误信息
return False
# 假设 src_field_keys 和 dst_field_keys 分别存储了源表和目标表的字段名
src_field_keys = src_field['field_keys']
dst_field_keys = dst_field_info['field_keys']
# 找到两个列表的交集,即两个表都有的字段
common_fields = list(set(src_field_keys) & set(dst_field_keys))
# 如果有共同的字段,则进行数据迁移
if common_fields:
insert_sql = "INSERT INTO {table}({fields}) SELECT {fields} FROM {new_table};".format(
table=table,
new_table=new_table,
fields=','.join(common_fields) # 使用交集中的字段
)
# print(insert_sql)
sql_list.append(insert_sql)
else:
print_x("没有共同的字段可以迁移。")
for sql_a in sql_list:
res = dst_obj.execute(sql_a)
print_x('sfile - {}'.format(db_file))
if str(res).find("error") != -1:
#还原同步失败
print_x("修改字段类型 {} 错误:{}".format(field, res))
dst_obj.execute("ALTER TABLE {table} RENAME TO {new_table};".format(table=new_table, new_table=table))
return False
#检查缺少字段
dst_field = get_table_info(dst_obj, table)
if dst_field == None:
return False
for field in src_field['field']:
if field in dst_field['field']:
continue
print_x('{}数据库 {} 表 {} 字段不存在,准备创建字段'.format(dst_db_name, table, field))
field_type = src_field['field'][field][2]
defult_val = src_field['field'][field][4]
if field_type.lower() in ['text'] and not defult_val:
defult_val = "''"
res = dst_obj.execute('ALTER TABLE {} ADD COLUMN {} {} DEFAULT {} ;'.format(table, field, field_type, defult_val))
if str(res).find("error") != -1:
print_x("创建字段 {} 错误:{}".format(field, res))
return False
if src_db_obj == None:
return True
# 检查数据是否存在
resp = src_db_obj.query("select count(*) from {};".format(table))
if 'unable to open database file' in resp:
return True
if not isinstance(resp, list):
#print_x('{} -- {}'.format(table,resp))
return False
limit = 1000
total = resp[0][0]
sql_field = ','.join(src_field['field_keys']).strip(',')
insert_sql = "INSERT INTO '{tb_name}' ({field_sql}) VALUES ({create_sql});".format(tb_name=table, field_sql=sql_field, create_sql=",".join(["?"] * len(src_field['field_keys'])))
dst_resp = dst_obj.query("select count(*) from {};".format(table))
if not isinstance(dst_resp, list):
print_x(dst_resp)
return False
# 存在数据跳过迁移
dst_total = dst_resp[0][0]
if dst_total > 0:
return False
for idx in range(0, total, limit):
data_list = src_db_obj.query("SELECT * FROM {tb_name} LIMIT {idx},{limit};".format(tb_name=table, idx=idx, limit=limit))
resp = dst_obj.executemany(insert_sql, data_list)
print_x("{} row:{}".format(table, resp))
except:
print(public.get_error_info())
"""
@name 获取数据库json
@param is_talbe 是否按表名索引,default.db按表名返回,其他数据库防止重复,按表名_数据库名返回
"""
def get_database_json(is_talbe=True):
all_tabs = {}
init_db_dict = json.loads(public.readFile(DATABASES_PATH))
for db_keys in init_db_dict.keys():
for table in init_db_dict[db_keys].keys():
nkey = '{}@{}'.format(table, db_keys)
if is_talbe: nkey = table
if not nkey in all_tabs:
all_tabs[nkey] = {}
else:
#表名重复处理
nkey = '{}@{}'.format(table, db_keys)
if not nkey in all_tabs:
all_tabs[nkey] = {}
all_tabs[nkey]['db'] = db_keys
all_tabs[nkey]['table'] = table
all_tabs[nkey]['sql'] = init_db_dict[db_keys][table]['sql']
all_tabs[nkey]['fields'] = init_db_dict[db_keys][table]['fields']
return all_tabs
"""
@name 数据未迁移成功,修复数据库
"""
def repair_db():
try:
total = 0
check_json = {
'sites': 'site.db',
'users': 'panel.db',
'databases': 'database.db',
'ftps': 'ftp.db',
'config': 'panel.db',
'crontab':'crontab.db'
}
for skey in check_json.keys():
dst_obj = get_db_obj('{}/{}'.format(dst_dir, check_json[skey]))
query_sql = 'SELECT count(*) FROM {} ;'.format(skey)
query_res = dst_obj.query(query_sql)
if 'unable to open database file' in query_res:
continue
try:
if query_res[0][0] > 0:
continue
total += 1
except:
total += 10
if total <= 2:
return False
flag_path = '{}/update'.format(dst_dir)
if os.path.exists(flag_path):
os.remove(flag_path)
sync_db_all()
except:
pass
"""
@name 迁移面板数据库
"""
def get_data_obj(db_file):
"""
@name 获取数据库对象
"""
return get_db_obj(db_file)
def sync_db_all():
num = 0
flag_path = '{}/update'.format(dst_dir)
if os.path.exists(flag_path):
try:
num = int(public.readFile(flag_path))
except:
pass
if num > 3:
return False
nlist = []
all_tabs = get_database_json(True)
n_json = {
'default.db': default_db_obj,
'docker.db': get_data_obj(os.path.join(panelPath, "data/docker.db")),
'script.db':get_data_obj(os.path.join(panelPath, "data/crontab.db"))
}
for key in n_json.keys():
try:
src_db_obj = n_json[key]
resp = src_db_obj.query("SELECT name FROM sqlite_master WHERE type='table' AND name not in ('sqlite_sequence','sqlite_master');")
if 'unable to open database file' in resp or len(resp) == 0:
continue
for table in resp:
try:
table = table[0]
db_info = {'db': key}
skey = '{}@{}'.format(table, key)
if skey in all_tabs:
db_info = all_tabs[skey]
elif table in all_tabs:
db_info = all_tabs[table]
print_x('正在同步表到 {} 数据库 - 1:{}'.format(table, db_info['db']))
sync_db_table(table, db_info['db'], src_db_obj)
nkey = '{}_{}'.format(table, db_info['db'])
nlist.append(nkey)
except:
pass
except:
print_x(public.get_error_info())
# 第二次检测,防止数据库损坏
all_tabs = get_database_json(False)
for nkey in all_tabs.keys():
db_info = all_tabs[nkey]
if not nkey in nlist:
print_x('正在同步表到 {} 数据库:{}'.format(db_info['table'], db_info['db']))
sync_db_table(db_info['table'], db_info['db'], None)
public.writeFile(flag_path, str(num + 1))
def check_db():
public.check_field('backup', 'pid', 'INTEGER DEFAULT 0')
public.check_field('backup', 'type', 'INTEGER DEFAULT 0')
"""
@name 修复字段默认值,重试3次
"""
def repair_column():
num = 0
flag_path = '{}/repair_column'.format(dst_dir)
if os.path.exists(flag_path):
try:
num = int(public.readFile(flag_path))
except:
pass
if num > 3:
return False
data = get_database_json(True)
for table in data:
try:
fields = data[table]['fields']
except:
continue
for field in fields:
if field[4] == None:
continue
sql_shell = "UPDATE {} SET {}={} WHERE {} is null;".format(table, field[1], field[4], field[1])
public.M(table).execute(sql_shell)
public.writeFile(flag_path, str(num + 1))
def get_sql_shell(dfile, table = None):
db_obj = get_db_obj(dfile)
try:
fname = os.path.basename(dfile)
data = {}
data[fname] = {}
resp = db_obj.query("SELECT name FROM sqlite_master WHERE type='table' AND name not in ('sqlite_sequence','sqlite_master');")
if len(resp) == 0:
raise Exception('读取失败,数据库损坏,跳过!')
if table:
info = get_table_info(db_obj, table)
if not info: return False
fields = []
for _,i in info['field'].items():
if i[4] in ['""', None, 'NULL', '0']:
if i[2].lower() in ['integer', 'int', 'real', 'numeric', 'decimal', 'BOOLEAN']:
i[4] = 0
elif i[2].lower() in ['text', 'varchar', 'char']:
i[4] = "''"
fields.append(i)
data[fname][table] = {
'sql': info['sql'],
'fields': fields
}
else:
for table in resp:
table = table[0]
info = get_table_info(db_obj, table)
if not info: continue
fields = []
for _, i in info['field'].items():
if i[4] in ['""', None, 'NULL', '0' , 0]:
if i[2].lower() in ['integer', 'int', 'real', 'numeric', 'decimal', 'BOOLEAN']:
i[4] = 0
elif i[2].lower() in ['text', 'varchar', 'char']:
i[4] = "''"
fields.append(i)
data[fname][table] = {
'sql': info['sql'],
'fields': fields
}
print(json.dumps(data))
return True
except:
print_x('不是有效的数据库文件,跳过!')
return False
#测试sql是否能执行
def test_sql():
dst_dir = '/www/test/db/'
if os.path.exists(dst_dir): shutil.rmtree(dst_dir)
os.makedirs(dst_dir, 755)
all_tabs = get_database_json(False)
for nkey in all_tabs.keys():
db_info = all_tabs[nkey]
print_x('正在同步表到 {} 数据库:{}'.format(db_info['table'], db_info['db']))
src_field = get_table_byjson(db_info['table'], db_info['db'])
dst_obj = get_db_obj('{}/{}'.format(dst_dir, db_info['db']))
if not check_exists_table(dst_obj, db_info['table']):
res = dst_obj.execute(src_field['sql'])
if str(res).find("error") != -1:
print_x("创建表 {} 错误:{}".format(db_info['table'], res))
break
dst_field = get_table_info(dst_obj, db_info['table'])
n_list = []
for field in src_field['field']:
if not field in n_list:
n_list.append(field)
for field in dst_field['field']:
if not field in n_list:
n_list.append(field)
for field in n_list:
if not field in src_field['field']:
print('databases.json文件 {} - {} fields 数组里缺少字段:{}'.format(db_info['db'],db_info['table'],field))
if not field in dst_field['field']:
print('databases.json文件 {} - {} sql语句里缺少字段:{}'.format(db_info['db'],db_info['table'],field))
"""
@name 检查数据库字段
"""
def repair_lack_field():
all_tabs = get_database_json(False)
for nkey in all_tabs.keys():
db_info = all_tabs[nkey]
dst_obj = get_db_obj('{}/{}'.format(dst_dir, db_info['db']))
if not check_exists_table(dst_obj, db_info['table']):
continue
src_field = get_table_byjson(db_info['table'], db_info['db'])
dst_field = get_table_info(dst_obj, db_info['table'])
if not dst_field:
continue
for field in src_field['field']:
if field in dst_field['field']:
continue
res = dst_obj.execute('ALTER TABLE {} ADD COLUMN {} {} DEFAULT {} ;'.format(db_info['table'], field, src_field['field'][field][2], src_field['field'][field][4]))
def check_default_error(sfile):
"""
@name 检查默认数据库是否损坏
"""
try:
default_db_obj = get_db_obj(sfile)
res = default_db_obj.query("SELECT name FROM sqlite_master WHERE type='table' AND name not in ('sqlite_sequence','sqlite_master');")
if type(res) == str:
if res.find('unable to open database file') >= 0:
#磁盘满了,非数据库损耗
return False
if res.find('error:') >= 0:
return True
except:pass
return False
def repair_db_byfile(dst_file):
"""
@name 修复数据库损坏的问题(从历史备份里恢复)
"""
try:
if not check_default_error(dst_file):
return False
print('{} 数据库已损坏'.format(dst_file))
mtime = os.path.getmtime(dst_file)
is_repair = False
for i in range(0, 180):
try:
bak_date = public.format_date(times= mtime - i * 86400, format='%Y-%m-%d')
zfile = '/www/backup/panel/{}.zip'.format(bak_date)
if not os.path.exists(zfile):
continue
public.ExecShell('unzip -o {} -d /www/backup/panel/'.format(zfile))
dst_path = '/www/backup/panel/{}'.format(bak_date)
if not os.path.exists(dst_path):
print('备份文件 {} 解压失败,请检查磁盘空间或者Inode是否正常'.format(zfile))
break
nfile = dst_file.replace('/www/server/panel/data/', '')
db_file = '/www/backup/panel/{}/data/{}'.format(bak_date,nfile)
if not os.path.exists(db_file):
print_x('备份文件 {} 不存在'.format(db_file))
continue
if check_default_error(db_file):
print('备份文件 {} 数据库已损坏'.format(db_file))
continue
if os.path.exists(dst_file):
os.rename(dst_file, '{}.{}.bak'.format(dst_file,time.strftime('%Y%m%d%H%M%S')))
print('正在恢复数据库文件 - {}'.format(bak_date))
shutil.copyfile(db_file, dst_file)
is_repair = True
break
except:
print(public.get_error_info())
if is_repair:
flag_path = '{}/update'.format(dst_dir)
if os.path.exists(flag_path): os.remove(flag_path)
print('数据库损坏,已修复成功')
else:
print('面板数据库恢复失败,已尝试重建,如果面板仍报错,请联系宝塔客服恢复')
nfile = dst_file.replace('/www/server/panel/data/', '')
db_file = '/www/backup/panel/{}/data/{}'.format(bak_date,nfile)
if os.path.exists(dst_file):
os.rename(dst_file, '{}.{}.bak'.format(dst_file,time.strftime('%Y%m%d%H%M%S')))
except: pass
def repair_all_db():
"""
@name 修复所有数据库
"""
db_files = [os.path.join(panelPath, "data/default.db")]
all_tabs = get_database_json(False)
for nkey in all_tabs.keys():
db_info = all_tabs[nkey]
nfile = '{}/{}'.format(dst_dir, db_info['db']).replace('//','/')
if nfile in db_files: continue
db_files.append(nfile)
for db_file in db_files:
repair_db_byfile(db_file)
def check_disk_used():
"""
@name 检测磁盘是否可用
"""
try:
total = 0
db_path = '{}/data/db'.format(panelPath)
for filename in os.listdir(db_path):
if total >= 3: break
if filename.find('.db') == -1:
continue
sfile = '{}/{}'.format(db_path, filename)
db_obj = get_data_obj(sfile)
res = db_obj.query('PRAGMA table_info(sqlite_master);')
if 'unable to open database file' in res:
total += 1
continue
if total >= 3:
return True
except:pass
return False
if __name__ == '__main__':
ERR_INFO = None
if len(sys.argv) > 1:
nkey = sys.argv[1]
if nkey == 'repair':
flag_path = '{}/update'.format(dst_dir)
if os.path.exists(flag_path): os.remove(flag_path)
elif nkey == 'get_sql':
try:
get_sql_shell(sys.argv[2], sys.argv[3])
except:
get_sql_shell(sys.argv[2])
exit()
elif nkey == 'test_sql':
test_sql()
exit()
if check_disk_used():
print('警告:磁盘已满,面板无法显示网站、数据库、计划任务等数据,请先清理磁盘空间')
exit()
repair_all_db()
try:
sync_db_all()
repair_db()
repair_column()
except:pass
repair_lack_field()