Source code for uppaal2jetracer.uppaalmodel.variable

"""Variable

This file contains the variables defined in the UPPAAL model.

This file can be imported and contains the following classes:

    * Variable:         Abstract base class for all variables.
    * IntVariable:      Integer variable.
    * StringVariable:   String variable.
    * DoubleVariable:   Double variable.
    * BooleanVariable:  Boolean variable.
    * Array:            Array of variables.
    * Range:            Numerical variable bounded by a range.
    * Clock:            Clock variable.
    * Channel:          Channel variable.
"""

from __future__ import annotations

import copy
import logging
from typing import List
from abc import ABC, abstractmethod
import time

from uppaal2jetracer.uppaalmodel.elements import Transition, Location

logger = logging.getLogger('uppaal_model')


[docs] class Variable(ABC): """ Base class for all variables used in the system. :ivar _is_constant: Defines if variable is changeable or not. :vartype _is_constant: bool :ivar _value: The value of the variable. :vartype _value: any """ __slots__ = ('_is_constant', '_value', '_default_value') def __init__(self, is_constant: bool, value = None): self._value = value self._is_constant = is_constant self._default_value = None @property def value(self) -> any: """ The value of the variable. :return: The value of the variable. :rtype: any """ if self._value is None: return self._default_value return self._value @value.setter def value(self, value): if not self._is_constant: self._value = value elif self._value is None: logger.debug("Set value from '%s' to value '%s'", self._value, value) self._value = value else: raise ImmutableValueError() @property def is_constant(self) -> bool: """ Defines if a variable is constant. :return: True if the variable is constant, otherwise False. :rtype: bool """ return self._is_constant
[docs] @abstractmethod def reset(self): """ Resets the variable to a starting state. """
[docs] class IntVariable(Variable): """ Represents an integer variable. """ def __init__(self, is_constant: bool, value: int = None): super().__init__(is_constant, value) self._default_value = 0
[docs] def reset(self): pass
[docs] class StringVariable(Variable): """ Represents a string variable. """ def __init__(self, is_constant: bool, value: str = None): super().__init__(is_constant, value) self._default_value = ""
[docs] def reset(self): pass
[docs] class DoubleVariable(Variable): """ Represents a double variable. """ def __init__(self, is_constant: bool, value: float = None): super().__init__(is_constant, value) self._default_value = 0.0
[docs] def reset(self): pass
[docs] class BooleanVariable(Variable): """ Represents a boolean variable. """ def __init__(self, is_constant: bool, value: bool = None): super().__init__(is_constant, value) self._default_value = False
[docs] def reset(self): pass
[docs] class Array(Variable): """ Represents an array of variables """ def __init__(self, is_constant: bool, value: List[Variable] = None): super().__init__(is_constant, value) self._default_value: List[Variable] = [] @property def value(self): if self._value is None: return self._default_value return self._value @value.setter def value(self, value: List[Variable]): if not self.is_constant: self._value = copy.copy(value) elif self._value is None: logger.debug("Set value of array from '%s' to value '%s'", self.value, value) self._value = copy.copy(value) else: raise ImmutableValueError()
[docs] def set_index(self, i: int, v: Variable): """ Sets the variable at index 'i' to variable 'v'. :param i: The index to change. :type i: int :param v: The value to set. :type v: Variable :raise IndexOutOfRangeError: Raised when index out of range is accessed. """ if i < 0 or i > len(self._value): raise IndexOutOfRangeError(i, self.__sizeof__()) if self.is_constant: raise ImmutableValueError() logger.debug("Set index '%i' of variable from '%i' to value '%s'!", i, self._value[i], v) self._value[i] = v
[docs] def get_index(self, i: int) -> Variable: """ Retrieves the element at index 'i'. :param i: The index to retrieve from. :type i: int :return: The value at the index. :raise IndexOutOfRangeError: Raised when index out of range is accessed. """ if i < 0 or i >= len(self._value): raise IndexOutOfRangeError(i, self.__sizeof__()) return self._value[i]
def __len__(self): return len(self._value) def __sizeof__(self): return len(self._value)
[docs] def reset(self): pass
[docs] class Range(Variable): """ Represent a range variable. :ivar _start: The start of the range. :vartype _start: int :ivar _end: the end of the range. :vartype _end: int """ __slots__ = ('_start', '_end') def __init__(self, is_constant: bool, start: int, end: int, value: int = None): self._start = start self._end = end super().__init__(is_constant, value) if value is None: self._default_value = start return if value > self._end: raise ExceedsRangeError(value, end) if value < self._start: raise BelowRangeError(value, start) @property def value(self): if self._value is None: return self._default_value return self._value @value.setter def value(self, value: int): if value > self._end: raise ExceedsRangeError(value, self._end) if value < self._start: raise BelowRangeError(value, self._start) if not self.is_constant: self._value = value elif self._value is None: logger.debug("Set range from value '%s' to value '%s'!", self._value, value) self._value = value else: raise ImmutableValueError() @property def start(self) -> int: """ The start of the range. :return: The start of the range. :rtype: int """ return self._start @property def end(self) -> int: """ The end of the range. :return: The end of the range. :rtype: int """ return self._end
[docs] def reset(self): pass
[docs] class Clock(Variable): """ Represents a clock variable """ _CLOCK_SPEED = 1 __instance_list: List[Clock] = [] __slots__ = ('_timestamp', '_init_value') def __init__(self, value: float = 0.0): super().__init__(False, None) self._timestamp = time.time() - value self._init_value = value Clock.__instance_list.append(self) @property def value(self) -> float: return (time.time() - self._timestamp) * Clock._CLOCK_SPEED @value.setter def value(self, value: float): self._timestamp = time.time() - value logger.debug("Set clock value to '%s'!", self.value)
[docs] def reset(self): self._timestamp = time.time() - self._init_value logger.debug("Clock reset to value '%s'!", self.value)
[docs] class Channel(Variable): """ Represents a channel variable :ivar _is_broadcast: Defines if channel is broadcast channel. :vartype _is_broadcast: bool """ __slots__ = ('_is_broadcast', '_sending_transitions', '_receiving_transitions') def __init__(self, is_broadcast: bool): super().__init__(True, self) self._is_broadcast = is_broadcast self._sending_transitions: List[Transition] = [] self._receiving_transitions: List[Transition] = [] @property def is_broadcast(self) -> bool: """ Defines if a channel is a broadcast channel. :return: True if the channel is a broadcast channel, otherwise False. :rtype: bool """ return self._is_broadcast @property def sending_transitions(self) -> List[Transition]: """ The internal list of sending transitions. :return: The internal list of sending transition. :rtype: List[Transition] """ return self._sending_transitions.copy() @property def receiving_transitions(self) -> List[Transition]: """ The internal list of receiving transitions. :return: The internal list of receiving transitions. :rtype: List[Transition] """ return self._receiving_transitions.copy()
[docs] def register_sending(self, t: Transition): """ Add element to internal sending list. :param t: The transition to add. :type t: Transition """ logger.debug( "Registering sending transition from location '%s' to " "location '%s'!", t.source.identifier, t.target.identifier) self._sending_transitions.append(t)
[docs] def register_receiving(self, t: Transition): """ Add element to internal receiving list. :param t: The transition to add. :type t: Transition """ logger.debug( "Registering receiving transition from location '%s' to " "location '%s'!", t.source.identifier, t.target.identifier) self._receiving_transitions.append(t)
[docs] def has_pair(self) -> bool: """ Checks if channel has a pair of receiving/sending transitions. :return: True if pair exists :rtype: bool """ logger.debug("Checking if channel has a pair...") if (len(self._sending_transitions) > 0) and (len(self._receiving_transitions) > 0): logger.debug("Channel has a pair!") return True logger.debug("Channel has no pair!") return False
[docs] def clean_up(self, loc: Location): """ Removes all transition in internal list that originate of location 'loc'. :param loc: The origin of the transitions to remove :type loc: Location """ logger.debug("Clean up channel for transitions from location '%s'", loc.identifier) for tr in self._receiving_transitions: if tr.source == loc: self._receiving_transitions.remove(tr) for tr in self._sending_transitions: if tr.source == loc: self._sending_transitions.remove(tr)
[docs] def reset(self): pass
[docs] class ImmutableValueError(ValueError): """ Error raised when a value change of an immutable variable is attempted. """ def __init__(self): msg: str = "Error: Value of variable can not be changed as it is immutable." logger.error(msg) super().__init__(msg)
[docs] class ExceedsRangeError(ValueError): """ Error raised when a value change of a range exceeds the upper bounds. :param attempted_value: The value that is attempted to be set. :type attempted_value: int :param upper_bounds: The upper bounds of the range. :type upper_bounds: int """ def __init__(self, attempted_value: int, upper_bounds: int): msg: str = (f"Error: Value '{attempted_value}' exceeds upper boundary of " f"'{upper_bounds}' for range.") logger.error(msg) super().__init__(msg)
[docs] class BelowRangeError(ValueError): """ Error raised when a value change of a range is below the lower bounds. :param attempted_value: The value that is attempted to be set. :type attempted_value: int :param lower_bounds: The lower bounds of the range. :type lower_bounds: int """ def __init__(self, attempted_value: int, lower_bounds: int): msg: str = (f"Error: Value '{attempted_value}' falls below lower boundary of " f"'{lower_bounds}' for range.") logger.error(msg) super().__init__(msg)
[docs] class IndexOutOfRangeError(IndexError): """ Error raised when an index out of range of the array is accessed. :param attempted_value: The value that is attempted to be set. :type attempted_value: int :param array_size: The size of the array. :type array_size: int """ def __init__(self, attempted_value: int, array_size: int): msg: str = (f"Error: Index '{attempted_value}' is out of range for array with " f"size {array_size}.") logger.error(msg) super().__init__(msg)