Source code for AYABInterface.communication.states

"""This module contains the state machine for the communication class.

Click on this image to go to the states from the diagram:

.. image:: ../../../_static/CommunicationStateDiagram.svg
   :target: ../../../_static/CommunicationStateDiagram.html
   :alt: State Disgram for the Communication class.

"""
from .host_messages import InformationRequest, LineConfirmation, StartRequest


[docs]class State(object): """The base class for states."""
[docs] def __init__(self, communication): """Create a new state. Please use the subclasses of this. :param AYABInterface.communication.Communication communication: the communication object which is in this state """ self._communication = communication
[docs] def enter(self): """Called when the state is entered. The :attr:`AYABInterface.communication.Communication.state` is set to this state. """
[docs] def exit(self): """Called when this state is left. The :attr:`AYABInterface.communication.Communication.state` is set to this state. """
[docs] def receive_message(self, message): """Receive a message from the controller. :param AYABInterface.communication.hardware_messages.Message message: the message to receive This method calls :meth:`message.received_by <AYABInterface.communication.hardware_messages.Message.received_by>` which dispatches the call to the ``receive_*`` methods. """ message.received_by(self)
[docs] def receive_state_indication(self, message): """Receive a StateIndication message. :param message: a :class:`StateIndication <AYABInterface.communication.hardware_messages.StateIndication>` message """
[docs] def receive_line_request(self, message): """Receive a LineRequest message. :param message: a :class:`LineRequest <AYABInterface.communication.hardware_messages.LineRequest>` message """
[docs] def receive_test_confirmation(self, message): """Receive a TestConfirmation message. :param message: a :class:`TestConfirmation <AYABInterface.communication.hardware_messages.TestConfirmation>` message """
[docs] def receive_information_confirmation(self, message): """Receive a InformationConfirmation message. :param message: a :class:`InformationConfirmation <AYABInterface.communication.hardware_messages.InformationConfirmation>` message """
[docs] def receive_debug(self, message): """Receive a Debug message. :param message: a :class:`Debug <AYABInterface.communication.hardware_messages.Debug>` message This logs the debug message. """
[docs] def receive_start_confirmation(self, message): """Receive a StartConfirmation message. :param message: a :class:`StartConfirmation <AYABInterface.communication.hardware_messages.StartConfirmation>` message """
[docs] def receive_unknown(self, message): """Receive a UnknownMessage message. :param message: a :class:`UnknownMessage <AYABInterface.communication.hardware_messages.UnknownMessage>` message """
[docs] def receive_connection_closed(self, message): """Receive a ConnectionClosed message. :param message: a :class:`ConnectionClosed <AYABInterface.communication.hardware_messages.ConnectionClosed>` message If the is called, the communication object transits into the :class:`ConnectionClosed`. """ self._next(ConnectionClosed)
[docs] def is_waiting_for_the_communication_to_start(self): """Whether the communication can be started. When this is :obj:`True`, you call call :meth:`AYABInterface.communication.Communication.start` to leave the state. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_before_knitting(self): """Whether the knitting should start soon. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_knitting(self): """Whether the machine ready to knit or knitting. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_final(self): """Whether the communication is over. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_connection_closed(self): """Whether the connection is closed. :rtype: bool :return: :obj:`False` """ return False
[docs] def communication_started(self): """Call when the communication starts."""
[docs] def is_waiting_for_start(self): """Whether this state is waiting for the start. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_initial_handshake(self): """Whether the communication object is in the intial handshake. :rtype: bool :return: :obj:`False` """ return False
def _next(self, state_class, *args): """Transition into the next state. :param type state_class: a subclass of :class:`State`. It is intialized with the communication object and :paramref:`args` :param args: additional arguments """ self._communication.state = state_class(self._communication, *args)
[docs] def is_unsupported_api_version(self): """Whether the API version of communcation and controller do not match. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_initializing_machine(self): """Whether the machine is currently being initialized. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_starting_to_knit(self): """Whether the machine initialized and knitting starts. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_knitting_started(self): """Whether the machine ready to knit the first line. :rtype: bool :return: :obj:`false` """ return False
[docs] def is_knitting_line(self): """Whether the machine knits a line. :rtype: bool :return: :obj:`False` """ return False
[docs] def __repr__(self): """This object as string. :rtype: str """ return "<{}>".format(self.__class__.__name__)
[docs]class FinalState(State): """Base class for states that can not reach knitting."""
[docs] def is_final(self): """From the current state, the knitting can not be reached. :rtype: bool :return: :obj:`True` """ return True
[docs]class ConnectionClosed(FinalState): """The connection is closed."""
[docs] def is_connection_closed(self): """The connection is closed. :rtype: bool :return: :obj:`True` """ return True
[docs]class WaitingForStart(State): """Waiting for the start() method to be called. This is the initial state of a :class:`AYABInterface.communication.Communication`. """
[docs] def is_before_knitting(self): """Whether the knitting should start soon. :rtype: bool :return: :obj:`True` """ return True
[docs] def communication_started(self): """Call when the communication starts. The communication object transits into :class:`InitialHandshake`. """ self._next(InitialHandshake)
[docs] def is_waiting_for_start(self): """Whether this state is waiting for the start. Call :meth:`AYABInterface.communication.Comunication.start` to leave this state. :rtype: bool :return: :obj:`True` """ return True
[docs]class InitialHandshake(State): """The communication has started."""
[docs] def enter(self): """This starts the handshake. A :class:`AYABInterface.communication.host_messages.InformationRequest` is sent to the controller. """ self._communication.send(InformationRequest)
[docs] def receive_information_confirmation(self, message): """A InformationConfirmation is received. If :meth:`the api version is supported <AYABInterface.communication.Communication.api_version_is_supported>`, the communication object transitions into a :class:`InitializingMachine`, if unsupported, into a :class:`UnsupportedApiVersion` """ if message.api_version_is_supported(): self._next(InitializingMachine) else: self._next(UnsupportedApiVersion) self._communication.controller = message
[docs] def is_before_knitting(self): """Whether the knitting should start soon. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_initial_handshake(self): """Whether the communication object is in the intial handshake. :rtype: bool :return: :obj:`True` """ return True
[docs]class UnsupportedApiVersion(FinalState): """The api version of the controller is not supported."""
[docs] def is_unsupported_api_version(self): """Whether the API version of communcation and controller do not match. :rtype: bool :return: :obj:`True` """ return True
[docs]class InitializingMachine(State): """The machine is currently being intialized."""
[docs] def is_before_knitting(self): """Whether the knitting should start soon. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_waiting_for_carriage_to_pass_the_left_turn_mark(self): """The carriage should be moved over the left turn mark. :rtype: bool :return: :obj:`True` """ return True
[docs] def receive_state_indication(self, message): """Receive a StateIndication message. :param message: a :class:`StateIndication <AYABInterface.communication.hardware_messages.StateIndication>` message If the message says that the controller is :meth:`is ready to knit <AYABInterface.communication.hardware_messages.StateIndication.is_ready_to_knit>`, there is a transition to :class:`StartingToKnit` or else the messages are ignored because they come from :ref:`reqtest`. """ if message.is_ready_to_knit(): self._next(StartingToKnit)
[docs] def is_initializing_machine(self): """Whether the machine is currently being initialized. :rtype: bool :return: :obj:`True` """ return True
[docs]class StartingToKnit(State): """:ref:`cnfstart` is sent and we wait for an answer."""
[docs] def is_before_knitting(self): """The knitting should start soon. :rtype: bool :return: :obj:`True` """ return True
[docs] def receive_start_confirmation(self, message): """Receive a StartConfirmation message. :param message: a :class:`StartConfirmation <AYABInterface.communication.hardware_messages.StartConfirmation>` message If the message indicates success, the communication object transitions into :class:`KnittingStarted` or else, into :class:`StartingFailed`. """ if message.is_success(): self._next(KnittingStarted) else: self._next(StartingFailed)
[docs] def is_starting_to_knit(self): """The machine initialized and knitting starts. :rtype: bool :return: :obj:`True` """ return True
[docs] def enter(self): """Send a StartRequest.""" self._communication.send(StartRequest, self._communication.left_end_needle, self._communication.right_end_needle)
[docs]class StartingFailed(FinalState): """The starting process has failed."""
[docs] def is_starting_failed(self): """The machine machine could not be configured to start. :rtype: bool :return: :obj:`True` """ return True
[docs]class KnittingStarted(State): """The knitting started and we are ready to receive :ref:`reqline`."""
[docs] def receive_line_request(self, message): """Receive a LineRequest message. :param message: a :class:`LineRequest <AYABInterface.communication.hardware_messages.LineRequest>` message The communicaion transisitions into a :class:`KnittingLine`. """ self._next(KnittingLine, message.line_number)
[docs] def is_knitting(self): """The machine ready to knit or knitting. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_knitting_started(self): """The machine ready to knit the first line. :rtype: bool :return: :obj:`True` """ return True
[docs]class KnittingLine(State): """The machine is currently knitting a line."""
[docs] def __init__(self, communication, line_number): """The machine is knitting a line.""" super().__init__(communication) self._line_number = line_number
[docs] def enter(self): """Send a LineConfirmation to the controller. When this state is entered, a :class:`AYABInterface.communication.host_messages.LineConfirmation` is sent to the controller. Also, the :attr:`last line requested <AYABInterface.communication.Communication.last_requested_line_number>` is set. """ self._communication.last_requested_line_number = self._line_number self._communication.send(LineConfirmation, self._line_number)
[docs] def is_knitting(self): """The machine ready to knit or knitting. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_knitting_line(self): """The machine knits a line. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_knitting_last_line(self): """Whether the line currently knit is the last line. :rtype: bool """ return self._communication.is_last_line(self._line_number) is True
@property def line_number(self): """The number if the line which is currently knit. :rtype: int """ return self._line_number
[docs] def receive_line_request(self, message): """Receive a LineRequest message. :param message: a :class:`LineRequest <AYABInterface.communication.hardware_messages.LineRequest>` message The communicaion transisitions into a :class:`KnittingLine`. """ self._next(KnittingLine, message.line_number)
# TODO: is it possible to know when the knitting process is over? __all__ = ["State", "ConnectionClosed", "WaitingForStart", "InitialHandshake", "UnsupportedApiVersion", "InitializingMachine", "StartingToKnit", "StartingFailed", "KnittingStarted", "KnittingLine", "FinalState"]