Source code for uppaal2jetracer.controller.executor

"""Executor

This file provides an executor to execute declarations that follow the UPPAAL syntax.

This file can be imported and contains the following classes:

    * Executor: Executes UPPAAL declarations.
"""

import logging

from uppaal2jetracer.controller.hardware_command_system import \
    HardwareCommandHandler, HardwareCommandError
from uppaal2jetracer.declarations.declarations_ast import NodeVisitor, ArrayDecl, \
    ArrayRef, Assignment, BinaryOp, Compound, CompoundLiteral, Constant, Decl, DeclList, \
    ExprList, FileAST, FuncCall, FuncDecl, FuncDef, ID, IdentifierType, InitList, \
    NamedInitializer, ParamList, Return, Struct, StructRef, TypeDecl, Typedef, Typename, UnaryOp, \
    RangeDecl
from uppaal2jetracer.uppaalmodel.frame import Frame
from uppaal2jetracer.uppaalmodel.variable import Array, Variable, IntVariable, \
    DoubleVariable, StringVariable, Clock, BooleanVariable, Channel, Range

logger = logging.getLogger("controller")
hardware_command_handler = HardwareCommandHandler()


[docs] class Executor(NodeVisitor): """ A NodeVisitor implementation that executes UPPAAL declarations. """ __slots__ = ("_frame",) _BROADCAST_KEY = "broadcast" _CONSTANT_KEY = "const" def __init__(self, frame: Frame): self._frame = frame
[docs] @staticmethod def stop(): """ Stops the hardware command handler. """ hardware_command_handler.stop()
[docs] def visit_arraydecl(self, array_decl: ArrayDecl): if self._CONSTANT_KEY in array_decl.dim_quals: logger.debug("Executed constant array creation.") return Array(True) logger.debug("Executed non-constant array creation.") return Array(False)
[docs] def visit_arrayref(self, array_ref: ArrayRef): array: Variable = self._frame.get_variable(array_ref.name) assert isinstance(array, Array) logger.debug("Executed referencing of array '%s'.", array_ref.name) return array.get_index(array_ref.subscript.accept(self))
[docs] def visit_assignment(self, assignment: Assignment): operator_map = { "=": lambda a, b: b, "+=": lambda a, b: a + b, "-=": lambda a, b: a - b, "*=": lambda a, b: a * b, "/=": lambda a, b: a / b, "%=": lambda a, b: a % b, "&=": lambda a, b: a & b, "|=": lambda a, b: a | b, "^=": lambda a, b: a ^ b, "<<=": lambda a, b: a << b, ">>=": lambda a, b: a >> b } variable: Variable = self._frame.get_variable(assignment.lvalue.name) variable.value = (operator_map.get(assignment.op, lambda a, b: a) (variable.value, assignment.rvalue.accept(self))) logger.debug("Executed assignment '%s'.", assignment.op)
[docs] def visit_binaryop(self, binaryop: BinaryOp): operator_map = { "||": lambda a, b: a or b, "&&": lambda a, b: a and b, "|": lambda a, b: a | b, "^": lambda a, b: a ^ b, "&": lambda a, b: a & b, "==": lambda a, b: a == b, "!=": lambda a, b: a != b, ">": lambda a, b: a > b, "<": lambda a, b: a < b, ">=": lambda a, b: a >= b, "<=": lambda a, b: a <= b, ">>": lambda a, b: a >> b, "<<": lambda a, b: a << b, "+": lambda a, b: a + b, "-": lambda a, b: a & b, "*": lambda a, b: a * b, "/": lambda a, b: a / b, "%": lambda a, b: a % b } left = binaryop.left.accept(self) right = binaryop.right.accept(self) logger.debug("Executed binary operation '%s'.", binaryop.op) return operator_map.get(binaryop.op, lambda a, b: a)(left, right)
[docs] def visit_compound(self, compound: Compound): if compound.block_items is not None: for item in compound.block_items: value = item.accept(self) if value is not None: return value return None
[docs] def visit_compoundliteral(self, compound_literal: CompoundLiteral): pass
[docs] def visit_constant(self, constant: Constant): type_map = { "int": int, "double": float, "string": str, "bool": bool } logger.debug("Executed constant '%s'.", constant.value) return type_map.get(constant.type)(constant.value)
[docs] def visit_decl(self, decl: Decl): variable = decl.type.accept(self) self._frame.add_binding(decl.name, variable) if decl.init is not None: initial_value = decl.init.accept(self) variable.value = initial_value logger.debug("Executed variable declaration of '%s'.", decl.name)
[docs] def visit_decllist(self, decl_list: DeclList): for declaration in decl_list.decls: declaration.accept(self)
[docs] def visit_emptystatement(self): pass
[docs] def visit_exprlist(self, expr_list: ExprList): values = [] for expression in expr_list.exprs: values.append(expression.accept(self)) return values
[docs] def visit_fileast(self, fileast: FileAST): for item in fileast.ext: value = item.accept(self) if value is not None: return value return None
[docs] def visit_funccall(self, funccall: FuncCall): function_name = funccall.name.name if hardware_command_handler.has_command(function_name): command = hardware_command_handler.get_command(function_name) params = [] if funccall.args is not None: for expression in funccall.args.exprs: params.append(expression.accept(self)) logger.debug("Executed hardware function call '%s'.", function_name) try: return hardware_command_handler.run_command(command, params).result except HardwareCommandError as e: Executor.stop() raise HardwareCallError(f"Error: Function call '{function_name}' failed.") from e function = self._frame.get_function(function_name) function_frame = Frame(self._frame) function_executor = Executor(function_frame) names = [] values = [] if function.decl.type.args is not None: names = function.decl.type.args.accept(function_executor) values = funccall.args.accept(function_executor) for name, value in zip(names, values): function_frame.get_variable(name).value = value logger.debug("Executed function call '%s'.", funccall.name) return self._frame.get_function(function_name).body.accept(function_executor)
[docs] def visit_funcdecl(self, funcdecl: FuncDecl): pass
[docs] def visit_funcdef(self, funcdef: FuncDef): self._frame.add_function(funcdef.decl.name, funcdef) logger.debug("Executed function declaration of '%s'.", funcdef.decl.name)
[docs] def visit_id(self, identifier: ID): variable = self._frame.get_variable(identifier.name) if variable is not None: logger.debug("Executed variable call to '%s'.", identifier.name) return self._frame.get_variable(identifier.name).value function = self._frame.get_function(identifier.name) return function
[docs] def visit_identifiertype(self, identifier_type: IdentifierType): logger.debug("Executed identifier type '%s'.", identifier_type.names) if isinstance(identifier_type.names, list): return identifier_type.names[0] return identifier_type.names
[docs] def visit_initlist(self, init_list: InitList): initial_values = [] for item in init_list.exprs: initial_values.append(item.accept(self)) logger.debug("Executed initialization list with values: %s", initial_values) return initial_values
[docs] def visit_namedinitializer(self, named_initializer: NamedInitializer): pass
[docs] def visit_paramlist(self, param_list: ParamList): parameter_names = [] for parameter in param_list.params: parameter_names.append(parameter.name) parameter.accept(self) logger.debug("Executed parameter declaration of '%s'.", parameter.name) return parameter_names
[docs] def visit_rangedecl(self, range_decl: RangeDecl): return Range(False, range_decl.lower.accept(self), range_decl.upper.accept(self))
[docs] def visit_return(self, ret: Return): return ret.expr.accept(self)
[docs] def visit_struct(self, struct: Struct): pass
[docs] def visit_structref(self, structref: StructRef): pass
[docs] def visit_typedecl(self, typedecl: TypeDecl): type_map = { "int": lambda a, b: IntVariable(a), "double": lambda a, b: DoubleVariable(a), "string": lambda a, b: StringVariable(a), "bool": lambda a, b: BooleanVariable(a), "chan": lambda a, b: Channel(b), "clock": lambda a, b: Clock() } variable_type = typedecl.type.accept(self) constant = self._CONSTANT_KEY in typedecl.quals broadcast = self._BROADCAST_KEY in typedecl.quals logger.debug("Executed variable creation of type '%s'.", variable_type) return type_map.get(variable_type, None)(constant, broadcast)
[docs] def visit_typedef(self, typedef: Typedef): pass
[docs] def visit_typename(self, typename: Typename): pass
[docs] def visit_unaryop(self, unaryop: UnaryOp): operator_map = { "!": lambda a: not a, "~": lambda a: ~ a, "p++": lambda a: a + 1, "p--": lambda a: a - 1, "-": lambda a: - a } return operator_map.get(unaryop.op, lambda a: a)(unaryop.expr.accept(self))
[docs] class HardwareCallError(Exception): """ A class to represent an error when executing hardware calls. """ def __init__(self, message: str): super().__init__(message) logger.error(message)