Skip to content
Snippets Groups Projects

nuPlan to CommonRoad / Lanelet2 map converter

  • Clone with SSH
  • Clone with HTTPS
  • Embed
  • Share
    The snippet can be accessed without any authentication.
    Authored by Ferdinand Mütsch

    :warning: Work in progress :warning:

    Edited
    nuplan_to_commonroad_map.py 6.97 KiB
    #!/usr/bin/python
    
    # Version: 0.1
    # Author: Ferdinand Mütsch <muetsch@kit.edu>
    
    # Script to convert nuPlan maps to CommonRoad format.
    # Currently only includes lanes and lane connectors layers.
    #
    # Limitations / open to dos:
    # - Parse traffic lights / regulatory elements
    # - Parse walkways
    # - Parse crosswalks
    # - Can't convert directly to Lanelet2 without CommonRoad as intermediate step
    
    # Setup (Python 3.9):
    # pip install git+https://github.com/motional/nuplan-devkit.git  # install nuplan devkit
    # pip install -r https://raw.githubusercontent.com/motional/nuplan-devkit/master/requirements.txt  # install devkit dependencies
    # pip install commonroad-all commonroad-scenario-designer lanelet2
    
    # Usage
    # Step 1: nuPlan -> CommonRoad
    # python nuplan_to_commonroad_map.py --map_root ~/data/nuplan/v1.1/dataset/maps
    #
    # Step 2: CommonRoad -> Lanelet2
    # crdesigner --input-file map.xml --output-file map.osm crlanelet2 --proj "epsg:32619"
    
    import argparse
    import logging
    from typing import List, cast, Tuple, Optional
    
    import numpy as np
    from commonroad.common.common_lanelet import LaneletType
    from commonroad.common.file_writer import CommonRoadFileWriter
    from commonroad.common.writer.file_writer_interface import OverwriteExistingFile
    from commonroad.planning.planning_problem import PlanningProblemSet
    from commonroad.scenario.lanelet import LaneletNetwork, Lanelet
    from commonroad.scenario.scenario import Scenario
    from lanelet2.core import Lanelet as L2Lanelet, LineString3d, getId, Point3d
    from nuplan.common.maps.abstract_map_objects import LaneGraphEdgeMapObject
    from nuplan.common.maps.maps_datatypes import SemanticMapLayer
    from nuplan.common.maps.nuplan_map.lane import NuPlanLane
    from nuplan.common.maps.nuplan_map.lane_connector import NuPlanLaneConnector
    from nuplan.database.maps_db.gpkg_mapsdb import GPKGMapsDB
    from nuplan.database.maps_db.map_api import NuPlanMapWrapper
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger('NuPlan2CommonRoadMapConverter')
    
    
    class NuPlan2CommonRoadMapConverter:
        def __init__(self, map_root: str, map_name: str = 'us-ma-boston', map_version: str = 'nuplan-maps-v1.0', target_crs_epsg: Optional[int] = None):
            self.nuplan_map: NuPlanMapWrapper = NuPlanMapWrapper(
                GPKGMapsDB(map_version, map_root),
                map_name
            )
            self.nuplan_map.initialize_all_layers()
    
            if target_crs_epsg:
                for layer in self.nuplan_map._vector_map.values():
                    layer.to_crs(epsg=target_crs_epsg, inplace=True)
    
        def get(self) -> Scenario:
            scenario: Scenario = Scenario(dt=0.05, author='Default Author', affiliation='None', source='None', tags=set())
            lanelets: LaneletNetwork = scenario.lanelet_network
    
            all_lanes: List[NuPlanLane] = cast(List[NuPlanLane], [self.nuplan_map.get_map_object(fid, SemanticMapLayer.LANE) for fid in self.nuplan_map._get_vector_map_layer(SemanticMapLayer.LANE)['fid'].tolist()])
            all_connectors: List[NuPlanLaneConnector] = cast(List[NuPlanLaneConnector], [self.nuplan_map.get_map_object(fid, SemanticMapLayer.LANE_CONNECTOR) for fid in self.nuplan_map._get_vector_map_layer(SemanticMapLayer.LANE_CONNECTOR)['fid'].tolist()])
    
            # parse lanes and lane connectors
            for l in [*all_lanes, *all_connectors]:
                n_vertices: int = max(len(l.left_boundary.linestring.coords), len(l.right_boundary.linestring.coords))
    
                left: List[Tuple[float, float]] = self.sample_2d(list(zip(*l.left_boundary.linestring.xy)), n_vertices)
                right: List[Tuple[float, float]] = self.sample_2d(list(zip(*l.right_boundary.linestring.xy)), n_vertices)
                center: List[Tuple[float, float]] = self.get_center_line(left, right)
    
                lanelets.add_lanelet(Lanelet(
                    lanelet_id=int(l.id),
                    lanelet_type={LaneletType.URBAN},
                    left_vertices=np.array(left),
                    right_vertices=np.array(right),
                    center_vertices=np.array(center),
                ))
    
            # another pass to add relations
            for l in [*all_lanes, *all_connectors]:
                l = cast(LaneGraphEdgeMapObject, l)
                lanelet: Lanelet = lanelets.find_lanelet_by_id(int(l.id))
                if lanelet is None:
                    continue
    
                if (left := l.adjacent_edges[0]) is not None:
                    lanelet.adj_left = int(left.id)
                    lanelet.adj_left_same_direction = l.get_roadblock_id() == left.get_roadblock_id()
                if (right := l.adjacent_edges[1]) is not None:
                    lanelet.adj_right = int(right.id)
                    lanelet.adj_right_same_direction = l.get_roadblock_id() == right.get_roadblock_id()
                for successor in l.outgoing_edges:
                    lanelet.add_successor(int(successor.id))
                for predecessor in l.incoming_edges:
                    lanelet.add_predecessor(int(predecessor.id))
    
            return scenario
    
        @staticmethod
        def to_file(scenario: Scenario, out_file: str):
            CommonRoadFileWriter(scenario, PlanningProblemSet(None)).write_to_file(out_file, OverwriteExistingFile.ALWAYS)
    
        @classmethod
        def get_center_line(cls, left: List[Tuple[float, float]], right: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
            # hacky and awkward way of getting a centerline between two boundaries
            l: L2Lanelet = L2Lanelet(
                getId(),
                LineString3d(getId(), [Point3d(getId(), x, y, 0) for x, y in left]),
                LineString3d(getId(), [Point3d(getId(), x, y, 0) for x, y in right]),
            )
            return cls.sample_2d(list(map(lambda p: (p.x, p.y), l.centerline)), max(len(left), len(right)))
    
        @staticmethod
        def sample_2d(coords: List[Tuple[float, float]], n: int) -> List[Tuple[float, float]]:
            x, y = zip(*coords)
            x = np.array(x)
            y = np.array(y)
    
            idxs = np.arange(len(x))
            idxs_new = np.linspace(0, len(x) - 1, n)
    
            x_new = np.interp(idxs_new, idxs, x)
            y_new = np.interp(idxs_new, idxs, y)
            coords_new = list(zip(x_new, y_new))
    
            assert len(coords_new) == n
            return coords_new
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument('--map_root', type=str, required=True, help='Absolut path to your nuPlan maps/ directory (containing the .json file)')
        parser.add_argument('--map', type=str, required=False, default='us-ma-boston', help='nuPlan map to convert')
        parser.add_argument('--map_version', type=str, required=False, default='nuplan-maps-v1.0', help='nuPlan map version to use')
        parser.add_argument('--crs', type=str, required=False, default=None, help='Optionally convert coordinates to the CRS defined by this EPSG code')
        parser.add_argument('--out', type=str, required=False, default='map.xml', help='Target file path')
        args = parser.parse_args()
    
        logger.info(f'Loading map ...')
        converter = NuPlan2CommonRoadMapConverter(
            args.map_root, args.map, args.map_version, args.crs
        )
    
        logger.info(f'Converting ...')
        NuPlan2CommonRoadMapConverter.to_file(converter.get(), args.out)
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment