| import os |
| import sys |
| from tqdm import tqdm |
| from core.text_processor import TextProcessor |
| from core.character_extractor import CharacterExtractor |
| from core.character_analyzer import CharacterAnalyzer |
| from core.character_agent import CharacterAgent |
| from utils.text_utils import TextUtils |
| from utils.cache_manager import CacheManager |
| from config import Config |
|
|
| def print_banner(): |
| """打印欢迎横幅""" |
| banner = """ |
| ╔══════════════════════════════════════════════════════════════════╗ |
| ║ ║ |
| ║ 🎭 小说角色 Agent 系统 (大规模文本版) ║ |
| ║ ║ |
| ║ 基于 AI 的角色性格分析与对话系统 ║ |
| ║ ║ |
| ╚══════════════════════════════════════════════════════════════════╝ |
| """ |
| print(banner) |
|
|
| def load_novel(file_path: str) -> str: |
| """加载小说文本""" |
| try: |
| |
| encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1', 'utf-16'] |
| |
| for encoding in encodings: |
| try: |
| with open(file_path, 'r', encoding=encoding) as f: |
| text = f.read() |
| print(f"✓ 成功使用 {encoding} 编码加载文件") |
| return text |
| except UnicodeDecodeError: |
| continue |
| |
| print("✗ 所有编码尝试失败") |
| return "" |
| |
| except FileNotFoundError: |
| print(f"✗ 文件不存在: {file_path}") |
| return "" |
| except Exception as e: |
| print(f"✗ 加载小说失败: {e}") |
| return "" |
|
|
| def display_statistics(stats: dict): |
| """显示文本统计信息""" |
| print("\n" + "="*70) |
| print("📊 文本统计信息") |
| print("="*70) |
| print(f"总字符数: {stats['total_length']:,}") |
| print(f"Token 数量: {stats['total_tokens']:,}") |
| print(f"段落数: {stats['paragraphs']:,}") |
| print(f"句子数: {stats['sentences']:,}") |
| |
| lang_map = {'zh': '中文', 'en': '英文', 'mixed': '中英混合', 'unknown': '未知'} |
| print(f"检测语言: {lang_map.get(stats['language'], stats['language'])}") |
| |
| |
| text_utils = TextUtils() |
| reading_time = text_utils.estimate_reading_time(" " * stats['total_length']) |
| print(f"预计阅读: 约 {reading_time} 分钟") |
| print("="*70) |
|
|
| def select_character_interactive(characters: list) -> dict: |
| """交互式选择角色""" |
| print("\n" + "="*70) |
| print("📋 检测到的主要角色") |
| print("="*70) |
| print(f"{'序号':<6}{'角色名':<25}{'出现次数':<12}{'分布章节':<12}") |
| print("-"*70) |
| |
| for i, char in enumerate(characters[:15], 1): |
| name = char['name'] |
| count = char['info']['count'] |
| chunks = len(char['info']['chunks']) |
| print(f"{i:<6}{name:<25}{count:<12}{chunks:<12}") |
| |
| print("="*70) |
| |
| while True: |
| try: |
| choice = input(f"\n请选择角色编号 (1-{min(15, len(characters))}): ").strip() |
| |
| if choice.isdigit(): |
| idx = int(choice) - 1 |
| if 0 <= idx < len(characters): |
| return characters[idx] |
| |
| print("❌ 无效选择,请重试") |
| except KeyboardInterrupt: |
| print("\n\n👋 程序已退出") |
| sys.exit(0) |
| except: |
| print("❌ 输入错误,请重试") |
|
|
| def interactive_chat(agent: CharacterAgent): |
| """交互式对话界面""" |
| |
| print("\n" + "="*70) |
| print(agent.get_character_info()) |
| print("\n💬 对话开始!") |
| print("-"*70) |
| print("💡 提示:") |
| print(" • 输入 'quit' 或 'exit' - 退出对话") |
| print(" • 输入 'reset' - 重置对话历史") |
| print(" • 输入 'save' - 保存对话") |
| print(" • 输入 'info' - 查看角色信息") |
| print(" • 输入 'help' - 显示帮助") |
| print("="*70 + "\n") |
| |
| char_name = agent.character_profile['name'] |
| |
| while True: |
| try: |
| |
| user_input = input("🧑 你: ").strip() |
| |
| if not user_input: |
| continue |
| |
| |
| if user_input.lower() in ['quit', 'exit', '退出', 'q']: |
| print(f"\n👋 {char_name}: 再见,很高兴和你聊天!") |
| |
| |
| if len(agent.conversation_history) > 0: |
| save = input("\n是否保存对话记录?(y/n): ").strip().lower() |
| if save in ['y', 'yes', '是']: |
| filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json" |
| agent.save_conversation(filename) |
| break |
| |
| if user_input.lower() in ['reset', '重置']: |
| agent.reset_conversation() |
| continue |
| |
| if user_input.lower() in ['save', '保存']: |
| filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json" |
| agent.save_conversation(filename) |
| continue |
| |
| if user_input.lower() in ['info', '信息']: |
| print(agent.get_character_info()) |
| continue |
| |
| if user_input.lower() in ['help', '帮助']: |
| print("\n可用命令:") |
| print(" quit/exit - 退出对话") |
| print(" reset - 重置对话历史") |
| print(" save - 保存对话") |
| print(" info - 查看角色信息") |
| print(" help - 显示此帮助\n") |
| continue |
| |
| |
| print(f"\n{'⏳ ' + char_name + ' 正在思考...':<70}", end='\r') |
| response = agent.chat(user_input) |
| print(" " * 70, end='\r') |
| print(f"🎭 {char_name}: {response}\n") |
| |
| except KeyboardInterrupt: |
| print(f"\n\n👋 {char_name}: 再见!") |
| break |
| except Exception as e: |
| print(f"\n❌ 错误: {e}\n") |
|
|
| def check_environment(): |
| """检查运行环境""" |
| issues = [] |
| |
| |
| if not Config.OPENAI_API_KEY or Config.OPENAI_API_KEY == "": |
| issues.append("未设置 OPENAI_API_KEY") |
| |
| |
| if not os.path.exists(Config.CACHE_DIR): |
| try: |
| os.makedirs(Config.CACHE_DIR) |
| except: |
| issues.append(f"无法创建缓存目录: {Config.CACHE_DIR}") |
| |
| |
| try: |
| import openai |
| import chromadb |
| import tiktoken |
| except ImportError as e: |
| issues.append(f"缺少必要的包: {e}") |
| |
| if issues: |
| print("\n⚠️ 环境检查发现问题:") |
| for issue in issues: |
| print(f" • {issue}") |
| print("\n请检查配置文件 .env 和依赖安装\n") |
| return False |
| |
| return True |
|
|
| def main(): |
| """主函数 - 完整流程""" |
| |
| |
| print_banner() |
| |
| |
| if not check_environment(): |
| return |
| |
| |
| cache = CacheManager() |
| cache_info = cache.get_cache_info() |
| if cache_info['count'] > 0: |
| print(f"📦 缓存: {cache_info['count']} 个文件, {cache_info['size_mb']} MB") |
| |
| |
| print("\n" + "="*70) |
| print("📖 步骤 1/5: 加载小说") |
| print("="*70) |
| |
| default_path = "sample_novels/harry_potter_sample.txt" |
| novel_path = input(f"\n请输入小说文件路径 (默认: {default_path})\n> ").strip() |
| |
| if not novel_path: |
| novel_path = default_path |
| |
| if not os.path.exists(novel_path): |
| print(f"❌ 文件不存在: {novel_path}") |
| |
| |
| alt_path = os.path.join("sample_novels", os.path.basename(novel_path)) |
| if os.path.exists(alt_path): |
| print(f"✓ 找到文件: {alt_path}") |
| novel_path = alt_path |
| else: |
| print("程序退出") |
| return |
| |
| print(f"\n正在加载: {novel_path}") |
| novel_text = load_novel(novel_path) |
| |
| if not novel_text: |
| print("❌ 无法加载小说,程序退出") |
| return |
| |
| |
| processor = TextProcessor() |
| stats = processor.get_statistics(novel_text) |
| display_statistics(stats) |
| |
| |
| if stats['total_length'] < 1000: |
| print("⚠️ 警告: 文本过短 (< 1000字符),可能影响分析效果") |
| proceed = input("是否继续?(y/n): ").strip().lower() |
| if proceed not in ['y', 'yes', '是']: |
| return |
| |
| |
| print("\n" + "="*70) |
| print("📄 步骤 2/5: 文本分块处理") |
| print("="*70) |
| |
| chunks = processor.chunk_text(novel_text) |
| print(f"✓ 文本已分为 {len(chunks)} 个块") |
| print(f" 平均每块: {stats['total_length'] // len(chunks)} 字符") |
| |
| |
| print("\n" + "="*70) |
| print("👥 步骤 3/5: 提取主要角色") |
| print("="*70) |
| |
| extractor = CharacterExtractor() |
| characters = extractor.extract_main_characters( |
| chunks, |
| text_sample=novel_text[:3000], |
| language=stats['language'] |
| ) |
| |
| if not characters: |
| print("❌ 未能提取到角色,程序退出") |
| return |
| |
| |
| print("\n" + "="*70) |
| print("🎯 步骤 4/5: 选择要对话的角色") |
| print("="*70) |
| |
| selected = select_character_interactive(characters) |
| character_name = selected['name'] |
| character_info = selected['info'] |
| |
| print(f"\n✓ 已选择: {character_name}") |
| print(f" 出现次数: {character_info['count']}") |
| print(f" 分布章节: {len(character_info['chunks'])}") |
| |
| |
| print(f"\n" + "="*70) |
| print(f"🧠 步骤 5/5: 分析角色性格") |
| print("="*70) |
| print(f"正在深度分析 {character_name} 的性格特征...") |
| print("这可能需要几分钟,请耐心等待...\n") |
| |
| analyzer = CharacterAnalyzer() |
| |
| |
| representative_chunks = analyzer.select_representative_chunks( |
| chunks, |
| character_info['chunks'] |
| ) |
| |
| print(f"✓ 选取了 {len(representative_chunks)} 个代表性片段进行分析") |
| |
| |
| character_profile = analyzer.analyze_character_batch( |
| character_name, |
| representative_chunks |
| ) |
| |
| |
| character_profile = analyzer.enhance_profile_with_examples( |
| character_profile, |
| chunks, |
| character_info['chunks'] |
| ) |
| |
| print(f"✓ 角色分析完成!") |
| |
| |
| print("\n" + "="*70) |
| print("🤖 创建对话代理") |
| print("="*70) |
| |
| use_memory = input("\n是否启用记忆系统?(y/n, 默认: y): ").strip().lower() |
| |
| if use_memory in ['', 'y', 'yes', '是']: |
| print("正在初始化记忆系统...") |
| agent = CharacterAgent( |
| character_profile, |
| chunks=chunks, |
| character_chunks=character_info['chunks'] |
| ) |
| print("✓ Agent 创建成功,记忆系统已初始化") |
| else: |
| agent = CharacterAgent(character_profile) |
| print("✓ Agent 创建成功(未启用记忆系统)") |
| |
| |
| interactive_chat(agent) |
| |
| |
| print("\n" + "="*70) |
| print("感谢使用小说角色 Agent 系统!") |
| print("="*70) |
|
|
| if __name__ == "__main__": |
| try: |
| main() |
| except KeyboardInterrupt: |
| print("\n\n👋 程序已被用户中断") |
| except Exception as e: |
| print(f"\n❌ 程序错误: {e}") |
| import traceback |
| traceback.print_exc() |