Source code for uppaal2jetracer.controller.executor

"""Executor

This file provides an executor to execute declarations that follow the UPPAAL syntax.

This file can be imported and contains the following classes:

    * Executor: Executes UPPAAL declarations.
"""

import logging

from uppaal2jetracer.controller.hardware_command_system import HardwareCommandHandler, \
    HardwareCommandError
from uppaal2jetracer.declarations.declarations_ast import NodeVisitor, ArrayDecl, ArrayRef, \
    Assignment, BinaryOp, Compound, CompoundLiteral, Constant, Decl, DeclList, ExprList, FileAST, \
    FuncCall, FuncDecl, FuncDef, ID, IdentifierType, InitList, NamedInitializer, ParamList, \
    Return, Struct, StructRef, TypeDecl, Typedef, Typename, UnaryOp, RangeDecl
from uppaal2jetracer.uppaalmodel.frame import Frame
from uppaal2jetracer.uppaalmodel.variable import Array, Variable, IntVariable, DoubleVariable, \
    StringVariable, Clock, BooleanVariable, Channel, Range

logger = logging.getLogger("controller")
hardware_command_handler = HardwareCommandHandler()


[docs] class Executor(NodeVisitor): """ A NodeVisitor implementation that executes UPPAAL declarations. """ __slots__ = ("_frame", "_assignment_operator_map", "_unary_operator_map", "_binary_operator_map", "constant_type_map", "type_map") _BROADCAST_KEY = "broadcast" _CONSTANT_KEY = "const" def __init__(self, frame: Frame): self._frame = frame self._assignment_operator_map = { "=": lambda a, b: b, "+=": lambda a, b: a + b, "-=": lambda a, b: a - b, "*=": lambda a, b: a * b, "/=": lambda a, b: a / b, "%=": lambda a, b: a % b, "&=": lambda a, b: a & b, "|=": lambda a, b: a | b, "^=": lambda a, b: a ^ b, "<<=": lambda a, b: a << b, ">>=": lambda a, b: a >> b } self._unary_operator_map = { "!": lambda a: not a, "~": lambda a: ~ a, "p++": lambda a: a + 1, "p--": lambda a: a - 1, "-": lambda a: - a } self._binary_operator_map = { "||": lambda a, b: a or b, "&&": lambda a, b: a and b, "|": lambda a, b: a | b, "^": lambda a, b: a ^ b, "&": lambda a, b: a & b, "==": lambda a, b: a == b, "!=": lambda a, b: a != b, ">": lambda a, b: a > b, "<": lambda a, b: a < b, ">=": lambda a, b: a >= b, "<=": lambda a, b: a <= b, ">>": lambda a, b: a >> b, "<<": lambda a, b: a << b, "+": lambda a, b: a + b, "-": lambda a, b: a & b, "*": lambda a, b: a * b, "/": lambda a, b: a / b, "%": lambda a, b: a % b } self._constant_type_map = { "int": int, "double": float, "string": str, "bool": bool } self._type_map = { "int": lambda a, b: IntVariable(a), "double": lambda a, b: DoubleVariable(a), "string": lambda a, b: StringVariable(a), "bool": lambda a, b: BooleanVariable(a), "chan": lambda a, b: Channel(b), "clock": lambda a, b: Clock() }
[docs] @staticmethod def stop(): """ Stops the hardware command handler. """ hardware_command_handler.stop()
[docs] def visit_arraydecl(self, array_decl: ArrayDecl): if array_decl.dim_quals is None: raise ExecutionError("Array declaration has no dimensional qualifier.") if self._CONSTANT_KEY in array_decl.dim_quals: logger.debug("Executed constant array creation.") return Array(True) logger.debug("Executed non-constant array creation.") return Array(False)
[docs] def visit_arrayref(self, array_ref: ArrayRef): if array_ref.name is None: raise ExecutionError("Array reference has no name.") if array_ref.subscript is None: raise ExecutionError(f"Array reference to '{array_ref.name}' has no index.") array: Variable = self._frame.get_variable(array_ref.name) if array is None: raise ExecutionError(f"Array '{array_ref.name}' is not declared.") if not isinstance(array, Array): raise ExecutionError(f"Variable '{array_ref.name}' is not an array.") logger.debug("Executed referencing of array '%s'.", array_ref.name) return array.get_index(array_ref.subscript.accept(self))
[docs] def visit_assignment(self, assignment: Assignment): if assignment.lvalue is None: raise ExecutionError("Assignment has no left expression.") if assignment.lvalue.name is None: raise ExecutionError("Left value of assignment has no name.") if assignment.rvalue is None: raise ExecutionError("Assignment has no right expression.") if assignment.op is None: raise ExecutionError("Assignment operator is not defined.") if assignment.op not in self._assignment_operator_map: raise ExecutionError(f"Assignment operator '{assignment.op}' is not supported.") variable: Variable = self._frame.get_variable(assignment.lvalue.name) variable.value = (self._assignment_operator_map.get(assignment.op) (variable.value, assignment.rvalue.accept(self))) logger.debug("Executed assignment '%s'.", assignment.op)
[docs] def visit_binaryop(self, binaryop: BinaryOp): if binaryop.left is None: raise ExecutionError("Binary operation has no left expression.") if binaryop.right is None: raise ExecutionError("Binary operation has no right expression.") if binaryop.op is None: raise ExecutionError("Binary operator is not defined.") if binaryop.op not in self._binary_operator_map: raise ExecutionError(f"Binary operator '{binaryop.op}' is not supported.") left = binaryop.left.accept(self) right = binaryop.right.accept(self) logger.debug("Executed binary operation '%s'.", binaryop.op) return self._binary_operator_map.get(binaryop.op)(left, right)
[docs] def visit_compound(self, compound: Compound): if compound.block_items is not None: for item in compound.block_items: value = item.accept(self) if value is not None: return value return None
[docs] def visit_compoundliteral(self, compound_literal: CompoundLiteral): raise ExecutionError("Compound literals are not supported.")
[docs] def visit_constant(self, constant: Constant): if constant.value is None: raise ExecutionError("Constant has no value.") if constant.type is None: raise ExecutionError("Constant has no type.") if constant.type not in self._constant_type_map: raise ExecutionError(f"Constant type '{constant.type}' is not supported.") logger.debug("Executed constant '%s'.", constant.value) return self._constant_type_map.get(constant.type)(constant.value)
[docs] def visit_decl(self, decl: Decl): if decl.type is None: raise ExecutionError("Declaration has no type.") if decl.name is None: raise ExecutionError("Declaration has no name.") variable = decl.type.accept(self) self._frame.add_binding(decl.name, variable) if decl.init is not None: initial_value = decl.init.accept(self) variable.value = initial_value logger.debug("Executed variable declaration of '%s'.", decl.name)
[docs] def visit_decllist(self, decl_list: DeclList): for declaration in decl_list.decls: declaration.accept(self)
[docs] def visit_emptystatement(self): raise ExecutionError("Empty statements are not supported.")
[docs] def visit_exprlist(self, expr_list: ExprList): values = [] for expression in expr_list.exprs: values.append(expression.accept(self)) return values
[docs] def visit_fileast(self, fileast: FileAST): for item in fileast.ext: value = item.accept(self) if value is not None: return value return None
[docs] def visit_funccall(self, funccall: FuncCall): function_name = funccall.name.name if hardware_command_handler.has_command(function_name): command = hardware_command_handler.get_command(function_name) params = [] if funccall.args is not None: for expression in funccall.args.exprs: params.append(expression.accept(self)) logger.debug("Executed hardware function call '%s'.", function_name) try: return hardware_command_handler.run_command(command, params).result except HardwareCommandError as e: Executor.stop() raise HardwareCallError(f"Function call '{function_name}' failed.") from e function = self._frame.get_function(function_name) if function is None: raise ExecutionError(f"Function '{function_name}' is not declared.") if function.decl is None or function.decl.type is None: raise ExecutionError(f"Function '{function_name}' has an invalid declaration.") if function.body is None: raise ExecutionError(f"Function '{function_name}' has no body.") function_frame = Frame(self._frame) function_executor = Executor(function_frame) names = [] values = [] if function.decl.type.args is not None: names = function.decl.type.args.accept(function_executor) if funccall.args is not None: values = funccall.args.accept(function_executor) if len(names) != len(values): raise ExecutionError(f"Function call '{function_name}' provides an invalid amount " f"of arguments.") for name, value in zip(names, values): function_frame.get_variable(name).value = value logger.debug("Executed function call '%s'.", funccall.name) return function.body.accept(function_executor)
[docs] def visit_funcdecl(self, funcdecl: FuncDecl): raise ExecutionError("Function declarations are supported but should never be visited.")
[docs] def visit_funcdef(self, funcdef: FuncDef): if funcdef.decl is None: raise ExecutionError("Function definition has no declaration.") if funcdef.decl.type is None: raise ExecutionError("Function definition has no declaration type.") if funcdef.decl.name is None: raise ExecutionError("Function definition has no name.") self._frame.add_function(funcdef.decl.name, funcdef) logger.debug("Executed function definition of '%s'.", funcdef.decl.name)
[docs] def visit_id(self, identifier: ID): if identifier.name is None: raise ExecutionError("Identifier has no name.") variable = self._frame.get_variable(identifier.name) if variable is not None: logger.debug("Executed variable call to '%s'.", identifier.name) return self._frame.get_variable(identifier.name).value function = self._frame.get_function(identifier.name) return function
[docs] def visit_identifiertype(self, identifier_type: IdentifierType): if identifier_type.names is None: raise ExecutionError("Identifier type has no type.") logger.debug("Executed identifier type '%s'.", identifier_type.names) if isinstance(identifier_type.names, list): return identifier_type.names[0] return identifier_type.names
[docs] def visit_initlist(self, init_list: InitList): initial_values = [] for item in init_list.exprs: initial_values.append(item.accept(self)) logger.debug("Executed initialization list with values: %s", initial_values) return initial_values
[docs] def visit_namedinitializer(self, named_initializer: NamedInitializer): raise ExecutionError("Named initializers are not supported.")
[docs] def visit_paramlist(self, param_list: ParamList): parameter_names = [] for parameter in param_list.params: if parameter.name is None: raise ExecutionError("Parameter in parameter list has no name.") parameter_names.append(parameter.name) parameter.accept(self) logger.debug("Executed parameter declaration of '%s'.", parameter.name) return parameter_names
[docs] def visit_rangedecl(self, range_decl: RangeDecl): return Range(False, range_decl.lower.accept(self), range_decl.upper.accept(self))
[docs] def visit_return(self, ret: Return): if ret.expr is None: raise ExecutionError("Return has no expression.") return ret.expr.accept(self)
[docs] def visit_struct(self, struct: Struct): raise ExecutionError("Structs are not supported.")
[docs] def visit_structref(self, structref: StructRef): raise ExecutionError("Struct references are not supported.")
[docs] def visit_typedecl(self, typedecl: TypeDecl): if typedecl.type is None: raise ExecutionError("Type declaration has no type.") if typedecl.quals is None: raise ExecutionError("Type declaration has no qualifier list.") variable_type = typedecl.type.accept(self) if variable_type not in self._type_map: raise ExecutionError(f"Type '{variable_type}' is not supported.") constant = self._CONSTANT_KEY in typedecl.quals broadcast = self._BROADCAST_KEY in typedecl.quals logger.debug("Executed variable creation of type '%s'.", variable_type) return self._type_map.get(variable_type)(constant, broadcast)
[docs] def visit_typedef(self, typedef: Typedef): raise ExecutionError("Type definitions are not supported.")
[docs] def visit_typename(self, typename: Typename): raise ExecutionError("Type names are not supported.")
[docs] def visit_unaryop(self, unaryop: UnaryOp): if unaryop.op is None: raise ExecutionError("Unary operation has no operator.") if unaryop.op not in self._unary_operator_map: raise ExecutionError(f"Unary operator '{unaryop.op}' is not supported.") if unaryop.expr is None: raise ExecutionError("Unary operation has no expression.") return self._unary_operator_map.get(unaryop.op)(unaryop.expr.accept(self))
[docs] class HardwareCallError(Exception): """ A class to represent an error when executing hardware calls. """ def __init__(self, message: str): super().__init__(message) logger.error(message)
[docs] class ExecutionError(Exception): """ A class to represent an error when executing hardware calls. """ def __init__(self, message: str): super().__init__(message) logger.error(message)