"""Turn controller
This file contains the turn control of a JetRacer.
This file can be imported and contains the following classes:
* TurnActionClientAsync: Turns a JetRacer asynchronously.
* TurnController: Controls the turning of a JetRacer.
"""
from __future__ import annotations
import logging
from jetracerros2_interface.action import Turn
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from action_msgs.msg import GoalStatus
logger = logging.getLogger("jetracer")
[docs]
class TurnActionClientAsync(Node):
"""
A wrapper class used to asynchronously send a goal to the turn action server and wait for the result.
:ivar _turn_action_client_async: The asynchronous action client for the turn action server.
:vartype _turn_action_client_async: TurnActionClientAsync
:ivar _result: The result of the turn action.
:vartype _result: Turn.Result
:ivar _status: The status of the turn action. Used to wait for the result.
:vartype _status: GoalStatus
"""
__slots__ = ["_turn_action_client_async", "_result", "_status"]
def __init__(self):
super().__init__("turn_action_client", namespace = "/jetracerros2")
self._turn_action_client = ActionClient(self, Turn, "turn")
self._result = Turn.Result()
self._status = GoalStatus.STATUS_UNKNOWN
@property
def result(self) -> Turn.Result:
"""
Get the result of the turn action.
:return: The result of the turn action.
:rtype: Turn.Result
"""
return self._result
@property
def status(self) -> int:
"""
Get the status of the turn action.
:return: The status of the turn action.
:rtype: GoalStatus
"""
return self._status
[docs]
def cleanup(self):
"""
Cleanup the turn action client.
This method should only be called after the turn action has finished!
"""
if self._status == GoalStatus.STATUS_SUCCEEDED:
self._status = GoalStatus.STATUS_UNKNOWN
logger.debug("Cleanup complete.")
[docs]
def send_turn_goal_async(self, theta: float):
"""
Asynchronously sends a goal to the turn action server.
:param theta: The angle to turn the JetRacer in radians.
:type theta: float
:return: The future of the goal handle.
:rtype: rclpy.task.Future
"""
goal = Turn.Goal()
goal.theta = theta
logger.debug("Waiting for Turn action server!")
self._turn_action_client.wait_for_server()
logger.debug("Sending %f turn request async!", theta)
send_goal_future = self._turn_action_client.send_goal_async(goal)
send_goal_future.add_done_callback(self._goal_response_callback)
def _goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
logger.warning("Turn request was denied!")
return
self._status = GoalStatus.STATUS_ACCEPTED
logger.debug("Turn request accepted!")
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self._get_result_callback)
def _get_result_callback(self, future):
self._result = future.result().result
self._status = GoalStatus.STATUS_SUCCEEDED
logger.debug("Turn request marked as succeeded!")
[docs]
class TurnController:
"""
This class is used to send a goal to the turn action server and wait for the result.
"""
__slots__ = ["_turn_action_client"]
def __init__(self):
if not rclpy.ok():
rclpy.init()
self._turn_action_client = TurnActionClientAsync()
[docs]
def shutdown(self):
"""
Shutdown the TurnController and destroy the node.
"""
logger.info("Shutting down TurnController!")
self._turn_action_client.destroy_node()
if rclpy.ok():
rclpy.shutdown()
[docs]
def turn(self, theta: float):
""""
Send a goal to the turn action server and wait for the result.
:param theta: The angle to turn the JetRacer in radians. Positive is left, negative is right.
:type theta: float
"""
self._turn_action_client.send_turn_goal_async(theta)
while self._turn_action_client.status != GoalStatus.STATUS_SUCCEEDED:
logger.debug("Waiting for Turn action request %f to succeed.", theta)
rclpy.spin_once(self._turn_action_client)
self._turn_action_client.cleanup()