| |
| |
| |
| |
| |
| |
| |
| |
| import json |
| import os |
| import sys |
| import time |
| import traceback |
|
|
| os.chdir('/www/server/panel') |
| if not 'class/' in sys.path: |
| sys.path.insert(0, 'class/') |
| |
| |
| from filesModel.base import filesBase |
| import public |
|
|
|
|
|
|
| class main(filesBase): |
| def __init__(self): |
| self.convert_type = { |
| 'audio': ['mp3', 'wav', 'flac'], |
| 'video': ['mp4', 'avi', 'mov', 'mkv'], |
| 'image': ['png', 'jpeg', 'jpg', 'bmp', 'tiff', 'webp'], |
| 'all': ['png', 'jpeg', 'jpg', 'bmp', 'tiff', 'webp', 'mp4', 'avi', 'mov', 'mkv', 'mp3', 'wav', 'flac'] |
| } |
|
|
| def convert_audio(self, input_file, output_file): |
| try: |
| |
| try: |
| import librosa |
| except: |
| os.system('btpip install librosa') |
| import librosa |
|
|
| audio, sr = librosa.load(input_file) |
| |
| try: |
| import soundfile as sf |
| except: |
| os.system('btpip install soundfile') |
| import soundfile as sf |
| sf.write(output_file, audio, sr) |
| if os.path.exists(output_file): |
| self.write_log('{};音频;从{}->{};转换成功!'.format(int(time.time()), input_file, output_file)) |
| else: |
| self.write_log('{};音频;从{}->{};转换失败,未检测到转换后的文件!'.format(int(time.time()), input_file, output_file)) |
| return False |
| if self.is_del: |
| os.remove(input_file) |
| self.write_log('{};音频;从{};已删除源文件!'.format(int(time.time()), input_file)) |
| return True |
| except: |
| print(traceback.format_exc()) |
| self.write_log('{};音频;从{}->{};转换失败!'.format(int(time.time()), input_file, output_file)) |
| if os.path.exists(output_file): |
| os.remove(output_file) |
| return False |
|
|
| def convert_video(self, input_file, output_file): |
| try: |
|
|
| try: |
| import imageio |
| except: |
| os.system('btpip install imageio') |
| import imageio |
|
|
| import imageio |
| |
| reader = imageio.get_reader(input_file) |
| |
| try: |
| meta_data = reader.get_meta_data() |
| writer = imageio.get_writer(output_file, fps=meta_data["fps"]) |
| except ValueError: |
| os.system("btpip install imageio[ffmpeg]") |
| os.system("btpip install imageio[pyav]") |
| meta_data = reader.get_meta_data() |
| writer = imageio.get_writer(output_file, fps=meta_data["fps"]) |
|
|
| |
| for frame in reader: |
| writer.append_data(frame) |
|
|
| |
| reader.close() |
| writer.close() |
|
|
| if os.path.exists(output_file): |
| self.write_log('{};视频;从{}->{};转换成功!'.format(int(time.time()), input_file, output_file)) |
| else: |
| self.write_log('{};视频;从{}->{};转换失败,未检测到转换后的文件!'.format(int(time.time()), input_file, output_file)) |
| print('1111') |
| return False |
| if self.is_del: |
| os.remove(input_file) |
| self.write_log('{};视频;从{};已删除源文件!'.format(int(time.time()), input_file)) |
| return True |
| except: |
| print(traceback.format_exc()) |
| self.write_log('{};视频;从{}->{};转换失败!'.format(int(time.time()), input_file, output_file)) |
| if os.path.exists(output_file): |
| os.remove(output_file) |
| return False |
|
|
| def convert_image(self, input_file, output_file): |
| try: |
|
|
| try: |
| import cv2 |
| except: |
| os.system('btpip install opencv-python') |
| import cv2 |
|
|
| if output_file.split('.')[-1] in ['ico', 'svg'] or input_file.split('.')[-1] in ['ico', 'svg']: |
| from PIL import Image |
| img = Image.open(input_file) |
| img.save(output_file, format=input_file.split('.')[-1]) |
| import cv2 |
| img = cv2.imread(input_file) |
| cv2.imwrite(output_file, img) |
| if os.path.exists(output_file): |
| self.write_log('{};图片;从{}->{};转换成功!'.format(int(time.time()), input_file, output_file)) |
| else: |
| self.write_log('{};图片;从{}->{};转换失败,未检测到转换后的文件!'.format(int(time.time()), input_file, output_file)) |
| return False |
| if self.is_del: |
| os.remove(input_file) |
| self.write_log('{};图片;从{};已删除源文件!'.format(int(time.time()), input_file)) |
| return True |
| except: |
| print(traceback.format_exc()) |
| self.write_log('{};图片;从{}->{};转换失败!'.format(int(time.time()), input_file, output_file)) |
| if os.path.exists(output_file): |
| os.remove(output_file) |
| return False |
|
|
| def run(self, get): |
| try: |
| public.set_module_logs('conversion', 'run_conversion', 1) |
| if not (hasattr(get, 'pdata') and get.pdata != '[]' and get.pdata): |
| return public.returnMsg(False, '转换列表为空!') |
| self.is_del = int(get.get('is_save', 0)) |
| pdata = json.loads(get.pdata) |
| if not pdata: |
| return public.returnMsg(False, '参数错误!') |
| result = [] |
| for i in pdata: |
| |
| type = [k for k, v in self.convert_type.items() if i['input_file'].split('.')[-1] in v] |
| if not type: |
| result.append({'name': i['input_file'].split('/')[-1], 'status': False}) |
| self.write_log('{};{};从{}->{};转换失败,不支持的格式!'.format(int(time.time()), '未知', i['input_file'], i['output_file'])) |
| continue |
| if os.path.exists(i['output_file']): |
| self.write_log('{};{};从{}->{};转换失败,输出地址已存在同名文件!'.format(int(time.time()), '未知', i['input_file'], i['output_file'])) |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': False}) |
| continue |
| type = type[0] |
| if not i['input_file'] or not i['output_file']: |
| return public.returnMsg(False, '参数错误!') |
| |
| if i['input_file'] == i['output_file']: |
| return public.returnMsg(False, '源文件和目标文件不能相同!') |
| |
| if not os.path.exists(i['input_file']): |
| return public.returnMsg(False, '源文件不存在!') |
| |
| if type == 'audio': |
| if i['input_file'].split('.')[-1] not in self.convert_type['audio'] or i['output_file'].split('.')[-1] not in self.convert_type['audio']: |
| return public.returnMsg(False, '不支持的格式转换!') |
| if self.convert_audio(i['input_file'], i['output_file']): |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': True}) |
| else: |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': False}) |
| |
| elif type == 'video': |
| if i['input_file'].split('.')[-1] not in self.convert_type['video'] or i['output_file'].split('.')[-1] not in self.convert_type['video']: |
| return public.returnMsg(False, '不支持的格式转换!') |
| if self.convert_video(i['input_file'], i['output_file']): |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': True}) |
| else: |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': False}) |
| |
| elif type == 'image': |
| if i['input_file'].split('.')[-1] not in self.convert_type['image'] or i['output_file'].split('.')[-1] not in self.convert_type['image']: |
| return public.returnMsg(False, '不支持的格式转换!') |
| if self.convert_image(i['input_file'], i['output_file']): |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': True}) |
| else: |
| result.append({'name': i['input_file'].split('/')[-1] + '->' + i['output_file'].split('/')[-1], 'status': False}) |
| return public.returnMsg(True, result) |
| except: |
| return traceback.format_exc() |
| return public.returnMsg(False, '转换失败!') |
|
|
| def get_convert_liet(self, get=None): |
| |
| try: |
| return public.returnMsg(True, self.convert_type) |
| except: |
| return public.returnMsg(False, '获取转换支持列表失败!') |
|
|
| def write_log(self, log): |
| |
| path = '/www/server/panel/logs/convert.log' |
| if not os.path.exists(path): |
| public.writeFile(path, '') |
| public.writeFile(path, log + '\n', 'a+') |
|
|
| def get_log(self, get=None): |
| try: |
| |
| p = int(get.p) if hasattr(get, 'p') else 1 |
| row = int(get.row) if hasattr(get, 'row') else 10 |
| path = '/www/server/panel/logs/convert.log' |
| if not os.path.exists(path): |
| return public.returnMsg(True, '无日志!') |
| logs = public.readFile(path) |
| logs = logs.strip().split('\n') |
| logs.reverse() |
| logs = [i.split(';') for i in logs] |
| logs = [{'time': i[0], 'type': i[1], 'operation': i[2], 'status': i[3]} for i in logs if len(i) == 4] |
| data = public.get_page(len(logs), p, row) |
| data['data'] = logs[p * row - row: p * row] |
| return public.returnMsg(True, data) |
| except: |
| return traceback.format_exc() |
| return public.returnMsg(False, '获取日志失败!') |
|
|