| """ |
| Computer Auditor — Python backend (FastAPI). |
| Hot paths: directory listing with recursive sizes, large-file scan, processes, drives, system, network. |
| Binds to 127.0.0.1 only. Started by Electron with: python -m uvicorn main:app --host 127.0.0.1 --port <port> |
| """ |
| from __future__ import annotations |
|
|
| import os |
| import platform |
| import socket |
| import sys |
| import tempfile |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from typing import Any |
|
|
| import psutil |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
|
|
| MAX_FILES_WALK = 200_000 |
|
|
| app = FastAPI(title="Computer Auditor API", version="1.0.0") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| def measure_folder_bytes(root: str) -> tuple[int, int, bool]: |
| """Recursive file byte total under root. Returns (bytes, file_count, truncated).""" |
| total = 0 |
| n = 0 |
| truncated = False |
| for dirpath, _dirnames, filenames in os.walk(root): |
| for fn in filenames: |
| if n >= MAX_FILES_WALK: |
| return total, n, True |
| fp = os.path.join(dirpath, fn) |
| try: |
| total += os.path.getsize(fp) |
| n += 1 |
| except OSError: |
| pass |
| return total, n, False |
|
|
|
|
| class ListDirBody(BaseModel): |
| path: str |
| max_entries: int = Field(default=800, ge=1, le=5000) |
|
|
|
|
| class FolderSizeBody(BaseModel): |
| path: str |
|
|
|
|
| class LargeFilesBody(BaseModel): |
| path: str |
| min_bytes: int = Field(ge=0) |
| max_results: int = Field(default=80, ge=1, le=500) |
|
|
|
|
| @app.get("/health") |
| def health() -> dict[str, str]: |
| return {"status": "ok"} |
|
|
|
|
| @app.post("/api/list_dir") |
| def api_list_dir(body: ListDirBody) -> list[dict[str, Any]]: |
| try: |
| root = os.path.abspath(os.path.normpath(body.path)) |
| except Exception as e: |
| raise HTTPException(400, str(e)) from e |
| if not os.path.isdir(root): |
| raise HTTPException(400, "not a directory") |
|
|
| max_e = min(body.max_entries, 5000) |
| try: |
| with os.scandir(root) as it: |
| raw = list(it)[:max_e] |
| except OSError as e: |
| return [ |
| { |
| "name": os.path.basename(root), |
| "fullPath": root, |
| "isDirectory": False, |
| "sizeBytes": 0, |
| "mtimeMs": 0, |
| "error": str(e), |
| } |
| ] |
|
|
| dirs = [e for e in raw if e.is_dir(follow_symlinks=False)] |
| file_entries = [e for e in raw if not e.is_dir(follow_symlinks=False)] |
|
|
| out: list[dict[str, Any]] = [] |
|
|
| for e in file_entries: |
| try: |
| st = e.stat(follow_symlinks=False) |
| out.append( |
| { |
| "name": e.name, |
| "fullPath": e.path, |
| "isDirectory": False, |
| "sizeBytes": st.st_size, |
| "mtimeMs": int(st.st_mtime * 1000), |
| } |
| ) |
| except OSError as ex: |
| out.append( |
| { |
| "name": e.name, |
| "fullPath": e.path, |
| "isDirectory": False, |
| "sizeBytes": 0, |
| "mtimeMs": 0, |
| "error": str(ex), |
| } |
| ) |
|
|
| workers = min(8, max(1, len(dirs))) |
| if dirs: |
| with ThreadPoolExecutor(max_workers=workers) as ex: |
| future_to_ent = {ex.submit(measure_folder_bytes, d.path): d for d in dirs} |
| for fut in as_completed(future_to_ent): |
| d = future_to_ent[fut] |
| try: |
| bytes_total, _n, truncated = fut.result() |
| try: |
| st = os.stat(d.path) |
| mtime_ms = int(st.st_mtime * 1000) |
| except OSError: |
| mtime_ms = 0 |
| out.append( |
| { |
| "name": d.name, |
| "fullPath": d.path, |
| "isDirectory": True, |
| "sizeBytes": bytes_total, |
| "mtimeMs": mtime_ms, |
| "sizeTruncated": truncated, |
| } |
| ) |
| except Exception as ex: |
| out.append( |
| { |
| "name": d.name, |
| "fullPath": d.path, |
| "isDirectory": True, |
| "sizeBytes": 0, |
| "mtimeMs": 0, |
| "error": str(ex), |
| } |
| ) |
|
|
| out.sort( |
| key=lambda x: ( |
| 0 if x.get("isDirectory") else 1, |
| -int(x.get("sizeBytes") or 0), |
| str(x.get("name") or ""), |
| ) |
| ) |
| return out |
|
|
|
|
| @app.post("/api/folder_size") |
| def api_folder_size(body: FolderSizeBody) -> dict[str, Any]: |
| try: |
| root = os.path.abspath(os.path.normpath(body.path)) |
| except Exception as e: |
| raise HTTPException(400, str(e)) from e |
| if not os.path.isdir(root): |
| raise HTTPException(400, "not a directory") |
| b, n, t = measure_folder_bytes(root) |
| return {"bytes": b, "files": n, "truncated": t} |
|
|
|
|
| @app.post("/api/large_files") |
| def api_large_files(body: LargeFilesBody) -> list[dict[str, Any]]: |
| try: |
| root = os.path.abspath(os.path.normpath(body.path)) |
| except Exception as e: |
| raise HTTPException(400, str(e)) from e |
| if not os.path.isdir(root): |
| raise HTTPException(400, "not a directory") |
|
|
| cap = min(body.max_results, 500) |
| results: list[dict[str, Any]] = [] |
| for dirpath, _dn, filenames in os.walk(root): |
| for fn in filenames: |
| if len(results) >= cap: |
| break |
| fp = os.path.join(dirpath, fn) |
| try: |
| sz = os.path.getsize(fp) |
| if sz >= body.min_bytes: |
| results.append({"path": fp, "sizeBytes": sz}) |
| except OSError: |
| pass |
| if len(results) >= cap: |
| break |
| results.sort(key=lambda x: -x["sizeBytes"]) |
| return results[:cap] |
|
|
|
|
| @app.get("/api/processes") |
| def api_processes() -> list[dict[str, Any]]: |
| rows: list[dict[str, Any]] = [] |
| for p in psutil.process_iter( |
| ["pid", "name", "memory_info", "cpu_times", "cmdline"] |
| ): |
| try: |
| info = p.info |
| mi = info.get("memory_info") |
| rss = mi.rss if mi else 0 |
| ct = info.get("cpu_times") |
| cpu_s = None |
| if ct: |
| cpu_s = round(ct.user + ct.system, 2) |
| cmd = info.get("cmdline") or [] |
| cmdline = " ".join(cmd) if isinstance(cmd, list) else str(cmd) |
| if len(cmdline) > 8000: |
| cmdline = cmdline[:8000] + "…" |
| rows.append( |
| { |
| "pid": int(info["pid"]), |
| "name": str(info.get("name") or ""), |
| "memoryBytes": int(rss), |
| "cpuSeconds": cpu_s, |
| "commandLine": cmdline, |
| } |
| ) |
| except (psutil.NoSuchProcess, psutil.AccessDenied): |
| continue |
| return rows |
|
|
|
|
| @app.get("/api/drives") |
| def api_drives() -> list[dict[str, Any]]: |
| out: list[dict[str, Any]] = [] |
| for part in psutil.disk_partitions(all=False): |
| if os.name == "nt" and "cdrom" in (part.opts or "").lower(): |
| continue |
| try: |
| u = psutil.disk_usage(part.mountpoint) |
| dev = part.device.rstrip("\\/") |
| out.append( |
| { |
| "letter": dev if dev else part.mountpoint, |
| "mount": part.mountpoint, |
| "label": part.fstype or "", |
| "totalBytes": int(u.total), |
| "freeBytes": int(u.free), |
| "usedBytes": int(u.used), |
| } |
| ) |
| except OSError: |
| continue |
| return out |
|
|
|
|
| @app.get("/api/system") |
| def api_system() -> dict[str, Any]: |
| vm = psutil.virtual_memory() |
| boot = psutil.boot_time() |
| try: |
| user = os.getlogin() |
| except OSError: |
| user = os.environ.get("USERNAME", os.environ.get("USER", "")) |
| cpus = os.cpu_count() or 1 |
| model = "" |
| if platform.system() == "Windows": |
| try: |
| import ctypes |
|
|
| buf = ctypes.create_unicode_buffer(256) |
| if ctypes.windll.kernel32.GetEnvironmentVariableW("PROCESSOR_IDENTIFIER", buf, 256): |
| model = buf.value |
| except Exception: |
| model = platform.processor() or "" |
| else: |
| model = platform.processor() or "" |
|
|
| return { |
| "hostname": socket.gethostname(), |
| "platform": sys.platform, |
| "release": platform.release(), |
| "arch": platform.machine(), |
| "uptimeSec": time.time() - boot, |
| "totalMem": int(vm.total), |
| "freeMem": int(vm.available), |
| "cpuModel": model, |
| "cpuCount": cpus, |
| "load1": 0.0, |
| "load5": 0.0, |
| "load15": 0.0, |
| "userInfo": user, |
| "homedir": str(os.path.expanduser("~")), |
| "tmpdir": tempfile.gettempdir(), |
| } |
|
|
|
|
| @app.get("/api/network") |
| def api_network() -> list[dict[str, Any]]: |
| rows: list[dict[str, Any]] = [] |
| link_fam = getattr(psutil, "AF_LINK", None) |
| for name, addrs in psutil.net_if_addrs().items(): |
| for a in addrs: |
| if a.family == socket.AF_INET: |
| fam_s = "IPv4" |
| elif a.family == socket.AF_INET6: |
| fam_s = "IPv6" |
| elif link_fam is not None and a.family == link_fam: |
| fam_s = "MAC" |
| else: |
| fam_s = str(a.family) |
| addr = a.address |
| internal = addr.startswith("127.") or addr == "::1" |
| mac = addr if fam_s == "MAC" else None |
| rows.append( |
| { |
| "name": name, |
| "address": addr, |
| "family": fam_s, |
| "internal": internal, |
| "mac": mac, |
| } |
| ) |
| return rows |
|
|
|
|
| @app.get("/api/env") |
| def api_env() -> dict[str, str]: |
| return dict(os.environ) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| port = int(os.environ.get("AUDITOR_PY_PORT", "54789")) |
| uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning") |
|
|