algorembrant's picture
Upload 28 files
b4143a2 verified
"""
Computer Auditor — Python backend (FastAPI).
Hot paths: directory listing with recursive sizes, large-file scan, processes, drives, system, network.
Binds to 127.0.0.1 only. Started by Electron with: python -m uvicorn main:app --host 127.0.0.1 --port <port>
"""
from __future__ import annotations
import os
import platform
import socket
import sys
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
import psutil
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
MAX_FILES_WALK = 200_000
app = FastAPI(title="Computer Auditor API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def measure_folder_bytes(root: str) -> tuple[int, int, bool]:
"""Recursive file byte total under root. Returns (bytes, file_count, truncated)."""
total = 0
n = 0
truncated = False
for dirpath, _dirnames, filenames in os.walk(root):
for fn in filenames:
if n >= MAX_FILES_WALK:
return total, n, True
fp = os.path.join(dirpath, fn)
try:
total += os.path.getsize(fp)
n += 1
except OSError:
pass
return total, n, False
class ListDirBody(BaseModel):
path: str
max_entries: int = Field(default=800, ge=1, le=5000)
class FolderSizeBody(BaseModel):
path: str
class LargeFilesBody(BaseModel):
path: str
min_bytes: int = Field(ge=0)
max_results: int = Field(default=80, ge=1, le=500)
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/api/list_dir")
def api_list_dir(body: ListDirBody) -> list[dict[str, Any]]:
try:
root = os.path.abspath(os.path.normpath(body.path))
except Exception as e:
raise HTTPException(400, str(e)) from e
if not os.path.isdir(root):
raise HTTPException(400, "not a directory")
max_e = min(body.max_entries, 5000)
try:
with os.scandir(root) as it:
raw = list(it)[:max_e]
except OSError as e:
return [
{
"name": os.path.basename(root),
"fullPath": root,
"isDirectory": False,
"sizeBytes": 0,
"mtimeMs": 0,
"error": str(e),
}
]
dirs = [e for e in raw if e.is_dir(follow_symlinks=False)]
file_entries = [e for e in raw if not e.is_dir(follow_symlinks=False)]
out: list[dict[str, Any]] = []
for e in file_entries:
try:
st = e.stat(follow_symlinks=False)
out.append(
{
"name": e.name,
"fullPath": e.path,
"isDirectory": False,
"sizeBytes": st.st_size,
"mtimeMs": int(st.st_mtime * 1000),
}
)
except OSError as ex:
out.append(
{
"name": e.name,
"fullPath": e.path,
"isDirectory": False,
"sizeBytes": 0,
"mtimeMs": 0,
"error": str(ex),
}
)
workers = min(8, max(1, len(dirs)))
if dirs:
with ThreadPoolExecutor(max_workers=workers) as ex:
future_to_ent = {ex.submit(measure_folder_bytes, d.path): d for d in dirs}
for fut in as_completed(future_to_ent):
d = future_to_ent[fut]
try:
bytes_total, _n, truncated = fut.result()
try:
st = os.stat(d.path)
mtime_ms = int(st.st_mtime * 1000)
except OSError:
mtime_ms = 0
out.append(
{
"name": d.name,
"fullPath": d.path,
"isDirectory": True,
"sizeBytes": bytes_total,
"mtimeMs": mtime_ms,
"sizeTruncated": truncated,
}
)
except Exception as ex: # noqa: BLE001
out.append(
{
"name": d.name,
"fullPath": d.path,
"isDirectory": True,
"sizeBytes": 0,
"mtimeMs": 0,
"error": str(ex),
}
)
out.sort(
key=lambda x: (
0 if x.get("isDirectory") else 1,
-int(x.get("sizeBytes") or 0),
str(x.get("name") or ""),
)
)
return out
@app.post("/api/folder_size")
def api_folder_size(body: FolderSizeBody) -> dict[str, Any]:
try:
root = os.path.abspath(os.path.normpath(body.path))
except Exception as e:
raise HTTPException(400, str(e)) from e
if not os.path.isdir(root):
raise HTTPException(400, "not a directory")
b, n, t = measure_folder_bytes(root)
return {"bytes": b, "files": n, "truncated": t}
@app.post("/api/large_files")
def api_large_files(body: LargeFilesBody) -> list[dict[str, Any]]:
try:
root = os.path.abspath(os.path.normpath(body.path))
except Exception as e:
raise HTTPException(400, str(e)) from e
if not os.path.isdir(root):
raise HTTPException(400, "not a directory")
cap = min(body.max_results, 500)
results: list[dict[str, Any]] = []
for dirpath, _dn, filenames in os.walk(root):
for fn in filenames:
if len(results) >= cap:
break
fp = os.path.join(dirpath, fn)
try:
sz = os.path.getsize(fp)
if sz >= body.min_bytes:
results.append({"path": fp, "sizeBytes": sz})
except OSError:
pass
if len(results) >= cap:
break
results.sort(key=lambda x: -x["sizeBytes"])
return results[:cap]
@app.get("/api/processes")
def api_processes() -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for p in psutil.process_iter(
["pid", "name", "memory_info", "cpu_times", "cmdline"]
):
try:
info = p.info
mi = info.get("memory_info")
rss = mi.rss if mi else 0
ct = info.get("cpu_times")
cpu_s = None
if ct:
cpu_s = round(ct.user + ct.system, 2)
cmd = info.get("cmdline") or []
cmdline = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
if len(cmdline) > 8000:
cmdline = cmdline[:8000] + "…"
rows.append(
{
"pid": int(info["pid"]),
"name": str(info.get("name") or ""),
"memoryBytes": int(rss),
"cpuSeconds": cpu_s,
"commandLine": cmdline,
}
)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return rows
@app.get("/api/drives")
def api_drives() -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for part in psutil.disk_partitions(all=False):
if os.name == "nt" and "cdrom" in (part.opts or "").lower():
continue
try:
u = psutil.disk_usage(part.mountpoint)
dev = part.device.rstrip("\\/")
out.append(
{
"letter": dev if dev else part.mountpoint,
"mount": part.mountpoint,
"label": part.fstype or "",
"totalBytes": int(u.total),
"freeBytes": int(u.free),
"usedBytes": int(u.used),
}
)
except OSError:
continue
return out
@app.get("/api/system")
def api_system() -> dict[str, Any]:
vm = psutil.virtual_memory()
boot = psutil.boot_time()
try:
user = os.getlogin()
except OSError:
user = os.environ.get("USERNAME", os.environ.get("USER", ""))
cpus = os.cpu_count() or 1
model = ""
if platform.system() == "Windows":
try:
import ctypes # noqa: PLC0415
buf = ctypes.create_unicode_buffer(256)
if ctypes.windll.kernel32.GetEnvironmentVariableW("PROCESSOR_IDENTIFIER", buf, 256):
model = buf.value
except Exception: # noqa: BLE001
model = platform.processor() or ""
else:
model = platform.processor() or ""
return {
"hostname": socket.gethostname(),
"platform": sys.platform,
"release": platform.release(),
"arch": platform.machine(),
"uptimeSec": time.time() - boot,
"totalMem": int(vm.total),
"freeMem": int(vm.available),
"cpuModel": model,
"cpuCount": cpus,
"load1": 0.0,
"load5": 0.0,
"load15": 0.0,
"userInfo": user,
"homedir": str(os.path.expanduser("~")),
"tmpdir": tempfile.gettempdir(),
}
@app.get("/api/network")
def api_network() -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
link_fam = getattr(psutil, "AF_LINK", None)
for name, addrs in psutil.net_if_addrs().items():
for a in addrs:
if a.family == socket.AF_INET:
fam_s = "IPv4"
elif a.family == socket.AF_INET6:
fam_s = "IPv6"
elif link_fam is not None and a.family == link_fam:
fam_s = "MAC"
else:
fam_s = str(a.family)
addr = a.address
internal = addr.startswith("127.") or addr == "::1"
mac = addr if fam_s == "MAC" else None
rows.append(
{
"name": name,
"address": addr,
"family": fam_s,
"internal": internal,
"mac": mac,
}
)
return rows
@app.get("/api/env")
def api_env() -> dict[str, str]:
return dict(os.environ)
if __name__ == "__main__":
import uvicorn # noqa: PLC0415
port = int(os.environ.get("AUDITOR_PY_PORT", "54789"))
uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning")