"""Turn controller
This file contains the turn control of a JetRacer.
This file can be imported and contains the following classes:
* TurnActionClientAsync: Turns a JetRacer asynchronously.
* TurnController: Controls the turning of a JetRacer.
"""
from __future__ import annotations
import logging
from jetracerros2_interface.action import Turn
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.executors import MultiThreadedExecutor, ExternalShutdownException
from action_msgs.msg import GoalStatus
logger = logging.getLogger("jetracer")
[docs]
class TurnActionClientAsync(Node):
"""
A wrapper class used to asynchronously send a goal to the turn action server and wait for the result.
:ivar _turn_action_client_async: The asynchronous action client for the turn action server.
:vartype _turn_action_client_async: TurnActionClientAsync
:ivar _result: The result of the turn action.
:vartype _result: Turn.Result
:ivar _status: The status of the turn action. Used to wait for the result.
:vartype _status: GoalStatus
"""
__slots__ = ["_turn_action_client_async", "_result", "_status",
"_goal_handle", "_get_result_future", "_send_goal_future"]
def __init__(self):
super().__init__("turn_action_client", namespace = "/jetracerros2")
self._turn_action_client = ActionClient(self, Turn, "turn")
self._result = Turn.Result()
self._status = GoalStatus.STATUS_UNKNOWN
self._goal_handle = None
self._get_result_future = None
self._send_goal_future = None
@property
def result(self) -> Turn.Result:
"""
Get the result of the turn action.
:return: The result of the turn action.
:rtype: Turn.Result
"""
return self._result
@property
def status(self) -> int:
"""
Get the status of the turn action.
:return: The status of the turn action.
:rtype: GoalStatus
"""
return self._status
[docs]
def cleanup(self):
"""
Cleanup the turn action client.
This method should only be called after the turn action has finished or is being cancelled!
"""
self._status = GoalStatus.STATUS_UNKNOWN
self._goal_handle = None
logger.debug("Clean up of TurnActionClientAsync completed!")
[docs]
def send_turn_goal_async(self, theta: float):
"""
Asynchronously sends a goal to the turn action server.
:param theta: The angle to turn the JetRacer in radians.
:type theta: float
:return: The future of the goal handle.
:rtype: rclpy.task.Future
"""
goal = Turn.Goal()
goal.theta = theta
logger.debug("Waiting for Turn action server!")
self._turn_action_client.wait_for_server()
logger.debug("Sending %f turn request async!", theta)
self._send_goal_future = self._turn_action_client.send_goal_async(goal)
self._status = GoalStatus.STATUS_UNKNOWN
self._send_goal_future.add_done_callback(self._goal_response_callback)
[docs]
def cancel_turn_goal_async(self):
"""
Cancel the turn goal.
"""
if not self._goal_handle:
logger.warning("No goal handle to cancel!")
return
cancel_future = self._goal_handle.cancel_goal_async()
logger.debug("Canceling turn request!")
cancel_future.add_done_callback(self._cancel_response_callback)
self._cancel_futures()
self._goal_handle = None
[docs]
def is_turn_goal_active(self) -> bool:
"""
Check if the turn goal is active.
:return: True if the turn goal is active, False otherwise.
"""
logger.debug("Checking if turn goal is active! Status: %s", self._status)
return (self._status != GoalStatus.STATUS_SUCCEEDED
and self._status != GoalStatus.STATUS_ABORTED)
def _goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
logger.warning("Turn request was denied!")
return
self._status = GoalStatus.STATUS_ACCEPTED
logger.debug("Turn request accepted!")
self._goal_handle = goal_handle
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self._get_result_callback)
def _get_result_callback(self, future):
if future.cancelled():
logger.debug("Turn request was canceled!")
return
self._result = future.result().result
if self._result.turned != 0.0:
self._status = GoalStatus.STATUS_SUCCEEDED
logger.debug("Turn request marked as succeeded with result %f!", self._result.turned)
def _cancel_response_callback(self, future):
if future.result():
logger.debug("Turn request canceled!")
self._status = GoalStatus.STATUS_ABORTED
self._reset_action_client()
def _cancel_futures(self):
if self._get_result_future:
self._get_result_future.cancel()
self._get_result_future = None
if self._send_goal_future:
self._send_goal_future.cancel()
self._send_goal_future = None
def _reset_action_client(self):
self._turn_action_client.destroy()
self._turn_action_client = ActionClient(self, Turn, "turn")
logger.info("Reset TurnActionClientAsync!")
[docs]
class TurnController:
"""
This class is used to send a goal to the turn action server and wait for the result.
"""
__slots__ = ["_turn_action_client", "_executor"]
def __init__(self):
if not rclpy.ok():
rclpy.init()
self._turn_action_client = TurnActionClientAsync()
self._executor = MultiThreadedExecutor()
[docs]
def stop(self):
"""
Stop the JetRacer.
"""
self._turn_action_client.cancel_turn_goal_async()
self._turn_action_client.cleanup()
self._executor.shutdown()
self._executor = MultiThreadedExecutor()
[docs]
def shutdown(self):
"""
Shutdown the TurnController and destroy the node.
"""
logger.info("Shutting down TurnController!")
self._turn_action_client.destroy_node()
self._executor.shutdown()
[docs]
def turn(self, theta: float):
""""
Send a goal to the turn action server and wait for the result.
:param theta: The angle to turn the JetRacer in radians. Positive is left, negative is right.
:type theta: float
"""
if theta == 0.0:
self._turn_action_client.cleanup()
return
self._turn_action_client.send_turn_goal_async(theta)
try:
while self._turn_action_client.is_turn_goal_active():
logger.debug("Waiting for Turn action request %f to succeed.", theta)
rclpy.spin_once(self._turn_action_client, executor = self._executor)
except (KeyboardInterrupt, ExternalShutdownException) as exc:
logger.info("Detected KeyboardInterrupt or ExternalShutdownException!")
raise KeyboardInterrupt from exc
self._turn_action_client.cleanup()