File size: 6,171 Bytes
3a5cf48 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | import os
import json
import shutil
import datetime
import tarfile
import zipfile
import public
class BaseCompressHandler:
"""压缩文件处理基类"""
def __init__(self):
pass
def get_files(self, sfile):
"""获取压缩包内文件列表"""
pass
def get_file_info(self, sfile,filename):
"""获取压缩包内文件信息"""
pass
def check_file_exists(self, file_path):
"""检查文件是否存在"""
if not os.path.exists(file_path):
return False
return True
class GzHandler(BaseCompressHandler):
"""tar.gz压缩文件处理类"""
def get_filename(self, item):
"""获取压缩包文件名"""
filename = item.name
try:
filename = item.name.encode('cp437').decode('gbk')
except:
pass
if item.isdir():
filename += '/'
return filename
def check_file_type(self, file_path):
"""检查文件是否为tar文件"""
if not tarfile.is_tarfile(file_path):
if file_path[-3:] == ".gz":
return False, '这不是tar.gz压缩包文件, gz压缩包文件不支持预览,仅支持解压'
return False, '不是有效的tar.gz压缩包文件'
return True, ''
def get_files(self, sfile):
"""获取压缩包内文件列表"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
is_valid, message = self.check_file_type(sfile)
if not is_valid:
return public.returnMsg(False, message)
zip_file = tarfile.open(sfile)
data = {}
for item in zip_file.getmembers():
file_name = self.get_filename(item)
temp_list = file_name.split("/")
sub_data = data
for name in temp_list:
if not name: continue
if name not in sub_data:
if file_name.endswith(name) and not ".{}".format(name) in file_name:
sub_data[name] = {
'file_size': item.size,
'filename': name,
'fullpath': file_name,
'date_time': public.format_date(times=item.mtime),
'is_dir': 1 if item.isdir() else 0
}
else:
sub_data[name] = {}
sub_data = sub_data[name]
zip_file.close()
return data
def get_file_info(self, sfile, filename):
"""获取压缩包内文件信息"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
tmp_path = '{}/tmp/{}'.format(public.get_panel_path(), public.md5(sfile + filename))
result = {}
result['status'] = True
result['data'] = ''
with tarfile.open(sfile, 'r') as zip_file:
try:
zip_file.extract(filename, tmp_path)
result['data'] = public.readFile('{}/{}'.format(tmp_path, filename))
except:
pass
public.ExecShell("rm -rf {}".format(tmp_path))
return result
class ZipHandler(BaseCompressHandler):
"""zip压缩文件处理类"""
def check_file_type(self, sfile, is_close=False):
"""检查文件是否为zip文件"""
zip_file = None
try:
zip_file = zipfile.ZipFile(sfile)
except:
pass
if is_close and zip_file:
zip_file.close()
return zip_file
def get_filename(self, item):
"""获取压缩包文件名"""
path = item.filename
try:
path_name = path.encode('cp437').decode('utf-8')
except:
try:
path_name = path.encode('cp437').decode('gbk')
path_name = path_name.encode('utf-8').decode('utf-8')
except:
path_name = path
return path_name
def get_files(self, sfile):
"""获取压缩包内文件列表"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
zip_file = self.check_file_type(sfile)
if not zip_file:
return public.returnMsg(False, 'NOT_ZIP_FILE')
data = {}
for item in zip_file.infolist():
file_name = self.get_filename(item)
temp_list = file_name.lstrip("./").split("/")
sub_data = data
for name in temp_list:
if not name: continue
if name not in sub_data:
if file_name.endswith(name):
sub_data[name] = {
'file_size': item.file_size,
'compress_size': item.compress_size,
'compress_type': item.compress_type,
'filename': name,
'fullpath': file_name,
'date_time': datetime.datetime(*item.date_time).strftime("%Y-%m-%d %H:%M:%S"),
'is_dir': 1 if item.is_dir() else 0
}
else:
sub_data[name] = {}
sub_data = sub_data[name]
zip_file.close()
return data
def get_file_info(self, sfile,filename):
"""获取压缩包内文件信息"""
if not self.check_file_exists(sfile):
return public.returnMsg(False, 'FILE_NOT_EXISTS')
result = {}
result['status'] = True
result['data'] = ''
with zipfile.ZipFile(sfile, 'r') as zip_file:
for item in zip_file.infolist():
z_filename = self.get_filename(item)
if z_filename == filename:
buff = zip_file.read(item.filename)
encoding, srcBody = public.decode_data(buff)
result['encoding'] = encoding
result['data'] = srcBody
break
return result
|