import os import json import shutil import re from pathlib import Path from datetime import datetime DATA_DIR = Path(os.environ.get("DATA_DIR", "/data/zones")) ZONES_META = DATA_DIR.parent / "zones_meta.json" # Đảm bảo thư mục tồn tại DATA_DIR.mkdir(parents=True, exist_ok=True) ZONE_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,50}$") def _load_meta() -> dict: if ZONES_META.exists(): return json.loads(ZONES_META.read_text(encoding="utf-8")) return {} def _save_meta(meta: dict): ZONES_META.write_text(json.dumps(meta, indent=2, default=str), encoding="utf-8") def list_zones() -> list[dict]: meta = _load_meta() zones = [] for name, info in meta.items(): zone_path = DATA_DIR / name zones.append({ "name": name, "created": info.get("created", ""), "description": info.get("description", ""), "exists": zone_path.is_dir(), }) return zones def create_zone(name: str, description: str = "") -> dict: if not ZONE_NAME_PATTERN.match(name): raise ValueError("Tên zone chỉ chứa a-z, A-Z, 0-9, _, - (tối đa 50 ký tự)") zone_path = DATA_DIR / name if zone_path.exists(): raise ValueError(f"Zone '{name}' đã tồn tại") zone_path.mkdir(parents=True) # Tạo file README mặc định (zone_path / "README.md").write_text(f"# {name}\n\nZone được tạo lúc {datetime.now().isoformat()}\n") meta = _load_meta() meta[name] = { "created": datetime.now().isoformat(), "description": description, } _save_meta(meta) return {"name": name, "path": str(zone_path)} def delete_zone(name: str): if not ZONE_NAME_PATTERN.match(name): raise ValueError("Tên zone không hợp lệ") zone_path = DATA_DIR / name if not zone_path.exists(): raise ValueError(f"Zone '{name}' không tồn tại") shutil.rmtree(zone_path) meta = _load_meta() meta.pop(name, None) _save_meta(meta) def get_zone_path(name: str) -> Path: if not ZONE_NAME_PATTERN.match(name): raise ValueError("Tên zone không hợp lệ") zone_path = DATA_DIR / name if not zone_path.is_dir(): raise ValueError(f"Zone '{name}' không tồn tại") return zone_path def _safe_path(zone_path: Path, rel_path: str) -> Path: """Kiểm tra path traversal — đảm bảo không thoát ra ngoài zone.""" target = (zone_path / rel_path).resolve() zone_resolved = zone_path.resolve() # Dùng os.sep để tránh prefix attack: zone "a" không truy cập được zone "ab" if target != zone_resolved and not str(target).startswith(str(zone_resolved) + os.sep): raise ValueError("Truy cập ngoài zone không được phép") return target def list_files(zone_name: str, rel_path: str = "") -> list[dict]: zone_path = get_zone_path(zone_name) target = _safe_path(zone_path, rel_path) if not target.is_dir(): raise ValueError("Không phải thư mục") items = [] for item in sorted(target.iterdir()): stat = item.stat() items.append({ "name": item.name, "is_dir": item.is_dir(), "size": stat.st_size if item.is_file() else 0, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), }) return items def read_file(zone_name: str, rel_path: str) -> str: zone_path = get_zone_path(zone_name) target = _safe_path(zone_path, rel_path) if not target.is_file(): raise ValueError("File không tồn tại") return target.read_text(encoding="utf-8", errors="replace") def write_file(zone_name: str, rel_path: str, content: str): zone_path = get_zone_path(zone_name) target = _safe_path(zone_path, rel_path) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(content, encoding="utf-8") def delete_file(zone_name: str, rel_path: str): zone_path = get_zone_path(zone_name) target = _safe_path(zone_path, rel_path) if target == zone_path.resolve(): raise ValueError("Không thể xoá thư mục gốc zone") if target.is_dir(): shutil.rmtree(target) elif target.is_file(): target.unlink() else: raise ValueError("File/thư mục không tồn tại") def create_folder(zone_name: str, rel_path: str): zone_path = get_zone_path(zone_name) target = _safe_path(zone_path, rel_path) target.mkdir(parents=True, exist_ok=True) def rename_item(zone_name: str, old_path: str, new_name: str): zone_path = get_zone_path(zone_name) source = _safe_path(zone_path, old_path) if not source.exists(): raise ValueError("File/thư mục nguồn không tồn tại") dest = _safe_path(zone_path, str(Path(old_path).parent / new_name)) source.rename(dest)