"""UPPAAL controller
This file provides controllers to run parsed UPPAAL automata.
This file can be imported and contains the following classes:
* SystemController: Controls the execution of a parsed UPPAAL system.
* AutomatonController: Controls the execution of a single parsed UPPAAL automaton.
"""
from __future__ import annotations
import copy
from typing import List, Optional, Tuple
import logging
from uppaal2jetracer.uppaalmodel.elements import Transition, Update, Synchronization
from uppaal2jetracer.uppaalmodel.frame import Frame
from uppaal2jetracer.uppaalmodel.system import System, Automaton
from uppaal2jetracer.declarations.declarations_ast import Node
from uppaal2jetracer.controller.executor import Executor
from uppaal2jetracer.uppaalmodel.variable import Channel
from uppaal2jetracer.httphandler import HTTPHandler
logger = logging.getLogger('controller')
http_handler = HTTPHandler()
[docs]
class SystemController:
"""
The system controller is simulating the system and managing automaton controller.
"""
__slots__ = ('_system', '_automata_controller', '_is_running')
def __init__(self, system: System):
self._system = system
self._automata_controller: List[AutomatonController] = []
for automaton in system.automata:
self._automata_controller.append(AutomatonController(automaton))
self._is_running = False
@property
def system(self):
"""
Returns the uppaal model system entity of the controller.
:return: The system object.
:rtype: System
"""
return self._system
@property
def automata_controller(self):
"""
Returns the list of automata controller managed by the controller.
:return: The list of automata controller
:rtype: List[AutomatonController]
"""
return copy.copy(self._automata_controller)
@property
def is_running(self) -> bool:
"""
Returns whether the controller is running or not.
:return: Whether the controller is running.
:rtype: bool
"""
return self._is_running
@is_running.setter
def is_running(self, run: bool):
"""
Sets whether the controller is running or not.
:param run: The new running state.
:type run: bool
"""
self._is_running = run
[docs]
def run_system(self):
"""
The main loop of the system, call to start execution.
"""
logger.info("Method run_system called.")
# reset all variables to their starting state
logger.debug("Resetting Variables.")
self.system.system_frame.reset()
self.system.global_frame.reset()
for automaton in self.system.automata:
automaton.automaton_frame.reset()
# main loop
self._is_running = True
while self._is_running:
for controller in self._automata_controller:
next_step = controller.step()
if next_step[0] is None:
continue
if next_step[1] is None:
self._execute_transition(next_step[0], controller)
elif next_step[1].is_broadcast:
self._execute_broadcast_transition(next_step[0], next_step[1], controller)
else:
self._execute_binary_transition(next_step[0], next_step[1], controller)
def _execute_transition(self, transition: Transition,
automaton_controller: AutomatonController):
automaton_controller.clean_up_channels()
automaton_controller.execute_step(transition)
logger.info("The transition from %s to %s was taken.",
transition.source.identifier, transition.target.identifier)
http_handler.update_webinterface(f"The transition from {transition.source.identifier} to "
f"{transition.target.identifier} was taken.")
def _execute_broadcast_transition(self, transition: Transition, channel: Channel,
automaton_controller: AutomatonController):
broadcast_receiver_list: List[Tuple[Transition, AutomatonController]] = []
logger.info("Executing the broadcast transition from location %s to %s.",
transition.source.identifier, transition.target.identifier)
http_handler.update_webinterface(f"Executing the broadcast transition from location "
f"{transition.source.identifier} to "
f"{transition.target.identifier} was taken.")
for receiving_controller in self._automata_controller:
if receiving_controller == automaton_controller:
continue
controller_frame = receiving_controller.automaton.automaton_frame
for receiving_transition in receiving_controller.current_location.transitions:
if not receiving_transition.synchronization.is_sending:
receiving_channel = receiving_controller.execute_sync(
receiving_transition.synchronization, Executor(controller_frame))
if receiving_channel == channel:
if receiving_controller.can_step(receiving_transition, controller_frame):
logger.debug("Adding transition from location %s to %s to the "
"broadcast receiver list.",
receiving_transition.source.identifier,
receiving_transition.target.identifier)
broadcast_receiver_list.append((receiving_transition,
receiving_controller))
break
self._execute_transition(transition, automaton_controller)
for receiver in broadcast_receiver_list:
self._execute_transition(receiver[0], receiver[1])
def _execute_binary_transition(self, transition: Transition, channel: Channel,
automaton_controller: AutomatonController):
logger.info("Executing the binary transition from location %s to %s.",
transition.source.identifier, transition.target.identifier)
http_handler.update_webinterface(f"Executing the binary transition from location "
f"{transition.source.identifier} to "
f"{transition.target.identifier}.")
transitions: List[Transition]
if transition.synchronization.is_sending:
transitions = channel.receiving_transitions
else:
transitions = channel.sending_transitions
for second_transition in transitions:
logger.debug("Checking if transition from location %s to %s is valid "
"binary transition partner.", second_transition.source.identifier,
second_transition.target.identifier)
second_controller = self._find_controller(second_transition)
if second_controller is None:
continue
second_frame = second_controller.automaton.automaton_frame
second_channel = second_controller.execute_sync(second_transition.synchronization,
Executor(second_frame))
if second_channel == channel:
if second_controller.can_step(second_transition, second_frame):
self._execute_transition(transition, automaton_controller)
self._execute_transition(second_transition, second_controller)
def _find_controller(self, transition: Transition) -> Optional[AutomatonController]:
for controller in self._automata_controller:
if transition.source == controller.current_location:
return controller
return None
[docs]
class AutomatonController:
"""
The automaton controller manages all processes that involve one automaton while runtime.
"""
__slots__ = ('_automaton', '_current_location', '_clean_up_list', '_executor')
def __init__(self, automaton: Automaton):
self._automaton = automaton
self._current_location = automaton.initial_location
self._clean_up_list: List[Channel] = []
self._executor = Executor(automaton.automaton_frame)
@property
def automaton(self):
"""
Returns the automaton of the controller.
:return: The automaton of the controller.
:rtype: Automaton
"""
return self._automaton
@property
def current_location(self):
"""
Returns the current location inside the automaton.
:return: The current_location
:rtype: Location
"""
return self._current_location
[docs]
def step(self) -> Tuple[Optional[Transition], Optional[Channel]]:
"""
Checks if the automaton has a valid transition, and returns it together with the channel.
:return: A tuple of optional transition, channel.
:rtype: Tuple[Optional[Transition], Optional[Channel]]
"""
for transition in self._current_location.transitions:
if self.can_step(transition):
logger.debug("Transition from location %s to %s is valid.",
transition.source.identifier, transition.target.identifier)
logger.debug("Checking transition type.")
channel = self.execute_sync(transition.synchronization)
# Transition with no synchronization
if channel is None:
logger.debug("Transition has no synchronization.")
return transition, None
# Transition with sending broadcast synchronization
if channel.is_broadcast and transition.synchronization.is_sending:
logger.debug("Transition is type sending broadcast.")
return transition, channel
# Transition with binary synchronization
if not channel.is_broadcast:
logger.debug("Transition is type binary.")
if transition.synchronization.is_sending:
channel.register_sending(transition)
else:
channel.register_receiving(transition)
self._clean_up_list.append(channel)
if channel.has_pair():
return transition, channel
logger.debug("Transition is type receiving broadcast! Continue with next "
"transition.")
return None, None
[docs]
def execute_step(self, transition: Transition):
"""
Takes a transition and changes current location and executes the update.
:param transition: The transition to take.
:type transition: Transition
"""
self._current_location = transition.target
self.execute(transition.update)
[docs]
def can_step(self, transition: Transition, frame: Frame = None) -> bool:
"""
Checks if a transition has a valid guard and if the next invariant is valid for a given
frame. Default frame is the frame of the automaton.
:param transition: The transition to check.
:type transition: Transition
:param frame: The variables to compare with.
:type frame: Frame
:return: True if the transition is valid, else False.
:rtype: bool
"""
if frame is None:
frame = self._automaton.automaton_frame
# If transition has a guard, check it.
if transition.guard is not None:
try:
guard_evaluation = self.is_valid(transition.guard.decision, Executor(frame))
if not guard_evaluation:
return False
except UnexpectedTypeError as exc:
raise InvalidGuardError from exc
# If next location has no invariant return true.
if transition.target.invariant is None:
logger.debug("Transition from location %s to location %s has a valid guard.",
transition.source.identifier, transition.target.invariant)
return True
# Checking next invariant with a temporary frame.
temporary_frame = frame.create_temporary_frame(transition.update)
temporary_executor = Executor(temporary_frame)
self.execute(transition.update, temporary_executor)
try:
invariant_evaluation = self.is_valid(transition.target.invariant.decision,
temporary_executor)
except UnexpectedTypeError as exc:
raise InvalidInvariantError from exc
if invariant_evaluation:
logger.debug("Transition form location %s to location %s has valid guard and "
"invariant.", transition.source.identifier, transition.target.identifier)
return invariant_evaluation
[docs]
def is_valid(self, node: Node, executor: Executor = None) -> bool:
"""
Checks if an expression is evaluated as true or false.
Default executor is automaton executor.
:param node: The root node of the expression.
:type node: Node
:param executor: The executor to use to execute the expression.
:type executor: Executor
:return: True when expression is true, False when expression is false.
:rtype: bool
"""
if executor is None:
executor = self._executor
result = node.accept(executor)
# Check if result has correct type
if isinstance(result, bool):
return result
raise UnexpectedTypeError(bool, type(result))
[docs]
def execute(self, update: Update, executor: Executor = None):
"""
Evaluates and updates the update assignments of a Transition.
:param update: The update to execute
:type update: Update
:param executor: The executor to use
:type executor: Executor
"""
if update is None:
return
if executor is None:
executor = self._executor
for assignment in update.assignments:
assignment.accept(executor)
[docs]
def execute_sync(self, sync: Synchronization, executor: Executor = None) -> Optional[Channel]:
"""
Evaluates a Synchronization to it's corresponding channel.
:param sync: The root node of the synchronization.
:type sync: Synchronization
:param executor: The executor to use.
:type executor: Executor
:return: The channel of the synchronization.
:rtype: Channel
"""
if sync is None:
return None
if executor is None:
executor = self._executor
result = sync.synchronization.accept(executor)
# Check if result has correct type
if isinstance(result, Channel):
return result
raise UnexpectedTypeError(Channel, type(result))
[docs]
def clean_up_channels(self):
"""
Function to remove all transitions from the channels that originate of the current location.
"""
for channel in self._clean_up_list:
channel.clean_up(self._current_location)
self._clean_up_list.clear()
[docs]
class UnexpectedTypeError(TypeError):
"""
Error that is raised when an unexpected argument is returned by the executor.
"""
def __init__(self, expected_type: type, gotten_type: type):
msg: str = (f"Error: Expected element of type '{expected_type}' but got "
f"type '{gotten_type}'.")
logger.error(msg)
super().__init__(msg)
[docs]
class InvalidGuardError(TypeError):
"""
Error that is raised when a guard has an invalid syntax.
"""
def __init__(self, transition: Transition):
msg: str = (f"Error: The guard of the Transition from location "
f"'{transition.source.identifier}' "
f"to '{transition.target.identifier} could not be evaluated.'")
logger.error(msg)
super().__init__(msg)
[docs]
class InvalidInvariantError(TypeError):
"""
Error that is raised when an invariant has an invalid syntax.
"""
def __init__(self, transition: Transition):
msg: str = (f"Error: The invariant of the Transition from location "
f"'{transition.source.identifier}' to location '{transition.target.identifier}'"
f" could not be evaluated.'")
logger.error(msg)
super().__init__(msg)