import os import re import uuid import requests import mimetypes import functools import time import logging from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, request, render_template_string, session, redirect, url_for, jsonify, send_file from dotenv import load_dotenv # 加载环境变量 load_dotenv() # ================= 日志配置 ================= logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) app = Flask(__name__) # ================= 配置区域 ================= app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24)) APP_TOKEN = os.getenv('APP_TOKEN', 'admin123') STORAGE_MODE = os.getenv('STORAGE_MODE', 'cloud').lower() # 并发控制 MAX_WORKERS = int(os.getenv('MAX_WORKERS', 5)) # 图床配置 UPLOAD_API_URL = os.getenv('UPLOAD_API_URL', "https://your.domain/upload") AUTH_CODE = os.getenv('AUTH_CODE', "your_authCode") # 本地配置 SITE_DOMAIN = os.getenv('SITE_DOMAIN', 'http://127.0.0.1:7860').rstrip('/') LOCAL_IMAGE_FOLDER = 'static/uploads' TEMP_MD_FOLDER = 'static/temp_md' # 确保目录存在 os.makedirs(LOCAL_IMAGE_FOLDER, exist_ok=True) os.makedirs(TEMP_MD_FOLDER, exist_ok=True) HEADERS = {'User-Agent': 'Apifox/1.0.0 (https://apifox.com)'} logger.info("="*30) logger.info(f"Starting MD Migrator...") logger.info(f"Mode: {STORAGE_MODE.upper()}") logger.info(f"Max Workers: {MAX_WORKERS}") logger.info("="*30) # ================= 文件系统管理逻辑 ================= def get_file_list_from_disk(): files_data = [] if not os.path.exists(TEMP_MD_FOLDER): return [] for filename in os.listdir(TEMP_MD_FOLDER): if not filename.endswith('.md'): continue filepath = os.path.join(TEMP_MD_FOLDER, filename) try: mtime = os.path.getmtime(filepath) dt_obj = datetime.fromtimestamp(mtime) time_str = dt_obj.strftime('%Y-%m-%d %H:%M:%S') except: mtime = 0 time_str = "Unknown" # URL 编码处理:文件名中可能有空格,下载链接也需要处理 # 但这里的 filename 是路径参数,通常由浏览器自动编码,这里手动替换空格更保险 encoded_filename = filename.replace(' ', '%20') url = f"{SITE_DOMAIN}/api/download/{encoded_filename}?t={int(mtime)}" files_data.append({ 'filename': filename, 'real_filename': filename, 'timestamp': time_str, 'timestamp_sort': mtime, 'url': url }) files_data.sort(key=lambda x: x['timestamp_sort'], reverse=True) return files_data # ================= 业务逻辑 ================= def auth_required(f): @functools.wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') token_query = request.args.get('token') client_ip = request.remote_addr api_token = None if auth_header and auth_header.startswith("Bearer "): api_token = auth_header.split(" ")[1] elif token_query: api_token = token_query if api_token == APP_TOKEN: return f(*args, **kwargs) if session.get('is_logged_in'): return f(*args, **kwargs) logger.warning(f"Unauthorized access from {client_ip}") if request.path.startswith('/api/'): return jsonify({"error": "Unauthorized"}), 401 return redirect(url_for('login')) return decorated_function def get_extension(url, content_type=None): path = urlparse(url).path ext = os.path.splitext(path)[1] if ext: return ext if content_type: ext = mimetypes.guess_extension(content_type) if ext: return ext return '.jpg' def download_image(url): try: start_time = time.time() resp = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=15) if resp.status_code == 200: logger.info(f"Downloaded: {url} ({len(resp.content)}b, {time.time()-start_time:.2f}s)") return resp.content, resp.headers.get('Content-Type') logger.warning(f"Download {resp.status_code}: {url}") return None, None except Exception as e: logger.error(f"Download failed {url}: {e}") return None, None def upload_to_cloud(image_data, filename, folder_name): try: params = {'authCode': AUTH_CODE, 'uploadFolder': folder_name} files = {'file': (filename, image_data, 'application/octet-stream')} resp = requests.post(UPLOAD_API_URL, params=params, headers=HEADERS, files=files, timeout=30) resp.raise_for_status() res = resp.json() if 'url' in res: return res['url'] if 'data' in res: if isinstance(res['data'], dict) and 'url' in res['data']: return res['data']['url'] return res['data'] return None except Exception as e: logger.error(f"Cloud Upload Error: {e}") return None def save_to_local(image_data, original_url, content_type, folder_name): try: safe_folder = folder_name.replace('..', '').strip('/') save_dir = os.path.join(LOCAL_IMAGE_FOLDER, safe_folder) os.makedirs(save_dir, exist_ok=True) ext = get_extension(original_url, content_type) unique_name = f"{uuid.uuid4().hex}{ext}" path = os.path.join(save_dir, unique_name) with open(path, 'wb') as f: f.write(image_data) # 注意:这里返回的 URL 可能包含中文或空格 return f"{SITE_DOMAIN}/{LOCAL_IMAGE_FOLDER}/{safe_folder}/{unique_name}" except Exception as e: logger.error(f"Local Save Error: {e}") return None # --- 单个图片处理任务 (用于线程池) --- def process_single_image_task(url, filename_no_ext): """ 下载并上传单个图片 """ img_data, c_type = download_image(url) if not img_data: return url, None fname = url.split('/')[-1].split('?')[0] or "image.jpg" new_url = None if STORAGE_MODE == 'cloud': new_url = upload_to_cloud(img_data, fname, filename_no_ext) else: new_url = save_to_local(img_data, url, c_type, filename_no_ext) return url, new_url def process_markdown_content(content, filename_no_ext): """ 并发处理 Markdown 内容 """ pattern = re.compile(r'!\[(.*?)\]\((.*?)\)') matches = pattern.findall(content) unique_urls = set() for _, url in matches: if url.startswith(('http://', 'https://')): if STORAGE_MODE == 'local' and SITE_DOMAIN in url: continue unique_urls.add(url) logger.info(f"Found {len(matches)} images, {len(unique_urls)} need processing.") url_map = {} success_count = 0 failed_count = 0 if unique_urls: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_url = { executor.submit(process_single_image_task, url, filename_no_ext): url for url in unique_urls } for future in as_completed(future_to_url): old_url, new_url = future.result() if new_url: # ==================================================== # 【核心修改】在这里对 URL 进行编码处理 # 将空格替换为 %20,保证 Markdown 解析正常 # ==================================================== encoded_new_url = new_url.replace(' ', '%20') url_map[old_url] = encoded_new_url success_count += 1 else: failed_count += 1 def replace_callback(match): alt_text = match.group(1) original_url = match.group(2) if original_url in url_map: return f'' return match.group(0) new_content = pattern.sub(replace_callback, content) logger.info(f"Task finished. Total: {len(unique_urls)}, Success: {success_count}, Failed: {failed_count}") return new_content def save_processed_md(content, original_filename): safe_filename = os.path.basename(original_filename) save_path = os.path.join(TEMP_MD_FOLDER, safe_filename) with open(save_path, 'w', encoding='utf-8') as f: f.write(content) logger.info(f"Markdown file saved: {save_path}") # 返回下载链接时,文件名也进行编码,防止下载链接断裂 encoded_filename = safe_filename.replace(' ', '%20') return f"{SITE_DOMAIN}/api/download/{encoded_filename}" # ================= HTML 模板 (保持不变) ================= BASE_TEMPLATE = """
| 文件名 | 处理时间 | 操作 |
|---|---|---|
| {{ item.filename }} | {{ item.timestamp }} | 下载 |