"""Executor
This file provides an executor to execute declarations that follow the UPPAAL syntax.
This file can be imported and contains the following classes:
* Executor: Executes UPPAAL declarations.
"""
import logging
from uppaal2jetracer.controller.hardware_command_system import HardwareCommandHandler, \
HardwareCommandError
from uppaal2jetracer.declarations.declarations_ast import NodeVisitor, ArrayDecl, ArrayRef, \
Assignment, BinaryOp, Compound, CompoundLiteral, Constant, Decl, DeclList, ExprList, FileAST, \
FuncCall, FuncDecl, FuncDef, ID, IdentifierType, InitList, NamedInitializer, ParamList, \
Return, Struct, StructRef, TypeDecl, Typedef, Typename, UnaryOp, RangeDecl
from uppaal2jetracer.uppaalmodel.frame import Frame
from uppaal2jetracer.uppaalmodel.variable import Array, Variable, IntVariable, DoubleVariable, \
StringVariable, Clock, BooleanVariable, Channel, Range
logger = logging.getLogger("controller")
hardware_command_handler = HardwareCommandHandler()
[docs]
class Executor(NodeVisitor):
"""
A NodeVisitor implementation that executes UPPAAL declarations.
"""
__slots__ = ("_frame", "_assignment_operator_map", "_unary_operator_map",
"_binary_operator_map", "constant_type_map", "type_map")
_BROADCAST_KEY = "broadcast"
_CONSTANT_KEY = "const"
def __init__(self, frame: Frame):
self._frame = frame
self._assignment_operator_map = {
"=": lambda a, b: b,
"+=": lambda a, b: a + b,
"-=": lambda a, b: a - b,
"*=": lambda a, b: a * b,
"/=": lambda a, b: a / b,
"%=": lambda a, b: a % b,
"&=": lambda a, b: a & b,
"|=": lambda a, b: a | b,
"^=": lambda a, b: a ^ b,
"<<=": lambda a, b: a << b,
">>=": lambda a, b: a >> b
}
self._unary_operator_map = {
"!": lambda a: not a,
"~": lambda a: ~ a,
"p++": lambda a: a + 1,
"p--": lambda a: a - 1,
"-": lambda a: - a
}
self._binary_operator_map = {
"||": lambda a, b: a or b,
"&&": lambda a, b: a and b,
"|": lambda a, b: a | b,
"^": lambda a, b: a ^ b,
"&": lambda a, b: a & b,
"==": lambda a, b: a == b,
"!=": lambda a, b: a != b,
">": lambda a, b: a > b,
"<": lambda a, b: a < b,
">=": lambda a, b: a >= b,
"<=": lambda a, b: a <= b,
">>": lambda a, b: a >> b,
"<<": lambda a, b: a << b,
"+": lambda a, b: a + b,
"-": lambda a, b: a & b,
"*": lambda a, b: a * b,
"/": lambda a, b: a / b,
"%": lambda a, b: a % b
}
self._constant_type_map = {
"int": int,
"double": float,
"string": str,
"bool": bool
}
self._type_map = {
"int": lambda a, b: IntVariable(a),
"double": lambda a, b: DoubleVariable(a),
"string": lambda a, b: StringVariable(a),
"bool": lambda a, b: BooleanVariable(a),
"chan": lambda a, b: Channel(b),
"clock": lambda a, b: Clock()
}
[docs]
@staticmethod
def stop():
"""
Stops the hardware command handler.
"""
hardware_command_handler.stop()
[docs]
def visit_arraydecl(self, array_decl: ArrayDecl):
if array_decl.dim_quals is None:
raise ExecutionError("Array declaration has no dimensional qualifier.")
if self._CONSTANT_KEY in array_decl.dim_quals:
logger.debug("Executed constant array creation.")
return Array(True)
logger.debug("Executed non-constant array creation.")
return Array(False)
[docs]
def visit_arrayref(self, array_ref: ArrayRef):
if array_ref.name is None:
raise ExecutionError("Array reference has no name.")
if array_ref.subscript is None:
raise ExecutionError(f"Array reference to '{array_ref.name}' has no index.")
array: Variable = self._frame.get_variable(array_ref.name)
if array is None:
raise ExecutionError(f"Array '{array_ref.name}' is not declared.")
if not isinstance(array, Array):
raise ExecutionError(f"Variable '{array_ref.name}' is not an array.")
logger.debug("Executed referencing of array '%s'.", array_ref.name)
return array.get_index(array_ref.subscript.accept(self))
[docs]
def visit_assignment(self, assignment: Assignment):
if assignment.lvalue is None:
raise ExecutionError("Assignment has no left expression.")
if assignment.lvalue.name is None:
raise ExecutionError("Left value of assignment has no name.")
if assignment.rvalue is None:
raise ExecutionError("Assignment has no right expression.")
if assignment.op is None:
raise ExecutionError("Assignment operator is not defined.")
if assignment.op not in self._assignment_operator_map:
raise ExecutionError(f"Assignment operator '{assignment.op}' is not supported.")
variable: Variable = self._frame.get_variable(assignment.lvalue.name)
variable.value = (self._assignment_operator_map.get(assignment.op)
(variable.value, assignment.rvalue.accept(self)))
logger.debug("Executed assignment '%s'.", assignment.op)
[docs]
def visit_binaryop(self, binaryop: BinaryOp):
if binaryop.left is None:
raise ExecutionError("Binary operation has no left expression.")
if binaryop.right is None:
raise ExecutionError("Binary operation has no right expression.")
if binaryop.op is None:
raise ExecutionError("Binary operator is not defined.")
if binaryop.op not in self._binary_operator_map:
raise ExecutionError(f"Binary operator '{binaryop.op}' is not supported.")
left = binaryop.left.accept(self)
right = binaryop.right.accept(self)
logger.debug("Executed binary operation '%s'.", binaryop.op)
return self._binary_operator_map.get(binaryop.op)(left, right)
[docs]
def visit_compound(self, compound: Compound):
if compound.block_items is not None:
for item in compound.block_items:
value = item.accept(self)
if value is not None:
return value
return None
[docs]
def visit_compoundliteral(self, compound_literal: CompoundLiteral):
raise ExecutionError("Compound literals are not supported.")
[docs]
def visit_constant(self, constant: Constant):
if constant.value is None:
raise ExecutionError("Constant has no value.")
if constant.type is None:
raise ExecutionError("Constant has no type.")
if constant.type not in self._constant_type_map:
raise ExecutionError(f"Constant type '{constant.type}' is not supported.")
logger.debug("Executed constant '%s'.", constant.value)
return self._constant_type_map.get(constant.type)(constant.value)
[docs]
def visit_decl(self, decl: Decl):
if decl.type is None:
raise ExecutionError("Declaration has no type.")
if decl.name is None:
raise ExecutionError("Declaration has no name.")
variable = decl.type.accept(self)
self._frame.add_binding(decl.name, variable)
if decl.init is not None:
initial_value = decl.init.accept(self)
variable.value = initial_value
logger.debug("Executed variable declaration of '%s'.", decl.name)
[docs]
def visit_decllist(self, decl_list: DeclList):
for declaration in decl_list.decls:
declaration.accept(self)
[docs]
def visit_emptystatement(self):
raise ExecutionError("Empty statements are not supported.")
[docs]
def visit_exprlist(self, expr_list: ExprList):
values = []
for expression in expr_list.exprs:
values.append(expression.accept(self))
return values
[docs]
def visit_fileast(self, fileast: FileAST):
for item in fileast.ext:
value = item.accept(self)
if value is not None:
return value
return None
[docs]
def visit_funccall(self, funccall: FuncCall):
function_name = funccall.name.name
if hardware_command_handler.has_command(function_name):
command = hardware_command_handler.get_command(function_name)
params = []
if funccall.args is not None:
for expression in funccall.args.exprs:
params.append(expression.accept(self))
logger.debug("Executed hardware function call '%s'.", function_name)
try:
return hardware_command_handler.run_command(command, params).result
except HardwareCommandError as e:
Executor.stop()
raise HardwareCallError(f"Function call '{function_name}' failed.") from e
function = self._frame.get_function(function_name)
if function is None:
raise ExecutionError(f"Function '{function_name}' is not declared.")
if function.decl is None or function.decl.type is None:
raise ExecutionError(f"Function '{function_name}' has an invalid declaration.")
if function.body is None:
raise ExecutionError(f"Function '{function_name}' has no body.")
function_frame = Frame(self._frame)
function_executor = Executor(function_frame)
names = []
values = []
if function.decl.type.args is not None:
names = function.decl.type.args.accept(function_executor)
if funccall.args is not None:
values = funccall.args.accept(function_executor)
if len(names) != len(values):
raise ExecutionError(f"Function call '{function_name}' provides an invalid amount "
f"of arguments.")
for name, value in zip(names, values):
function_frame.get_variable(name).value = value
logger.debug("Executed function call '%s'.", funccall.name)
return function.body.accept(function_executor)
[docs]
def visit_funcdecl(self, funcdecl: FuncDecl):
raise ExecutionError("Function declarations are supported but should never be visited.")
[docs]
def visit_funcdef(self, funcdef: FuncDef):
if funcdef.decl is None:
raise ExecutionError("Function definition has no declaration.")
if funcdef.decl.type is None:
raise ExecutionError("Function definition has no declaration type.")
if funcdef.decl.name is None:
raise ExecutionError("Function definition has no name.")
self._frame.add_function(funcdef.decl.name, funcdef)
logger.debug("Executed function definition of '%s'.", funcdef.decl.name)
[docs]
def visit_id(self, identifier: ID):
if identifier.name is None:
raise ExecutionError("Identifier has no name.")
variable = self._frame.get_variable(identifier.name)
if variable is not None:
logger.debug("Executed variable call to '%s'.", identifier.name)
return self._frame.get_variable(identifier.name).value
function = self._frame.get_function(identifier.name)
return function
[docs]
def visit_identifiertype(self, identifier_type: IdentifierType):
if identifier_type.names is None:
raise ExecutionError("Identifier type has no type.")
logger.debug("Executed identifier type '%s'.", identifier_type.names)
if isinstance(identifier_type.names, list):
return identifier_type.names[0]
return identifier_type.names
[docs]
def visit_initlist(self, init_list: InitList):
initial_values = []
for item in init_list.exprs:
initial_values.append(item.accept(self))
logger.debug("Executed initialization list with values: %s", initial_values)
return initial_values
[docs]
def visit_namedinitializer(self, named_initializer: NamedInitializer):
raise ExecutionError("Named initializers are not supported.")
[docs]
def visit_paramlist(self, param_list: ParamList):
parameter_names = []
for parameter in param_list.params:
if parameter.name is None:
raise ExecutionError("Parameter in parameter list has no name.")
parameter_names.append(parameter.name)
parameter.accept(self)
logger.debug("Executed parameter declaration of '%s'.", parameter.name)
return parameter_names
[docs]
def visit_rangedecl(self, range_decl: RangeDecl):
return Range(False, range_decl.lower.accept(self), range_decl.upper.accept(self))
[docs]
def visit_return(self, ret: Return):
if ret.expr is None:
raise ExecutionError("Return has no expression.")
return ret.expr.accept(self)
[docs]
def visit_struct(self, struct: Struct):
raise ExecutionError("Structs are not supported.")
[docs]
def visit_structref(self, structref: StructRef):
raise ExecutionError("Struct references are not supported.")
[docs]
def visit_typedecl(self, typedecl: TypeDecl):
if typedecl.type is None:
raise ExecutionError("Type declaration has no type.")
if typedecl.quals is None:
raise ExecutionError("Type declaration has no qualifier list.")
variable_type = typedecl.type.accept(self)
if variable_type not in self._type_map:
raise ExecutionError(f"Type '{variable_type}' is not supported.")
constant = self._CONSTANT_KEY in typedecl.quals
broadcast = self._BROADCAST_KEY in typedecl.quals
logger.debug("Executed variable creation of type '%s'.", variable_type)
return self._type_map.get(variable_type)(constant, broadcast)
[docs]
def visit_typedef(self, typedef: Typedef):
raise ExecutionError("Type definitions are not supported.")
[docs]
def visit_typename(self, typename: Typename):
raise ExecutionError("Type names are not supported.")
[docs]
def visit_unaryop(self, unaryop: UnaryOp):
if unaryop.op is None:
raise ExecutionError("Unary operation has no operator.")
if unaryop.op not in self._unary_operator_map:
raise ExecutionError(f"Unary operator '{unaryop.op}' is not supported.")
if unaryop.expr is None:
raise ExecutionError("Unary operation has no expression.")
return self._unary_operator_map.get(unaryop.op)(unaryop.expr.accept(self))
[docs]
class HardwareCallError(Exception):
"""
A class to represent an error when executing hardware calls.
"""
def __init__(self, message: str):
super().__init__(message)
logger.error(message)
[docs]
class ExecutionError(Exception):
"""
A class to represent an error when executing hardware calls.
"""
def __init__(self, message: str):
super().__init__(message)
logger.error(message)