JM-EH-Downloader / eh_logic.py
jscmp4's picture
Update eh_logic.py
2402aa6 verified
import os
import shutil
import subprocess
import gradio as gr
import json
from utils import BASE_DIR, PDF_FINAL_DIR, sanitize_filename, manual_merge_pdf
# 统一的 User-Agent
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
def extract_title_from_json(json_data):
"""
辅助函数:从复杂的 JSON 结构(可能是列表嵌套)中递归查找 title
"""
if isinstance(json_data, dict):
# 优先找 title,没有找 gallery_title
if 'title' in json_data: return json_data['title']
if 'gallery_title' in json_data: return json_data['gallery_title']
# 如果当前层没有,递归找 value
for v in json_data.values():
res = extract_title_from_json(v)
if res: return res
elif isinstance(json_data, list):
# 如果是列表,遍历每个元素找
for v in json_data:
res = extract_title_from_json(v)
if res: return res
return None
def get_gallery_title(eh_url, cookie_file_path):
"""
获取本子标题 (修复版:支持多行 JSON 解析)
"""
try:
cmd = [
"gallery-dl",
"--cookies", cookie_file_path,
"--user-agent", USER_AGENT,
"--dump-json",
"--range", "1",
eh_url
]
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore')
if result.returncode == 0:
# 🔴 关键修复:直接解析整个 stdout 字符串,而不是一行行读
try:
data = json.loads(result.stdout)
title = extract_title_from_json(data)
if title: return str(title)
except json.JSONDecodeError:
# 如果整体解析失败,再尝试处理混合数据流的情况(虽然现在应该不需要了)
pass
except Exception as e:
print(f"[-] 获取标题失败: {e}")
return None
def debug_eh_info(eh_url, cookies_str):
"""
调试工具函数 (修复版)
"""
debug_report = []
debug_report.append(f"🔍 [Debug] 开始分析链接: {eh_url}")
eh_base_dir = os.path.join(BASE_DIR, "eh_debug_temp")
os.makedirs(eh_base_dir, exist_ok=True)
cookie_file_path = os.path.join(BASE_DIR, "eh_cookies.txt")
with open(cookie_file_path, "w", encoding="utf-8") as f:
f.write(cookies_str)
debug_report.append(f"🍪 [Debug] Cookies 已写入")
cmd = [
"gallery-dl",
"--cookies", cookie_file_path,
"--user-agent", USER_AGENT,
"--dump-json",
"--range", "1",
"--verbose",
eh_url
]
try:
process = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
errors='ignore'
)
debug_report.append(f"🏷️ [Return Code]: {process.returncode}")
# 尝试解析
debug_report.append("\n🧐 [Debug] 尝试解析 JSON:")
try:
# 🔴 关键修复:和上面一样,整体解析
data = json.loads(process.stdout)
title = extract_title_from_json(data)
if title:
debug_report.append(f"✅ 成功解析! 标题是: {title}")
else:
debug_report.append("⚠️ JSON 有效,但没找到 title 字段。")
except Exception as e:
debug_report.append(f"❌ JSON 解析失败: {e}")
debug_report.append("可能是返回的数据不是纯 JSON,以下是原始输出:")
debug_report.append(process.stdout[:1000] + "...")
except Exception as e:
debug_report.append(f"💥 执行出错: {e}")
return "\n".join(debug_report)
def run_eh_download(eh_url, cookies_str, quality, progress=gr.Progress()):
"""
E-Hentai 下载主逻辑 (标题修复版)
"""
print(f"[-] Debug:收到任务 URL: {eh_url}, 画质: {quality}")
if not eh_url:
yield None, "❌ 请输入画廊链接", "参数缺失"
return
progress(0, desc="正在初始化...")
eh_base_dir = os.path.join(BASE_DIR, "eh_temp")
if os.path.exists(eh_base_dir):
try: shutil.rmtree(eh_base_dir)
except: pass
os.makedirs(eh_base_dir, exist_ok=True)
# 1. 写入 Cookies
cookie_file_path = os.path.join(BASE_DIR, "eh_cookies.txt")
with open(cookie_file_path, "w", encoding="utf-8") as f:
f.write(cookies_str)
# 2. 获取标题
progress(0.1, desc="正在解析标题...")
real_title = get_gallery_title(eh_url, cookie_file_path)
if real_title:
clean_title = sanitize_filename(real_title)
yield None, f"📖 识别到标题: {real_title}", "标题获取成功"
else:
clean_title = "Unknown_Gallery"
yield None, "⚠️ 标题解析失败,将使用默认名称", "标题未知"
# 3. 开始下载
cmd = [
"gallery-dl",
"--verbose",
"--user-agent", USER_AGENT,
"--directory", eh_base_dir,
"--cookies", cookie_file_path,
eh_url
]
yield None, f"🚀 正在启动下载...", "启动中"
progress(0.2, desc=f"正在下载: {clean_title[:15]}...")
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
log_buffer = []
img_count = 0
for line in process.stdout:
line = line.strip()
if not line: continue
log_buffer.append(line)
display_text = "\n".join(log_buffer[-20:])
if "#" in line and "http" in line:
img_count += 1
progress(None, desc=f"下载中 ({img_count}张)...")
if "403 Forbidden" in line:
progress(None, desc="❌ 403 拒绝访问")
yield None, display_text, f"已下载: {img_count}"
process.wait()
if process.returncode != 0:
yield None, f"❌ 下载失败 (Code {process.returncode})", "失败"
return
except Exception as e:
yield None, f"❌ 调用出错: {e}", "系统错误"
return
# 4. 寻找图片
progress(0.9, desc="打包中...")
target_img_dir = None
for root, dirs, files in os.walk(eh_base_dir):
if any(f.lower().endswith(('.jpg', '.png', '.jpeg', '.webp')) for f in files):
target_img_dir = root
break
if not target_img_dir:
yield None, "❌ 未找到图片", "空目录"
return
# 5. 合并 PDF
if len(clean_title) > 100: clean_title = clean_title[:100]
quality_tag = "Original" if quality >= 100 else f"Q{quality}"
final_pdf_name = f"[EH][{quality_tag}] {clean_title}.pdf"
final_pdf_path = os.path.join(PDF_FINAL_DIR, final_pdf_name)
yield None, f"🔨 正在生成: {final_pdf_name}...", "合并中"
try:
manual_merge_pdf(target_img_dir, final_pdf_path, quality=quality)
except Exception as e:
yield None, f"❌ 合并失败: {e}", "合并失败"
return
progress(1.0, desc="完成!")
yield [final_pdf_path], f"✅ 处理完成!\n文件名: {final_pdf_name}", "完成"