Source code for uppaal2jetracer.jetracerros2.jetracerros2.servo_motor_controller

""" Servo motor controller

This file contains the motor control and its ROS 2 Launch entrypoint.
Its main purpose is to write speed data to the JetRacer motors.

This file can be imported and contains the following classes:

    * ServoMotorController: Controls the motors of a JetRacer.
"""

from __future__ import annotations
from ctypes import c_uint8
from typing import List
import struct
import serial
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.executors import MultiThreadedExecutor

from geometry_msgs.msg import Twist


[docs] class ServoMotorController(Node): """ This class is used to control the servo motors of the JetRacer. :ivar _linear_velocity: The linear velocity of the JetRacer in m/s. Range is from -1.2 to 1.2. :vartype _linear_velocity: float :ivar _steering_angle: The steering angle of the JetRacer in radians. Range is from -0.6 to 0.6. :vartype _steering_angle: float """ _CMD_VEL_TOPIC = "cmd_vel" _SET_VELOCITY_INTERVAL = 0.02 # in seconds _VELOCITY_RESET_THRESHOLD = 1 # in seconds _SECONDS_TO_NANOSECONDS = 1e9 _MAX_LINEAR_VELOCITY = 1.2 # in m/s _MIN_LINEAR_VELOCITY = -1.2 # in m/s _MAX_STEERING_ANGLE = 0.6 # in radians _MIN_STEERING_ANGLE = -0.6 # in radians _HEAD1 = 0xAA _HEAD2 = 0x55 _SENDTYPE_VELOCITY = 0x11 _SENDTYPE_PARAMS = 0x12 _SENDTYPE_COEFFICIENT = 0x13 _BAUDRATE = 115200 __slots__ = ("_serial_port", "_linear_velocity", "_steering_angle", "_cmd_vel_subscription", "_last_subscription_time", "_publish_timer") def __init__(self): super().__init__("servo_motor_controller", namespace = "/jetracerros2") self._serial_port: serial.Serial = serial.Serial( '/dev/ttyACM0', self._BAUDRATE, timeout = 1, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, bytesize = serial.EIGHTBITS ) self._cmd_vel_subscription = self.create_subscription( Twist, self._CMD_VEL_TOPIC, self._cmd_vel_subscription_callback, 10) self._linear_velocity: float = 0.0 self._steering_angle: float = 0.0 self._last_subscription_time = self.get_clock().now() self._publish_timer = self.create_timer( self._SET_VELOCITY_INTERVAL, self._publish_timer_callback) @property def last_subscription_time(self) -> Time: """ Get the last time the cmd_vel topic received new data. :return: The last time the cmd_vel topic received new data. :rtype: Time """ return self._last_subscription_time
[docs] def set_velocity(self) -> None: """ Set the velocity of the JetRacer. """ self._serial_port.write(bytes(self._generate_velocity_data()))
[docs] def zero_velocity(self) -> None: """ Set the velocity of the JetRacer to zero. """ self._linear_velocity = 0.0 self._steering_angle = 0.0 self.set_velocity()
[docs] def set_params(self, proportional_gain: int = 350, integral_gain: int = 120, derivative_gain: int = 0, linear_correction: float = 1.0, servo_bias: int = 50 ) -> None: """ Set the parameters of the JetRacer. :param proportional_gain: proportional gain of the PID, defaults to 350, range is from 0 to 2000. :type proportional_gain: int, optional :param integral_gain: integral gain of the PID, defaults to 120, range is from 0 to 2000. :type integral_gain: int, optional :param derivative_gain: derivative gain of the PID, defaults to 0, range is from 0 to 2000. :type derivative_gain: int, optional :param linear_correction: applied linear correction, defaults to 1.0 :type linear_correction: float, optional :param servo_bias: servo bias, defaults to 50, range is from -500 to 500. :type servo_bias: int, optional """ self._serial_port.write( bytes(self._generate_params_data( proportional_gain, integral_gain, derivative_gain, linear_correction, servo_bias )) )
[docs] def set_coefficient(self, a: float = -0.016073, b: float = 0.176183, c: float = -23.428084, d: float = 1500 ): """ Set the coefficients of the JetRacer. :param a: coefficient a, defaults to -0.016073 :type a: float :param b: coefficient b, defaults to 0.176183 :type b: float :param c: coefficient c, defaults to -23.428084 :type c: float :param d: coefficient d, defaults to 1500 :type d: float """ self._serial_port.write( bytes(self._generate_coefficient_data(a, b, c, d)) )
def _generate_velocity_data(self) -> List[c_uint8]: data: List[c_uint8] = [ self._HEAD1, self._HEAD2, 0x0B, self._SENDTYPE_VELOCITY, int(self._linear_velocity * 1000) >> 8 & 0xFF, int(self._linear_velocity * 1000) & 0xFF, int(self._linear_velocity * 1000) >> 8 & 0xFF, int(self._linear_velocity * 1000) & 0xFF, int(self._steering_angle * 1000) >> 8 & 0xFF, int(self._steering_angle * 1000) & 0xFF, ] data.append(self._checksum(data)) return data def _generate_params_data(self, proportional_gain: int, integral_gain: int, derivative_gain: int, linear_coefficient: float, servo_bias: int ) -> List[c_uint8]: data: List[c_uint8] = [ self._HEAD1, self._HEAD2, 0x0F, self._SENDTYPE_PARAMS, (proportional_gain >> 8) & 0xFF, proportional_gain & 0xFF, (integral_gain >> 8) & 0xFF, integral_gain & 0xFF, (derivative_gain >> 8) & 0xFF, derivative_gain & 0xFF, int(linear_coefficient * 1000) >> 8 & 0xFF, int(linear_coefficient * 1000) & 0xFF, int(servo_bias) >> 8 & 0xFF, int(servo_bias) & 0xFF, ] data.append(self._checksum(data)) return data def _generate_coefficient_data(self, a: float, b: float, c: float, d: float) -> List[c_uint8]: data = struct.pack('<BBBBffff', self._HEAD1, self._HEAD2, 0x15, self._SENDTYPE_COEFFICIENT, a, b, c, d) data += struct.pack('<B', self._checksum(data)) return data def _checksum(self, data: List[c_uint8]) -> c_uint8: return sum(data) & 0xFF def _cmd_vel_subscription_callback(self, msg: Twist): # Clamp the linear velocity and steering angle to the min and max values self._linear_velocity = max(self._MIN_LINEAR_VELOCITY, min( msg.linear.x, self._MAX_LINEAR_VELOCITY)) self._steering_angle = max(self._MIN_STEERING_ANGLE, min( msg.angular.z, self._MAX_STEERING_ANGLE)) self._last_subscription_time = self.get_clock().now() def _publish_timer_callback(self): current_time = self.get_clock().now() if ((current_time - self.last_subscription_time).nanoseconds / self._SECONDS_TO_NANOSECONDS) > self._VELOCITY_RESET_THRESHOLD: self.zero_velocity() else: self.set_velocity()
[docs] def main(args = None): """ Entry point for ROS 2 launch script. """ rclpy.init(args = args) servo_motor_controller = ServoMotorController() servo_motor_controller.set_params() servo_motor_controller.set_coefficient() executor = MultiThreadedExecutor() executor.add_node(servo_motor_controller) rclpy.spin(servo_motor_controller, executor = executor) servo_motor_controller.zero_velocity() servo_motor_controller.destroy_node() rclpy.shutdown()