Source code for uppaal2jetracer.controller.hardware_command_system

"""Hardware command system

This module provides a system for handling hardware commands and their execution.

This file can be imported and contains the following classes:

    * HardwareCommandHandler:       Handles HardwareCommand calls.
    * HardwareCommand:              Capsules functionality of a command for any hardware.
    * JRHardwareCommand:            HardwareCommand for a NVIDIA JetRacer ROS AI Kit.
    * TurnCommand:                  Turns a JetRacer.
    * SetSpeedCommand:              Sets a JetRacers speed.
    * PotentialCollisionCommand:    Checks if a JetRacer is about to run into something.
    * FreeAheadCommand:             Checks if there is something ahead of a JetRacer.
    * HardwareCommandResult:        Result of a HardwareCommand.
"""

from __future__ import annotations
from typing import List, Any, Dict
from enum import Enum
import math
import logging
from abc import ABC, abstractmethod

from uppaal2jetracer.controller.hardware_controller import HardwareController, \
    JRHardwareController, HardwareState, HardwareResult

logger = logging.getLogger("controller")


[docs] class HardwareCommandHandler(): """ A class for handling hardware commands and their execution. :ivar _hardware_commands: The existing hardware commands. :vartype _hardware_commands: Dict[str, HardwareCommand] :ivar _hardware_controller: The hardware controller used by the commands. :vartype _hardware_controller: List[HardwareController] """ TURN_COMMAND_NAME = "turn" SET_SPEED_COMMAND_NAME = "set_speed" POTENTIAL_COLLISION_COMMAND_NAME = "potential_collision" FREE_AHEAD_COMMAND_NAME = "free_ahead" __slots__ = ("_hardware_commands", "_hardware_controller") def __init__(self): self._hardware_commands: Dict[str, HardwareCommand] = {} self._hardware_controller: List[HardwareController] = [] self._init_commands() logger.debug("Hardware commands successfully initialized.")
[docs] def run_command(self, command: HardwareCommand, params: List[object]) -> HardwareCommandResult[Any]: """ Run a hardware command. If the hardware state is ERROR, all hardware controller will be shut down. :param command: The hardware command to run. :type command: HardwareCommand :param params: The parameters for the command. :type params: List[object] :return: The result of the hardware command. :rtype: HardwareCommandResult[Any] """ logger.debug("Running %s command with parameters %s.", command.__class__.__name__, params) result: HardwareCommandResult[Any] = command.execute(params) if result.result_type == HardwareCommandResultType.HARDWARE_FAILURE: logger.fatal("Hardware error in %s command. Shutting down hardware controller.", command.__class__.__name__) for controller in self._hardware_controller: controller.shutdown() raise HardwareError(command.__class__.__name__) return result
[docs] def get_command(self, name: str) -> HardwareCommand[Any]: """ Get a hardware command by name. :param name: The name of the command. :type name: str :return: The hardware command. None if the command does not exist. :rtype: HardwareCommand[Any] """ return self._hardware_commands.get(name, None)
[docs] def has_command(self, name: str) -> bool: """ Check if a hardware command exists. :param name: The name of the command. :type name: str :return: True if the command exists. False otherwise. :rtype: bool """ return name in self._hardware_commands
[docs] def stop(self): """ Stops all hardware controllers. """ for controller in self._hardware_controller: controller.stop()
def _init_commands(self): jr_hardware_controller = JRHardwareController() self._register_command(self.TURN_COMMAND_NAME, TurnCommand(jr_hardware_controller)) self._register_command(self.SET_SPEED_COMMAND_NAME, SetSpeedCommand(jr_hardware_controller)) self._register_command(self.POTENTIAL_COLLISION_COMMAND_NAME, PotentialCollisionCommand(jr_hardware_controller)) self._register_command(self.FREE_AHEAD_COMMAND_NAME, FreeAheadCommand(jr_hardware_controller)) self._hardware_controller.append( jr_hardware_controller ) def _register_command(self, name: str, command: HardwareCommand): self._hardware_commands[name] = command
[docs] class HardwareCommand[T](ABC): """ A class to represent a hardware command. :ivar _hardware_controller: The hardware controller used by the command. :vartype _hardware_controller: HardwareController """ __slots__ = ("_hardware_controller",) def __init__(self, hardware_controller: HardwareController): self._hardware_controller = hardware_controller @property def hardware_controller(self) -> HardwareController: """ Get the hardware controller used by the command. :return: The hardware controller. :rtype: HardwareController """ return self._hardware_controller
[docs] @abstractmethod def execute(self, params: List[object]) -> HardwareCommandResult[T]: """ Execute the hardware command. :param params: The parameters for the command. :type params: List[object] :return: The result of the command. :rtype: HardwareCommandResult[T] """
[docs] class JRHardwareCommand[T](HardwareCommand): """ A class to represent a hardware command for the JetRacer. """ def __init__(self, jr_hardware_controller: JRHardwareController): super().__init__(jr_hardware_controller)
[docs] def execute(self, params: List[object]) -> HardwareCommandResult[T]: pass
[docs] class TurnCommand(JRHardwareCommand[None]): """ A class to represent a hardware command for turning the JetRacer. Parameters: - theta: The angle to turn the JetRacer in radians. """
[docs] def execute(self, params: List[object]) -> HardwareCommandResult[None]: if len(params) != 1: raise InvalidParamsAmountError( HardwareCommandHandler.TURN_COMMAND_NAME, len(params), 1 ) try: theta: float = float(params[0]) hardware_result: HardwareResult = self._hardware_controller.turn(math.radians(theta)) logger.debug("Turn command executed successfully for angle %f.", theta) return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result) except ValueError as exc: raise InvalidParamTypeError( HardwareCommandHandler.TURN_COMMAND_NAME, 0, type(params[0]), float) from exc
[docs] class SetSpeedCommand(JRHardwareCommand[None]): """ A class to represent a hardware command for setting the speed of the JetRacer. Parameters: - speed: The speed of the JetRacer in m/s. Range: [-1.2, 1.2] """ _MAX_SPEED = 1.2 # in m/s _MIN_SPEED = -1.2 # in m/s
[docs] def execute(self, params: List[object]) -> HardwareCommandResult[None]: if len(params) != 1: raise InvalidParamsAmountError( HardwareCommandHandler.SET_SPEED_COMMAND_NAME, len(params), 1) try: speed: float = float(params[0]) if speed < self._MIN_SPEED or speed > self._MAX_SPEED: raise InvalidParamValueError( "set_speed", 0, speed, f"Speed must be in the range [{self._MIN_SPEED}, {self._MAX_SPEED}]." ) hardware_result: HardwareResult = self._hardware_controller.set_speed(speed) logger.debug("Set speed command executed successfully for speed %f.", speed) return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result) except ValueError as exc: raise InvalidParamTypeError( HardwareCommandHandler.SET_SPEED_COMMAND_NAME, 0, type(params[0]), float) from exc
[docs] class PotentialCollisionCommand(JRHardwareCommand[bool]): """ A class to represent a hardware command for checking if a potential collision is detected. Parameters: - collision_threshold (optional): The distance threshold for a potential collision in meters. Default: 0.8 meters. """ _MIN_COLLISION_THRESHOLD = 0.0
[docs] def execute(self, params: List[object]) -> HardwareCommandResult[bool]: if len(params) > 1: raise InvalidParamsAmountError( HardwareCommandHandler.POTENTIAL_COLLISION_COMMAND_NAME, len(params), 0) hardware_result: HardwareResult if len(params) == 1: try: collision_threshold: float = float(params[0]) if collision_threshold < self._MIN_COLLISION_THRESHOLD: raise InvalidParamValueError( HardwareCommandHandler.POTENTIAL_COLLISION_COMMAND_NAME, 0, collision_threshold, f"Collision threshold must be greater than or equal to {self._MIN_COLLISION_THRESHOLD}." ) hardware_result = self._hardware_controller.potential_collision(collision_threshold) except ValueError as exc: raise InvalidParamTypeError( HardwareCommandHandler.POTENTIAL_COLLISION_COMMAND_NAME, 0, type(params[0]), float) from exc else: hardware_result = self._hardware_controller.potential_collision() logger.debug("Potential collision command executed successfully with result %s.", hardware_result.result) return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result)
[docs] class FreeAheadCommand(JRHardwareCommand[bool]): """ A class to represent a hardware command for checking if the path ahead is free. Parameters: - collision_threshold (optional): The distance threshold for a potential collision in meters. Default: 0.8 meters. """ _MIN_COLLISION_THRESHOLD = 0.0
[docs] def execute(self, params: List[object]) -> HardwareCommandResult[bool]: if len(params) > 1: raise InvalidParamsAmountError( HardwareCommandHandler.FREE_AHEAD_COMMAND_NAME, len(params), 0) hardware_result: HardwareResult if len(params) == 1: try: collision_threshold: float = float(params[0]) if collision_threshold < self._MIN_COLLISION_THRESHOLD: raise InvalidParamValueError( HardwareCommandHandler.FREE_AHEAD_COMMAND_NAME, 0, collision_threshold, f"Collision threshold must be greater than or equal to {self._MIN_COLLISION_THRESHOLD}." ) hardware_result = self._hardware_controller.free_ahead(collision_threshold) except ValueError as exc: raise InvalidParamTypeError( HardwareCommandHandler.FREE_AHEAD_COMMAND_NAME, 0, type(params[0]), float) from exc else: hardware_result = self._hardware_controller.free_ahead() logger.debug("Free ahead command executed successfully with result %s.", hardware_result.result) return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result)
[docs] class HardwareCommandResult[T]: """ A class to represent the result of a :class:`HardwareCommand`. :ivar _result: The result of the command. May be None on failure. :vartype _result: T :ivar _result_type: The type of the result. Automatically set to HARDWARE_FAILURE if the hardware state is ERROR. :vartype _result_type: HardwareCommandResultType """ __slots__ = ("_result", "_result_type", "_return_type") def __init__(self, result_type: HardwareCommandResultType, hardware_result: HardwareResult = HardwareResult(HardwareState.OK, None)): self._result = hardware_result.result self._result_type: HardwareCommandResultType = result_type self._return_type = type(self._result) if hardware_result.hardware_state == HardwareState.ERROR: self._result_type = HardwareCommandResultType.HARDWARE_FAILURE @property def result(self) -> T: """ Get the result of the command. :return: The result of the command. :rtype: T """ return self._result @property def return_type(self) -> type: """ Get the return type of the command. :return: The return type of the command. :rtype: type """ return self._return_type @property def result_type(self) -> HardwareCommandResultType: """ Get the type of the result. :return: The type of the result. :rtype: HardwareCommandResultType """ return self._result_type
[docs] class HardwareCommandResultType(Enum): """ A enum class to represent the result type of a :class:`HardwareCommand`. """ SUCCESS = 0 PARTIAL_SUCCESS = 1 HARDWARE_FAILURE = -1
[docs] class HardwareCommandError(Exception): """ A class to represent an error in a :class:`HardwareCommand`. """ def __init__(self, message: str): super().__init__(message) logger.error(message)
[docs] class InvalidParamsAmountError(HardwareCommandError): """ A class to represent an error in a :class:`HardwareCommand` due to an invalid amount of parameters. """ def __init__(self, command_name: str, actual_amount: int, expected_amount: int): super().__init__(f"Error: Invalid amount of parameters for hardware command {command_name}." f" Expected {expected_amount}, but got {actual_amount}.")
[docs] class InvalidParamTypeError(HardwareCommandError): """ A class to represent an error in a :class:`HardwareCommand` due to an invalid parameter type. """ def __init__(self, command_name: str, param_index: int, actual_type: type, expected_type: type): super().__init__(f"Error: Invalid parameter type for hardware command {command_name}. " f"Expected type {expected_type} for parameter at index {param_index}," f" but got {actual_type}.")
[docs] class InvalidParamValueError(HardwareCommandError): """ A class to represent an error in a :class:`HardwareCommand` due to an invalid parameter value. """ def __init__(self, command_name: str, param_index: int, value: Any, message: str): super().__init__(f"Error: Invalid parameter value for hardware command {command_name}. " f"Value {value} for parameter at index {param_index} is invalid. {message}" )
[docs] class HardwareError(HardwareCommandError): """ A class to represent an error in a :class:`HardwareCommand` due to a hardware error. """ def __init__(self, command_name: str): super().__init__(f"Hardware error for hardware command {command_name}.")