"""Speed controller
This file contains the speed control of a JetRacer.
This file can be imported and contains the following classes:
* SetSpeedClientAsync: Sets the speed of a JetRacer asynchronously
* SpeedController: Controls the speed of a JetRacer.
"""
from __future__ import annotations
import logging
import rclpy
from rclpy.node import Node
from rclpy.task import Future
from jetracerros2_interface.srv import Speed
logger = logging.getLogger("jetracer")
[docs]
class SetSpeedClientAsync(Node):
"""
A asynchronous client to set the speed of the JetRacer.
It is recommended to always wait for the future to complete!
:ivar _set_speed_client: The client to set the speed of the JetRacer.
:vartype _set_speed_client: Client
"""
__slots__ = ["_set_speed_client"]
def __init__(self):
super().__init__("set_speed_client_async", namespace = "/jetracerros2")
self._set_speed_client = self.create_client(Speed, "set_speed")
[docs]
def send_speed_request(self, speed: float) -> Future:
"""
Send a request to set the speed of the JetRacer.
:param speed: The speed to set the JetRacer to in m/s. Range is [-1.2, 1.2].
:type speed: float
:return: The future to wait for the result.
:rtype: Future
"""
speed_request = Speed.Request()
speed_request.speed = speed
logger.debug("Send set_speed request %s async", speed_request)
return self._set_speed_client.call_async(speed_request)
[docs]
class SpeedController:
"""
A class to control the speed of the JetRacer using rclpy.
"""
__slots__ = ["_set_speed_client"]
def __init__(self):
if not rclpy.ok():
rclpy.init()
self._set_speed_client = SetSpeedClientAsync()
[docs]
def shutdown(self):
"""
Shutdown the SpeedController and destroy the node.
"""
logger.info("Shutting down SpeedController!")
self._set_speed_client.destroy_node()
if rclpy.ok():
rclpy.shutdown()
[docs]
def set_speed(self, speed: float):
"""
Set the speed of the JetRacer using :class:`SetSpeedClient`.
This methods waits for the future to complete.
:param speed: The speed to set the JetRacer to in m/s. Range is [-1.2, 1.2].
:type speed: float
"""
service_future = self._set_speed_client.send_speed_request(speed)
logger.debug("Spinning SetSpeedClientAsync until future complete for %f speed request.",
speed)
rclpy.spin_until_future_complete(self._set_speed_client, service_future)
response = service_future.result()
logger.debug("Future complete with response %s!", response)