import os import re import uuid import requests import mimetypes import functools import time import logging from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, request, render_template_string, session, redirect, url_for, jsonify, send_file from dotenv import load_dotenv # 加载环境变量 load_dotenv() # ================= 日志配置 ================= logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) app = Flask(__name__) # ================= 配置区域 ================= app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24)) APP_TOKEN = os.getenv('APP_TOKEN', 'admin123') STORAGE_MODE = os.getenv('STORAGE_MODE', 'cloud').lower() # 并发控制 MAX_WORKERS = int(os.getenv('MAX_WORKERS', 5)) # 图床配置 UPLOAD_API_URL = os.getenv('UPLOAD_API_URL', "https://your.domain/upload") AUTH_CODE = os.getenv('AUTH_CODE', "your_authCode") # 本地配置 SITE_DOMAIN = os.getenv('SITE_DOMAIN', 'http://127.0.0.1:7860').rstrip('/') LOCAL_IMAGE_FOLDER = 'static/uploads' TEMP_MD_FOLDER = 'static/temp_md' # 确保目录存在 os.makedirs(LOCAL_IMAGE_FOLDER, exist_ok=True) os.makedirs(TEMP_MD_FOLDER, exist_ok=True) HEADERS = {'User-Agent': 'Apifox/1.0.0 (https://apifox.com)'} logger.info("="*30) logger.info(f"Starting MD Migrator...") logger.info(f"Mode: {STORAGE_MODE.upper()}") logger.info(f"Max Workers: {MAX_WORKERS}") logger.info("="*30) # ================= 文件系统管理逻辑 ================= def get_file_list_from_disk(): files_data = [] if not os.path.exists(TEMP_MD_FOLDER): return [] for filename in os.listdir(TEMP_MD_FOLDER): if not filename.endswith('.md'): continue filepath = os.path.join(TEMP_MD_FOLDER, filename) try: mtime = os.path.getmtime(filepath) dt_obj = datetime.fromtimestamp(mtime) time_str = dt_obj.strftime('%Y-%m-%d %H:%M:%S') except: mtime = 0 time_str = "Unknown" # URL 编码处理:文件名中可能有空格,下载链接也需要处理 # 但这里的 filename 是路径参数,通常由浏览器自动编码,这里手动替换空格更保险 encoded_filename = filename.replace(' ', '%20') url = f"{SITE_DOMAIN}/api/download/{encoded_filename}?t={int(mtime)}" files_data.append({ 'filename': filename, 'real_filename': filename, 'timestamp': time_str, 'timestamp_sort': mtime, 'url': url }) files_data.sort(key=lambda x: x['timestamp_sort'], reverse=True) return files_data # ================= 业务逻辑 ================= def auth_required(f): @functools.wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') token_query = request.args.get('token') client_ip = request.remote_addr api_token = None if auth_header and auth_header.startswith("Bearer "): api_token = auth_header.split(" ")[1] elif token_query: api_token = token_query if api_token == APP_TOKEN: return f(*args, **kwargs) if session.get('is_logged_in'): return f(*args, **kwargs) logger.warning(f"Unauthorized access from {client_ip}") if request.path.startswith('/api/'): return jsonify({"error": "Unauthorized"}), 401 return redirect(url_for('login')) return decorated_function def get_extension(url, content_type=None): path = urlparse(url).path ext = os.path.splitext(path)[1] if ext: return ext if content_type: ext = mimetypes.guess_extension(content_type) if ext: return ext return '.jpg' def download_image(url): try: start_time = time.time() resp = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=15) if resp.status_code == 200: logger.info(f"Downloaded: {url} ({len(resp.content)}b, {time.time()-start_time:.2f}s)") return resp.content, resp.headers.get('Content-Type') logger.warning(f"Download {resp.status_code}: {url}") return None, None except Exception as e: logger.error(f"Download failed {url}: {e}") return None, None def upload_to_cloud(image_data, filename, folder_name): try: params = {'authCode': AUTH_CODE, 'uploadFolder': folder_name} files = {'file': (filename, image_data, 'application/octet-stream')} resp = requests.post(UPLOAD_API_URL, params=params, headers=HEADERS, files=files, timeout=30) resp.raise_for_status() res = resp.json() if 'url' in res: return res['url'] if 'data' in res: if isinstance(res['data'], dict) and 'url' in res['data']: return res['data']['url'] return res['data'] return None except Exception as e: logger.error(f"Cloud Upload Error: {e}") return None def save_to_local(image_data, original_url, content_type, folder_name): try: safe_folder = folder_name.replace('..', '').strip('/') save_dir = os.path.join(LOCAL_IMAGE_FOLDER, safe_folder) os.makedirs(save_dir, exist_ok=True) ext = get_extension(original_url, content_type) unique_name = f"{uuid.uuid4().hex}{ext}" path = os.path.join(save_dir, unique_name) with open(path, 'wb') as f: f.write(image_data) # 注意:这里返回的 URL 可能包含中文或空格 return f"{SITE_DOMAIN}/{LOCAL_IMAGE_FOLDER}/{safe_folder}/{unique_name}" except Exception as e: logger.error(f"Local Save Error: {e}") return None # --- 单个图片处理任务 (用于线程池) --- def process_single_image_task(url, filename_no_ext): """ 下载并上传单个图片 """ img_data, c_type = download_image(url) if not img_data: return url, None fname = url.split('/')[-1].split('?')[0] or "image.jpg" new_url = None if STORAGE_MODE == 'cloud': new_url = upload_to_cloud(img_data, fname, filename_no_ext) else: new_url = save_to_local(img_data, url, c_type, filename_no_ext) return url, new_url def process_markdown_content(content, filename_no_ext): """ 并发处理 Markdown 内容 """ pattern = re.compile(r'!\[(.*?)\]\((.*?)\)') matches = pattern.findall(content) unique_urls = set() for _, url in matches: if url.startswith(('http://', 'https://')): if STORAGE_MODE == 'local' and SITE_DOMAIN in url: continue unique_urls.add(url) logger.info(f"Found {len(matches)} images, {len(unique_urls)} need processing.") url_map = {} success_count = 0 failed_count = 0 if unique_urls: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_url = { executor.submit(process_single_image_task, url, filename_no_ext): url for url in unique_urls } for future in as_completed(future_to_url): old_url, new_url = future.result() if new_url: # ==================================================== # 【核心修改】在这里对 URL 进行编码处理 # 将空格替换为 %20,保证 Markdown 解析正常 # ==================================================== encoded_new_url = new_url.replace(' ', '%20') url_map[old_url] = encoded_new_url success_count += 1 else: failed_count += 1 def replace_callback(match): alt_text = match.group(1) original_url = match.group(2) if original_url in url_map: return f'![{alt_text}]({url_map[original_url]})' return match.group(0) new_content = pattern.sub(replace_callback, content) logger.info(f"Task finished. Total: {len(unique_urls)}, Success: {success_count}, Failed: {failed_count}") return new_content def save_processed_md(content, original_filename): safe_filename = os.path.basename(original_filename) save_path = os.path.join(TEMP_MD_FOLDER, safe_filename) with open(save_path, 'w', encoding='utf-8') as f: f.write(content) logger.info(f"Markdown file saved: {save_path}") # 返回下载链接时,文件名也进行编码,防止下载链接断裂 encoded_filename = safe_filename.replace(' ', '%20') return f"{SITE_DOMAIN}/api/download/{encoded_filename}" # ================= HTML 模板 (保持不变) ================= BASE_TEMPLATE = """ Markdown 资源迁移器 {{ content_html|safe }} """ LOGIN_CONTENT = """

