Diffusers
Safetensors
zeyuren2002's picture
Add files using upload-large-folder tool
ea3c0ad verified
# Copyright (c) 2025 ByteDance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Adapted from [VGGT-Long](https://github.com/DengKaiCQ/VGGT-Long)
import time
from typing import List, Tuple
import numpy as np
import pypose as pp
import torch
from fastloop.solve_python import solve_system_py
from scipy.spatial.transform import Rotation as R
cpp_version = False
try:
import sim3solve
cpp_version = True
except Exception:
print("Sim3solve of C++ Version failed, Will using Python Version.")
class Sim3LoopOptimizer:
"""
Loop closure optimizer for sequences of Sim3 transformations
Input:
- sequential_transforms: List[Tuple[float, np.ndarray, np.ndarray]]
Each element is (s, R, t), where s is scalar scale, R is [3,3] rotation matrix,
t is [3,] translation vector
- loop_constraints: List[Tuple[int, int, Tuple[float, np.ndarray, np.ndarray]]]
Each element is (i, j, (s, R, t)), representing a loop closure constraint
from frame i to frame j
Output:
- Optimized sequential_transforms
"""
def __init__(self, config, device="cpu"):
self.device = device
self.config = config
self.solve_system_version = self.config["Loop"]["SIM3_Optimizer"][
"lang_version"
] # choose between 'python' and 'cpp'
if not cpp_version:
self.solve_system_version = "python"
def numpy_to_pypose_sim3(self, s: float, R_mat: np.ndarray, t_vec: np.ndarray) -> pp.Sim3:
"""Convert numpy s,R,t to pypose Sim3"""
q = R.from_matrix(R_mat).as_quat() # [x,y,z,w]
# pypose requires [t, q, s] format
data = np.concatenate([t_vec, q, np.array([s])])
return pp.Sim3(torch.from_numpy(data).float().to(self.device))
def pypose_sim3_to_numpy(self, sim3: pp.Sim3) -> Tuple[float, np.ndarray, np.ndarray]:
"""Convert pypose Sim3 to numpy s,R,t"""
data = sim3.data.cpu().numpy()
t = data[:3]
q = data[3:7] # [x,y,z,w]
s = data[7]
R_mat = R.from_quat(q).as_matrix()
return s, R_mat, t
def sequential_to_absolute_poses(
self, sequential_transforms: List[Tuple[float, np.ndarray, np.ndarray]]
) -> torch.Tensor:
"""
Convert sequential relative transforms to absolute pose sequence
S_01, S_12, S_23, ... -> T_0, T_1, T_2, T_3, ...
Where T_i is the transform from world coordinate to frame i
"""
len(sequential_transforms) + 1
poses = []
identity = pp.Sim3(
torch.tensor([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0], device=self.device)
)
poses.append(identity)
current_pose = identity
for s, R_mat, t_vec in sequential_transforms:
rel_transform = self.numpy_to_pypose_sim3(s, R_mat, t_vec)
current_pose = current_pose @ rel_transform
poses.append(current_pose)
return torch.stack(poses)
def absolute_to_sequential_transforms(
self, absolute_poses: pp.Sim3
) -> List[Tuple[float, np.ndarray, np.ndarray]]:
"""
Convert absolute pose sequence back to sequential relative transforms
T_0, T_1, T_2, ... -> S_01, S_12, S_23, ...
"""
sequential_transforms = []
n = absolute_poses.shape[0]
for i in range(n - 1):
rel_transform = absolute_poses[i].Inv() @ absolute_poses[i + 1]
s, R_mat, t_vec = self.pypose_sim3_to_numpy(rel_transform)
sequential_transforms.append((s, R_mat, t_vec))
return sequential_transforms
def SE3_to_Sim3(self, x: torch.Tensor) -> pp.Sim3:
"""Convert SE3 to Sim3 (add unit scale)"""
ones = torch.ones_like(x[..., :1])
out = torch.cat((x, ones), dim=-1)
return pp.Sim3(out)
def build_loop_constraints(
self, loop_constraints: List[Tuple[int, int, Tuple[float, np.ndarray, np.ndarray]]]
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Build loop closure constraints"""
if not loop_constraints:
return (
torch.empty(0, 8, device=self.device),
torch.empty(0, dtype=torch.long),
torch.empty(0, dtype=torch.long),
)
loop_transforms = []
ii_loop = []
jj_loop = []
for i, j, (s, R_mat, t_vec) in loop_constraints:
loop_sim3 = self.numpy_to_pypose_sim3(s, R_mat, t_vec)
loop_transforms.append(loop_sim3.data)
ii_loop.append(i)
jj_loop.append(j)
dSloop = pp.Sim3(torch.stack(loop_transforms))
ii_loop = torch.tensor(ii_loop, dtype=torch.long, device=self.device)
jj_loop = torch.tensor(jj_loop, dtype=torch.long, device=self.device)
return dSloop, ii_loop, jj_loop
def residual(self, Ginv, input_poses, dSloop, ii, jj, jacobian=False):
"""Compute residuals (modified from original code)"""
def _residual(C, Gi, Gj):
out = C @ pp.Exp(Gi) @ pp.Exp(Gj).Inv()
return out.Log().tensor()
pred_inv_poses = pp.Sim3(input_poses).Inv()
n, _ = pred_inv_poses.shape
if n > 1:
kk = torch.arange(1, n, device=self.device)
ll = kk - 1
Ti = pred_inv_poses[kk]
Tj = pred_inv_poses[ll]
dSij = Tj @ Ti.Inv()
else:
kk = torch.empty(0, dtype=torch.long, device=self.device)
ll = torch.empty(0, dtype=torch.long, device=self.device)
dSij = pp.Sim3(torch.empty(0, 8, device=self.device))
constants = (
torch.cat((dSij.data, dSloop.data), dim=0) if dSloop.shape[0] > 0 else dSij.data
)
if constants.shape[0] > 0:
constants = pp.Sim3(constants)
iii = torch.cat((kk, ii))
jjj = torch.cat((ll, jj))
resid = _residual(constants, Ginv[iii], Ginv[jjj])
else:
iii = torch.empty(0, dtype=torch.long, device=self.device)
jjj = torch.empty(0, dtype=torch.long, device=self.device)
resid = torch.empty(0, device=self.device)
if not jacobian:
return resid
if constants.shape[0] > 0:
def batch_jacobian(func, x):
def _func_sum(*x):
return func(*x).sum(dim=0)
_, b, c = torch.autograd.functional.jacobian(_func_sum, x, vectorize=True)
from einops import rearrange
return rearrange(torch.stack((b, c)), "N O B I -> N B O I", N=2)
J_Ginv_i, J_Ginv_j = batch_jacobian(_residual, (constants, Ginv[iii], Ginv[jjj]))
else:
J_Ginv_i = torch.empty(0, device=self.device)
J_Ginv_j = torch.empty(0, device=self.device)
return resid, (J_Ginv_i, J_Ginv_j, iii, jjj)
def optimize(
self,
sequential_transforms: List[Tuple[float, np.ndarray, np.ndarray]],
loop_constraints: List[Tuple[int, int, Tuple[float, np.ndarray, np.ndarray]]],
max_iterations: int = None,
lambda_init: float = None,
) -> List[Tuple[float, np.ndarray, np.ndarray]]:
"""
Main optimization function
Args:
sequential_transforms: Input sequence of transforms
loop_constraints: List of loop closure constraints
max_iterations: Maximum iterations
lambda_init: Initial lambda for L-M algorithm
Returns:
Optimized sequence of transforms
"""
if max_iterations is None:
max_iterations = self.config["Loop"]["SIM3_Optimizer"]["max_iterations"]
if lambda_init is None:
lambda_init = eval(self.config["Loop"]["SIM3_Optimizer"]["lambda_init"])
input_poses = self.sequential_to_absolute_poses(sequential_transforms)
dSloop, ii_loop, jj_loop = self.build_loop_constraints(loop_constraints)
if len(loop_constraints) == 0:
print("Warning: No loop constraints provided, returning original transforms")
return sequential_transforms
Ginv = pp.Sim3(input_poses).Inv().Log()
lmbda = lambda_init
residual_history = []
print(
f"Starting optimization with {len(sequential_transforms)} poses \
and {len(loop_constraints)} loop constraints"
)
# L-M loop
for itr in range(max_iterations):
resid, (J_Ginv_i, J_Ginv_j, iii, jjj) = self.residual(
Ginv, input_poses, dSloop, ii_loop, jj_loop, jacobian=True
)
if resid.numel() == 0:
print("No residuals to optimize")
break
current_cost = resid.square().mean().item()
residual_history.append(current_cost)
try: # Solve linear system
begin_time = time.time()
if self.solve_system_version == "cpp":
(delta_pose,) = sim3solve.solve_system(
J_Ginv_i, J_Ginv_j, iii, jjj, resid, 0.0, lmbda, -1
)
elif self.solve_system_version == "python":
delta_pose = solve_system_py(
J_Ginv_i, J_Ginv_j, iii, jjj, resid, 0.0, lmbda, -1
)
else:
print("Solver version has not been chosen! ('python' or 'cpp')")
end_time = time.time()
except Exception as e:
print(f"Solver failed at iteration {itr}: {e}")
break
Ginv_tmp = Ginv + delta_pose
new_resid = self.residual(Ginv_tmp, input_poses, dSloop, ii_loop, jj_loop)
new_cost = new_resid.square().mean().item() if new_resid.numel() > 0 else float("inf")
# L-M
if new_cost < current_cost:
Ginv = Ginv_tmp
lmbda /= 2
print(
f"Iteration {itr}: cost {current_cost:.14f} -> {new_cost:.14f} (accepted)",
end=" | ",
)
else:
lmbda *= 2
print(
f"Iteration {itr}: cost {current_cost:.14f} -> {new_cost:.14f} (rej) ",
end=" | ",
) # more readible to accepted
print(
f"Time of solver ({self.solve_system_version}): \
{(end_time - begin_time)*1000:.4f} ms"
)
if (current_cost < 1e-5) and (itr >= 4):
if len(residual_history) >= 5:
improvement_ratio = residual_history[-5] / residual_history[-1]
if improvement_ratio < 1.5:
print(f"Converged at iteration {itr}")
break
optimized_absolute_poses = pp.Exp(Ginv).Inv()
optimized_sequential = self.absolute_to_sequential_transforms(optimized_absolute_poses)
print(
f"Optimization completed. Final cost: \
{residual_history[-1] if residual_history else 'N/A'}"
)
return optimized_sequential
# ======== TEST CODE ========
def create_ring_transforms(num_poses=6, radius=5.0, rot_noise_deg=2.0):
"""Generate a ring of Sim3 transforms with rotation, adding slight rotational noise"""
transforms = []
angle_step = 2 * np.pi / num_poses
for i in range(num_poses):
angle = angle_step
# Main rotation (around Z-axis)
R_z = R.from_euler("z", angle, degrees=False)
# Add slight rotational noise (Gaussian noise in degrees)
noise_angles_deg = np.random.normal(loc=0.0, scale=rot_noise_deg, size=3)
R_noise = R.from_euler("xyz", noise_angles_deg, degrees=True)
# Combine rotations
R_mat = (R_noise * R_z).as_matrix()
# Translation: simulate a circular trajectory
t = np.array([radius * np.sin(angle), radius * (1 - np.cos(angle)), 0.0])
s = np.random.uniform(0.8, 1.2)
transforms.append((s, R_mat, t))
return transforms
def example_usage():
optimizer = Sim3LoopOptimizer(solve_system_version="cpp")
# Build rotating ring
sequential_transforms = create_ring_transforms(num_poses=20, radius=3.0)
# Add loop closure constraint: from frame 5 back to frame 0
loop_constraints = [
(20, 0, (1.0, np.eye(3), np.zeros(3))) # Temporary unit loop for simulation
]
# Trajectory before/after optimization
input_abs_poses = optimizer.sequential_to_absolute_poses(sequential_transforms)
optimized_transforms = optimizer.optimize(sequential_transforms, loop_constraints)
optimized_abs_poses = optimizer.sequential_to_absolute_poses(optimized_transforms)
def extract_xyz(pose_tensor):
poses = pose_tensor.cpu().numpy()
return poses[:, 0], poses[:, 1], poses[:, 2]
x0, y0, z0 = extract_xyz(input_abs_poses)
x1, y1, z1 = extract_xyz(optimized_abs_poses)
# Visualize trajectory
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("Agg")
plt.figure(figsize=(8, 6))
plt.plot(x0, y0, "o--", label="Before Optimization")
plt.plot(x1, y1, "o-", label="After Optimization")
for i, j, _ in loop_constraints:
plt.plot([x0[i], x0[j]], [y0[i], y0[j]], "r--", label="Loop (Before)" if i == 5 else "")
plt.plot([x1[i], x1[j]], [y1[i], y1[j]], "g-", label="Loop (After)" if i == 5 else "")
plt.gca().set_aspect("equal")
plt.title("Sim3 Loop Closure Optimization (Rotating Ring)")
plt.xlabel("x")
plt.ylabel("y")
plt.legend()
plt.grid(True)
plt.axis("equal")
plt.show()
return optimized_transforms
if __name__ == "__main__":
example_usage()