| """AstrBot 数据导出器 |
| |
| 负责将所有数据导出为 ZIP 备份文件。 |
| 导出格式为 JSON,这是数据库无关的方案,支持未来向 MySQL/PostgreSQL 迁移。 |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import zipfile |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import TYPE_CHECKING, Any |
|
|
| from sqlalchemy import select |
|
|
| from astrbot.core import logger |
| from astrbot.core.config.default import VERSION |
| from astrbot.core.db import BaseDatabase |
| from astrbot.core.utils.astrbot_path import ( |
| get_astrbot_backups_path, |
| get_astrbot_data_path, |
| ) |
|
|
| |
| from .constants import ( |
| BACKUP_MANIFEST_VERSION, |
| KB_METADATA_MODELS, |
| MAIN_DB_MODELS, |
| get_backup_directories, |
| ) |
|
|
| if TYPE_CHECKING: |
| from astrbot.core.knowledge_base.kb_mgr import KnowledgeBaseManager |
|
|
| CMD_CONFIG_FILE_PATH = os.path.join(get_astrbot_data_path(), "cmd_config.json") |
|
|
|
|
| class AstrBotExporter: |
| """AstrBot 数据导出器 |
| |
| 导出内容: |
| - 主数据库所有表(data/data_v4.db) |
| - 知识库元数据(data/knowledge_base/kb.db) |
| - 每个知识库的向量文档数据 |
| - 配置文件(data/cmd_config.json) |
| - 附件文件 |
| - 知识库多媒体文件 |
| - 插件目录(data/plugins) |
| - 插件数据目录(data/plugin_data) |
| - 配置目录(data/config) |
| - T2I 模板目录(data/t2i_templates) |
| - WebChat 数据目录(data/webchat) |
| - 临时文件目录(data/temp) |
| """ |
|
|
| def __init__( |
| self, |
| main_db: BaseDatabase, |
| kb_manager: "KnowledgeBaseManager | None" = None, |
| config_path: str = CMD_CONFIG_FILE_PATH, |
| ) -> None: |
| self.main_db = main_db |
| self.kb_manager = kb_manager |
| self.config_path = config_path |
| self._checksums: dict[str, str] = {} |
|
|
| async def export_all( |
| self, |
| output_dir: str | None = None, |
| progress_callback: Any | None = None, |
| ) -> str: |
| """导出所有数据到 ZIP 文件 |
| |
| Args: |
| output_dir: 输出目录 |
| progress_callback: 进度回调函数,接收参数 (stage, current, total, message) |
| |
| Returns: |
| str: 生成的 ZIP 文件路径 |
| """ |
| if output_dir is None: |
| output_dir = get_astrbot_backups_path() |
|
|
| |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
|
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| zip_filename = f"astrbot_backup_{timestamp}.zip" |
| zip_path = os.path.join(output_dir, zip_filename) |
|
|
| logger.info(f"开始导出备份到 {zip_path}") |
|
|
| try: |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: |
| |
| if progress_callback: |
| await progress_callback("main_db", 0, 100, "正在导出主数据库...") |
| main_data = await self._export_main_database() |
| main_db_json = json.dumps( |
| main_data, ensure_ascii=False, indent=2, default=str |
| ) |
| zf.writestr("databases/main_db.json", main_db_json) |
| self._add_checksum("databases/main_db.json", main_db_json) |
| if progress_callback: |
| await progress_callback("main_db", 100, 100, "主数据库导出完成") |
|
|
| |
| kb_meta_data: dict[str, Any] = { |
| "knowledge_bases": [], |
| "kb_documents": [], |
| "kb_media": [], |
| } |
| if self.kb_manager: |
| if progress_callback: |
| await progress_callback( |
| "kb_metadata", 0, 100, "正在导出知识库元数据..." |
| ) |
| kb_meta_data = await self._export_kb_metadata() |
| kb_meta_json = json.dumps( |
| kb_meta_data, ensure_ascii=False, indent=2, default=str |
| ) |
| zf.writestr("databases/kb_metadata.json", kb_meta_json) |
| self._add_checksum("databases/kb_metadata.json", kb_meta_json) |
| if progress_callback: |
| await progress_callback( |
| "kb_metadata", 100, 100, "知识库元数据导出完成" |
| ) |
|
|
| |
| kb_insts = self.kb_manager.kb_insts |
| total_kbs = len(kb_insts) |
| for idx, (kb_id, kb_helper) in enumerate(kb_insts.items()): |
| if progress_callback: |
| await progress_callback( |
| "kb_documents", |
| idx, |
| total_kbs, |
| f"正在导出知识库 {kb_helper.kb.kb_name} 的文档数据...", |
| ) |
| doc_data = await self._export_kb_documents(kb_helper) |
| doc_json = json.dumps( |
| doc_data, ensure_ascii=False, indent=2, default=str |
| ) |
| doc_path = f"databases/kb_{kb_id}/documents.json" |
| zf.writestr(doc_path, doc_json) |
| self._add_checksum(doc_path, doc_json) |
|
|
| |
| await self._export_faiss_index(zf, kb_helper, kb_id) |
|
|
| |
| await self._export_kb_media_files(zf, kb_helper, kb_id) |
|
|
| if progress_callback: |
| await progress_callback( |
| "kb_documents", total_kbs, total_kbs, "知识库文档导出完成" |
| ) |
|
|
| |
| if progress_callback: |
| await progress_callback("config", 0, 100, "正在导出配置文件...") |
| if os.path.exists(self.config_path): |
| with open(self.config_path, encoding="utf-8") as f: |
| config_content = f.read() |
| zf.writestr("config/cmd_config.json", config_content) |
| self._add_checksum("config/cmd_config.json", config_content) |
| if progress_callback: |
| await progress_callback("config", 100, 100, "配置文件导出完成") |
|
|
| |
| if progress_callback: |
| await progress_callback("attachments", 0, 100, "正在导出附件...") |
| await self._export_attachments(zf, main_data.get("attachments", [])) |
| if progress_callback: |
| await progress_callback("attachments", 100, 100, "附件导出完成") |
|
|
| |
| if progress_callback: |
| await progress_callback( |
| "directories", 0, 100, "正在导出插件和数据目录..." |
| ) |
| dir_stats = await self._export_directories(zf) |
| if progress_callback: |
| await progress_callback("directories", 100, 100, "目录导出完成") |
|
|
| |
| if progress_callback: |
| await progress_callback("manifest", 0, 100, "正在生成清单...") |
| manifest = self._generate_manifest(main_data, kb_meta_data, dir_stats) |
| manifest_json = json.dumps(manifest, ensure_ascii=False, indent=2) |
| zf.writestr("manifest.json", manifest_json) |
| if progress_callback: |
| await progress_callback("manifest", 100, 100, "清单生成完成") |
|
|
| logger.info(f"备份导出完成: {zip_path}") |
| return zip_path |
|
|
| except Exception as e: |
| logger.error(f"备份导出失败: {e}") |
| |
| if os.path.exists(zip_path): |
| os.remove(zip_path) |
| raise |
|
|
| async def _export_main_database(self) -> dict[str, list[dict]]: |
| """导出主数据库所有表""" |
| export_data: dict[str, list[dict]] = {} |
|
|
| async with self.main_db.get_db() as session: |
| for table_name, model_class in MAIN_DB_MODELS.items(): |
| try: |
| result = await session.execute(select(model_class)) |
| records = result.scalars().all() |
| export_data[table_name] = [ |
| self._model_to_dict(record) for record in records |
| ] |
| logger.debug( |
| f"导出表 {table_name}: {len(export_data[table_name])} 条记录" |
| ) |
| except Exception as e: |
| logger.warning(f"导出表 {table_name} 失败: {e}") |
| export_data[table_name] = [] |
|
|
| return export_data |
|
|
| async def _export_kb_metadata(self) -> dict[str, list[dict]]: |
| """导出知识库元数据库""" |
| if not self.kb_manager: |
| return {"knowledge_bases": [], "kb_documents": [], "kb_media": []} |
|
|
| export_data: dict[str, list[dict]] = {} |
|
|
| async with self.kb_manager.kb_db.get_db() as session: |
| for table_name, model_class in KB_METADATA_MODELS.items(): |
| try: |
| result = await session.execute(select(model_class)) |
| records = result.scalars().all() |
| export_data[table_name] = [ |
| self._model_to_dict(record) for record in records |
| ] |
| logger.debug( |
| f"导出知识库表 {table_name}: {len(export_data[table_name])} 条记录" |
| ) |
| except Exception as e: |
| logger.warning(f"导出知识库表 {table_name} 失败: {e}") |
| export_data[table_name] = [] |
|
|
| return export_data |
|
|
| async def _export_kb_documents(self, kb_helper: Any) -> dict[str, Any]: |
| """导出知识库的文档块数据""" |
| try: |
| from astrbot.core.db.vec_db.faiss_impl.vec_db import FaissVecDB |
|
|
| vec_db: FaissVecDB = kb_helper.vec_db |
| if not vec_db or not vec_db.document_storage: |
| return {"documents": []} |
|
|
| |
| docs = await vec_db.document_storage.get_documents( |
| metadata_filters={}, |
| offset=0, |
| limit=None, |
| ) |
|
|
| return {"documents": docs} |
| except Exception as e: |
| logger.warning(f"导出知识库文档失败: {e}") |
| return {"documents": []} |
|
|
| async def _export_faiss_index( |
| self, |
| zf: zipfile.ZipFile, |
| kb_helper: Any, |
| kb_id: str, |
| ) -> None: |
| """导出 FAISS 索引文件""" |
| try: |
| index_path = kb_helper.kb_dir / "index.faiss" |
| if index_path.exists(): |
| archive_path = f"databases/kb_{kb_id}/index.faiss" |
| zf.write(str(index_path), archive_path) |
| logger.debug(f"导出 FAISS 索引: {archive_path}") |
| except Exception as e: |
| logger.warning(f"导出 FAISS 索引失败: {e}") |
|
|
| async def _export_kb_media_files( |
| self, zf: zipfile.ZipFile, kb_helper: Any, kb_id: str |
| ) -> None: |
| """导出知识库的多媒体文件""" |
| try: |
| media_dir = kb_helper.kb_medias_dir |
| if not media_dir.exists(): |
| return |
|
|
| for root, _, files in os.walk(media_dir): |
| for file in files: |
| file_path = Path(root) / file |
| |
| rel_path = file_path.relative_to(kb_helper.kb_dir) |
| archive_path = f"files/kb_media/{kb_id}/{rel_path}" |
| zf.write(str(file_path), archive_path) |
| except Exception as e: |
| logger.warning(f"导出知识库媒体文件失败: {e}") |
|
|
| async def _export_directories( |
| self, zf: zipfile.ZipFile |
| ) -> dict[str, dict[str, int]]: |
| """导出插件和其他数据目录 |
| |
| Returns: |
| dict: 每个目录的统计信息 {dir_name: {"files": count, "size": bytes}} |
| """ |
| stats: dict[str, dict[str, int]] = {} |
| backup_directories = get_backup_directories() |
|
|
| for dir_name, dir_path in backup_directories.items(): |
| full_path = Path(dir_path) |
| if not full_path.exists(): |
| logger.debug(f"目录不存在,跳过: {full_path}") |
| continue |
|
|
| file_count = 0 |
| total_size = 0 |
|
|
| try: |
| for root, dirs, files in os.walk(full_path): |
| |
| dirs[:] = [d for d in dirs if d != "__pycache__"] |
|
|
| for file in files: |
| |
| if file.endswith(".pyc"): |
| continue |
|
|
| file_path = Path(root) / file |
| try: |
| |
| rel_path = file_path.relative_to(full_path) |
| archive_path = f"directories/{dir_name}/{rel_path}" |
| zf.write(str(file_path), archive_path) |
| file_count += 1 |
| total_size += file_path.stat().st_size |
| except Exception as e: |
| logger.warning(f"导出文件 {file_path} 失败: {e}") |
|
|
| stats[dir_name] = {"files": file_count, "size": total_size} |
| logger.debug( |
| f"导出目录 {dir_name}: {file_count} 个文件, {total_size} 字节" |
| ) |
| except Exception as e: |
| logger.warning(f"导出目录 {dir_path} 失败: {e}") |
| stats[dir_name] = {"files": 0, "size": 0} |
|
|
| return stats |
|
|
| async def _export_attachments( |
| self, zf: zipfile.ZipFile, attachments: list[dict] |
| ) -> None: |
| """导出附件文件""" |
| for attachment in attachments: |
| try: |
| file_path = attachment.get("path", "") |
| if file_path and os.path.exists(file_path): |
| |
| attachment_id = attachment.get("attachment_id", "") |
| ext = os.path.splitext(file_path)[1] |
| archive_path = f"files/attachments/{attachment_id}{ext}" |
| zf.write(file_path, archive_path) |
| except Exception as e: |
| logger.warning(f"导出附件失败: {e}") |
|
|
| def _model_to_dict(self, record: Any) -> dict: |
| """将 SQLModel 实例转换为字典 |
| |
| 这是数据库无关的序列化方式,支持未来迁移到其他数据库。 |
| """ |
| |
| if hasattr(record, "model_dump"): |
| data = record.model_dump(mode="python") |
| |
| for key, value in data.items(): |
| if isinstance(value, datetime): |
| data[key] = value.isoformat() |
| return data |
|
|
| |
| data = {} |
| |
| from sqlalchemy import inspect as sa_inspect |
|
|
| mapper = sa_inspect(record.__class__) |
| for column in mapper.columns: |
| value = getattr(record, column.name) |
| |
| if isinstance(value, datetime): |
| value = value.isoformat() |
| data[column.name] = value |
| return data |
|
|
| def _add_checksum(self, path: str, content: str | bytes) -> None: |
| """计算并添加文件校验和""" |
| if isinstance(content, str): |
| content = content.encode("utf-8") |
| checksum = hashlib.sha256(content).hexdigest() |
| self._checksums[path] = f"sha256:{checksum}" |
|
|
| def _generate_manifest( |
| self, |
| main_data: dict[str, list[dict]], |
| kb_meta_data: dict[str, list[dict]], |
| dir_stats: dict[str, dict[str, int]] | None = None, |
| ) -> dict: |
| """生成备份清单""" |
| if dir_stats is None: |
| dir_stats = {} |
| |
| kb_document_tables = {} |
| if self.kb_manager: |
| for kb_id in self.kb_manager.kb_insts.keys(): |
| kb_document_tables[kb_id] = "documents" |
|
|
| |
| attachment_files = [] |
| for attachment in main_data.get("attachments", []): |
| attachment_id = attachment.get("attachment_id", "") |
| path = attachment.get("path", "") |
| if attachment_id and path: |
| ext = os.path.splitext(path)[1] |
| attachment_files.append(f"{attachment_id}{ext}") |
|
|
| |
| kb_media_files: dict[str, list[str]] = {} |
| if self.kb_manager: |
| for kb_id, kb_helper in self.kb_manager.kb_insts.items(): |
| media_files: list[str] = [] |
| media_dir = kb_helper.kb_medias_dir |
| if media_dir.exists(): |
| for root, _, files in os.walk(media_dir): |
| for file in files: |
| media_files.append(file) |
| if media_files: |
| kb_media_files[kb_id] = media_files |
|
|
| manifest = { |
| "version": BACKUP_MANIFEST_VERSION, |
| "astrbot_version": VERSION, |
| "exported_at": datetime.now(timezone.utc).isoformat(), |
| "origin": "exported", |
| "schema_version": { |
| "main_db": "v4", |
| "kb_db": "v1", |
| }, |
| "tables": { |
| "main_db": list(main_data.keys()), |
| "kb_metadata": list(kb_meta_data.keys()), |
| "kb_documents": kb_document_tables, |
| }, |
| "files": { |
| "attachments": attachment_files, |
| "kb_media": kb_media_files, |
| }, |
| "directories": list(dir_stats.keys()), |
| "checksums": self._checksums, |
| "statistics": { |
| "main_db": { |
| table: len(records) for table, records in main_data.items() |
| }, |
| "kb_metadata": { |
| table: len(records) for table, records in kb_meta_data.items() |
| }, |
| "directories": dir_stats, |
| }, |
| } |
|
|
| return manifest |
|
|