Spaces:
Running
Running
File size: 2,832 Bytes
98075af | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | from __future__ import annotations
from typing import Any
import numpy as np
def to_jsonable(value: Any) -> Any:
if isinstance(value, np.ndarray):
return value.tolist()
if isinstance(value, (np.floating, np.integer)):
return value.item()
if isinstance(value, dict):
return {str(k): to_jsonable(v) for k, v in value.items()}
if isinstance(value, tuple):
return [to_jsonable(v) for v in value]
if isinstance(value, list):
return [to_jsonable(v) for v in value]
return value
def serialize_agents(agents: list[dict[str, Any]]) -> list[dict[str, Any]]:
serialized = []
for agent in agents:
serialized.append(
{
"id": int(agent.get("id", 0)),
"type": str(agent.get("type", "unknown")),
"raw_label": agent.get("raw_label"),
"history": [
{"x": float(pt[0]), "y": float(pt[1])}
for pt in agent.get("history", [])
],
"predictions": [
[{"x": float(pt[0]), "y": float(pt[1])} for pt in mode]
for mode in agent.get("predictions", [])
],
"probabilities": [float(p) for p in agent.get("probabilities", [])],
"is_target": bool(agent.get("is_target", False)),
}
)
return serialized
def build_prediction_payload(result: dict[str, Any]) -> dict[str, Any]:
core_excludes = {
"agents",
"target_track_id",
"mode",
"camera_snapshots",
"fusion_data",
"scene_geometry",
"error",
}
payload: dict[str, Any] = {
"mode": result.get("mode", "unknown"),
"target_track_id": result.get("target_track_id"),
"agents": serialize_agents(result.get("agents", [])),
"meta": to_jsonable({k: v for k, v in result.items() if k not in core_excludes}),
}
snapshots = result.get("camera_snapshots")
if snapshots:
payload["detections"] = {
name: {
"frame_path": snap.get("frame_path"),
"detections": to_jsonable(snap.get("detections", [])),
}
for name, snap in snapshots.items()
}
fusion_data = result.get("fusion_data")
if fusion_data:
payload["sensors"] = {
"sample_token": fusion_data.get("sample_token"),
"lidar_points": int(len(fusion_data.get("lidar_xy", []))),
"radar_points": int(len(fusion_data.get("radar_xy", []))),
"radar_channel_counts": to_jsonable(fusion_data.get("radar_channel_counts", {})),
}
if result.get("scene_geometry") is not None:
payload["scene_geometry"] = to_jsonable(result.get("scene_geometry"))
return payload
|