"""Hardware command system
This module provides a system for handling hardware commands and their execution.
This file can be imported and contains the following classes:
* HardwareCommandHandler: Handles HardwareCommand calls.
* HardwareCommand: Capsules functionality of a command for any hardware.
* JRHardwareCommand: HardwareCommand for a NVIDIA JetRacer ROS AI Kit.
* TurnCommand: Turns a JetRacer.
* SetSpeedCommand: Sets a JetRacers speed.
* PotentialCollisionCommand: Checks if a JetRacer is about to run into something.
* FreeAheadCommand: Checks if there is something ahead of a JetRacer.
* HardwareCommandResult: Result of a HardwareCommand.
"""
from __future__ import annotations
from typing import List, Any, Dict
from enum import Enum
import math
import logging
from abc import ABC, abstractmethod
from uppaal2jetracer.controller.hardware_controller import HardwareController, \
JRHardwareController, HardwareState, HardwareResult
logger = logging.getLogger("controller")
[docs]
class HardwareCommandHandler():
"""
A class for handling hardware commands and their execution.
:ivar _hardware_commands: The existing hardware commands.
:vartype _hardware_commands: Dict[str, HardwareCommand]
:ivar _hardware_controller: The hardware controller used by the commands.
:vartype _hardware_controller: List[HardwareController]
"""
TURN_COMMAND_NAME = "turn"
SET_SPEED_COMMAND_NAME = "set_speed"
POTENTIAL_COLLISION_COMMAND_NAME = "potential_collision"
FREE_AHEAD_COMMAND_NAME = "free_ahead"
__slots__ = ("_hardware_commands", "_hardware_controller")
def __init__(self):
self._hardware_commands: Dict[str, HardwareCommand] = {}
self._hardware_controller: List[HardwareController] = []
self._init_commands()
logger.debug("Hardware commands successfully initialized.")
[docs]
def run_command(self, command: HardwareCommand,
params: List[object]) -> HardwareCommandResult[Any]:
"""
Run a hardware command.
If the hardware state is ERROR, all hardware controller will be shut down.
:param command: The hardware command to run.
:type command: HardwareCommand
:param params: The parameters for the command.
:type params: List[object]
:return: The result of the hardware command.
:rtype: HardwareCommandResult[Any]
"""
logger.debug("Running %s command with parameters %s.", command.__class__.__name__, params)
result: HardwareCommandResult[Any] = command.execute(params)
if result.result_type == HardwareCommandResultType.HARDWARE_FAILURE:
logger.fatal("Hardware error in %s command. Shutting down hardware controller.",
command.__class__.__name__)
for controller in self._hardware_controller:
controller.shutdown()
raise HardwareError(command.__class__.__name__)
return result
[docs]
def get_command(self, name: str) -> HardwareCommand[Any]:
"""
Get a hardware command by name.
:param name: The name of the command.
:type name: str
:return: The hardware command. None if the command does not exist.
:rtype: HardwareCommand[Any]
"""
return self._hardware_commands.get(name, None)
[docs]
def has_command(self, name: str) -> bool:
"""
Check if a hardware command exists.
:param name: The name of the command.
:type name: str
:return: True if the command exists. False otherwise.
:rtype: bool
"""
return name in self._hardware_commands
[docs]
def stop(self):
"""
Stops all hardware controllers.
"""
for controller in self._hardware_controller:
controller.stop()
def _init_commands(self):
jr_hardware_controller = JRHardwareController()
self._register_command(self.TURN_COMMAND_NAME, TurnCommand(jr_hardware_controller))
self._register_command(self.SET_SPEED_COMMAND_NAME, SetSpeedCommand(jr_hardware_controller))
self._register_command(self.POTENTIAL_COLLISION_COMMAND_NAME,
PotentialCollisionCommand(jr_hardware_controller))
self._register_command(self.FREE_AHEAD_COMMAND_NAME,
FreeAheadCommand(jr_hardware_controller))
self._hardware_controller.append(
jr_hardware_controller
)
def _register_command(self, name: str, command: HardwareCommand):
self._hardware_commands[name] = command
[docs]
class HardwareCommand[T](ABC):
"""
A class to represent a hardware command.
:ivar _hardware_controller: The hardware controller used by the command.
:vartype _hardware_controller: HardwareController
"""
__slots__ = ("_hardware_controller",)
def __init__(self, hardware_controller: HardwareController):
self._hardware_controller = hardware_controller
@property
def hardware_controller(self) -> HardwareController:
"""
Get the hardware controller used by the command.
:return: The hardware controller.
:rtype: HardwareController
"""
return self._hardware_controller
[docs]
@abstractmethod
def execute(self, params: List[object]) -> HardwareCommandResult[T]:
"""
Execute the hardware command.
:param params: The parameters for the command.
:type params: List[object]
:return: The result of the command.
:rtype: HardwareCommandResult[T]
"""
[docs]
class JRHardwareCommand[T](HardwareCommand):
"""
A class to represent a hardware command for the JetRacer.
"""
def __init__(self, jr_hardware_controller: JRHardwareController):
super().__init__(jr_hardware_controller)
[docs]
def execute(self, params: List[object]) -> HardwareCommandResult[T]:
pass
[docs]
class TurnCommand(JRHardwareCommand[None]):
"""
A class to represent a hardware command for turning the JetRacer.
Parameters:
- theta: The angle to turn the JetRacer in radians.
"""
[docs]
def execute(self, params: List[object]) -> HardwareCommandResult[None]:
if len(params) != 1:
raise InvalidParamsAmountError(
HardwareCommandHandler.TURN_COMMAND_NAME,
len(params),
1
)
try:
theta: float = float(params[0])
hardware_result: HardwareResult = self._hardware_controller.turn(math.radians(theta))
logger.debug("Turn command executed successfully for angle %f.", theta)
return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result)
except ValueError as exc:
raise InvalidParamTypeError(
HardwareCommandHandler.TURN_COMMAND_NAME, 0, type(params[0]), float) from exc
[docs]
class SetSpeedCommand(JRHardwareCommand[None]):
"""
A class to represent a hardware command for setting the speed of the JetRacer.
Parameters:
- speed: The speed of the JetRacer in m/s. Range: [-1.2, 1.2]
"""
_MAX_SPEED = 1.2 # in m/s
_MIN_SPEED = -1.2 # in m/s
[docs]
def execute(self, params: List[object]) -> HardwareCommandResult[None]:
if len(params) != 1:
raise InvalidParamsAmountError(
HardwareCommandHandler.SET_SPEED_COMMAND_NAME, len(params), 1)
try:
speed: float = float(params[0])
if speed < self._MIN_SPEED or speed > self._MAX_SPEED:
raise InvalidParamValueError(
"set_speed", 0, speed,
f"Speed must be in the range [{self._MIN_SPEED}, {self._MAX_SPEED}]."
)
hardware_result: HardwareResult = self._hardware_controller.set_speed(speed)
logger.debug("Set speed command executed successfully for speed %f.", speed)
return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result)
except ValueError as exc:
raise InvalidParamTypeError(
HardwareCommandHandler.SET_SPEED_COMMAND_NAME, 0, type(params[0]), float) from exc
[docs]
class PotentialCollisionCommand(JRHardwareCommand[bool]):
"""
A class to represent a hardware command for checking if a potential collision is detected.
Parameters:
- collision_threshold (optional): The distance threshold for a potential collision in meters. Default: 0.8 meters.
"""
_MIN_COLLISION_THRESHOLD = 0.0
[docs]
def execute(self, params: List[object]) -> HardwareCommandResult[bool]:
if len(params) > 1:
raise InvalidParamsAmountError(
HardwareCommandHandler.POTENTIAL_COLLISION_COMMAND_NAME, len(params), 0)
hardware_result: HardwareResult
if len(params) == 1:
try:
collision_threshold: float = float(params[0])
if collision_threshold < self._MIN_COLLISION_THRESHOLD:
raise InvalidParamValueError(
HardwareCommandHandler.POTENTIAL_COLLISION_COMMAND_NAME, 0, collision_threshold,
f"Collision threshold must be greater than or equal to {self._MIN_COLLISION_THRESHOLD}."
)
hardware_result = self._hardware_controller.potential_collision(collision_threshold)
except ValueError as exc:
raise InvalidParamTypeError(
HardwareCommandHandler.POTENTIAL_COLLISION_COMMAND_NAME, 0, type(params[0]), float) from exc
else:
hardware_result = self._hardware_controller.potential_collision()
logger.debug("Potential collision command executed successfully with result %s.",
hardware_result.result)
return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result)
[docs]
class FreeAheadCommand(JRHardwareCommand[bool]):
"""
A class to represent a hardware command for checking if the path ahead is free.
Parameters:
- collision_threshold (optional): The distance threshold for a potential collision in meters. Default: 0.8 meters.
"""
_MIN_COLLISION_THRESHOLD = 0.0
[docs]
def execute(self, params: List[object]) -> HardwareCommandResult[bool]:
if len(params) > 1:
raise InvalidParamsAmountError(
HardwareCommandHandler.FREE_AHEAD_COMMAND_NAME, len(params), 0)
hardware_result: HardwareResult
if len(params) == 1:
try:
collision_threshold: float = float(params[0])
if collision_threshold < self._MIN_COLLISION_THRESHOLD:
raise InvalidParamValueError(
HardwareCommandHandler.FREE_AHEAD_COMMAND_NAME, 0, collision_threshold,
f"Collision threshold must be greater than or equal to {self._MIN_COLLISION_THRESHOLD}."
)
hardware_result = self._hardware_controller.free_ahead(collision_threshold)
except ValueError as exc:
raise InvalidParamTypeError(
HardwareCommandHandler.FREE_AHEAD_COMMAND_NAME, 0, type(params[0]), float) from exc
else:
hardware_result = self._hardware_controller.free_ahead()
logger.debug("Free ahead command executed successfully with result %s.",
hardware_result.result)
return HardwareCommandResult(HardwareCommandResultType.SUCCESS, hardware_result)
[docs]
class HardwareCommandResult[T]:
"""
A class to represent the result of a :class:`HardwareCommand`.
:ivar _result: The result of the command. May be None on failure.
:vartype _result: T
:ivar _result_type: The type of the result. Automatically set to HARDWARE_FAILURE if the hardware state is ERROR.
:vartype _result_type: HardwareCommandResultType
"""
__slots__ = ("_result", "_result_type", "_return_type")
def __init__(self, result_type: HardwareCommandResultType,
hardware_result: HardwareResult = HardwareResult(HardwareState.OK, None)):
self._result = hardware_result.result
self._result_type: HardwareCommandResultType = result_type
self._return_type = type(self._result)
if hardware_result.hardware_state == HardwareState.ERROR:
self._result_type = HardwareCommandResultType.HARDWARE_FAILURE
@property
def result(self) -> T:
"""
Get the result of the command.
:return: The result of the command.
:rtype: T
"""
return self._result
@property
def return_type(self) -> type:
"""
Get the return type of the command.
:return: The return type of the command.
:rtype: type
"""
return self._return_type
@property
def result_type(self) -> HardwareCommandResultType:
"""
Get the type of the result.
:return: The type of the result.
:rtype: HardwareCommandResultType
"""
return self._result_type
[docs]
class HardwareCommandResultType(Enum):
"""
A enum class to represent the result type of a :class:`HardwareCommand`.
"""
SUCCESS = 0
PARTIAL_SUCCESS = 1
HARDWARE_FAILURE = -1
[docs]
class HardwareCommandError(Exception):
"""
A class to represent an error in a :class:`HardwareCommand`.
"""
def __init__(self, message: str):
super().__init__(message)
logger.error(message)
[docs]
class InvalidParamsAmountError(HardwareCommandError):
"""
A class to represent an error in a :class:`HardwareCommand`
due to an invalid amount of parameters.
"""
def __init__(self, command_name: str, actual_amount: int, expected_amount: int):
super().__init__(f"Error: Invalid amount of parameters for hardware command {command_name}."
f" Expected {expected_amount}, but got {actual_amount}.")
[docs]
class InvalidParamTypeError(HardwareCommandError):
"""
A class to represent an error in a :class:`HardwareCommand`
due to an invalid parameter type.
"""
def __init__(self, command_name: str, param_index: int, actual_type: type, expected_type: type):
super().__init__(f"Error: Invalid parameter type for hardware command {command_name}. "
f"Expected type {expected_type} for parameter at index {param_index},"
f" but got {actual_type}.")
[docs]
class InvalidParamValueError(HardwareCommandError):
"""
A class to represent an error in a :class:`HardwareCommand` due to an invalid parameter value.
"""
def __init__(self, command_name: str, param_index: int, value: Any, message: str):
super().__init__(f"Error: Invalid parameter value for hardware command {command_name}. "
f"Value {value} for parameter at index {param_index} is invalid. {message}"
)
[docs]
class HardwareError(HardwareCommandError):
"""
A class to represent an error in a :class:`HardwareCommand`
due to a hardware error.
"""
def __init__(self, command_name: str):
super().__init__(f"Hardware error for hardware command {command_name}.")