| import json |
| import os |
| import time |
| import random |
| import numpy as np |
| from collections import deque |
| import torchvision |
| import yaml |
| from types import SimpleNamespace |
| from packaging.version import Version |
| from typing import Callable, Dict, List, Optional, Type, Union, Tuple, Any, Sequence |
| from glob import glob |
| from tqdm import tqdm |
| from safetensors import safe_open |
| from safetensors.torch import load_file |
| from pathlib import Path |
| from PIL import Image |
| import torch |
| import torch.nn.functional as F |
| from torch import Tensor, nn |
|
|
|
|
| import transformers |
| from transformers.models.auto.tokenization_auto import AutoTokenizer |
| from transformers import ( |
| AutoConfig, |
| PretrainedConfig, |
| PreTrainedModel, |
| AutoProcessor, |
| ) |
|
|
| from lerobot.configs.policies import PreTrainedConfig |
| from lingbotvla.models.vla.pi0.modeling_pi0 import PI0Policy |
| from lingbotvla.models.vla.pi0.modeling_lingbot_vla import LingbotVlaPolicy |
| from lingbotvla.data.vla_data.transform import Normalizer, prepare_images, prepare_language, prepare_state |
| from lingbotvla.models import build_processor |
|
|
|
|
| def set_seed_everywhere(seed: int): |
| """Sets the random seed for Python, NumPy, and PyTorch functions.""" |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| np.random.seed(seed) |
| random.seed(seed) |
| torch.backends.cudnn.deterministic = True |
| torch.backends.cudnn.benchmark = False |
| os.environ["PYTHONHASHSEED"] = str(seed) |
|
|
| set_seed_everywhere(42) |
|
|
| BASE_MODEL_PATH = { |
| 'pi0': os.environ.get('PALIGEMMA_PATH', './paligemma-3b-pt-224/'), |
| 'lingbotvla': os.environ.get('QWEN25_PATH', './Qwen2.5-VL-3B-Instruct/'), |
| } |
|
|
| def load_model_weights(policy, path_to_pi_model, strict=True): |
| all_safetensors = glob(os.path.join(path_to_pi_model, "*.safetensors")) |
| merged_weights = {} |
|
|
| for file_path in tqdm(all_safetensors): |
| with safe_open(file_path, framework="pt", device="cpu") as f: |
| for key in f.keys(): |
| merged_weights[key] = f.get_tensor(key) |
| policy.load_state_dict(merged_weights, strict=strict) |
|
|
|
|
| def center_crop_image(image: Union[np.ndarray, Image.Image]) -> Image.Image: |
| crop_scale = 0.9 |
| side_scale = float(np.sqrt(np.clip(crop_scale, 0.0, 1.0))) |
| out_size = (224, 224) |
|
|
| |
| if isinstance(image, np.ndarray): |
| arr = image |
| if arr.dtype.kind == "f": |
| |
| if arr.max() <= 1.0 and arr.min() >= 0.0: |
| arr = (np.clip(arr, 0.0, 1.0) * 255.0).astype(np.uint8) |
| else: |
| arr = np.clip(arr, 0.0, 255.0).astype(np.uint8) |
| elif arr.dtype == np.uint16: |
| |
| arr = (arr / 257).astype(np.uint8) |
| elif arr.dtype != np.uint8: |
| arr = arr.astype(np.uint8) |
| pil = Image.fromarray(arr) |
| elif isinstance(image, Image.Image): |
| pil = image |
| else: |
| raise TypeError("image must be a numpy array or PIL.Image.Image") |
|
|
| |
| pil = pil.convert("RGB") |
| W, H = pil.size |
|
|
| |
| crop_w = max(1, int(round(W * side_scale))) |
| crop_h = max(1, int(round(H * side_scale))) |
| left = (W - crop_w) // 2 |
| top = (H - crop_h) // 2 |
| right = left + crop_w |
| bottom = top + crop_h |
|
|
| cropped = pil.crop((left, top, right, bottom)) |
| resized = cropped.resize(out_size, resample=Image.BILINEAR) |
| return resized |
|
|
| def resize_with_pad(img, width, height, pad_value=-1): |
| |
| if img.ndim != 4: |
| raise ValueError(f"(b,c,h,w) expected, but {img.shape}") |
| |
| |
| if img.shape[1] not in (1, 3) and img.shape[-1] in (1, 3): |
| img = img.permute(0, 3, 1, 2) |
|
|
| cur_height, cur_width = img.shape[2:] |
|
|
| ratio = max(cur_width / width, cur_height / height) |
| resized_height = int(cur_height / ratio) |
| resized_width = int(cur_width / ratio) |
| resized_img = F.interpolate( |
| img, size=(resized_height, resized_width), mode="bilinear", align_corners=False |
| ) |
|
|
| pad_height = max(0, int(height - resized_height)) |
| pad_width = max(0, int(width - resized_width)) |
|
|
| |
| padded_img = F.pad(resized_img, (pad_width, 0, pad_height, 0), value=pad_value) |
| return padded_img |
|
|
| class PolicyPreprocessMixin: |
|
|
| @torch.no_grad |
| def select_action( |
| self, observation: dict[str, Tensor], use_bf16: bool = False, vlm_causal: bool = False, noise: Tensor | None = None |
| ): |
| self.eval() |
| device = 'cuda' |
| if use_bf16: |
| dtype = torch.bfloat16 |
| else: |
| dtype = torch.float32 |
| s1 = time.time() |
|
|
| if len(observation['images'].shape) == 4: |
| observation['images'] = observation['images'].unsqueeze(0) |
| observation['img_masks'] = observation['img_masks'].unsqueeze(0) |
| state_indices = list(range(12)) + list(range(73, 75)) + list(range(12, 14)) + list(range(14, 73)) |
| observation['state'] = observation['state'][state_indices] |
| if 'expert_imgs' in observation: |
| actions = self.model.sample_actions( |
| observation['images'].to(dtype=dtype, device=device), |
| observation['img_masks'].to(device=device), |
| observation['lang_tokens'].unsqueeze(0).to(device=device), |
| observation['lang_masks'].unsqueeze(0).to(device=device), |
| observation['state'].unsqueeze(0).to(dtype=dtype, device=device), |
| observation['expert_imgs'].to(dtype=dtype, device=device), |
| vlm_causal = vlm_causal |
| ) |
| else: |
| actions = self.model.sample_actions( |
| observation['images'].to(dtype=dtype, device=device), |
| observation['img_masks'].to(device=device), |
| observation['lang_tokens'].unsqueeze(0).to(device=device), |
| observation['lang_masks'].unsqueeze(0).to(device=device), |
| observation['state'].unsqueeze(0).to(dtype=dtype, device=device), |
| vlm_causal = vlm_causal |
| ) |
| action_indices = list(range(6)) + [14] + list(range(6, 12)) + [15] |
| actions = actions[:, :, action_indices] |
| delta_time = time.time() - s1 |
| print(f'sample_actions cost {delta_time} s') |
| observation['action'] = actions.squeeze(0)[:, :14].to(dtype=torch.float32, device='cpu') |
| if use_bf16: |
| observation['state'] = observation['state'].to(dtype=torch.float32) |
| data = self.normalizer.unnormalize(observation) |
| return data |
|
|
| class LingBotVlaInferencePolicy(PolicyPreprocessMixin, LingbotVlaPolicy): |
| pass |
|
|
| class PI0InfernecePolicy(PolicyPreprocessMixin, PI0Policy): |
| pass |
|
|
|
|
| def merge_qwen_config(policy_config, qwen_config): |
| if hasattr(qwen_config, 'to_dict'): |
| config_dict = qwen_config.to_dict() |
| else: |
| config_dict = qwen_config |
|
|
| text_keys = { |
| "hidden_size", |
| "intermediate_size", |
| "num_hidden_layers", |
| "num_attention_heads", |
| "num_key_value_heads", |
| "rms_norm_eps", |
| "rope_theta", |
| "vocab_size", |
| "max_position_embeddings", |
| "hidden_act", |
| "tie_word_embeddings", |
| "tokenizer_path", |
| } |
|
|
| for key in text_keys: |
| if key in config_dict: |
| setattr(policy_config, key, config_dict[key]) |
| print(f"✅ Merged LLM: {key} = {config_dict[key]}") |
|
|
| if "vision_config" in config_dict: |
| policy_config.vision_config = qwen_config.vision_config |
| else: |
| print("⚠️ Warning: 'vision_config' not found in qwen_config!") |
|
|
| return policy_config |
|
|
|
|
| class QwenPiServer: |
| ''' |
| policy wrapper to support action ensemble or chunk execution |
| ''' |
| def __init__( |
| self, |
| path_to_pi_model="", |
| adaptive_ensemble_alpha=0.1, |
| action_ensemble_horizon=8, |
| use_length=1, |
| chunk_ret=False, |
| use_bf16=True, |
| use_fp32=False, |
| ) -> None: |
| assert not (use_bf16 and use_fp32), 'Bfloat16 or Float32!!!' |
| self.adaptive_ensemble_alpha = adaptive_ensemble_alpha |
| self.use_length = use_length |
| self.chunk_ret = chunk_ret |
|
|
| self.task_description = None |
| |
| self.vla = self.load_vla(path_to_pi_model) |
| self.vla = self.vla.cuda().eval() |
| if use_bf16: |
| self.vla = self.vla.to(torch.bfloat16) |
| elif use_fp32: |
| self.vla.model.float() |
| self.global_step = 0 |
| self.last_action_chunk = None |
| self.use_bf16 = use_bf16 |
| self.use_fp32 = use_fp32 |
|
|
| def load_vla(self, path_to_pi_model) -> LingbotVlaPolicy: |
| |
| print(f"loading model from: {path_to_pi_model}") |
| config = PreTrainedConfig.from_pretrained(path_to_pi_model) |
| |
| |
| training_config_path = Path(path_to_pi_model)/'lingbotvla_cli.yaml' |
| with open(training_config_path, 'r') as f: |
| training_config = yaml.safe_load(f) |
| f.close() |
|
|
| |
| training_model_config = training_config['model'] |
| training_model_config.update(training_config['train']) |
| for k, v in training_model_config.items(): |
| v = getattr(config, k, training_model_config[k]) |
| setattr(config, k, v) |
|
|
| |
| config.attention_implementation = 'eager' |
| |
| |
| training_base_model = os.environ.get('QWEN25_PATH', './Qwen2.5-VL-3B-Instruct/') |
| if 'paligemma' in training_base_model: |
| model_name = 'pi0' |
| config.vocab_size = 257152 |
| elif 'qwen2' in training_base_model.lower(): |
| model_name = 'lingbotvla' |
| else: |
| raise ValueError(f"Unsupported base model of {path_to_pi_model}") |
| base_model_path = BASE_MODEL_PATH[model_name] |
| config.tokenizer_path = base_model_path |
| self.model_name = model_name |
| |
| qwen_config = AutoConfig.from_pretrained(base_model_path) |
| config = merge_qwen_config(config, qwen_config) |
|
|
| if 'vocab_size' in training_config['model'] and training_config['model']['vocab_size'] != 0: |
| config.vocab_size = training_config['model']['vocab_size'] |
| |
| self.processor = build_processor(base_model_path) |
| self.language_tokenizer = self.processor.tokenizer |
| self.image_processor = self.processor.image_processor |
| data_config = SimpleNamespace(**training_config['data']) |
| |
| print('Initializing model ... ') |
|
|
| if 'paligemma' in training_base_model: |
| policy = PI0InfernecePolicy(config, tokenizer_path=base_model_path) |
| else: |
| policy = LingBotVlaInferencePolicy(config, tokenizer_path=base_model_path, eval=True) |
|
|
| load_model_weights(policy, path_to_pi_model, strict=True) |
| |
| policy.feature_transform = None |
| self.data_config = data_config |
| self.config = config |
| self.joint_max_dim = training_config['train']['max_action_dim'] |
| self.action_dim = training_config['train']['action_dim'] |
| self.chunk_size = training_config['train']['chunk_size'] |
| policy.action_dim = self.action_dim |
| policy.chunk_size = self.chunk_size |
| self.norm_stats_file = 'assets/norm_stats/robotwin_all_new.json' |
| if 'align_params' in training_config['train']: |
| self.use_depth_align = True |
| else: self.use_depth_align = False |
| with open(self.norm_stats_file) as f: |
| self.norm_stats = json.load(f) |
| policy.normalizer = Normalizer( |
| norm_stats=self.norm_stats['norm_stats'], |
| from_file=True, |
| data_type='robotwin_rep', |
| norm_type={ |
| "observation.images.cam_high": "identity", |
| "observation.images.cam_left_wrist": "identity", |
| "observation.images.cam_right_wrist": "identity", |
| "observation.state": self.data_config.norm_type, |
| "action": self.data_config.norm_type, |
| }, |
| ) |
|
|
| print('Model initialized ... ') |
|
|
| return policy |
|
|
| def reset(self, robo_name, path_to_pi_model = None) -> None: |
|
|
| if path_to_pi_model is not None: |
| self.vla = self.load_vla(path_to_pi_model) |
| self.vla = self.vla.cuda().eval() |
| if self.use_bf16: |
| self.vla = self.vla.to(torch.bfloat16) |
| elif self.use_fp32: |
| self.vla.model.float() |
|
|
| self.global_step = 0 |
| self.last_action_chunk = None |
|
|
| if getattr(self.data_config, 'norm_type', None) is None: |
| self.data_config.norm_type = 'meanstd' |
| if getattr(self.config, 'vlm_causal', None) is None: |
| self.config.vlm_causal = False |
| if getattr(self.config, 'qwenvl_bos', None) is None: |
| self.config.qwenvl_bos = False |
|
|
| |
| if path_to_pi_model is not None: |
| all_safetensors = glob(os.path.join(path_to_pi_model, "*.safetensors")) |
| merged_weights = {} |
|
|
| for file_path in tqdm(all_safetensors): |
| with safe_open(file_path, framework="pt", device="cpu") as f: |
| for key in f.keys(): |
| merged_weights[key] = f.get_tensor(key) |
| |
| self.vla.load_state_dict(merged_weights, strict=True) |
|
|
| def resize_image(self, observation): |
| for image_feature in ['observation.images.cam_high', 'observation.images.cam_left_wrist', 'observation.images.cam_right_wrist']: |
| assert image_feature in observation |
| assert len(observation[image_feature].shape)==3 and observation[image_feature].shape[-1] == 3 |
| image = observation[image_feature] |
| img_pil = Image.fromarray(image) |
| image_size = getattr(self.data_config, 'img_size', 224) |
| img_pil = img_pil.resize((image_size, image_size), Image.BILINEAR) |
|
|
| |
| img_resized = np.transpose(np.array(img_pil), (2,0,1)) |
| observation[image_feature] = img_resized / 255. |
|
|
| def infer(self, observation, center_crop=True): |
| """Generates an action with the VLA policy.""" |
|
|
| |
| |
| |
| if 'reset' in observation and observation['reset']: |
| self.reset(robo_name=observation['robo_name'], path_to_pi_model=observation['path_to_pi_model'] if 'path_to_pi_model' in observation else None) |
| return dict(action = None) |
| |
| self.resize_image(observation) |
| for k, v in observation.items(): |
| if isinstance(v, np.ndarray): |
| observation[k] = torch.from_numpy(v) |
| |
| if self.use_length == -1 or self.global_step % self.use_length == 0: |
| joint_max_dim = getattr(self, 'joint_max_dim') |
| action_dim = getattr(self, 'action_dim') |
| chunk_size = getattr(self, 'chunk_size') |
| indices = list(range(6)) + list(range(7, 13)) + [6] + [13] |
| observation["observation.state"] = observation["observation.state"][indices] |
| normalized_observation = self.vla.normalizer.normalize(observation) |
| base_image = (normalized_observation["observation.images.cam_high"] * 255).to(torch.uint8) |
| left_wrist_image = (normalized_observation["observation.images.cam_left_wrist"] * 255).to( |
| torch.uint8 |
| ) |
| right_wrist_image = (normalized_observation["observation.images.cam_right_wrist"] * 255).to( |
| torch.uint8 |
| ) |
| obs_dict = { |
| "image": {"base_0_rgb": base_image, "left_wrist_0_rgb": left_wrist_image, "right_wrist_0_rgb": right_wrist_image}, |
| "state": normalized_observation["observation.state"].to(torch.float32), |
| "prompt": [observation["task"]], |
| } |
| state = prepare_state(self.config, obs_dict) |
| lang_tokens, lang_masks = prepare_language(self.config, self.language_tokenizer, obs_dict) |
| images, img_masks, _ = prepare_images(self.config, self.image_processor, obs_dict) |
| observation = { |
| 'images': images, |
| 'img_masks': img_masks, |
| 'state': state, |
| 'lang_tokens': lang_tokens, |
| 'lang_masks': lang_masks, |
| } |
|
|
| if self.use_bf16: |
| observation['state'] = observation['state'].to(torch.bfloat16) |
| |
| org_actions = ['action'] |
| assert len(org_actions)==1, "Only support single action feature" |
| if self.chunk_ret: |
| action = self.vla.select_action(observation, self.use_bf16, self.config.vlm_causal)[org_actions[0]].float().cpu().numpy() |
| action = action[:self.use_length, :self.action_dim] |
| else: |
| if self.use_length == -1 or self.global_step % self.use_length == 0: |
| action = self.vla.select_action(observation, self.use_bf16, self.config.vlm_causal)[org_actions[0]] |
| self.last_action_chunk = action.float().cpu().numpy() |
| |
| if self.use_length > 0: |
| action = self.last_action_chunk[self.global_step % self.use_length] |
| action = action[:, :self.action_dim] |
| print(f"on server step: {self.global_step}") |
| self.global_step+=1 |
| |
| return dict(action = action) |
|
|
|
|
| import argparse |
| from .websocket_policy_server import WebsocketPolicyServer |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="启动 QwenPi WebSocket 策略服务器") |
|
|
| parser.add_argument( |
| "--model_path", |
| type=str, |
| ) |
|
|
| parser.add_argument( |
| "--use_length", |
| type=int, |
| default=50, |
| help="used length of action chunk" |
| ) |
|
|
| parser.add_argument( |
| "--chunk_ret", |
| type=bool, |
| default=True, |
| help=" True: The returned action tensor includes the horizon dimension. This allows the model to output a sequence of actions for each horizon step. False: The horizon dimension is omitted. The model selects and returns the next step autonomously based on its policy." |
| ) |
|
|
| parser.add_argument( |
| "--port", |
| type=int, |
| default=8006, |
| help="port of WebSocket" |
| ) |
|
|
| args = parser.parse_args() |
|
|
| model = QwenPiServer(args.model_path, use_length=args.use_length, chunk_ret = args.chunk_ret) |
| model_server = WebsocketPolicyServer(model, port=args.port) |
| model_server.serve_forever() |
|
|
|
|
| if __name__ == "__main__": |
| main() |