import torch
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.policies.utils import build_inference_frame, make_robot_action
from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
# Load policy
device = torch.device("cuda")
policy = ACTPolicy.from_pretrained("your_username/my_robot_policy")
policy.to(device)
policy.eval()
# Load dataset metadata for normalization stats
dataset_metadata = LeRobotDatasetMetadata("your_username/training_dataset")
preprocessor, postprocessor = make_pre_post_processors(
policy.config,
dataset_stats=dataset_metadata.stats
)
# Configure robot
camera_config = {
"side": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30),
"wrist": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30),
}
robot_cfg = SO100FollowerConfig(
port="/dev/ttyUSB0",
id="follower_so100",
cameras=camera_config
)
robot = SO100Follower(robot_cfg)
robot.connect()
# Run evaluation episodes
num_episodes = 5
max_steps = 100
for episode in range(num_episodes):
print(f"\nEpisode {episode + 1}/{num_episodes}")
# Reset robot to initial state
input("Position robot at starting configuration and press Enter...")
for step in range(max_steps):
# Get observation from robot
obs = robot.get_observation()
# Build policy input
obs_frame = build_inference_frame(
observation=obs,
ds_features=dataset_metadata.features,
device=device
)
obs_frame = preprocessor(obs_frame)
# Get action from policy
action = policy.select_action(obs_frame)
action = postprocessor(action)
# Convert to robot action format
robot_action = make_robot_action(action, dataset_metadata.features)
# Execute action
robot.send_action(robot_action)
success = input("Was the episode successful? (y/n): ")
if success.lower() == 'y':
print("Episode marked as success!")
robot.disconnect()