| """ |
| Demo文件夹操作模块 |
| 提供文件夹和文件的列表、移动、重命名、删除等功能 |
| """ |
| import os |
| import shutil |
| import time |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| from backend.path_utils import ( |
| normalize_path, |
| check_path_in_demo_dir, |
| validate_demo_path, |
| resolve_demo_path |
| ) |
|
|
|
|
| |
|
|
| def _normalize_path(path: str) -> str: |
| """统一处理路径:将空字符串转换为 "/" (向后兼容包装器)""" |
| return normalize_path(path) |
|
|
|
|
| def _build_api_path(parent_path: str, item_name: str) -> str: |
| """构建API路径格式(统一使用 "/" 开头的格式)""" |
| if parent_path and parent_path != "/": |
| return f"{parent_path}/{item_name}" |
| return f"/{item_name}" |
|
|
|
|
| def _error_response(message: str) -> Dict[str, any]: |
| """统一错误响应格式""" |
| return {"success": False, "message": message} |
|
|
|
|
| def _success_response(message: str) -> Dict[str, any]: |
| """统一成功响应格式""" |
| return {"success": True, "message": message} |
|
|
|
|
| def _get_timestamped_name(base_name: str, extension: str = "") -> str: |
| """生成带时间戳的名称""" |
| timestamp = int(time.time()) |
| return f"{base_name}_{timestamp}{extension}" |
|
|
|
|
| def _ensure_deleted_dir(demo_dir: Path) -> Path: |
| """确保.deleted目录存在并返回路径""" |
| deleted_dir = demo_dir.resolve() / '.deleted' |
| deleted_dir.mkdir(parents=True, exist_ok=True) |
| return deleted_dir |
|
|
|
|
| def _validate_json_file(file_path: Path) -> Optional[str]: |
| """验证文件存在且为JSON文件,返回错误消息或None""" |
| if not file_path.exists(): |
| return "文件不存在" |
| if not file_path.is_file(): |
| return "路径不是文件" |
| if file_path.suffix != '.json': |
| return "只能操作JSON文件" |
| return None |
|
|
|
|
| def _validate_folder(folder_path: Path) -> Optional[str]: |
| """验证文件夹存在,返回错误消息或None""" |
| if not folder_path.exists(): |
| return "文件夹不存在" |
| if not folder_path.is_dir(): |
| return "路径不是文件夹" |
| return None |
|
|
|
|
| |
| |
|
|
| def list_demo_items(demo_dir: Path, path: str = "") -> Dict[str, any]: |
| """返回指定路径下的文件夹和文件列表,自动忽略隐藏文件夹""" |
| normalized_path = _normalize_path(path) |
| target_dir = resolve_demo_path(demo_dir, normalized_path) |
| |
| if not target_dir or not target_dir.exists(): |
| return {"path": normalized_path, "items": []} |
| |
| items = [] |
| |
| try: |
| for item_path in target_dir.iterdir(): |
| if item_path.name.startswith('.'): |
| continue |
| |
| if item_path.is_dir(): |
| items.append({ |
| "type": "folder", |
| "name": item_path.name, |
| "path": _build_api_path(normalized_path, item_path.name) |
| }) |
| elif item_path.is_file() and item_path.suffix == '.json': |
| items.append({ |
| "type": "file", |
| "name": item_path.stem, |
| "path": _build_api_path(normalized_path, item_path.name) |
| }) |
| except Exception as e: |
| import traceback |
| print(f"❌ 扫描目录失败: {e}") |
| traceback.print_exc() |
| return {"path": normalized_path, "items": []} |
| |
| |
| folders = sorted([item for item in items if item["type"] == "folder"], key=lambda x: x["name"]) |
| files = sorted([item for item in items if item["type"] == "file"], key=lambda x: x["name"]) |
| |
| return {"path": path, "items": folders + files} |
|
|
|
|
| def get_all_folders(demo_dir: Path, exclude_path: Optional[str] = None) -> List[str]: |
| """递归获取所有文件夹列表(用于移动操作),自动忽略隐藏文件夹""" |
| folders = [] |
| |
| def _scan_directory(current_dir: Path, current_path: str): |
| """递归扫描目录""" |
| try: |
| for item in current_dir.iterdir(): |
| if item.name.startswith('.'): |
| continue |
| |
| if item.is_dir(): |
| folder_path = _build_api_path(current_path, item.name) |
| |
| if exclude_path and (folder_path == exclude_path or folder_path.startswith(exclude_path + "/")): |
| continue |
| |
| folders.append(folder_path) |
| _scan_directory(item, folder_path) |
| except Exception as e: |
| import traceback |
| print(f"❌ 扫描文件夹失败: {e}") |
| traceback.print_exc() |
| |
| _scan_directory(demo_dir, "/") |
| folders.insert(0, "/") |
| return folders |
|
|
|
|
| def move_demo_file(demo_dir: Path, source_path: str, target_path: str) -> Dict[str, any]: |
| """移动demo文件""" |
| source_file = resolve_demo_path(demo_dir, source_path) |
| if not source_file: |
| return _error_response(f"源文件不存在: {source_path}") |
| |
| error_msg = _validate_json_file(source_file) |
| if error_msg: |
| return _error_response(f"源文件{error_msg}: {source_path}" if "不存在" not in error_msg else error_msg) |
| |
| target_dir = resolve_demo_path(demo_dir, target_path) |
| if not target_dir: |
| return _error_response(f"无效的目标路径: {target_path}") |
| |
| target_dir.mkdir(parents=True, exist_ok=True) |
| target_file = target_dir / source_file.name |
| |
| if target_file.exists() and target_file != source_file: |
| return _error_response(f"目标位置已存在同名文件: {source_file.name}") |
| |
| try: |
| shutil.move(str(source_file), str(target_file)) |
| return _success_response(f"文件已移动到 {target_path}") |
| except Exception as e: |
| return _error_response(f"移动失败: {str(e)}") |
|
|
|
|
| def rename_demo_file(demo_dir: Path, file_path: str, new_name: str) -> Dict[str, any]: |
| """重命名demo文件""" |
| from backend.data_utils import sanitize_demo_name |
| |
| source_file = resolve_demo_path(demo_dir, file_path) |
| if not source_file: |
| return _error_response(f"文件不存在: {file_path}") |
| |
| error_msg = _validate_json_file(source_file) |
| if error_msg: |
| return _error_response(error_msg) |
| |
| safe_name = sanitize_demo_name(new_name) |
| if not safe_name: |
| return _error_response("新名称无效") |
| |
| target_file = source_file.parent / f"{safe_name}.json" |
| |
| if target_file.exists() and target_file != source_file: |
| return _error_response(f"文件 '{safe_name}.json' 已存在") |
| |
| try: |
| source_file.rename(target_file) |
| return _success_response(f"文件已重命名为 '{safe_name}.json'") |
| except Exception as e: |
| return _error_response(f"重命名失败: {str(e)}") |
|
|
|
|
| def move_folder(demo_dir: Path, source_path: str, target_path: str) -> Dict[str, any]: |
| """移动文件夹(递归)""" |
| source_folder = resolve_demo_path(demo_dir, source_path) |
| if not source_folder: |
| return _error_response(f"源文件夹不存在: {source_path}") |
| |
| error_msg = _validate_folder(source_folder) |
| if error_msg: |
| return _error_response(f"源{error_msg}: {source_path}" if "不存在" not in error_msg else error_msg) |
| |
| target_dir = resolve_demo_path(demo_dir, target_path) |
| if not target_dir: |
| return _error_response(f"无效的目标路径: {target_path}") |
| |
| target_dir.mkdir(parents=True, exist_ok=True) |
| target_folder = target_dir / source_folder.name |
| |
| if target_folder.exists(): |
| return _error_response(f"目标位置已存在同名文件夹: {source_folder.name}") |
| |
| |
| if check_path_in_demo_dir(target_folder.resolve(), source_folder.resolve()): |
| return _error_response("不能将文件夹移动到自己的子目录") |
| |
| try: |
| shutil.move(str(source_folder), str(target_folder)) |
| return _success_response(f"文件夹已移动到 {target_path}") |
| except Exception as e: |
| return _error_response(f"移动失败: {str(e)}") |
|
|
|
|
| def rename_folder(demo_dir: Path, folder_path: str, new_name: str) -> Dict[str, any]: |
| """重命名文件夹""" |
| from backend.data_utils import sanitize_demo_name |
| |
| source_folder = resolve_demo_path(demo_dir, folder_path) |
| if not source_folder: |
| return _error_response(f"文件夹不存在: {folder_path}") |
| |
| error_msg = _validate_folder(source_folder) |
| if error_msg: |
| return _error_response(error_msg) |
| |
| safe_name = sanitize_demo_name(new_name) |
| if not safe_name: |
| return _error_response("新名称无效") |
| |
| target_folder = source_folder.parent / safe_name |
| |
| if target_folder.exists(): |
| return _error_response(f"文件夹 '{safe_name}' 已存在") |
| |
| try: |
| source_folder.rename(target_folder) |
| return _success_response(f"文件夹已重命名为 '{safe_name}'") |
| except Exception as e: |
| return _error_response(f"重命名失败: {str(e)}") |
|
|
|
|
| def create_folder(demo_dir: Path, parent_path: str, folder_name: str) -> Dict[str, any]: |
| """创建新文件夹""" |
| from backend.data_utils import sanitize_demo_name |
| |
| parent_dir = resolve_demo_path(demo_dir, parent_path) |
| if not parent_dir: |
| return _error_response(f"无效的父路径: {parent_path}") |
| |
| safe_name = sanitize_demo_name(folder_name) |
| if not safe_name: |
| return _error_response("文件夹名称无效") |
| |
| target_folder = parent_dir / safe_name |
| |
| if target_folder.exists(): |
| return _error_response(f"文件夹 '{safe_name}' 已存在") |
| |
| try: |
| target_folder.mkdir(parents=True, exist_ok=False) |
| return _success_response(f"文件夹 '{safe_name}' 已创建") |
| except Exception as e: |
| return _error_response(f"创建失败: {str(e)}") |
|
|
|
|
| def delete_folder(demo_dir: Path, folder_path: str) -> Dict[str, any]: |
| """删除文件夹(移动到 .deleted 隐藏目录)""" |
| source_folder = resolve_demo_path(demo_dir, folder_path) |
| if not source_folder: |
| return _error_response(f"文件夹不存在: {folder_path}") |
| |
| error_msg = _validate_folder(source_folder) |
| if error_msg: |
| return _error_response(error_msg) |
| |
| deleted_dir = _ensure_deleted_dir(demo_dir) |
| target_folder = deleted_dir / source_folder.name |
| |
| if target_folder.exists(): |
| target_folder = deleted_dir / _get_timestamped_name(source_folder.name) |
| |
| try: |
| shutil.move(str(source_folder), str(target_folder)) |
| return _success_response("文件夹已移动到 .deleted 目录") |
| except Exception as e: |
| return _error_response(f"删除失败: {str(e)}") |
|
|
|
|
| def delete_demo_file(demo_dir: Path, file_path: str) -> Dict[str, any]: |
| """删除demo文件(移动到 .deleted 隐藏目录)""" |
| demo_dir_resolved = demo_dir.resolve() |
| source_file = resolve_demo_path(demo_dir_resolved, file_path) |
| |
| if not source_file: |
| return _error_response(f"文件不存在: {file_path}") |
| |
| error_msg = _validate_json_file(source_file) |
| if error_msg: |
| return _error_response(error_msg) |
| |
| try: |
| relative_path = source_file.relative_to(demo_dir_resolved) |
| except ValueError: |
| return _error_response("无效的文件路径") |
| |
| deleted_dir = _ensure_deleted_dir(demo_dir_resolved) |
| target_file = deleted_dir / relative_path |
| target_parent = target_file.parent |
| target_parent.mkdir(parents=True, exist_ok=True) |
| |
| if target_file.exists(): |
| target_file = target_parent / _get_timestamped_name(source_file.stem, ".json") |
| |
| try: |
| shutil.move(str(source_file), str(target_file)) |
| return _success_response(f"文件已移动到 .deleted 目录: {relative_path.as_posix()}") |
| except Exception as e: |
| return _error_response(f"删除失败: {str(e)}") |
|
|
|
|