""" Servo motor controller
This file contains the motor control and its ROS 2 Launch entrypoint.
Its main purpose is to write speed data to the JetRacer motors.
This file can be imported and contains the following classes:
* ServoMotorController: Controls the motors of a JetRacer.
"""
from __future__ import annotations
from ctypes import c_uint8
from typing import List
import struct
import serial
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.executors import MultiThreadedExecutor
from geometry_msgs.msg import Twist
[docs]
class ServoMotorController(Node):
"""
This class is used to control the servo motors of the JetRacer.
:ivar _linear_velocity: The linear velocity of the JetRacer in m/s. Range is from -1.2 to 1.2.
:vartype _linear_velocity: float
:ivar _steering_angle: The steering angle of the JetRacer in radians. Range is from -0.6 to 0.6.
:vartype _steering_angle: float
"""
_CMD_VEL_TOPIC = "cmd_vel"
_SET_VELOCITY_INTERVAL = 0.02 # in seconds
_VELOCITY_RESET_THRESHOLD = 1 # in seconds
_SECONDS_TO_NANOSECONDS = 1e9
_MAX_LINEAR_VELOCITY = 1.2 # in m/s
_MIN_LINEAR_VELOCITY = -1.2 # in m/s
_MAX_STEERING_ANGLE = 0.6 # in radians
_MIN_STEERING_ANGLE = -0.6 # in radians
_HEAD1 = 0xAA
_HEAD2 = 0x55
_SENDTYPE_VELOCITY = 0x11
_SENDTYPE_PARAMS = 0x12
_SENDTYPE_COEFFICIENT = 0x13
_BAUDRATE = 115200
__slots__ = ("_serial_port", "_linear_velocity", "_steering_angle",
"_cmd_vel_subscription", "_last_subscription_time", "_publish_timer")
def __init__(self):
super().__init__("servo_motor_controller", namespace = "/jetracerros2")
self._serial_port: serial.Serial = serial.Serial(
'/dev/ttyACM0', self._BAUDRATE, timeout = 1, parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS
)
self._cmd_vel_subscription = self.create_subscription(
Twist, self._CMD_VEL_TOPIC, self._cmd_vel_subscription_callback, 10)
self._linear_velocity: float = 0.0
self._steering_angle: float = 0.0
self._last_subscription_time = self.get_clock().now()
self._publish_timer = self.create_timer(
self._SET_VELOCITY_INTERVAL, self._publish_timer_callback)
@property
def last_subscription_time(self) -> Time:
"""
Get the last time the cmd_vel topic received new data.
:return: The last time the cmd_vel topic received new data.
:rtype: Time
"""
return self._last_subscription_time
[docs]
def set_velocity(self) -> None:
"""
Set the velocity of the JetRacer.
"""
self._serial_port.write(bytes(self._generate_velocity_data()))
[docs]
def zero_velocity(self) -> None:
"""
Set the velocity of the JetRacer to zero.
"""
self._linear_velocity = 0.0
self._steering_angle = 0.0
self.set_velocity()
[docs]
def set_params(self,
proportional_gain: int = 350,
integral_gain: int = 120,
derivative_gain: int = 0,
linear_correction: float = 1.0,
servo_bias: int = 50
) -> None:
"""
Set the parameters of the JetRacer.
:param proportional_gain: proportional gain of the PID, defaults to 350, range is from 0 to 2000.
:type proportional_gain: int, optional
:param integral_gain: integral gain of the PID, defaults to 120, range is from 0 to 2000.
:type integral_gain: int, optional
:param derivative_gain: derivative gain of the PID, defaults to 0, range is from 0 to 2000.
:type derivative_gain: int, optional
:param linear_correction: applied linear correction, defaults to 1.0
:type linear_correction: float, optional
:param servo_bias: servo bias, defaults to 50, range is from -500 to 500.
:type servo_bias: int, optional
"""
self._serial_port.write(
bytes(self._generate_params_data(
proportional_gain,
integral_gain,
derivative_gain,
linear_correction,
servo_bias
))
)
[docs]
def set_coefficient(self,
a: float = -0.016073,
b: float = 0.176183,
c: float = -23.428084,
d: float = 1500
):
"""
Set the coefficients of the JetRacer.
:param a: coefficient a, defaults to -0.016073
:type a: float
:param b: coefficient b, defaults to 0.176183
:type b: float
:param c: coefficient c, defaults to -23.428084
:type c: float
:param d: coefficient d, defaults to 1500
:type d: float
"""
self._serial_port.write(
bytes(self._generate_coefficient_data(a, b, c, d))
)
def _generate_velocity_data(self) -> List[c_uint8]:
data: List[c_uint8] = [
self._HEAD1,
self._HEAD2,
0x0B,
self._SENDTYPE_VELOCITY,
int(self._linear_velocity * 1000) >> 8 & 0xFF,
int(self._linear_velocity * 1000) & 0xFF,
int(self._linear_velocity * 1000) >> 8 & 0xFF,
int(self._linear_velocity * 1000) & 0xFF,
int(self._steering_angle * 1000) >> 8 & 0xFF,
int(self._steering_angle * 1000) & 0xFF,
]
data.append(self._checksum(data))
return data
def _generate_params_data(self,
proportional_gain: int,
integral_gain: int,
derivative_gain: int,
linear_coefficient: float,
servo_bias: int
) -> List[c_uint8]:
data: List[c_uint8] = [
self._HEAD1,
self._HEAD2,
0x0F,
self._SENDTYPE_PARAMS,
(proportional_gain >> 8) & 0xFF,
proportional_gain & 0xFF,
(integral_gain >> 8) & 0xFF,
integral_gain & 0xFF,
(derivative_gain >> 8) & 0xFF,
derivative_gain & 0xFF,
int(linear_coefficient * 1000) >> 8 & 0xFF,
int(linear_coefficient * 1000) & 0xFF,
int(servo_bias) >> 8 & 0xFF,
int(servo_bias) & 0xFF,
]
data.append(self._checksum(data))
return data
def _generate_coefficient_data(self, a: float, b: float, c: float, d: float) -> List[c_uint8]:
data = struct.pack('<BBBBffff', self._HEAD1, self._HEAD2, 0x15,
self._SENDTYPE_COEFFICIENT, a, b, c, d)
data += struct.pack('<B', self._checksum(data))
return data
def _checksum(self, data: List[c_uint8]) -> c_uint8:
return sum(data) & 0xFF
def _cmd_vel_subscription_callback(self, msg: Twist):
# Clamp the linear velocity and steering angle to the min and max values
self._linear_velocity = max(self._MIN_LINEAR_VELOCITY, min(
msg.linear.x, self._MAX_LINEAR_VELOCITY))
self._steering_angle = max(self._MIN_STEERING_ANGLE, min(
msg.angular.z, self._MAX_STEERING_ANGLE))
self._last_subscription_time = self.get_clock().now()
def _publish_timer_callback(self):
current_time = self.get_clock().now()
if ((current_time - self.last_subscription_time).nanoseconds
/ self._SECONDS_TO_NANOSECONDS) > self._VELOCITY_RESET_THRESHOLD:
self.zero_velocity()
else:
self.set_velocity()
[docs]
def main(args = None):
"""
Entry point for ROS 2 launch script.
"""
rclpy.init(args = args)
servo_motor_controller = ServoMotorController()
servo_motor_controller.set_params()
servo_motor_controller.set_coefficient()
executor = MultiThreadedExecutor()
executor.add_node(servo_motor_controller)
rclpy.spin(servo_motor_controller, executor = executor)
servo_motor_controller.zero_velocity()
servo_motor_controller.destroy_node()
rclpy.shutdown()