|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from __future__ import annotations
|
|
|
| import argparse
|
| import os
|
| import sys
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
| from os import scandir, stat_result
|
|
|
|
|
| def _bytes_to_kb(n: int) -> int:
|
| return (n + 1023) // 1024 if n else 0
|
|
|
|
|
| def _tree_size_bytes(root: str) -> int:
|
| """Sum st_size of all regular files under root (iterative, no recursion limit)."""
|
| total = 0
|
| stack = [root]
|
| push = stack.append
|
| pop = stack.pop
|
|
|
| while stack:
|
| d = pop()
|
| try:
|
| with scandir(d) as it:
|
| for ent in it:
|
| try:
|
| if ent.is_file(follow_symlinks=False):
|
| st: stat_result = ent.stat(follow_symlinks=False)
|
| total += st.st_size
|
| elif ent.is_dir(follow_symlinks=False):
|
| push(ent.path)
|
| except OSError:
|
| continue
|
| except OSError:
|
| continue
|
| return total
|
|
|
|
|
| def _format_line(name: str, path_for_display: str, size_kb: int) -> str:
|
| return f"{name} {path_for_display} {size_kb}"
|
|
|
|
|
| def scan_root(root: str, jobs: int) -> None:
|
| root = os.path.abspath(os.path.normpath(root))
|
| if not os.path.isdir(root):
|
| print(f"Not a directory: {root}", file=sys.stderr)
|
| sys.exit(2)
|
|
|
| sep = os.sep
|
| entries: list[tuple[str, str, bool]] = []
|
| try:
|
| with scandir(root) as it:
|
| for ent in it:
|
| try:
|
| is_dir = ent.is_dir(follow_symlinks=False)
|
| except OSError:
|
| continue
|
| entries.append((ent.name, ent.path, is_dir))
|
| except OSError as e:
|
| print(f"Cannot read directory {root}: {e}", file=sys.stderr)
|
| sys.exit(1)
|
|
|
| files_ready: list[tuple[str, str, int]] = []
|
| dir_jobs: list[tuple[str, str]] = []
|
|
|
| for name, fullpath, is_dir in entries:
|
| if is_dir:
|
| display = fullpath if fullpath.endswith(sep) else fullpath + sep
|
| dir_jobs.append((name, fullpath))
|
| else:
|
| try:
|
| st = os.stat(fullpath, follow_symlinks=False)
|
| sz = st.st_size
|
| except OSError:
|
| sz = 0
|
| display = fullpath
|
| files_ready.append((name, display, _bytes_to_kb(sz)))
|
|
|
| dirs_ready: list[tuple[str, str, int]] = []
|
| if dir_jobs:
|
| workers = max(1, min(jobs, len(dir_jobs)))
|
| with ThreadPoolExecutor(max_workers=workers) as ex:
|
| futs = {
|
| ex.submit(_tree_size_bytes, p): (n, p)
|
| for n, p in dir_jobs
|
| }
|
| for fut in as_completed(futs):
|
| name, fullpath = futs[fut]
|
| display = fullpath if fullpath.endswith(sep) else fullpath + sep
|
| try:
|
| b = fut.result()
|
| except Exception:
|
| b = 0
|
| dirs_ready.append((name, display, _bytes_to_kb(b)))
|
|
|
| out: list[tuple[str, str, int]] = [*files_ready, *dirs_ready]
|
| out.sort(key=lambda row: (-row[2], row[0].lower()))
|
|
|
| for name, path_disp, kb in out:
|
| print(_format_line(name, path_disp, kb))
|
|
|
|
|
| def main() -> None:
|
| p = argparse.ArgumentParser(
|
| description="List direct children of PATH with sizes (folders = recursive total).",
|
| )
|
| p.add_argument(
|
| "-scan",
|
| "--scan",
|
| dest="root",
|
| metavar="PATH",
|
| required=True,
|
| help="Root directory to scan",
|
| )
|
| p.add_argument(
|
| "-j",
|
| "--jobs",
|
| type=int,
|
| default=max(1, min(32, (os.cpu_count() or 4) * 4)),
|
| metavar="N",
|
| help="Thread workers for parallel folder sizing",
|
| )
|
| args = p.parse_args()
|
| scan_root(args.root, args.jobs)
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|