"""Executor
This file provides an executor to execute declarations that follow the UPPAAL syntax.
This file can be imported and contains the following classes:
* Executor: Executes UPPAAL declarations.
"""
import logging
from uppaal2jetracer.controller.hardware_command_system import \
HardwareCommandHandler, HardwareCommandError
from uppaal2jetracer.declarations.declarations_ast import NodeVisitor, ArrayDecl, \
ArrayRef, Assignment, BinaryOp, Compound, CompoundLiteral, Constant, Decl, DeclList, \
EmptyStatement, ExprList, FileAST, FuncCall, FuncDecl, FuncDef, ID, IdentifierType, InitList, \
NamedInitializer, ParamList, Return, Struct, StructRef, TypeDecl, Typedef, Typename, UnaryOp, \
RangeDecl
from uppaal2jetracer.uppaalmodel.frame import Frame
from uppaal2jetracer.uppaalmodel.variable import Array, Variable, IntVariable, \
DoubleVariable, StringVariable, Clock, BooleanVariable, Channel, Range
logger = logging.getLogger("controller")
hardware_command_handler = HardwareCommandHandler()
[docs]
class Executor(NodeVisitor):
"""
A NodeVisitor implementation that executes UPPAAL declarations.
"""
__slots__ = ("_frame",)
_BROADCAST_KEY = "broadcast"
_CONSTANT_KEY = "const"
def __init__(self, frame: Frame):
self._frame = frame
[docs]
@staticmethod
def stop():
"""
Stops the hardware command handler.
"""
hardware_command_handler.stop()
[docs]
def visit_arraydecl(self, array_decl: ArrayDecl):
if self._CONSTANT_KEY in array_decl.dim_quals:
logger.debug("Executed constant array creation.")
return Array(True)
logger.debug("Executed non-constant array creation.")
return Array(False)
[docs]
def visit_arrayref(self, array_ref: ArrayRef):
array: Variable = self._frame.get_variable(array_ref.name)
assert isinstance(array, Array)
logger.debug("Executed referencing of array '%s'.", array_ref.name)
return array.get_index(array_ref.subscript.accept(self))
[docs]
def visit_assignment(self, assignment: Assignment):
operator_map = {
"=": lambda a, b: b,
"+=": lambda a, b: a + b,
"-=": lambda a, b: a - b,
"*=": lambda a, b: a * b,
"/=": lambda a, b: a / b,
"%=": lambda a, b: a % b,
"&=": lambda a, b: a & b,
"|=": lambda a, b: a | b,
"^=": lambda a, b: a ^ b,
"<<=": lambda a, b: a << b,
">>=": lambda a, b: a >> b
}
variable: Variable = self._frame.get_variable(assignment.lvalue.name)
variable.value = (operator_map.get(assignment.op, lambda a, b: a)
(variable.value, assignment.rvalue.accept(self)))
logger.debug("Executed assignment '%s'.", assignment.op)
[docs]
def visit_binaryop(self, binaryop: BinaryOp):
operator_map = {
"||": lambda a, b: a or b,
"&&": lambda a, b: a and b,
"|": lambda a, b: a | b,
"^": lambda a, b: a ^ b,
"&": lambda a, b: a & b,
"==": lambda a, b: a == b,
"!=": lambda a, b: a != b,
">": lambda a, b: a > b,
"<": lambda a, b: a < b,
">=": lambda a, b: a >= b,
"<=": lambda a, b: a <= b,
">>": lambda a, b: a >> b,
"<<": lambda a, b: a << b,
"+": lambda a, b: a + b,
"-": lambda a, b: a & b,
"*": lambda a, b: a * b,
"/": lambda a, b: a / b,
"%": lambda a, b: a % b
}
left = binaryop.left.accept(self)
right = binaryop.right.accept(self)
logger.debug("Executed binary operation '%s'.", binaryop.op)
return operator_map.get(binaryop.op, lambda a, b: a)(left, right)
[docs]
def visit_compound(self, compound: Compound):
for item in compound.block_items:
value = item.accept(self)
if value is not None:
return value
return None
[docs]
def visit_compoundliteral(self, compound_literal: CompoundLiteral):
pass
[docs]
def visit_constant(self, constant: Constant):
type_map = {
"int": int,
"double": float,
"string": str,
"boolean": bool
}
logger.debug("Executed constant '%s'.", constant.value)
return type_map.get(constant.type)(constant.value)
[docs]
def visit_decl(self, decl: Decl):
variable = decl.type.accept(self)
self._frame.add_binding(decl.name, variable)
if decl.init is not None:
initial_value = decl.init.accept(self)
variable.value = initial_value
logger.debug("Executed variable declaration of '%s'.", decl.name)
[docs]
def visit_decllist(self, decl_list: DeclList):
for declaration in decl_list.decls:
declaration.accept(self)
[docs]
def visit_emptystatement(self, empty_statement: EmptyStatement):
pass
[docs]
def visit_exprlist(self, expr_list: ExprList):
values = []
for expression in expr_list.exprs:
values.append(expression.accept(self))
return values
[docs]
def visit_fileast(self, fileast: FileAST):
for item in fileast.ext:
value = item.accept(self)
if value is not None:
return value
return None
[docs]
def visit_funccall(self, funccall: FuncCall):
function_name = funccall.name.name
if hardware_command_handler.has_command(function_name):
command = hardware_command_handler.get_command(function_name)
params = []
if funccall.args is not None:
for expression in funccall.args.exprs:
params.append(expression.accept(self))
logger.debug("Executed hardware function call '%s'.", function_name)
try:
return hardware_command_handler.run_command(command, params).result
except HardwareCommandError as e:
Executor.stop()
raise HardwareCallError(f"Error: Function call '{function_name}' failed.") from e
function = self._frame.get_function(function_name)
function_frame = Frame(self._frame)
function_executor = Executor(function_frame)
names = function.decl.type.args.accept(function_executor)
values = funccall.args.accept(function_executor)
for name, value in zip(names, values):
function_frame.get_variable(name).value = value
logger.debug("Executed function call '%s'.", funccall.name)
return self._frame.get_function(function_name).body.accept(function_executor)
[docs]
def visit_funcdecl(self, funcdecl: FuncDecl):
pass
[docs]
def visit_funcdef(self, funcdef: FuncDef):
self._frame.add_function(funcdef.decl.name, funcdef)
logger.debug("Executed function declaration of '%s'.", funcdef.decl.name)
[docs]
def visit_id(self, identifier: ID):
variable = self._frame.get_variable(identifier.name)
if variable is not None:
logger.debug("Executed variable call to '%s'.", identifier.name)
return self._frame.get_variable(identifier.name).value
function = self._frame.get_function(identifier.name)
return function
[docs]
def visit_identifiertype(self, identifier_type: IdentifierType):
logger.debug("Executed identifier type '%s'.", identifier_type.names)
if isinstance(identifier_type.names, list):
return identifier_type.names[0]
return identifier_type.names
[docs]
def visit_initlist(self, init_list: InitList):
initial_values = []
for item in init_list.exprs:
initial_values.append(item.accept(self))
logger.debug("Executed initialization list with values: %s", initial_values)
return initial_values
[docs]
def visit_namedinitializer(self, named_initializer: NamedInitializer):
pass
[docs]
def visit_paramlist(self, param_list: ParamList):
parameter_names = []
for parameter in param_list.params:
parameter_names.append(parameter.name)
parameter.accept(self)
logger.debug("Executed parameter declaration of '%s'.", parameter.name)
return parameter_names
[docs]
def visit_rangedecl(self, range_decl: RangeDecl):
return Range(False, range_decl.lower.accept(self), range_decl.upper.accept(self))
[docs]
def visit_return(self, ret: Return):
return ret.expr.accept(self)
[docs]
def visit_struct(self, struct: Struct):
pass
[docs]
def visit_structref(self, structref: StructRef):
pass
[docs]
def visit_typedecl(self, typedecl: TypeDecl):
type_map = {
"int": lambda a, b: IntVariable(a),
"double": lambda a, b: DoubleVariable(a),
"string": lambda a, b: StringVariable(a),
"boolean": lambda a, b: BooleanVariable(a),
"chan": lambda a, b: Channel(b),
"clock": lambda a, b: Clock()
}
variable_type = typedecl.type.accept(self)
constant = self._CONSTANT_KEY in typedecl.quals
broadcast = self._BROADCAST_KEY in typedecl.quals
logger.debug("Executed variable creation of type '%s'.", variable_type)
return type_map.get(variable_type, None)(constant, broadcast)
[docs]
def visit_typedef(self, typedef: Typedef):
pass
[docs]
def visit_typename(self, typename: Typename):
pass
[docs]
def visit_unaryop(self, unaryop: UnaryOp):
operator_map = {
"!": lambda a: not a,
"~": lambda a: ~ a,
"p++": lambda a: a + 1,
"p--": lambda a: a - 1
}
return operator_map.get(unaryop.op, lambda a: a)(unaryop.expr.accept(self))
[docs]
class HardwareCallError(Exception):
"""
A class to represent an error when executing hardware calls.
"""
def __init__(self, message: str):
super().__init__(message)
logger.error(message)