Source code for uppaal2jetracer.jetracerros2.jetracerros2.laser_filter

"""Laser filter

This file contains the access to the Lidar and its ROS 2 Launch entrypoint.

This file can be imported and contains the following classes:

    * LaserFilter: Filters lidar scan data.
"""

from typing import List
from threading import Lock

import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from sensor_msgs.msg import LaserScan
from rcl_interfaces.msg import SetParametersResult


[docs] class LaserFilter(Node): """ A class to filter the laser scan data based on the range and angle. All values which are not in the range [min_range, max_range] will be set to 0. Only values with angles between -angle_range/2 and +angle_range/2 are kept, as they correspond to the region in front of the LiDAR. If a parameter is changed, the filter will automatically update the scan data based on the last received unfiltered scan data. """ MIN_RANGE_PARAM = "min_range" MAX_RANGE_PARAM = "max_range" SECTION_END_PARAM = "section_end" SECTION_START_PARAM = "section_start" _SCAN_TOPIC = "scan" _FILTERED_SCAN_TOPIC = "filtered_scan" _DEFAULT_MIN_RANGE = 0.0 # in meters _DEFAULT_MAX_RANGE = 10.0 # in meters _DEFAULT_SECTION_START = 315 # in degrees _DEFAULT_SECTION_END = 45 # in degrees _MAX_POSSIBLE_ANGLE = 360 # in degrees _FILTER_INTERVAL = 0.05 # in seconds __slots__ = ["_scan_subscription", "_filtered_scan_publisher", "_filter_timer", "_min_range", "_max_range", "_last_scan", "_section_start", "_section_end", "_scan_lock"] def __init__(self): super().__init__("laser_filter", namespace = "/jetracerros2") self._scan_subscription = self.create_subscription( LaserScan, self._SCAN_TOPIC, self._scan_subscription_callback, 10, callback_group = MutuallyExclusiveCallbackGroup()) self._filter_timer = self.create_timer( self._FILTER_INTERVAL, self._process_last_scan, callback_group = MutuallyExclusiveCallbackGroup()) self._filtered_scan_publisher = self.create_publisher( LaserScan, self._FILTERED_SCAN_TOPIC, 10) self._last_scan: LaserScan = None self._scan_lock = Lock() self.declare_parameter(self.MIN_RANGE_PARAM, self._DEFAULT_MIN_RANGE) self.declare_parameter(self.MAX_RANGE_PARAM, self._DEFAULT_MAX_RANGE) self.declare_parameter(self.SECTION_START_PARAM, self._DEFAULT_SECTION_START) self.declare_parameter(self.SECTION_END_PARAM, self._DEFAULT_SECTION_END) self.add_on_set_parameters_callback(self._parameter_callback) self._min_range = self.get_parameter( self.MIN_RANGE_PARAM).get_parameter_value().double_value self._max_range = self.get_parameter( self.MAX_RANGE_PARAM).get_parameter_value().double_value self._section_start = self.get_parameter( self.SECTION_START_PARAM).get_parameter_value().integer_value % self._MAX_POSSIBLE_ANGLE self._section_end = self.get_parameter( self.SECTION_END_PARAM).get_parameter_value().integer_value % self._MAX_POSSIBLE_ANGLE def _filter_scan(self, scan: LaserScan) -> LaserScan: filtered_scan = scan filtered_scan.ranges = list(scan.ranges) filtered_scan.intensities = list(scan.intensities) scan_length = len(scan.ranges) # Filter the scan based on the range for i in range(scan_length): if (filtered_scan.ranges[i] < self._min_range or filtered_scan.ranges[i] > self._max_range): filtered_scan.ranges[i] = 0 filtered_scan.intensities[i] = 0 filtered_scan = self._filter_section(filtered_scan) return filtered_scan def _filter_section(self, scan: LaserScan) -> LaserScan: """ Filter out all values which are not in the section defined by the parameters section_start and section_end. :param scan: The scan data to filter :type scan: LaserScan :return: The filtered scan data :rtype: LaserScan """ entries_per_degree = len(scan.ranges) / 360 start_index = int(self._section_start * entries_per_degree) end_index = int(self._section_end * entries_per_degree) if start_index <= end_index: for i in range(0, start_index): scan.ranges[i] = 0 scan.intensities[i] = 0 for i in range(end_index, len(scan.ranges)): scan.ranges[i] = 0 scan.intensities[i] = 0 else: for i in range(end_index, start_index): scan.ranges[i] = 0 scan.intensities[i] = 0 return scan def _scan_subscription_callback(self, msg: LaserScan): with self._scan_lock: self._last_scan = msg def _process_last_scan(self): with self._scan_lock: if self._last_scan is None: return filtered_scan = self._filter_scan(self._last_scan) self._filtered_scan_publisher.publish(filtered_scan) def _parameter_callback(self, params: List[rclpy.parameter.Parameter]) -> SetParametersResult: for param in params: if param.name == self.MIN_RANGE_PARAM: self._min_range = param.value elif param.name == self.MAX_RANGE_PARAM: self._max_range = param.value elif param.name == self.SECTION_START_PARAM: self._section_start = param.value % self._MAX_POSSIBLE_ANGLE elif param.name == self.SECTION_END_PARAM: self._section_end = param.value % self._MAX_POSSIBLE_ANGLE if self._last_scan is not None: self._scan_subscription_callback(self._last_scan) self._process_last_scan() return SetParametersResult(successful = True)
[docs] def main(args = None): """ Entry point for ROS 2 launch script. """ rclpy.init(args = args) laser_filter = LaserFilter() executor = MultiThreadedExecutor(num_threads = 4) executor.add_node(laser_filter) executor.spin() laser_filter.destroy_node() rclpy.shutdown()