Source code for uppaal2jetracer.jetracerros2.jetracerros2.turn_controller

"""Turn controller

This file contains the turn control of a JetRacer.

This file can be imported and contains the following classes:

    * TurnActionClientAsync:    Turns a JetRacer asynchronously.
    * TurnController:           Controls the turning of a JetRacer.

"""

from __future__ import annotations
import logging
from jetracerros2_interface.action import Turn


import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.executors import MultiThreadedExecutor, ExternalShutdownException

from action_msgs.msg import GoalStatus

logger = logging.getLogger("jetracer")


[docs] class TurnActionClientAsync(Node): """ A wrapper class used to asynchronously send a goal to the turn action server and wait for the result. :ivar _turn_action_client_async: The asynchronous action client for the turn action server. :vartype _turn_action_client_async: TurnActionClientAsync :ivar _result: The result of the turn action. :vartype _result: Turn.Result :ivar _status: The status of the turn action. Used to wait for the result. :vartype _status: GoalStatus """ __slots__ = ["_turn_action_client_async", "_result", "_status", "_goal_handle", "_get_result_future", "_send_goal_future"] def __init__(self): super().__init__("turn_action_client", namespace = "/jetracerros2") self._turn_action_client = ActionClient(self, Turn, "turn") self._result = Turn.Result() self._status = GoalStatus.STATUS_UNKNOWN self._goal_handle = None self._get_result_future = None self._send_goal_future = None @property def result(self) -> Turn.Result: """ Get the result of the turn action. :return: The result of the turn action. :rtype: Turn.Result """ return self._result @property def status(self) -> int: """ Get the status of the turn action. :return: The status of the turn action. :rtype: GoalStatus """ return self._status
[docs] def cleanup(self): """ Cleanup the turn action client. This method should only be called after the turn action has finished or is being cancelled! """ self._status = GoalStatus.STATUS_UNKNOWN self._goal_handle = None logger.debug("Clean up of TurnActionClientAsync completed!")
[docs] def send_turn_goal_async(self, theta: float): """ Asynchronously sends a goal to the turn action server. :param theta: The angle to turn the JetRacer in radians. :type theta: float :return: The future of the goal handle. :rtype: rclpy.task.Future """ goal = Turn.Goal() goal.theta = theta logger.debug("Waiting for Turn action server!") self._turn_action_client.wait_for_server() logger.debug("Sending %f turn request async!", theta) self._send_goal_future = self._turn_action_client.send_goal_async(goal) self._status = GoalStatus.STATUS_UNKNOWN self._send_goal_future.add_done_callback(self._goal_response_callback)
[docs] def cancel_turn_goal_async(self): """ Cancel the turn goal. """ if not self._goal_handle: logger.warning("No goal handle to cancel!") return cancel_future = self._goal_handle.cancel_goal_async() logger.debug("Canceling turn request!") cancel_future.add_done_callback(self._cancel_response_callback) self._cancel_futures() self._goal_handle = None
[docs] def is_turn_goal_active(self) -> bool: """ Check if the turn goal is active. :return: True if the turn goal is active, False otherwise. """ logger.debug("Checking if turn goal is active! Status: %s", self._status) return (self._status != GoalStatus.STATUS_SUCCEEDED and self._status != GoalStatus.STATUS_ABORTED)
def _goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: logger.warning("Turn request was denied!") return self._status = GoalStatus.STATUS_ACCEPTED logger.debug("Turn request accepted!") self._goal_handle = goal_handle self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self._get_result_callback) def _get_result_callback(self, future): if future.cancelled(): logger.debug("Turn request was canceled!") return self._result = future.result().result if self._result.turned != 0.0: self._status = GoalStatus.STATUS_SUCCEEDED logger.debug("Turn request marked as succeeded with result %f!", self._result.turned) def _cancel_response_callback(self, future): if future.result(): logger.debug("Turn request canceled!") self._status = GoalStatus.STATUS_ABORTED self._reset_action_client() def _cancel_futures(self): if self._get_result_future: self._get_result_future.cancel() self._get_result_future = None if self._send_goal_future: self._send_goal_future.cancel() self._send_goal_future = None def _reset_action_client(self): self._turn_action_client.destroy() self._turn_action_client = ActionClient(self, Turn, "turn") logger.info("Reset TurnActionClientAsync!")
[docs] class TurnController: """ This class is used to send a goal to the turn action server and wait for the result. """ __slots__ = ["_turn_action_client", "_executor"] def __init__(self): if not rclpy.ok(): rclpy.init() self._turn_action_client = TurnActionClientAsync() self._executor = MultiThreadedExecutor()
[docs] def stop(self): """ Stop the JetRacer. """ self._turn_action_client.cancel_turn_goal_async() self._turn_action_client.cleanup() self._executor.shutdown() self._executor = MultiThreadedExecutor()
[docs] def shutdown(self): """ Shutdown the TurnController and destroy the node. """ logger.info("Shutting down TurnController!") self._turn_action_client.destroy_node() self._executor.shutdown()
[docs] def turn(self, theta: float): """" Send a goal to the turn action server and wait for the result. :param theta: The angle to turn the JetRacer in radians. Positive is left, negative is right. :type theta: float """ if theta == 0.0: self._turn_action_client.cleanup() return self._turn_action_client.send_turn_goal_async(theta) try: while self._turn_action_client.is_turn_goal_active(): logger.debug("Waiting for Turn action request %f to succeed.", theta) rclpy.spin_once(self._turn_action_client, executor = self._executor) except (KeyboardInterrupt, ExternalShutdownException) as exc: logger.info("Detected KeyboardInterrupt or ExternalShutdownException!") raise KeyboardInterrupt from exc self._turn_action_client.cleanup()