"""Laser filter
This file contains the access to the Lidar and its ROS 2 Launch entrypoint.
This file can be imported and contains the following classes:
* LaserFilter: Filters lidar scan data.
"""
from typing import List
from threading import Lock
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from sensor_msgs.msg import LaserScan
from rcl_interfaces.msg import SetParametersResult
[docs]
class LaserFilter(Node):
"""
A class to filter the laser scan data based on the range and angle.
All values which are not in the range [min_range, max_range] will be set to 0.
Only values with angles between -angle_range/2 and +angle_range/2 are kept,
as they correspond to the region in front of the LiDAR.
If a parameter is changed, the filter will automatically update the scan data
based on the last received unfiltered scan data.
"""
MIN_RANGE_PARAM = "min_range"
MAX_RANGE_PARAM = "max_range"
SECTION_END_PARAM = "section_end"
SECTION_START_PARAM = "section_start"
_SCAN_TOPIC = "scan"
_FILTERED_SCAN_TOPIC = "filtered_scan"
_DEFAULT_MIN_RANGE = 0.0 # in meters
_DEFAULT_MAX_RANGE = 10.0 # in meters
_DEFAULT_SECTION_START = 315 # in degrees
_DEFAULT_SECTION_END = 45 # in degrees
_MAX_POSSIBLE_ANGLE = 360 # in degrees
_FILTER_INTERVAL = 0.05 # in seconds
__slots__ = ["_scan_subscription", "_filtered_scan_publisher", "_filter_timer",
"_min_range", "_max_range", "_last_scan",
"_section_start", "_section_end", "_scan_lock"]
def __init__(self):
super().__init__("laser_filter", namespace = "/jetracerros2")
self._scan_subscription = self.create_subscription(
LaserScan, self._SCAN_TOPIC, self._scan_subscription_callback, 10,
callback_group = MutuallyExclusiveCallbackGroup())
self._filter_timer = self.create_timer(
self._FILTER_INTERVAL, self._process_last_scan,
callback_group = MutuallyExclusiveCallbackGroup())
self._filtered_scan_publisher = self.create_publisher(
LaserScan, self._FILTERED_SCAN_TOPIC, 10)
self._last_scan: LaserScan = None
self._scan_lock = Lock()
self.declare_parameter(self.MIN_RANGE_PARAM, self._DEFAULT_MIN_RANGE)
self.declare_parameter(self.MAX_RANGE_PARAM, self._DEFAULT_MAX_RANGE)
self.declare_parameter(self.SECTION_START_PARAM, self._DEFAULT_SECTION_START)
self.declare_parameter(self.SECTION_END_PARAM, self._DEFAULT_SECTION_END)
self.add_on_set_parameters_callback(self._parameter_callback)
self._min_range = self.get_parameter(
self.MIN_RANGE_PARAM).get_parameter_value().double_value
self._max_range = self.get_parameter(
self.MAX_RANGE_PARAM).get_parameter_value().double_value
self._section_start = self.get_parameter(
self.SECTION_START_PARAM).get_parameter_value().integer_value % self._MAX_POSSIBLE_ANGLE
self._section_end = self.get_parameter(
self.SECTION_END_PARAM).get_parameter_value().integer_value % self._MAX_POSSIBLE_ANGLE
def _filter_scan(self, scan: LaserScan) -> LaserScan:
filtered_scan = scan
filtered_scan.ranges = list(scan.ranges)
filtered_scan.intensities = list(scan.intensities)
scan_length = len(scan.ranges)
# Filter the scan based on the range
for i in range(scan_length):
if (filtered_scan.ranges[i] < self._min_range or
filtered_scan.ranges[i] > self._max_range):
filtered_scan.ranges[i] = 0
filtered_scan.intensities[i] = 0
filtered_scan = self._filter_section(filtered_scan)
return filtered_scan
def _filter_section(self, scan: LaserScan) -> LaserScan:
"""
Filter out all values which are not in the section
defined by the parameters section_start and section_end.
:param scan: The scan data to filter
:type scan: LaserScan
:return: The filtered scan data
:rtype: LaserScan
"""
entries_per_degree = len(scan.ranges) / 360
start_index = int(self._section_start * entries_per_degree)
end_index = int(self._section_end * entries_per_degree)
if start_index <= end_index:
for i in range(0, start_index):
scan.ranges[i] = 0
scan.intensities[i] = 0
for i in range(end_index, len(scan.ranges)):
scan.ranges[i] = 0
scan.intensities[i] = 0
else:
for i in range(end_index, start_index):
scan.ranges[i] = 0
scan.intensities[i] = 0
return scan
def _scan_subscription_callback(self, msg: LaserScan):
with self._scan_lock:
self._last_scan = msg
def _process_last_scan(self):
with self._scan_lock:
if self._last_scan is None:
return
filtered_scan = self._filter_scan(self._last_scan)
self._filtered_scan_publisher.publish(filtered_scan)
def _parameter_callback(self, params: List[rclpy.parameter.Parameter]) -> SetParametersResult:
for param in params:
if param.name == self.MIN_RANGE_PARAM:
self._min_range = param.value
elif param.name == self.MAX_RANGE_PARAM:
self._max_range = param.value
elif param.name == self.SECTION_START_PARAM:
self._section_start = param.value % self._MAX_POSSIBLE_ANGLE
elif param.name == self.SECTION_END_PARAM:
self._section_end = param.value % self._MAX_POSSIBLE_ANGLE
if self._last_scan is not None:
self._scan_subscription_callback(self._last_scan)
self._process_last_scan()
return SetParametersResult(successful = True)
[docs]
def main(args = None):
"""
Entry point for ROS 2 launch script.
"""
rclpy.init(args = args)
laser_filter = LaserFilter()
executor = MultiThreadedExecutor(num_threads = 4)
executor.add_node(laser_filter)
executor.spin()
laser_filter.destroy_node()
rclpy.shutdown()