系统登录

{% if error %}
{{ error }}
{% endif %}
""" INDEX_CONTENT = """

MD 图片迁移 {{ mode|upper }}

退出

点击或拖拽 Markdown 文件

历史文件列表

{% if history %}
{% for item in history %} {% endfor %}
文件名处理时间操作
{{ item.filename }} {{ item.timestamp }} 下载
{% else %}
暂无记录
{% endif %}
""" SUCCESS_CONTENT = """

处理成功!

文件已保存。

立即下载 返回列表
""" # ================= 路由定义 ================= @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': client_ip = request.remote_addr if request.form.get('token') == APP_TOKEN: session['is_logged_in'] = True logger.info(f"Web Login Success | IP: {client_ip}") return redirect(url_for('index')) else: logger.warning(f"Web Login Failed | IP: {client_ip}") return render_template_string(BASE_TEMPLATE, content_html=render_template_string(LOGIN_CONTENT, error="无效的 Token")) return render_template_string(BASE_TEMPLATE, content_html=render_template_string(LOGIN_CONTENT, error=None)) @app.route('/logout') def logout(): logger.info(f"Web Logout | IP: {request.remote_addr}") session.pop('is_logged_in', None) return redirect(url_for('login')) @app.route('/', methods=['GET', 'POST']) @auth_required def index(): if request.method == 'POST': if 'file' not in request.files: return "无文件", 400 file = request.files['file'] if not file.filename: return "未选择文件", 400 try: logger.info(f"Received file upload (Web): {file.filename}") md_name = os.path.splitext(file.filename)[0] content = file.read().decode('utf-8', errors='ignore') new_content = process_markdown_content(content, md_name) download_url = save_processed_md(new_content, file.filename) return render_template_string(BASE_TEMPLATE, content_html=render_template_string(SUCCESS_CONTENT, download_url=download_url)) except Exception as e: logger.error(f"Web Process Error: {e}", exc_info=True) return f"Error: {e}", 500 history = get_file_list_from_disk() inner_html = render_template_string(INDEX_CONTENT, mode=STORAGE_MODE, history=history) return render_template_string(BASE_TEMPLATE, content_html=inner_html) @app.route('/api/process', methods=['POST']) @auth_required def api_process(): if 'file' not in request.files: return jsonify({"code": 400, "error": "No file uploaded"}), 400 file = request.files['file'] if not file.filename: return jsonify({"code": 400, "error": "Empty filename"}), 400 try: logger.info(f"Received file upload (API): {file.filename}") md_name = os.path.splitext(file.filename)[0] content = file.read().decode('utf-8', errors='ignore') new_content = process_markdown_content(content, md_name) download_url = save_processed_md(new_content, file.filename) return jsonify({"code": 200, "message": "success", "filename": file.filename, "url": download_url}) except Exception as e: logger.error(f"API Process Error: {e}", exc_info=True) return jsonify({"code": 500, "error": str(e)}), 500 @app.route('/api/history', methods=['GET']) @auth_required def api_history(): try: files = get_file_list_from_disk() return jsonify({"code": 200, "message": "success", "data": files}) except Exception as e: logger.error(f"API History Error: {e}") return jsonify({"code": 500, "error": str(e)}), 500 @app.route('/api/download/', methods=['GET']) @auth_required def api_download(filename): safe_filename = os.path.basename(filename) file_path = os.path.join(TEMP_MD_FOLDER, safe_filename) if not os.path.exists(file_path): logger.warning(f"Download not found: {safe_filename}") return jsonify({"code": 404, "error": "File not found"}), 404 try: logger.info(f"File downloaded: {safe_filename}") return send_file(file_path, as_attachment=True, download_name=safe_filename, mimetype='text/markdown') except Exception as e: logger.error(f"Download Error: {e}") return jsonify({"code": 500, "error": str(e)}), 500 if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(debug=True, host='0.0.0.0', port=port)