Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import subprocess | |
| import gradio as gr | |
| import json | |
| from utils import BASE_DIR, PDF_FINAL_DIR, sanitize_filename, manual_merge_pdf | |
| # 统一的 User-Agent | |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| def extract_title_from_json(json_data): | |
| """ | |
| 辅助函数:从复杂的 JSON 结构(可能是列表嵌套)中递归查找 title | |
| """ | |
| if isinstance(json_data, dict): | |
| # 优先找 title,没有找 gallery_title | |
| if 'title' in json_data: return json_data['title'] | |
| if 'gallery_title' in json_data: return json_data['gallery_title'] | |
| # 如果当前层没有,递归找 value | |
| for v in json_data.values(): | |
| res = extract_title_from_json(v) | |
| if res: return res | |
| elif isinstance(json_data, list): | |
| # 如果是列表,遍历每个元素找 | |
| for v in json_data: | |
| res = extract_title_from_json(v) | |
| if res: return res | |
| return None | |
| def get_gallery_title(eh_url, cookie_file_path): | |
| """ | |
| 获取本子标题 (修复版:支持多行 JSON 解析) | |
| """ | |
| try: | |
| cmd = [ | |
| "gallery-dl", | |
| "--cookies", cookie_file_path, | |
| "--user-agent", USER_AGENT, | |
| "--dump-json", | |
| "--range", "1", | |
| eh_url | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore') | |
| if result.returncode == 0: | |
| # 🔴 关键修复:直接解析整个 stdout 字符串,而不是一行行读 | |
| try: | |
| data = json.loads(result.stdout) | |
| title = extract_title_from_json(data) | |
| if title: return str(title) | |
| except json.JSONDecodeError: | |
| # 如果整体解析失败,再尝试处理混合数据流的情况(虽然现在应该不需要了) | |
| pass | |
| except Exception as e: | |
| print(f"[-] 获取标题失败: {e}") | |
| return None | |
| def debug_eh_info(eh_url, cookies_str): | |
| """ | |
| 调试工具函数 (修复版) | |
| """ | |
| debug_report = [] | |
| debug_report.append(f"🔍 [Debug] 开始分析链接: {eh_url}") | |
| eh_base_dir = os.path.join(BASE_DIR, "eh_debug_temp") | |
| os.makedirs(eh_base_dir, exist_ok=True) | |
| cookie_file_path = os.path.join(BASE_DIR, "eh_cookies.txt") | |
| with open(cookie_file_path, "w", encoding="utf-8") as f: | |
| f.write(cookies_str) | |
| debug_report.append(f"🍪 [Debug] Cookies 已写入") | |
| cmd = [ | |
| "gallery-dl", | |
| "--cookies", cookie_file_path, | |
| "--user-agent", USER_AGENT, | |
| "--dump-json", | |
| "--range", "1", | |
| "--verbose", | |
| eh_url | |
| ] | |
| try: | |
| process = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| encoding='utf-8', | |
| errors='ignore' | |
| ) | |
| debug_report.append(f"🏷️ [Return Code]: {process.returncode}") | |
| # 尝试解析 | |
| debug_report.append("\n🧐 [Debug] 尝试解析 JSON:") | |
| try: | |
| # 🔴 关键修复:和上面一样,整体解析 | |
| data = json.loads(process.stdout) | |
| title = extract_title_from_json(data) | |
| if title: | |
| debug_report.append(f"✅ 成功解析! 标题是: {title}") | |
| else: | |
| debug_report.append("⚠️ JSON 有效,但没找到 title 字段。") | |
| except Exception as e: | |
| debug_report.append(f"❌ JSON 解析失败: {e}") | |
| debug_report.append("可能是返回的数据不是纯 JSON,以下是原始输出:") | |
| debug_report.append(process.stdout[:1000] + "...") | |
| except Exception as e: | |
| debug_report.append(f"💥 执行出错: {e}") | |
| return "\n".join(debug_report) | |
| def run_eh_download(eh_url, cookies_str, quality, progress=gr.Progress()): | |
| """ | |
| E-Hentai 下载主逻辑 (标题修复版) | |
| """ | |
| print(f"[-] Debug:收到任务 URL: {eh_url}, 画质: {quality}") | |
| if not eh_url: | |
| yield None, "❌ 请输入画廊链接", "参数缺失" | |
| return | |
| progress(0, desc="正在初始化...") | |
| eh_base_dir = os.path.join(BASE_DIR, "eh_temp") | |
| if os.path.exists(eh_base_dir): | |
| try: shutil.rmtree(eh_base_dir) | |
| except: pass | |
| os.makedirs(eh_base_dir, exist_ok=True) | |
| # 1. 写入 Cookies | |
| cookie_file_path = os.path.join(BASE_DIR, "eh_cookies.txt") | |
| with open(cookie_file_path, "w", encoding="utf-8") as f: | |
| f.write(cookies_str) | |
| # 2. 获取标题 | |
| progress(0.1, desc="正在解析标题...") | |
| real_title = get_gallery_title(eh_url, cookie_file_path) | |
| if real_title: | |
| clean_title = sanitize_filename(real_title) | |
| yield None, f"📖 识别到标题: {real_title}", "标题获取成功" | |
| else: | |
| clean_title = "Unknown_Gallery" | |
| yield None, "⚠️ 标题解析失败,将使用默认名称", "标题未知" | |
| # 3. 开始下载 | |
| cmd = [ | |
| "gallery-dl", | |
| "--verbose", | |
| "--user-agent", USER_AGENT, | |
| "--directory", eh_base_dir, | |
| "--cookies", cookie_file_path, | |
| eh_url | |
| ] | |
| yield None, f"🚀 正在启动下载...", "启动中" | |
| progress(0.2, desc=f"正在下载: {clean_title[:15]}...") | |
| try: | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1 | |
| ) | |
| log_buffer = [] | |
| img_count = 0 | |
| for line in process.stdout: | |
| line = line.strip() | |
| if not line: continue | |
| log_buffer.append(line) | |
| display_text = "\n".join(log_buffer[-20:]) | |
| if "#" in line and "http" in line: | |
| img_count += 1 | |
| progress(None, desc=f"下载中 ({img_count}张)...") | |
| if "403 Forbidden" in line: | |
| progress(None, desc="❌ 403 拒绝访问") | |
| yield None, display_text, f"已下载: {img_count}" | |
| process.wait() | |
| if process.returncode != 0: | |
| yield None, f"❌ 下载失败 (Code {process.returncode})", "失败" | |
| return | |
| except Exception as e: | |
| yield None, f"❌ 调用出错: {e}", "系统错误" | |
| return | |
| # 4. 寻找图片 | |
| progress(0.9, desc="打包中...") | |
| target_img_dir = None | |
| for root, dirs, files in os.walk(eh_base_dir): | |
| if any(f.lower().endswith(('.jpg', '.png', '.jpeg', '.webp')) for f in files): | |
| target_img_dir = root | |
| break | |
| if not target_img_dir: | |
| yield None, "❌ 未找到图片", "空目录" | |
| return | |
| # 5. 合并 PDF | |
| if len(clean_title) > 100: clean_title = clean_title[:100] | |
| quality_tag = "Original" if quality >= 100 else f"Q{quality}" | |
| final_pdf_name = f"[EH][{quality_tag}] {clean_title}.pdf" | |
| final_pdf_path = os.path.join(PDF_FINAL_DIR, final_pdf_name) | |
| yield None, f"🔨 正在生成: {final_pdf_name}...", "合并中" | |
| try: | |
| manual_merge_pdf(target_img_dir, final_pdf_path, quality=quality) | |
| except Exception as e: | |
| yield None, f"❌ 合并失败: {e}", "合并失败" | |
| return | |
| progress(1.0, desc="完成!") | |
| yield [final_pdf_path], f"✅ 处理完成!\n文件名: {final_pdf_name}", "完成" |