File size: 12,174 Bytes
494c9e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 | """
Demo文件夹操作模块
提供文件夹和文件的列表、移动、重命名、删除等功能
"""
import os
import shutil
import time
from pathlib import Path
from typing import Dict, List, Optional
from backend.path_utils import (
normalize_path,
check_path_in_demo_dir,
validate_demo_path,
resolve_demo_path
)
# ==================== 辅助函数 ====================
def _normalize_path(path: str) -> str:
"""统一处理路径:将空字符串转换为 "/" (向后兼容包装器)"""
return normalize_path(path)
def _build_api_path(parent_path: str, item_name: str) -> str:
"""构建API路径格式(统一使用 "/" 开头的格式)"""
if parent_path and parent_path != "/":
return f"{parent_path}/{item_name}"
return f"/{item_name}"
def _error_response(message: str) -> Dict[str, any]:
"""统一错误响应格式"""
return {"success": False, "message": message}
def _success_response(message: str) -> Dict[str, any]:
"""统一成功响应格式"""
return {"success": True, "message": message}
def _get_timestamped_name(base_name: str, extension: str = "") -> str:
"""生成带时间戳的名称"""
timestamp = int(time.time())
return f"{base_name}_{timestamp}{extension}"
def _ensure_deleted_dir(demo_dir: Path) -> Path:
"""确保.deleted目录存在并返回路径"""
deleted_dir = demo_dir.resolve() / '.deleted'
deleted_dir.mkdir(parents=True, exist_ok=True)
return deleted_dir
def _validate_json_file(file_path: Path) -> Optional[str]:
"""验证文件存在且为JSON文件,返回错误消息或None"""
if not file_path.exists():
return "文件不存在"
if not file_path.is_file():
return "路径不是文件"
if file_path.suffix != '.json':
return "只能操作JSON文件"
return None
def _validate_folder(folder_path: Path) -> Optional[str]:
"""验证文件夹存在,返回错误消息或None"""
if not folder_path.exists():
return "文件夹不存在"
if not folder_path.is_dir():
return "路径不是文件夹"
return None
# ==================== 文件系统操作函数 ====================
# 核心路径处理函数已移至 backend/path_utils.py
def list_demo_items(demo_dir: Path, path: str = "") -> Dict[str, any]:
"""返回指定路径下的文件夹和文件列表,自动忽略隐藏文件夹"""
normalized_path = _normalize_path(path)
target_dir = resolve_demo_path(demo_dir, normalized_path)
if not target_dir or not target_dir.exists():
return {"path": normalized_path, "items": []}
items = []
try:
for item_path in target_dir.iterdir():
if item_path.name.startswith('.'):
continue
if item_path.is_dir():
items.append({
"type": "folder",
"name": item_path.name,
"path": _build_api_path(normalized_path, item_path.name)
})
elif item_path.is_file() and item_path.suffix == '.json':
items.append({
"type": "file",
"name": item_path.stem,
"path": _build_api_path(normalized_path, item_path.name)
})
except Exception as e:
import traceback
print(f"❌ 扫描目录失败: {e}")
traceback.print_exc()
return {"path": normalized_path, "items": []}
# 排序:文件夹在前,文件在后,各自按名称排序
folders = sorted([item for item in items if item["type"] == "folder"], key=lambda x: x["name"])
files = sorted([item for item in items if item["type"] == "file"], key=lambda x: x["name"])
return {"path": path, "items": folders + files}
def get_all_folders(demo_dir: Path, exclude_path: Optional[str] = None) -> List[str]:
"""递归获取所有文件夹列表(用于移动操作),自动忽略隐藏文件夹"""
folders = []
def _scan_directory(current_dir: Path, current_path: str):
"""递归扫描目录"""
try:
for item in current_dir.iterdir():
if item.name.startswith('.'):
continue
if item.is_dir():
folder_path = _build_api_path(current_path, item.name)
if exclude_path and (folder_path == exclude_path or folder_path.startswith(exclude_path + "/")):
continue
folders.append(folder_path)
_scan_directory(item, folder_path)
except Exception as e:
import traceback
print(f"❌ 扫描文件夹失败: {e}")
traceback.print_exc()
_scan_directory(demo_dir, "/")
folders.insert(0, "/")
return folders
def move_demo_file(demo_dir: Path, source_path: str, target_path: str) -> Dict[str, any]:
"""移动demo文件"""
source_file = resolve_demo_path(demo_dir, source_path)
if not source_file:
return _error_response(f"源文件不存在: {source_path}")
error_msg = _validate_json_file(source_file)
if error_msg:
return _error_response(f"源文件{error_msg}: {source_path}" if "不存在" not in error_msg else error_msg)
target_dir = resolve_demo_path(demo_dir, target_path)
if not target_dir:
return _error_response(f"无效的目标路径: {target_path}")
target_dir.mkdir(parents=True, exist_ok=True)
target_file = target_dir / source_file.name
if target_file.exists() and target_file != source_file:
return _error_response(f"目标位置已存在同名文件: {source_file.name}")
try:
shutil.move(str(source_file), str(target_file))
return _success_response(f"文件已移动到 {target_path}")
except Exception as e:
return _error_response(f"移动失败: {str(e)}")
def rename_demo_file(demo_dir: Path, file_path: str, new_name: str) -> Dict[str, any]:
"""重命名demo文件"""
from backend.data_utils import sanitize_demo_name
source_file = resolve_demo_path(demo_dir, file_path)
if not source_file:
return _error_response(f"文件不存在: {file_path}")
error_msg = _validate_json_file(source_file)
if error_msg:
return _error_response(error_msg)
safe_name = sanitize_demo_name(new_name)
if not safe_name:
return _error_response("新名称无效")
target_file = source_file.parent / f"{safe_name}.json"
if target_file.exists() and target_file != source_file:
return _error_response(f"文件 '{safe_name}.json' 已存在")
try:
source_file.rename(target_file)
return _success_response(f"文件已重命名为 '{safe_name}.json'")
except Exception as e:
return _error_response(f"重命名失败: {str(e)}")
def move_folder(demo_dir: Path, source_path: str, target_path: str) -> Dict[str, any]:
"""移动文件夹(递归)"""
source_folder = resolve_demo_path(demo_dir, source_path)
if not source_folder:
return _error_response(f"源文件夹不存在: {source_path}")
error_msg = _validate_folder(source_folder)
if error_msg:
return _error_response(f"源{error_msg}: {source_path}" if "不存在" not in error_msg else error_msg)
target_dir = resolve_demo_path(demo_dir, target_path)
if not target_dir:
return _error_response(f"无效的目标路径: {target_path}")
target_dir.mkdir(parents=True, exist_ok=True)
target_folder = target_dir / source_folder.name
if target_folder.exists():
return _error_response(f"目标位置已存在同名文件夹: {source_folder.name}")
# 检查是否尝试移动到自己的子目录
if check_path_in_demo_dir(target_folder.resolve(), source_folder.resolve()):
return _error_response("不能将文件夹移动到自己的子目录")
try:
shutil.move(str(source_folder), str(target_folder))
return _success_response(f"文件夹已移动到 {target_path}")
except Exception as e:
return _error_response(f"移动失败: {str(e)}")
def rename_folder(demo_dir: Path, folder_path: str, new_name: str) -> Dict[str, any]:
"""重命名文件夹"""
from backend.data_utils import sanitize_demo_name
source_folder = resolve_demo_path(demo_dir, folder_path)
if not source_folder:
return _error_response(f"文件夹不存在: {folder_path}")
error_msg = _validate_folder(source_folder)
if error_msg:
return _error_response(error_msg)
safe_name = sanitize_demo_name(new_name)
if not safe_name:
return _error_response("新名称无效")
target_folder = source_folder.parent / safe_name
if target_folder.exists():
return _error_response(f"文件夹 '{safe_name}' 已存在")
try:
source_folder.rename(target_folder)
return _success_response(f"文件夹已重命名为 '{safe_name}'")
except Exception as e:
return _error_response(f"重命名失败: {str(e)}")
def create_folder(demo_dir: Path, parent_path: str, folder_name: str) -> Dict[str, any]:
"""创建新文件夹"""
from backend.data_utils import sanitize_demo_name
parent_dir = resolve_demo_path(demo_dir, parent_path)
if not parent_dir:
return _error_response(f"无效的父路径: {parent_path}")
safe_name = sanitize_demo_name(folder_name)
if not safe_name:
return _error_response("文件夹名称无效")
target_folder = parent_dir / safe_name
if target_folder.exists():
return _error_response(f"文件夹 '{safe_name}' 已存在")
try:
target_folder.mkdir(parents=True, exist_ok=False)
return _success_response(f"文件夹 '{safe_name}' 已创建")
except Exception as e:
return _error_response(f"创建失败: {str(e)}")
def delete_folder(demo_dir: Path, folder_path: str) -> Dict[str, any]:
"""删除文件夹(移动到 .deleted 隐藏目录)"""
source_folder = resolve_demo_path(demo_dir, folder_path)
if not source_folder:
return _error_response(f"文件夹不存在: {folder_path}")
error_msg = _validate_folder(source_folder)
if error_msg:
return _error_response(error_msg)
deleted_dir = _ensure_deleted_dir(demo_dir)
target_folder = deleted_dir / source_folder.name
if target_folder.exists():
target_folder = deleted_dir / _get_timestamped_name(source_folder.name)
try:
shutil.move(str(source_folder), str(target_folder))
return _success_response("文件夹已移动到 .deleted 目录")
except Exception as e:
return _error_response(f"删除失败: {str(e)}")
def delete_demo_file(demo_dir: Path, file_path: str) -> Dict[str, any]:
"""删除demo文件(移动到 .deleted 隐藏目录)"""
demo_dir_resolved = demo_dir.resolve()
source_file = resolve_demo_path(demo_dir_resolved, file_path)
if not source_file:
return _error_response(f"文件不存在: {file_path}")
error_msg = _validate_json_file(source_file)
if error_msg:
return _error_response(error_msg)
try:
relative_path = source_file.relative_to(demo_dir_resolved)
except ValueError:
return _error_response("无效的文件路径")
deleted_dir = _ensure_deleted_dir(demo_dir_resolved)
target_file = deleted_dir / relative_path
target_parent = target_file.parent
target_parent.mkdir(parents=True, exist_ok=True)
if target_file.exists():
target_file = target_parent / _get_timestamped_name(source_file.stem, ".json")
try:
shutil.move(str(source_file), str(target_file))
return _success_response(f"文件已移动到 .deleted 目录: {relative_path.as_posix()}")
except Exception as e:
return _error_response(f"删除失败: {str(e)}")
|