Source code for uppaal2jetracer.controller.uppaal_controller

"""UPPAAL controller

This file provides controllers to run parsed UPPAAL automata.

This file can be imported and contains the following classes:

    * SystemController:     Controls the execution of a parsed UPPAAL system.
    * AutomatonController:  Controls the execution of a single parsed UPPAAL automaton.
"""

from __future__ import annotations

import copy
from typing import List, Optional, Tuple
import logging

from uppaal2jetracer.uppaalmodel.elements import Transition, Update, Synchronization
from uppaal2jetracer.uppaalmodel.frame import Frame
from uppaal2jetracer.uppaalmodel.system import System, Automaton
from uppaal2jetracer.declarations.declarations_ast import Node
from uppaal2jetracer.controller.executor import Executor
from uppaal2jetracer.uppaalmodel.variable import Channel
from uppaal2jetracer.httphandler import HTTPHandler

logger = logging.getLogger('controller')
http_handler = HTTPHandler()


[docs] class SystemController: """ The system controller is simulating the system and managing automaton controller. """ __slots__ = ('_system', '_automata_controller', '_is_running') def __init__(self, system: System): self._system = system self._automata_controller: List[AutomatonController] = [] for automaton in system.automata: self._automata_controller.append(AutomatonController(automaton)) self._is_running = False @property def system(self): """ Returns the uppaal model system entity of the controller. :return: The system object. :rtype: System """ return self._system @property def automata_controller(self): """ Returns the list of automata controller managed by the controller. :return: The list of automata controller :rtype: List[AutomatonController] """ return copy.copy(self._automata_controller) @property def is_running(self) -> bool: """ Returns whether the controller is running or not. :return: Whether the controller is running. :rtype: bool """ return self._is_running @is_running.setter def is_running(self, run: bool): """ Sets whether the controller is running or not. :param run: The new running state. :type run: bool """ self._is_running = run
[docs] def run_system(self): """ The main loop of the system, call to start execution. """ logger.info("Method run_system called.") # reset all variables to their starting state logger.debug("Resetting Variables.") self.system.system_frame.reset() self.system.global_frame.reset() for automaton in self.system.automata: automaton.automaton_frame.reset() # main loop self._is_running = True while self._is_running: for controller in self._automata_controller: next_step = controller.step() if next_step[0] is None: continue if next_step[1] is None: self._execute_transition(next_step[0], controller) elif next_step[1].is_broadcast: self._execute_broadcast_transition(next_step[0], next_step[1], controller) else: self._execute_binary_transition(next_step[0], next_step[1], controller)
def _execute_transition(self, transition: Transition, automaton_controller: AutomatonController): automaton_controller.clean_up_channels() automaton_controller.execute_step(transition) logger.info("The transition from %s to %s was taken.", transition.source.identifier, transition.target.identifier) http_handler.update_webinterface(f"The transition from {transition.source.identifier} to " f"{transition.target.identifier} was taken.") def _execute_broadcast_transition(self, transition: Transition, channel: Channel, automaton_controller: AutomatonController): broadcast_receiver_list: List[Tuple[Transition, AutomatonController]] = [] logger.info("Executing the broadcast transition from location %s to %s.", transition.source.identifier, transition.target.identifier) http_handler.update_webinterface(f"Executing the broadcast transition from location " f"{transition.source.identifier} to " f"{transition.target.identifier} was taken.") for receiving_controller in self._automata_controller: if receiving_controller == automaton_controller: continue controller_frame = receiving_controller.automaton.automaton_frame for receiving_transition in receiving_controller.current_location.transitions: if not receiving_transition.synchronization.is_sending: receiving_channel = receiving_controller.execute_sync( receiving_transition.synchronization, Executor(controller_frame)) if receiving_channel == channel: if receiving_controller.can_step(receiving_transition, controller_frame): logger.debug("Adding transition from location %s to %s to the " "broadcast receiver list.", receiving_transition.source.identifier, receiving_transition.target.identifier) broadcast_receiver_list.append((receiving_transition, receiving_controller)) break self._execute_transition(transition, automaton_controller) for receiver in broadcast_receiver_list: self._execute_transition(receiver[0], receiver[1]) def _execute_binary_transition(self, transition: Transition, channel: Channel, automaton_controller: AutomatonController): logger.info("Executing the binary transition from location %s to %s.", transition.source.identifier, transition.target.identifier) http_handler.update_webinterface(f"Executing the binary transition from location " f"{transition.source.identifier} to " f"{transition.target.identifier}.") transitions: List[Transition] if transition.synchronization.is_sending: transitions = channel.receiving_transitions else: transitions = channel.sending_transitions for second_transition in transitions: logger.debug("Checking if transition from location %s to %s is valid " "binary transition partner.", second_transition.source.identifier, second_transition.target.identifier) second_controller = self._find_controller(second_transition) if second_controller is None: continue second_frame = second_controller.automaton.automaton_frame second_channel = second_controller.execute_sync(second_transition.synchronization, Executor(second_frame)) if second_channel == channel: if second_controller.can_step(second_transition, second_frame): self._execute_transition(transition, automaton_controller) self._execute_transition(second_transition, second_controller) def _find_controller(self, transition: Transition) -> Optional[AutomatonController]: for controller in self._automata_controller: if transition.source == controller.current_location: return controller return None
[docs] class AutomatonController: """ The automaton controller manages all processes that involve one automaton while runtime. """ __slots__ = ('_automaton', '_current_location', '_clean_up_list', '_executor') def __init__(self, automaton: Automaton): self._automaton = automaton self._current_location = automaton.initial_location self._clean_up_list: List[Channel] = [] self._executor = Executor(automaton.automaton_frame) @property def automaton(self): """ Returns the automaton of the controller. :return: The automaton of the controller. :rtype: Automaton """ return self._automaton @property def current_location(self): """ Returns the current location inside the automaton. :return: The current_location :rtype: Location """ return self._current_location
[docs] def step(self) -> Tuple[Optional[Transition], Optional[Channel]]: """ Checks if the automaton has a valid transition, and returns it together with the channel. :return: A tuple of optional transition, channel. :rtype: Tuple[Optional[Transition], Optional[Channel]] """ for transition in self._current_location.transitions: if self.can_step(transition): logger.debug("Transition from location %s to %s is valid.", transition.source.identifier, transition.target.identifier) logger.debug("Checking transition type.") channel = self.execute_sync(transition.synchronization) # Transition with no synchronization if channel is None: logger.debug("Transition has no synchronization.") return transition, None # Transition with sending broadcast synchronization if channel.is_broadcast and transition.synchronization.is_sending: logger.debug("Transition is type sending broadcast.") return transition, channel # Transition with binary synchronization if not channel.is_broadcast: logger.debug("Transition is type binary.") if transition.synchronization.is_sending: channel.register_sending(transition) else: channel.register_receiving(transition) self._clean_up_list.append(channel) if channel.has_pair(): return transition, channel logger.debug("Transition is type receiving broadcast! Continue with next " "transition.") return None, None
[docs] def execute_step(self, transition: Transition): """ Takes a transition and changes current location and executes the update. :param transition: The transition to take. :type transition: Transition """ self._current_location = transition.target self.execute(transition.update)
[docs] def can_step(self, transition: Transition, frame: Frame = None) -> bool: """ Checks if a transition has a valid guard and if the next invariant is valid for a given frame. Default frame is the frame of the automaton. :param transition: The transition to check. :type transition: Transition :param frame: The variables to compare with. :type frame: Frame :return: True if the transition is valid, else False. :rtype: bool """ if frame is None: frame = self._automaton.automaton_frame # If transition has a guard, check it. if transition.guard is not None: try: guard_evaluation = self.is_valid(transition.guard.decision, Executor(frame)) if not guard_evaluation: return False except UnexpectedTypeError as exc: raise InvalidGuardError from exc # If next location has no invariant return true. if transition.target.invariant is None: logger.debug("Transition from location %s to location %s has a valid guard.", transition.source.identifier, transition.target.invariant) return True # Checking next invariant with a temporary frame. temporary_frame = frame.create_temporary_frame(transition.update) temporary_executor = Executor(temporary_frame) self.execute(transition.update, temporary_executor) try: invariant_evaluation = self.is_valid(transition.target.invariant.decision, temporary_executor) except UnexpectedTypeError as exc: raise InvalidInvariantError from exc if invariant_evaluation: logger.debug("Transition form location %s to location %s has valid guard and " "invariant.", transition.source.identifier, transition.target.identifier) return invariant_evaluation
[docs] def is_valid(self, node: Node, executor: Executor = None) -> bool: """ Checks if an expression is evaluated as true or false. Default executor is automaton executor. :param node: The root node of the expression. :type node: Node :param executor: The executor to use to execute the expression. :type executor: Executor :return: True when expression is true, False when expression is false. :rtype: bool """ if executor is None: executor = self._executor result = node.accept(executor) # Check if result has correct type if isinstance(result, bool): return result raise UnexpectedTypeError(bool, type(result))
[docs] def execute(self, update: Update, executor: Executor = None): """ Evaluates and updates the update assignments of a Transition. :param update: The update to execute :type update: Update :param executor: The executor to use :type executor: Executor """ if update is None: return if executor is None: executor = self._executor for assignment in update.assignments: assignment.accept(executor)
[docs] def execute_sync(self, sync: Synchronization, executor: Executor = None) -> Optional[Channel]: """ Evaluates a Synchronization to it's corresponding channel. :param sync: The root node of the synchronization. :type sync: Synchronization :param executor: The executor to use. :type executor: Executor :return: The channel of the synchronization. :rtype: Channel """ if sync is None: return None if executor is None: executor = self._executor result = sync.synchronization.accept(executor) # Check if result has correct type if isinstance(result, Channel): return result raise UnexpectedTypeError(Channel, type(result))
[docs] def clean_up_channels(self): """ Function to remove all transitions from the channels that originate of the current location. """ for channel in self._clean_up_list: channel.clean_up(self._current_location) self._clean_up_list.clear()
[docs] class UnexpectedTypeError(TypeError): """ Error that is raised when an unexpected argument is returned by the executor. """ def __init__(self, expected_type: type, gotten_type: type): msg: str = (f"Error: Expected element of type '{expected_type}' but got " f"type '{gotten_type}'.") logger.error(msg) super().__init__(msg)
[docs] class InvalidGuardError(TypeError): """ Error that is raised when a guard has an invalid syntax. """ def __init__(self, transition: Transition): msg: str = (f"Error: The guard of the Transition from location " f"'{transition.source.identifier}' " f"to '{transition.target.identifier} could not be evaluated.'") logger.error(msg) super().__init__(msg)
[docs] class InvalidInvariantError(TypeError): """ Error that is raised when an invariant has an invalid syntax. """ def __init__(self, transition: Transition): msg: str = (f"Error: The invariant of the Transition from location " f"'{transition.source.identifier}' to location '{transition.target.identifier}'" f" could not be evaluated.'") logger.error(msg) super().__init__(msg)