pentest-7b / telemetry /collector.py
VextLabs's picture
Upload telemetry/collector.py with huggingface_hub
abd769b verified
"""
VEXT Pentest-7B — Opt-in Telemetry Collector
Ships with the open-source Pentest-7B model. Collects ANONYMIZED usage
patterns to help improve the model. Default OFF — only activates when the
user explicitly sets ``VEXT_TELEMETRY=on``.
Privacy guarantees:
- NEVER collects URLs, IPs, hostnames, credentials, actual vulnerability
details, request/response bodies, file paths, or user identity.
- Only collects aggregate categories: vuln types, tool success rates,
tech stack names, attack pattern categories, session metadata, and
model inference stats.
- All data is batched locally and sent in a single fire-and-forget POST.
- Call ``what_we_collect()`` at any time to see exactly what is sent.
Enable: ``export VEXT_TELEMETRY=on``
Disable: ``export VEXT_TELEMETRY=off`` (default)
"""
from __future__ import annotations
import atexit
import json
import logging
import os
import threading
import time
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
TELEMETRY_ENABLED = os.getenv("VEXT_TELEMETRY", "off").lower() in ("on", "1", "yes", "true")
TELEMETRY_ENDPOINT = os.getenv(
"VEXT_TELEMETRY_ENDPOINT",
"https://telemetry.tryvext.com/v1/collect",
)
_FLUSH_INTERVAL_S = int(os.getenv("VEXT_TELEMETRY_FLUSH_INTERVAL", "300")) # 5 minutes
_MAX_BATCH_SIZE = int(os.getenv("VEXT_TELEMETRY_MAX_BATCH", "500"))
# Stable anonymous session ID (random per process, no PII)
_SESSION_ID = uuid.uuid4().hex
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class TelemetryEvent:
"""A single anonymized telemetry event.
All fields contain category-level data only — no raw payloads,
targets, or identifying information.
"""
event_type: str # e.g. "vuln_found", "tool_run", "inference", "session_end"
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
session_id: str = field(default_factory=lambda: _SESSION_ID)
# Vulnerability category (e.g. "sql_injection", "xss", "ssrf")
vuln_type: Optional[str] = None
severity: Optional[str] = None # critical / high / medium / low / info
# Tool/technique stats
tool_name: Optional[str] = None # e.g. "nuclei", "sqlmap", "dalfox"
tool_success: Optional[bool] = None
findings_count: Optional[int] = None
# Tech stack detected (names only — e.g. "nginx", "python", "postgresql")
tech_stack: Optional[List[str]] = None
# Attack pattern category (e.g. "auth_bypass", "injection", "misconfig")
attack_category: Optional[str] = None
attack_succeeded: Optional[bool] = None
# Session metadata
session_duration_s: Optional[float] = None
tools_run_count: Optional[int] = None
finding_counts_by_severity: Optional[Dict[str, int]] = None
# Model inference stats
avg_tokens: Optional[float] = None
avg_latency_ms: Optional[float] = None
inference_error_rate: Optional[float] = None
# Pentest-7B version for tracking improvements across releases
model_version: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Serialize to dict, dropping None values for compact payloads."""
return {k: v for k, v in asdict(self).items() if v is not None}
# ---------------------------------------------------------------------------
# PII scrubber — defense in depth
# ---------------------------------------------------------------------------
import re as _re
_PII_PATTERNS = [
_re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"), # IPv4
_re.compile(r"\b[0-9a-fA-F:]{6,39}\b"), # IPv6-ish
_re.compile(r"https?://[^\s]+"), # URLs
_re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"), # emails
_re.compile(r"/(?:home|Users|tmp|var|etc)/[^\s\"',]+"), # file paths
_re.compile(r"\b(?:password|passwd|secret|token|key|credential)s?\b", _re.I), # secret keywords
]
def _scrub_value(val: Any) -> Any:
"""Recursively scrub potential PII from event values."""
if isinstance(val, str):
for pat in _PII_PATTERNS:
if pat.search(val):
return "[REDACTED]"
return val
if isinstance(val, list):
return [_scrub_value(v) for v in val]
if isinstance(val, dict):
return {k: _scrub_value(v) for k, v in val.items()}
return val
def _scrub_event(event_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Scrub all string fields in an event dict for potential PII."""
return {k: _scrub_value(v) for k, v in event_dict.items()}
# ---------------------------------------------------------------------------
# Collector
# ---------------------------------------------------------------------------
class TelemetryCollector:
"""Thread-safe, non-blocking telemetry collector.
Buffers events in memory and flushes to the VEXT telemetry endpoint
every ``_FLUSH_INTERVAL_S`` seconds (default 5 min) or on shutdown.
Usage::
collector = TelemetryCollector()
collector.record(TelemetryEvent(event_type="vuln_found", vuln_type="xss"))
# ... at end of session:
collector.shutdown()
If telemetry is disabled (default), all methods are safe no-ops.
"""
def __init__(self) -> None:
self._enabled = TELEMETRY_ENABLED
self._buffer: List[Dict[str, Any]] = []
self._lock = threading.Lock()
self._flush_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._telemetry_id: Optional[str] = None
if self._enabled:
self._telemetry_id = uuid.uuid4().hex
self._print_startup_banner()
self._start_flush_thread()
atexit.register(self.shutdown)
# -- public API ---------------------------------------------------------
@property
def enabled(self) -> bool:
return self._enabled
@property
def telemetry_id(self) -> Optional[str]:
"""Opaque ID for this collector instance.
Can be sent to ``DELETE /api/v1/telemetry/delete`` to request
removal of all data associated with this session.
"""
return self._telemetry_id
def record(self, event: TelemetryEvent) -> None:
"""Record a telemetry event (non-blocking).
If telemetry is disabled, this is an immediate no-op.
"""
if not self._enabled:
return
event_dict = event.to_dict()
event_dict = _scrub_event(event_dict)
# Tag with collector-level telemetry_id for deletion support
if self._telemetry_id:
event_dict["telemetry_id"] = self._telemetry_id
with self._lock:
self._buffer.append(event_dict)
if len(self._buffer) >= _MAX_BATCH_SIZE:
self._flush_locked()
def record_vuln(self, vuln_type: str, severity: str) -> None:
"""Convenience: record a vulnerability discovery (category only)."""
self.record(TelemetryEvent(
event_type="vuln_found",
vuln_type=vuln_type,
severity=severity,
))
def record_tool_run(
self,
tool_name: str,
success: bool,
findings_count: int = 0,
) -> None:
"""Convenience: record a tool execution result."""
self.record(TelemetryEvent(
event_type="tool_run",
tool_name=tool_name,
tool_success=success,
findings_count=findings_count,
))
def record_inference(
self,
avg_tokens: float,
avg_latency_ms: float,
error_rate: float = 0.0,
model_version: Optional[str] = None,
) -> None:
"""Convenience: record model inference statistics."""
self.record(TelemetryEvent(
event_type="inference",
avg_tokens=avg_tokens,
avg_latency_ms=avg_latency_ms,
inference_error_rate=error_rate,
model_version=model_version,
))
def record_session_end(
self,
duration_s: float,
tools_run: int,
finding_counts: Dict[str, int],
) -> None:
"""Convenience: record session summary on completion."""
self.record(TelemetryEvent(
event_type="session_end",
session_duration_s=duration_s,
tools_run_count=tools_run,
finding_counts_by_severity=finding_counts,
))
def record_tech_stack(self, stack: List[str]) -> None:
"""Convenience: record detected technology stack (names only)."""
self.record(TelemetryEvent(
event_type="tech_detected",
tech_stack=stack,
))
def record_attack_pattern(
self, category: str, succeeded: bool
) -> None:
"""Convenience: record an attack pattern outcome (category only)."""
self.record(TelemetryEvent(
event_type="attack_pattern",
attack_category=category,
attack_succeeded=succeeded,
))
def flush(self) -> None:
"""Force an immediate flush of buffered events."""
if not self._enabled:
return
with self._lock:
self._flush_locked()
def shutdown(self) -> None:
"""Flush remaining events and stop the background thread."""
if not self._enabled:
return
self._stop_event.set()
if self._flush_thread and self._flush_thread.is_alive():
self._flush_thread.join(timeout=10)
with self._lock:
self._flush_locked()
logger.info("Telemetry collector shut down.")
# -- internals ----------------------------------------------------------
def _print_startup_banner(self) -> None:
msg = (
"\n"
" [VEXT Telemetry] Enabled — sending anonymized usage patterns\n"
" to improve Pentest-7B. Disable with VEXT_TELEMETRY=off\n"
" Run what_we_collect() to see exactly what data is sent.\n"
f" Telemetry ID (for deletion requests): {self._telemetry_id}\n"
)
print(msg)
logger.info("Telemetry enabled (session=%s)", _SESSION_ID)
def _start_flush_thread(self) -> None:
def _flush_loop():
while not self._stop_event.wait(timeout=_FLUSH_INTERVAL_S):
with self._lock:
self._flush_locked()
self._flush_thread = threading.Thread(
target=_flush_loop,
name="vext-telemetry-flush",
daemon=True,
)
self._flush_thread.start()
def _flush_locked(self) -> None:
"""Send buffered events to the telemetry endpoint.
MUST be called while holding ``self._lock``.
Fire-and-forget: errors are logged but never raised.
"""
if not self._buffer:
return
batch = self._buffer[:]
self._buffer.clear()
# Send in a background thread so we never block the caller
t = threading.Thread(
target=self._send_batch,
args=(batch,),
name="vext-telemetry-send",
daemon=True,
)
t.start()
@staticmethod
def _send_batch(batch: List[Dict[str, Any]]) -> None:
"""POST a batch of events to the telemetry endpoint."""
try:
import urllib.request
import urllib.error
payload = json.dumps({
"events": batch,
"sent_at": datetime.now(timezone.utc).isoformat(),
"collector_version": "1.0.0",
}).encode("utf-8")
req = urllib.request.Request(
TELEMETRY_ENDPOINT,
data=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "vext-pentest7b-telemetry/1.0",
},
method="POST",
)
# 10s timeout — we never want to block the user
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status == 200:
logger.debug(
"Telemetry batch sent: %d events", len(batch)
)
else:
logger.debug(
"Telemetry endpoint returned %d", resp.status
)
except urllib.error.URLError as exc:
logger.debug("Telemetry send failed (network): %s", exc)
except Exception as exc:
# Never crash the model over telemetry
logger.debug("Telemetry send failed: %s", exc)
# ---------------------------------------------------------------------------
# Transparency: what_we_collect()
# ---------------------------------------------------------------------------
def what_we_collect() -> None:
"""Print a human-readable description of exactly what telemetry data
is collected and sent. Call this at any time for full transparency."""
print(
"""
=== VEXT Pentest-7B Telemetry — What We Collect ===
COLLECTED (anonymized categories only):
- Vulnerability types found (e.g. "sql_injection", "xss")
NOT the actual payload, target URL, or vulnerability details.
- Tool/technique success/failure rates
(e.g. "nuclei: 3 found, sqlmap: 0 found")
- Tech stack detected (e.g. "nginx", "python", "postgresql")
NOT the hostname, IP, or URL where it was detected.
- Attack pattern categories that succeeded vs failed
(e.g. "auth_bypass: succeeded", "injection: failed")
- Session metadata: duration, number of tools run,
finding count by severity (critical/high/medium/low/info).
- Model inference stats: average tokens, average latency,
error rate per session.
- A random session ID (UUID, not linked to your identity).
NEVER COLLECTED:
- URLs, IPs, hostnames, or domain names
- Credentials, API keys, tokens, or secrets
- Actual vulnerability details or proof-of-concept payloads
- HTTP request/response bodies
- File paths from your system
- Your identity, username, email, or organization name
- The target you are testing
HOW IT WORKS:
- Default OFF. Enable with: export VEXT_TELEMETRY=on
- Events are batched locally and sent every 5 minutes.
- All string fields are scrubbed for PII patterns before sending.
- Sends to: https://telemetry.tryvext.com/v1/collect
- Fire-and-forget: telemetry failures never affect the model.
- You receive a telemetry_id on startup that you can use to
request deletion of your data at any time (GDPR).
DELETE YOUR DATA:
curl -X DELETE "https://telemetry.tryvext.com/api/v1/telemetry/delete" \\
-H "Content-Type: application/json" \\
-d '{"telemetry_id": "<your-telemetry-id>"}'
SOURCE CODE:
This collector is fully open source. Read the code at:
ml/telemetry/collector.py
"""
)
# ---------------------------------------------------------------------------
# Module-level singleton for convenience
# ---------------------------------------------------------------------------
_default_collector: Optional[TelemetryCollector] = None
def get_collector() -> TelemetryCollector:
"""Return the module-level singleton TelemetryCollector.
Creates the instance on first call. Thread-safe via the GIL.
"""
global _default_collector
if _default_collector is None:
_default_collector = TelemetryCollector()
return _default_collector