cc3m / zones.py
kokokoasd's picture
Upload 9 files
914be63 verified
import os
import json
import shutil
import re
from pathlib import Path
from datetime import datetime
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data/zones"))
ZONES_META = DATA_DIR.parent / "zones_meta.json"
# Đảm bảo thư mục tồn tại
DATA_DIR.mkdir(parents=True, exist_ok=True)
ZONE_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,50}$")
def _load_meta() -> dict:
if ZONES_META.exists():
return json.loads(ZONES_META.read_text(encoding="utf-8"))
return {}
def _save_meta(meta: dict):
ZONES_META.write_text(json.dumps(meta, indent=2, default=str), encoding="utf-8")
def list_zones() -> list[dict]:
meta = _load_meta()
zones = []
for name, info in meta.items():
zone_path = DATA_DIR / name
zones.append({
"name": name,
"created": info.get("created", ""),
"description": info.get("description", ""),
"exists": zone_path.is_dir(),
})
return zones
def create_zone(name: str, description: str = "") -> dict:
if not ZONE_NAME_PATTERN.match(name):
raise ValueError("Tên zone chỉ chứa a-z, A-Z, 0-9, _, - (tối đa 50 ký tự)")
zone_path = DATA_DIR / name
if zone_path.exists():
raise ValueError(f"Zone '{name}' đã tồn tại")
zone_path.mkdir(parents=True)
# Tạo file README mặc định
(zone_path / "README.md").write_text(f"# {name}\n\nZone được tạo lúc {datetime.now().isoformat()}\n")
meta = _load_meta()
meta[name] = {
"created": datetime.now().isoformat(),
"description": description,
}
_save_meta(meta)
return {"name": name, "path": str(zone_path)}
def delete_zone(name: str):
if not ZONE_NAME_PATTERN.match(name):
raise ValueError("Tên zone không hợp lệ")
zone_path = DATA_DIR / name
if not zone_path.exists():
raise ValueError(f"Zone '{name}' không tồn tại")
shutil.rmtree(zone_path)
meta = _load_meta()
meta.pop(name, None)
_save_meta(meta)
def get_zone_path(name: str) -> Path:
if not ZONE_NAME_PATTERN.match(name):
raise ValueError("Tên zone không hợp lệ")
zone_path = DATA_DIR / name
if not zone_path.is_dir():
raise ValueError(f"Zone '{name}' không tồn tại")
return zone_path
def _safe_path(zone_path: Path, rel_path: str) -> Path:
"""Kiểm tra path traversal — đảm bảo không thoát ra ngoài zone."""
target = (zone_path / rel_path).resolve()
zone_resolved = zone_path.resolve()
# Dùng os.sep để tránh prefix attack: zone "a" không truy cập được zone "ab"
if target != zone_resolved and not str(target).startswith(str(zone_resolved) + os.sep):
raise ValueError("Truy cập ngoài zone không được phép")
return target
def list_files(zone_name: str, rel_path: str = "") -> list[dict]:
zone_path = get_zone_path(zone_name)
target = _safe_path(zone_path, rel_path)
if not target.is_dir():
raise ValueError("Không phải thư mục")
items = []
for item in sorted(target.iterdir()):
stat = item.stat()
items.append({
"name": item.name,
"is_dir": item.is_dir(),
"size": stat.st_size if item.is_file() else 0,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
})
return items
def read_file(zone_name: str, rel_path: str) -> str:
zone_path = get_zone_path(zone_name)
target = _safe_path(zone_path, rel_path)
if not target.is_file():
raise ValueError("File không tồn tại")
return target.read_text(encoding="utf-8", errors="replace")
def write_file(zone_name: str, rel_path: str, content: str):
zone_path = get_zone_path(zone_name)
target = _safe_path(zone_path, rel_path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
def delete_file(zone_name: str, rel_path: str):
zone_path = get_zone_path(zone_name)
target = _safe_path(zone_path, rel_path)
if target == zone_path.resolve():
raise ValueError("Không thể xoá thư mục gốc zone")
if target.is_dir():
shutil.rmtree(target)
elif target.is_file():
target.unlink()
else:
raise ValueError("File/thư mục không tồn tại")
def create_folder(zone_name: str, rel_path: str):
zone_path = get_zone_path(zone_name)
target = _safe_path(zone_path, rel_path)
target.mkdir(parents=True, exist_ok=True)
def rename_item(zone_name: str, old_path: str, new_name: str):
zone_path = get_zone_path(zone_name)
source = _safe_path(zone_path, old_path)
if not source.exists():
raise ValueError("File/thư mục nguồn không tồn tại")
dest = _safe_path(zone_path, str(Path(old_path).parent / new_name))
source.rename(dest)