"""Collision controller
This file contains classes to handle laser scan data and check for potential collisions.
This file can be imported and contains the following classes:
* FilteredLaserScanSubscriber: Checks for the laser scans for potential collisions.
* LaserFilterParameterSetter: Sets parameters of the laser filter.
* CollisionController: Checks for potential collisions.
"""
import logging
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.parameter_client import AsyncParameterClient
from rclpy.executors import MultiThreadedExecutor
from sensor_msgs.msg import LaserScan
from uppaal2jetracer.jetracerros2.jetracerros2.laser_filter import LaserFilter
logger = logging.getLogger("jetracer")
[docs]
class FilteredLaserScanSubscriber(Node):
"""
A class to subscribe to the filtered laser scan data and check for potential collisions.
"""
_FILTERED_SCAN_TOPIC = "filtered_scan"
_DEFAULT_COLLISION_THRESHOLD = 0.8 # in meters
__slots__ = ["_filtered_scan_subscription", "_free_ahead", "_potential_collision",
"_collision_threshold"]
def __init__(self):
super().__init__("filtered_laser_scan_subscriber", namespace = "/jetracerros2")
self._filtered_scan_subscription = self.create_subscription(
LaserScan,
self._FILTERED_SCAN_TOPIC,
self._filtered_scan_subscription_callback,
10)
self._free_ahead = True
self._potential_collision = False
self._collision_threshold = self._DEFAULT_COLLISION_THRESHOLD
@property
def free_ahead(self) -> bool:
"""
Check if the path ahead is free of obstacles.
It is recommended to use rclpy.spin_once() on this node before calling this method.
"""
return self._free_ahead
@property
def potential_collision(self) -> bool:
"""
Check if there is a potential collision ahead.
It is recommended to use rclpy.spin_once() on this node before calling this method.
"""
return self._potential_collision
@property
def collision_threshold(self) -> float:
"""
Get the threshold for potential collisions.
"""
return self._collision_threshold
@collision_threshold.setter
def collision_threshold(self, threshold: float = None):
"""
Set the threshold for potential collisions.
"""
if threshold is None:
self._collision_threshold = self._DEFAULT_COLLISION_THRESHOLD
elif threshold > 0:
self._collision_threshold = threshold
def _filtered_scan_subscription_callback(self, msg: LaserScan):
data = list(msg.ranges)
for range_value in data:
if range_value == 0:
continue
if range_value < self._collision_threshold:
logger.debug("Potential collision detected with object in %f meters", range_value)
self._free_ahead = False
self._potential_collision = True
return
logger.debug("No potential collisions detected!")
self._free_ahead = True
self._potential_collision = False
[docs]
class LaserFilterParameterSetter(Node):
"""
This node is used to set the parameters of the laser filter.
"""
_PARAM_DEBUG_MSG = "Setting param %s of LaserFilter Node to %f!"
__slots__ = ["_laser_filter_parameter_client"]
def __init__(self):
super().__init__("laser_filter_parameter_setter", namespace = "/jetracerros2")
self._laser_filter_parameter_client = AsyncParameterClient(self, "laser_filter")
[docs]
def set_min_range(self, min_range: float):
"""
Set the minimum range for the values filtered by the laser filter.
:param min_range: The minimum range in meters
:type min_range: float
"""
logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.MIN_RANGE_PARAM, min_range)
return self._laser_filter_parameter_client.set_parameters(
[Parameter(LaserFilter.MIN_RANGE_PARAM, Parameter.Type.DOUBLE, min_range)])
[docs]
def set_max_range(self, max_range: float) -> rclpy.task.Future:
"""
Set the maximum range for the values filtered by the laser filter.
:param max_range: The maximum range in meters
:type max_range: float
"""
logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.MAX_RANGE_PARAM, max_range)
return self._laser_filter_parameter_client.set_parameters(
[Parameter(LaserFilter.MAX_RANGE_PARAM, Parameter.Type.DOUBLE, max_range)])
[docs]
def set_section_start(self, section_start: int) -> rclpy.task.Future:
"""
Set the start of the section to keep in the filtered scan.
In degrees.
:param section_start: The start of the section in degrees
:type section_start: int
"""
logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.SECTION_START_PARAM, section_start)
return self._laser_filter_parameter_client.set_parameters(
[Parameter(LaserFilter.SECTION_START_PARAM, Parameter.Type.INTEGER, section_start)])
[docs]
def set_section_end(self, section_end: int) -> rclpy.task.Future:
"""
Set the end of the section to keep in the filtered scan.
In degrees.
:param section_end: The end of the section in degrees
:type section_end: int
"""
logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.SECTION_END_PARAM, section_end)
return self._laser_filter_parameter_client.set_parameters(
[Parameter(LaserFilter.SECTION_END_PARAM, Parameter.Type.INTEGER, section_end)])
[docs]
class CollisionController():
"""
A wrapper class to check for potential collisions using the /filtered_scan topic.
This class provides asynchronous methods to set the parameters of the LaserFilter node.
If you need to wait for the returned Future to complete, avoid blocking the thread
where CollisionController is used.
Otherwise, laser scan data will remain invalid until the thread is unblocked.
Providing a synchronous alternative is not feasible
because waiting for the Future would prevent access to any current data from the LaserFilter,
as the execution would be stalled.
"""
_FAILED_PARAM_MSG = "Failed to set param %s of the LaserFilter Node to %f!"
__slots__ = ["_free_ahead", "_potential_collision",
"_filtered_laser_scan_subscriber", "_laser_filter_parameter_setter",
"_executor"]
def __init__(self):
self._free_ahead = True
self._potential_collision = False
if not rclpy.ok():
rclpy.init()
self._filtered_laser_scan_subscriber = FilteredLaserScanSubscriber()
self._laser_filter_parameter_setter = LaserFilterParameterSetter()
self._executor = MultiThreadedExecutor(num_threads = 2)
self._executor.add_node(self._filtered_laser_scan_subscriber)
self._executor.add_node(self._laser_filter_parameter_setter)
[docs]
def free_ahead(self, collision_threshold: float = None) -> bool:
"""
Check if the path ahead is free of obstacles.
"""
self._update_collision_status(collision_threshold)
return self._free_ahead
[docs]
def potential_collision(self, collision_threshold: float = None) -> bool:
"""
Check if there is a potential collision ahead.
"""
self._update_collision_status(collision_threshold)
return self._potential_collision
[docs]
def shutdown(self):
"""
Destroy the node and release the resources.
"""
logger.info("Shutting down CollisionController!")
self._filtered_laser_scan_subscriber.destroy_node()
if rclpy.ok():
rclpy.shutdown()
[docs]
def set_min_range_async(self, min_range: float) -> rclpy.task.Future:
"""
Set the minimum range for the values filtered by the laser filter.
This method is asynchronous. It does not wait for the future to complete!
See the class documentation for more information.
:param min_range: The minimum range in meters
:type min_range: float
:return: The future to wait for the parameter to be set.
:rtype: rclpy.task.Future
"""
return self._laser_filter_parameter_setter.set_min_range(min_range)
[docs]
def set_max_range_async(self, max_range: float) -> rclpy.task.Future:
"""
Set the maximum range for the values filtered by the laser filter.
This method is asynchronous. It does not wait for the future to complete!
See the class documentation for more information.
:param max_range: The maximum range in meters
:type max_range: float
:return: The future to wait for the parameter to be set.
:rtype: rclpy.task.Future
"""
return self._laser_filter_parameter_setter.set_max_range(max_range)
[docs]
def set_section_start_async(self, section_start: int) -> rclpy.task.Future:
"""
Set the start of the section to keep in the filtered scan.
In degrees.
This method is asynchronous. It does not wait for the future to complete!
See the class documentation for more information.
:param section_start: The start of the section in degrees
:type section_start: int
:return: The future to wait for the parameter to be set.
:rtype: rclpy.task.Future
"""
return self._laser_filter_parameter_setter.set_section_start(section_start)
[docs]
def set_section_end_async(self, section_end: int) -> rclpy.task.Future:
"""
Set the end of the section to keep in the filtered scan.
In degrees.
This method is asynchronous. It does not wait for the future to complete!
See the class documentation for more information.
:param section_end: The end of the section in degrees
:type section_end: int
:return: The future to wait for the parameter to be set.
:rtype: rclpy.task.Future
"""
return self._laser_filter_parameter_setter.set_section_end(section_end)
def _update_collision_status(self, collision_threshold: float = None):
self._filtered_laser_scan_subscriber.collision_threshold = collision_threshold
logger.debug("Spinning FilteredLaserScanSubscriber once to update collision attributes.")
self._executor.spin_once()
self._free_ahead = self._filtered_laser_scan_subscriber.free_ahead
self._potential_collision = self._filtered_laser_scan_subscriber.potential_collision