Source code for uppaal2jetracer.jetracerros2.jetracerros2.speed_controller

"""Speed controller

This file contains the speed control of a JetRacer.

This file can be imported and contains the following classes:

    * SetSpeedClientAsync:  Sets the speed of a JetRacer asynchronously
    * SpeedController:      Controls the speed of a JetRacer.
"""

from __future__ import annotations
import logging
import rclpy
from rclpy.node import Node
from rclpy.task import Future

from jetracerros2_interface.srv import Speed

logger = logging.getLogger("jetracer")


[docs] class SetSpeedClientAsync(Node): """ A asynchronous client to set the speed of the JetRacer. It is recommended to always wait for the future to complete! :ivar _set_speed_client: The client to set the speed of the JetRacer. :vartype _set_speed_client: Client """ __slots__ = ["_set_speed_client"] def __init__(self): super().__init__("set_speed_client_async", namespace = "/jetracerros2") self._set_speed_client = self.create_client(Speed, "set_speed")
[docs] def send_speed_request(self, speed: float) -> Future: """ Send a request to set the speed of the JetRacer. :param speed: The speed to set the JetRacer to in m/s. Range is [-1.2, 1.2]. :type speed: float :return: The future to wait for the result. :rtype: Future """ speed_request = Speed.Request() speed_request.speed = speed logger.debug("Send set_speed request %s async", speed_request) return self._set_speed_client.call_async(speed_request)
[docs] class SpeedController: """ A class to control the speed of the JetRacer using rclpy. """ __slots__ = ["_set_speed_client"] def __init__(self): if not rclpy.ok(): rclpy.init() self._set_speed_client = SetSpeedClientAsync()
[docs] def shutdown(self): """ Shutdown the SpeedController and destroy the node. """ logger.info("Shutting down SpeedController!") self._set_speed_client.destroy_node() if rclpy.ok(): rclpy.shutdown()
[docs] def set_speed(self, speed: float): """ Set the speed of the JetRacer using :class:`SetSpeedClient`. This methods waits for the future to complete. :param speed: The speed to set the JetRacer to in m/s. Range is [-1.2, 1.2]. :type speed: float """ service_future = self._set_speed_client.send_speed_request(speed) logger.debug("Spinning SetSpeedClientAsync until future complete for %f speed request.", speed) rclpy.spin_until_future_complete(self._set_speed_client, service_future) response = service_future.result() logger.debug("Future complete with response %s!", response)