Source code for uppaal2jetracer.jetracerros2.jetracerros2.turn_controller

"""Turn controller

This file contains the turn control of a JetRacer.

This file can be imported and contains the following classes:

    * TurnActionClientAsync:    Turns a JetRacer asynchronously.
    * TurnController:           Controls the turning of a JetRacer.

"""

from __future__ import annotations
import logging
from jetracerros2_interface.action import Turn


import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient

from action_msgs.msg import GoalStatus

logger = logging.getLogger("jetracer")


[docs] class TurnActionClientAsync(Node): """ A wrapper class used to asynchronously send a goal to the turn action server and wait for the result. :ivar _turn_action_client_async: The asynchronous action client for the turn action server. :vartype _turn_action_client_async: TurnActionClientAsync :ivar _result: The result of the turn action. :vartype _result: Turn.Result :ivar _status: The status of the turn action. Used to wait for the result. :vartype _status: GoalStatus """ __slots__ = ["_turn_action_client_async", "_result", "_status"] def __init__(self): super().__init__("turn_action_client", namespace = "/jetracerros2") self._turn_action_client = ActionClient(self, Turn, "turn") self._result = Turn.Result() self._status = GoalStatus.STATUS_UNKNOWN @property def result(self) -> Turn.Result: """ Get the result of the turn action. :return: The result of the turn action. :rtype: Turn.Result """ return self._result @property def status(self) -> int: """ Get the status of the turn action. :return: The status of the turn action. :rtype: GoalStatus """ return self._status
[docs] def cleanup(self): """ Cleanup the turn action client. This method should only be called after the turn action has finished! """ if self._status == GoalStatus.STATUS_SUCCEEDED: self._status = GoalStatus.STATUS_UNKNOWN logger.debug("Cleanup complete.")
[docs] def send_turn_goal_async(self, theta: float): """ Asynchronously sends a goal to the turn action server. :param theta: The angle to turn the JetRacer in radians. :type theta: float :return: The future of the goal handle. :rtype: rclpy.task.Future """ goal = Turn.Goal() goal.theta = theta logger.debug("Waiting for Turn action server!") self._turn_action_client.wait_for_server() logger.debug("Sending %f turn request async!", theta) send_goal_future = self._turn_action_client.send_goal_async(goal) send_goal_future.add_done_callback(self._goal_response_callback)
def _goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: logger.warning("Turn request was denied!") return self._status = GoalStatus.STATUS_ACCEPTED logger.debug("Turn request accepted!") self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self._get_result_callback) def _get_result_callback(self, future): self._result = future.result().result self._status = GoalStatus.STATUS_SUCCEEDED logger.debug("Turn request marked as succeeded!")
[docs] class TurnController: """ This class is used to send a goal to the turn action server and wait for the result. """ __slots__ = ["_turn_action_client"] def __init__(self): if not rclpy.ok(): rclpy.init() self._turn_action_client = TurnActionClientAsync()
[docs] def shutdown(self): """ Shutdown the TurnController and destroy the node. """ logger.info("Shutting down TurnController!") self._turn_action_client.destroy_node() if rclpy.ok(): rclpy.shutdown()
[docs] def turn(self, theta: float): """" Send a goal to the turn action server and wait for the result. :param theta: The angle to turn the JetRacer in radians. Positive is left, negative is right. :type theta: float """ self._turn_action_client.send_turn_goal_async(theta) while self._turn_action_client.status != GoalStatus.STATUS_SUCCEEDED: logger.debug("Waiting for Turn action request %f to succeed.", theta) rclpy.spin_once(self._turn_action_client) self._turn_action_client.cleanup()