Source code for uppaal2jetracer.jetracerros2.jetracerros2.collision_controller

"""Collision controller

This file contains classes to handle laser scan data and check for potential collisions.

This file can be imported and contains the following classes:

    * FilteredLaserScanSubscriber:  Checks for the laser scans for potential collisions.
    * LaserFilterParameterSetter:   Sets parameters of the laser filter.
    * CollisionController:          Checks for potential collisions.
"""

import logging
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.parameter_client import AsyncParameterClient
from rclpy.executors import MultiThreadedExecutor

from sensor_msgs.msg import LaserScan

from uppaal2jetracer.jetracerros2.jetracerros2.laser_filter import LaserFilter

logger = logging.getLogger("jetracer")


[docs] class FilteredLaserScanSubscriber(Node): """ A class to subscribe to the filtered laser scan data and check for potential collisions. """ _FILTERED_SCAN_TOPIC = "filtered_scan" _DEFAULT_COLLISION_THRESHOLD = 0.8 # in meters __slots__ = ["_filtered_scan_subscription", "_free_ahead", "_potential_collision", "_collision_threshold"] def __init__(self): super().__init__("filtered_laser_scan_subscriber", namespace = "/jetracerros2") self._filtered_scan_subscription = self.create_subscription( LaserScan, self._FILTERED_SCAN_TOPIC, self._filtered_scan_subscription_callback, 10) self._free_ahead = True self._potential_collision = False self._collision_threshold = self._DEFAULT_COLLISION_THRESHOLD @property def free_ahead(self) -> bool: """ Check if the path ahead is free of obstacles. It is recommended to use rclpy.spin_once() on this node before calling this method. """ return self._free_ahead @property def potential_collision(self) -> bool: """ Check if there is a potential collision ahead. It is recommended to use rclpy.spin_once() on this node before calling this method. """ return self._potential_collision @property def collision_threshold(self) -> float: """ Get the threshold for potential collisions. """ return self._collision_threshold @collision_threshold.setter def collision_threshold(self, threshold: float = None): """ Set the threshold for potential collisions. """ if threshold is None: self._collision_threshold = self._DEFAULT_COLLISION_THRESHOLD elif threshold > 0: self._collision_threshold = threshold def _filtered_scan_subscription_callback(self, msg: LaserScan): data = list(msg.ranges) for range_value in data: if range_value == 0: continue if range_value < self._collision_threshold: logger.debug("Potential collision detected with object in %f meters", range_value) self._free_ahead = False self._potential_collision = True return logger.debug("No potential collisions detected!") self._free_ahead = True self._potential_collision = False
[docs] class LaserFilterParameterSetter(Node): """ This node is used to set the parameters of the laser filter. """ _PARAM_DEBUG_MSG = "Setting param %s of LaserFilter Node to %f!" __slots__ = ["_laser_filter_parameter_client"] def __init__(self): super().__init__("laser_filter_parameter_setter", namespace = "/jetracerros2") self._laser_filter_parameter_client = AsyncParameterClient(self, "laser_filter")
[docs] def set_min_range(self, min_range: float): """ Set the minimum range for the values filtered by the laser filter. :param min_range: The minimum range in meters :type min_range: float """ logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.MIN_RANGE_PARAM, min_range) return self._laser_filter_parameter_client.set_parameters( [Parameter(LaserFilter.MIN_RANGE_PARAM, Parameter.Type.DOUBLE, min_range)])
[docs] def set_max_range(self, max_range: float) -> rclpy.task.Future: """ Set the maximum range for the values filtered by the laser filter. :param max_range: The maximum range in meters :type max_range: float """ logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.MAX_RANGE_PARAM, max_range) return self._laser_filter_parameter_client.set_parameters( [Parameter(LaserFilter.MAX_RANGE_PARAM, Parameter.Type.DOUBLE, max_range)])
[docs] def set_section_start(self, section_start: int) -> rclpy.task.Future: """ Set the start of the section to keep in the filtered scan. In degrees. :param section_start: The start of the section in degrees :type section_start: int """ logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.SECTION_START_PARAM, section_start) return self._laser_filter_parameter_client.set_parameters( [Parameter(LaserFilter.SECTION_START_PARAM, Parameter.Type.INTEGER, section_start)])
[docs] def set_section_end(self, section_end: int) -> rclpy.task.Future: """ Set the end of the section to keep in the filtered scan. In degrees. :param section_end: The end of the section in degrees :type section_end: int """ logger.debug(self._PARAM_DEBUG_MSG, LaserFilter.SECTION_END_PARAM, section_end) return self._laser_filter_parameter_client.set_parameters( [Parameter(LaserFilter.SECTION_END_PARAM, Parameter.Type.INTEGER, section_end)])
[docs] class CollisionController(): """ A wrapper class to check for potential collisions using the /filtered_scan topic. This class provides asynchronous methods to set the parameters of the LaserFilter node. If you need to wait for the returned Future to complete, avoid blocking the thread where CollisionController is used. Otherwise, laser scan data will remain invalid until the thread is unblocked. Providing a synchronous alternative is not feasible because waiting for the Future would prevent access to any current data from the LaserFilter, as the execution would be stalled. """ _FAILED_PARAM_MSG = "Failed to set param %s of the LaserFilter Node to %f!" __slots__ = ["_free_ahead", "_potential_collision", "_filtered_laser_scan_subscriber", "_laser_filter_parameter_setter", "_executor"] def __init__(self): self._free_ahead = True self._potential_collision = False if not rclpy.ok(): rclpy.init() self._filtered_laser_scan_subscriber = FilteredLaserScanSubscriber() self._laser_filter_parameter_setter = LaserFilterParameterSetter() self._executor = MultiThreadedExecutor(num_threads = 2) self._executor.add_node(self._filtered_laser_scan_subscriber) self._executor.add_node(self._laser_filter_parameter_setter)
[docs] def free_ahead(self, collision_threshold: float = None) -> bool: """ Check if the path ahead is free of obstacles. """ self._update_collision_status(collision_threshold) return self._free_ahead
[docs] def potential_collision(self, collision_threshold: float = None) -> bool: """ Check if there is a potential collision ahead. """ self._update_collision_status(collision_threshold) return self._potential_collision
[docs] def shutdown(self): """ Destroy the node and release the resources. """ logger.info("Shutting down CollisionController!") self._filtered_laser_scan_subscriber.destroy_node() if rclpy.ok(): rclpy.shutdown()
[docs] def set_min_range_async(self, min_range: float) -> rclpy.task.Future: """ Set the minimum range for the values filtered by the laser filter. This method is asynchronous. It does not wait for the future to complete! See the class documentation for more information. :param min_range: The minimum range in meters :type min_range: float :return: The future to wait for the parameter to be set. :rtype: rclpy.task.Future """ return self._laser_filter_parameter_setter.set_min_range(min_range)
[docs] def set_max_range_async(self, max_range: float) -> rclpy.task.Future: """ Set the maximum range for the values filtered by the laser filter. This method is asynchronous. It does not wait for the future to complete! See the class documentation for more information. :param max_range: The maximum range in meters :type max_range: float :return: The future to wait for the parameter to be set. :rtype: rclpy.task.Future """ return self._laser_filter_parameter_setter.set_max_range(max_range)
[docs] def set_section_start_async(self, section_start: int) -> rclpy.task.Future: """ Set the start of the section to keep in the filtered scan. In degrees. This method is asynchronous. It does not wait for the future to complete! See the class documentation for more information. :param section_start: The start of the section in degrees :type section_start: int :return: The future to wait for the parameter to be set. :rtype: rclpy.task.Future """ return self._laser_filter_parameter_setter.set_section_start(section_start)
[docs] def set_section_end_async(self, section_end: int) -> rclpy.task.Future: """ Set the end of the section to keep in the filtered scan. In degrees. This method is asynchronous. It does not wait for the future to complete! See the class documentation for more information. :param section_end: The end of the section in degrees :type section_end: int :return: The future to wait for the parameter to be set. :rtype: rclpy.task.Future """ return self._laser_filter_parameter_setter.set_section_end(section_end)
def _update_collision_status(self, collision_threshold: float = None): self._filtered_laser_scan_subscriber.collision_threshold = collision_threshold logger.debug("Spinning FilteredLaserScanSubscriber once to update collision attributes.") self._executor.spin_once() self._free_ahead = self._filtered_laser_scan_subscriber.free_ahead self._potential_collision = self._filtered_laser_scan_subscriber.potential_collision