import numpy as np
import serial
import cv2
from lerobot.robots.robot import Robot
from lerobot.robots.config import RobotConfig
from lerobot.processor import RobotObservation, RobotAction
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from dataclasses import dataclass
@dataclass
class SerialRobotConfig(RobotConfig):
"""Configuration for serial-controlled robot."""
port: str = "/dev/ttyUSB0"
baud_rate: int = 115200
timeout: float = 1.0
num_joints: int = 6
cameras: dict[str, OpenCVCameraConfig] | None = None
class SerialRobot(Robot):
"""Robot controlled via serial communication."""
name = "serial_robot"
config_class = SerialRobotConfig
def __init__(self, config: SerialRobotConfig):
super().__init__(config)
self.port = config.port
self.baud_rate = config.baud_rate
self.timeout = config.timeout
self.num_joints = config.num_joints
self.serial_connection = None
self.cameras = {}
# Initialize cameras if configured
if config.cameras:
for name, cam_config in config.cameras.items():
self.cameras[name] = OpenCVCamera(cam_config)
@property
def observation_features(self) -> dict:
features = {
"state": (self.num_joints,),
}
# Add camera features
if self.cameras:
features["pixels"] = {}
for name, camera in self.cameras.items():
features["pixels"][name] = (
camera.config.height,
camera.config.width,
3 # RGB
)
return features
@property
def action_features(self) -> dict:
return {
"action": (self.num_joints,),
}
@property
def is_connected(self) -> bool:
return self.serial_connection is not None and self.serial_connection.is_open
def connect(self) -> None:
"""Connect to robot via serial port."""
print(f"Connecting to {self.port} at {self.baud_rate} baud...")
try:
self.serial_connection = serial.Serial(
port=self.port,
baudrate=self.baud_rate,
timeout=self.timeout
)
print("Serial connection established.")
# Connect cameras
for name, camera in self.cameras.items():
camera.connect()
print(f"Camera '{name}' connected.")
# Wait for robot to initialize
time.sleep(2)
# Send initialization command
self._send_command("INIT")
except serial.SerialException as e:
raise RuntimeError(f"Failed to connect: {e}")
def disconnect(self) -> None:
"""Disconnect from robot."""
if self.serial_connection:
self._send_command("STOP")
self.serial_connection.close()
print("Serial connection closed.")
# Disconnect cameras
for camera in self.cameras.values():
camera.disconnect()
def get_observation(self) -> RobotObservation:
"""Read current state and images."""
if not self.is_connected:
raise RuntimeError("Robot not connected")
# Read joint positions
self._send_command("GET_STATE")
response = self._read_response()
joint_positions = np.array([float(x) for x in response.split(",")])
# Capture images from all cameras
images = {}
for name, camera in self.cameras.items():
frame = camera.read()
images[name] = frame
return RobotObservation(
state=joint_positions,
pixels=images if images else None
)
def send_action(self, action: RobotAction) -> None:
"""Send action command to robot."""
if not self.is_connected:
raise RuntimeError("Robot not connected")
# Format action as comma-separated values
action_str = ",".join([f"{x:.6f}" for x in action.action])
command = f"SET_ACTION,{action_str}"
self._send_command(command)
def _send_command(self, command: str) -> None:
"""Send command over serial."""
self.serial_connection.write(f"{command}\n".encode())
self.serial_connection.flush()
def _read_response(self) -> str:
"""Read response from serial."""
response = self.serial_connection.readline().decode().strip()
return response