Source code for uppaal2jetracer.jetracerros2.jetracerros2.jetracer_controller

"""JetRacer controller

This file contains the control of a JetRacer and its ROS 2 Launch entrypoint.

This file can be imported and contains the following classes:

    * JetRacerController:   Controls a NVIDIA JetRacer ROS AI Kit.
"""

from __future__ import annotations
import math
import time

import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.service import Service
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.executors import MultiThreadedExecutor

from geometry_msgs.msg import Twist
from jetracerros2_interface.action import Turn
from jetracerros2_interface.srv import Speed


[docs] class JetRacerController(Node): """ This class is the main controller for the JetRacer. It uses the /cmd_vel topic to control the servo motors using :class:`ServoMotorController`. It provides a action server turn to turn the JetRacer by a certain angle. It provides a service set_speed to set the speed of the JetRacer. """ _CMD_VEL_TOPIC = "cmd_vel" _CMD_VEL_PUBLISH_INTERVAL = 0.5 # in seconds _WHEEL_BASE = 0.24 # in meters _DEFAULT_TURN_STEERING_ANGLE = 0.4 # in radians _DEFAULT_TURN_SPEED = 0.5 # in m/s __slots__ = ["_cmd_vel_publisher", "_publish_timer", "_speed", "_steering_angle", "_turn_action_server", "_set_speed_service"] def __init__(self): super().__init__("jetracer_controller", namespace = "/jetracerros2") self._cmd_vel_publisher: Publisher = self.create_publisher(Twist, self._CMD_VEL_TOPIC, 10) self._publish_timer = self.create_timer( self._CMD_VEL_PUBLISH_INTERVAL, self._publish_timer_callback) self._speed: float = 0.0 self._steering_angle: float = 0.0 self._speed_service: Service = self.create_service( Speed, "set_speed", self._speed_service_callback) self._turn_action_server: ActionServer = ActionServer( self, Turn, "turn", execute_callback = self._execute_turn_callback, goal_callback = self._goal_turn_callback, cancel_callback = self._cancel_turn_callback) def _speed_service_callback(self, request, response): self._speed = request.speed self._publish_to_cmd_vel() response.new_speed = self._speed return response def _execute_turn_callback(self, goal_handle): request = goal_handle.request if request.theta == 0: goal_handle.succeed() return Turn.Result() result_turned: float = self._execute_turn(request.theta, goal_handle) result = Turn.Result() result.turned = result_turned return result def _execute_turn(self, theta: float, goal_handle) -> float: old_speed = self._speed if self._speed == 0: self._speed = self._DEFAULT_TURN_SPEED radius = self._WHEEL_BASE / math.tan(self._DEFAULT_TURN_STEERING_ANGLE) arc_length = abs(theta) * radius turn_time = arc_length / self._speed if theta < 0: self._steering_angle = -self._DEFAULT_TURN_STEERING_ANGLE else: self._steering_angle = self._DEFAULT_TURN_STEERING_ANGLE self._publish_to_cmd_vel() start_time = time.time() while time.time() - start_time < turn_time: if goal_handle.is_cancel_requested: self._speed = 0.0 self._steering_angle = 0.0 self._publish_to_cmd_vel() goal_handle.canceled() return 0.0 self._steering_angle = 0.0 if self._speed != 0.0: self._speed = old_speed self._publish_to_cmd_vel() goal_handle.succeed() return theta def _cancel_turn_callback(self, goal_handle): return CancelResponse.ACCEPT def _goal_turn_callback(self, goal_handle): return GoalResponse.ACCEPT def _publish_timer_callback(self): self._publish_to_cmd_vel() def _publish_to_cmd_vel(self): msg = Twist() msg.linear.x = self._speed msg.angular.z = self._steering_angle self._cmd_vel_publisher.publish(msg)
[docs] def main(args = None): """ Entry point for ROS 2 launch script. """ rclpy.init(args = args) jetracer_controller = JetRacerController() executor = MultiThreadedExecutor(num_threads = 3) executor.add_node(jetracer_controller) rclpy.spin(jetracer_controller, executor = executor) jetracer_controller.destroy_node() rclpy.shutdown()