Spaces:
Paused
Paused
File size: 8,569 Bytes
03c8703 32faa06 03c8703 32faa06 03c8703 a556b6c 03c8703 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | """
CGAE 0G Storage — Python Interface
====================================
Uploads CGAE audit certificates to 0G decentralized storage via the
Node.js uploader script (storage/upload_to_0g.mjs).
0G Storage returns a Merkle root hash (bytes32) as the content identifier,
which is stored on-chain in CGAERegistry.certify(). Anyone can verify by
downloading from 0G via the root hash and checking the Merkle proof.
Usage:
from storage.zg_store import ZgStore, StoreResult
store = ZgStore()
result = store.store_audit_result(model_name, audit_json_path)
print(result.root_hash) # "0xabc..." or deterministic fallback
0G Integration:
Real uploads require:
1. Node.js 18+ with `@0gfoundation/0g-ts-sdk` installed in storage/
2. ZG_PRIVATE_KEY env var (hex, no 0x prefix)
3. Wallet funded with testnet tokens from faucet.0g.ai
Without credentials the store falls back to a deterministic
content-addressed hash (SHA-256) so the pipeline always has a
root hash to work with. The 'real' field on StoreResult tells
callers which mode was used.
Network:
Default: 0G Testnet
EVM RPC: https://evmrpc-testnet.0g.ai
Indexer: https://indexer-storage-testnet-turbo.0g.ai
Scan: https://storagescan.0g.ai
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
logger = logging.getLogger(__name__)
_STORAGE_DIR = Path(__file__).resolve().parent
_UPLOADER_SCRIPT = _STORAGE_DIR / "upload_to_0g.mjs"
ZG_RPC_URL = "https://evmrpc-testnet.0g.ai"
ZG_INDEXER_RPC = "https://indexer-storage-testnet-turbo.0g.ai"
ZG_STORAGE_SCAN = "https://storagescan.0g.ai"
ZG_FAUCET = "https://faucet.0g.ai"
@dataclass
class StoreResult:
"""Result of a 0G Storage operation."""
root_hash: str # Merkle root hash (real) or sha256-derived (fallback)
real: bool # True = uploaded to 0G; False = fallback
model_name: str
file_path: str
size_bytes: int = 0
error: Optional[str] = None
@property
def scan_url(self) -> Optional[str]:
if self.real:
return f"{ZG_STORAGE_SCAN}/tx/{self.root_hash}"
return None
def to_dict(self) -> dict:
return {
"root_hash": self.root_hash,
"real": self.real,
"model_name": self.model_name,
"file_path": self.file_path,
"size_bytes": self.size_bytes,
"error": self.error,
"scan_url": self.scan_url,
}
class ZgStore:
"""
Uploads audit JSON files to 0G decentralized storage.
Falls back to deterministic SHA-256 hash when upload is unavailable.
"""
def __init__(
self,
private_key: Optional[str] = None,
node_cmd: Optional[str] = None,
fallback_ok: bool = True,
):
self._private_key = private_key or os.getenv("ZG_PRIVATE_KEY")
self._node = node_cmd or _find_node()
self.fallback_ok = fallback_ok
def store_audit_result(self, model_name: str, json_path: str | Path) -> StoreResult:
json_path = Path(json_path)
if not json_path.exists():
raise FileNotFoundError(f"Audit file not found: {json_path}")
if self._can_upload():
try:
return self._upload_via_0g(model_name, json_path)
except Exception as e:
msg = str(e)
if not self.fallback_ok:
raise RuntimeError(f"0G Storage upload failed for {model_name}: {msg}") from e
logger.warning(f" [0g] Upload failed for {model_name}: {msg}. Using fallback hash.")
return self._fallback_result(model_name, json_path, error=msg)
else:
reason = self._unavailable_reason()
if not self.fallback_ok:
raise RuntimeError(f"0G Storage unavailable: {reason}")
logger.info(f" [0g] Upload unavailable ({reason}). Using deterministic hash for {model_name}.")
return self._fallback_result(model_name, json_path, error=reason)
def _can_upload(self) -> bool:
return (
self._node is not None
and _UPLOADER_SCRIPT.exists()
and self._private_key is not None
)
def _unavailable_reason(self) -> str:
if self._node is None:
return "node.js not found in PATH"
if not _UPLOADER_SCRIPT.exists():
return f"uploader script missing: {_UPLOADER_SCRIPT}"
if self._private_key is None:
return "ZG_PRIVATE_KEY not set"
return "unknown"
def _upload_via_0g(self, model_name: str, json_path: Path) -> StoreResult:
env = {**os.environ}
if self._private_key:
env["ZG_PRIVATE_KEY"] = self._private_key
cmd = [self._node, str(_UPLOADER_SCRIPT), str(json_path)]
logger.info(f" [0g] Uploading {json_path.name} for {model_name}...")
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300, env=env)
if proc.returncode == 2:
raise RuntimeError(
"0G SDK not installed. Run: cd storage && npm install @0gfoundation/0g-ts-sdk ethers"
)
if proc.returncode != 0:
stderr = proc.stderr.strip()
try:
err_data = json.loads(stderr)
raise RuntimeError(err_data.get("error", stderr))
except (json.JSONDecodeError, KeyError):
raise RuntimeError(stderr or f"exit code {proc.returncode}")
stdout = proc.stdout.strip()
if not stdout:
stderr = proc.stderr.strip()
raise RuntimeError(f"0G upload returned empty output. stderr: {stderr}")
# SDK may print debug lines before the JSON; find the last JSON line
json_line = None
for line in reversed(stdout.splitlines()):
line = line.strip()
if line.startswith("{"):
json_line = line
break
if not json_line:
raise RuntimeError(f"0G upload returned no JSON. stdout: {stdout[:200]}")
data = json.loads(json_line)
if not data.get("ok"):
raise RuntimeError(data.get("error", "Unknown upload error"))
root_hash = data["rootHash"]
size = data.get("size", json_path.stat().st_size)
logger.info(f" [0g] Uploaded {json_path.name} → rootHash {root_hash} ({size} bytes)")
return StoreResult(
root_hash=root_hash, real=True, model_name=model_name,
file_path=str(json_path), size_bytes=size,
)
@staticmethod
def _fallback_result(model_name: str, json_path: Path, error: Optional[str] = None) -> StoreResult:
content = json_path.read_bytes()
digest = hashlib.sha256(content).hexdigest()
pseudo_hash = f"0x{digest}"
return StoreResult(
root_hash=pseudo_hash, real=False, model_name=model_name,
file_path=str(json_path), size_bytes=len(content), error=error,
)
def _find_node() -> Optional[str]:
for name in ["node", "nodejs"]:
try:
result = subprocess.run([name, "--version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return name
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
return None
def check_setup() -> dict:
node = _find_node()
sdk_installed = False
if node:
nm = _STORAGE_DIR / "node_modules" / "@0gfoundation" / "0g-ts-sdk"
sdk_installed = nm.exists()
has_key = bool(os.getenv("ZG_PRIVATE_KEY"))
script_ok = _UPLOADER_SCRIPT.exists()
ready = node and sdk_installed and has_key and script_ok
return {
"ready": ready,
"node_found": node,
"sdk_installed": sdk_installed,
"private_key_set": has_key,
"uploader_script": script_ok,
"instructions": (
None if ready else
"To enable real 0G uploads:\n"
" 1. cd storage && npm install @0gfoundation/0g-ts-sdk ethers\n"
f" 2. Get testnet tokens: {ZG_FAUCET}\n"
" 3. export ZG_PRIVATE_KEY=<your_hex_private_key>\n"
" 4. Re-run the simulation"
),
}
if __name__ == "__main__":
print(json.dumps(check_setup(), indent=2))
|