Source code for AYABInterface.communication.hardware_messages

"""This modue contains all the messages taht are received."""
from ..utils import next_line
from collections import namedtuple
from .carriages import id_to_carriage_type
import struct


[docs]class Message(object): """This is the base class for messages that are received."""
[docs] def __init__(self, file, communication): """Create a new Message.""" self._file = file self._communication = communication self._init()
def _init(self): """Initialize the message. Override this method to configure your message. This pattern is called template method. Reading from the file should be done here and nowhere else. """
[docs] def is_from_host(self): """Whether this message is sent by the host. :rtype: bool :return: :obj:`False` """ return False
[docs] def is_from_controller(self): """Whether this message is sent by the controller. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_valid(self): """Whether the message is valid. :rtype: bool :return: :obj:`True` """ return True
[docs] def is_start_confirmation(self): """Whether this is a StartConfirmation message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_information_confirmation(self): """Whether this is a InformationConfirmation message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_test_confirmation(self): """Whether this is a TestConfirmation message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_state_indication(self): """Whether this is a StateIndication message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_line_request(self): """Whether this is a LineRequest message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_connection_closed(self): """Whether this is a ConnectionClosed message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_unknown(self): """Whether this is a StateIndication message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def is_debug(self): """Whether this is a Debug message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def wants_to_answer(self): """Whether this message produces and answer message. :rtype: bool :returns: :obj:`False` """ return False
[docs] def __repr__(self): """This object as string. :rtype: str """ return "<{}>".format(self.__class__.__name__)
[docs]class FixedSizeMessage(Message): """This is a message of fixed size."""
[docs] def __init__(self, file, communication): """Create a new Message.""" super().__init__(file, communication) self.read_end_of_message()
[docs] def read_end_of_message(self): """Read the b"\\r\\n" at the end of the message.""" read = self._file.read last = read(1) current = read(1) while last != b'' and current != b'' and not \ (last == b'\r' and current == b'\n'): last = current current = read(1)
[docs]class SuccessConfirmation(FixedSizeMessage): """Base class for massages of success and failure.""" def _init(self): """Read the success byte.""" self._success = self._file.read(1)
[docs] def is_valid(self): """Whether this message is valid.""" return self._success <= b"\x01"
[docs] def is_success(self): """Whether the configuration was successful. :rtype: bool """ return self._success == b"\x01"
[docs]class StartConfirmation(SuccessConfirmation): """This marks the success or failure of a reqStart message. .. seealso:: :ref:`cnfstart` """ MESSAGE_ID = 0xc1 #: The first byte that indicates this message
[docs] def is_start_confirmation(self): """Whether this is a StartConfirmation message. :rtype: bool :returns: :obj:`True` """ return True
[docs] def received_by(self, visitor): """Call visitor.receive_state_confirmation.""" visitor.receive_start_confirmation(self)
[docs]class ConnectionClosed(Message): """This message is notified about when the connection is closed."""
[docs] def is_connection_closed(self): """Whether this is a ConnectionClosed message. :rtype: bool :returns: :obj:`True` """ return True
[docs] def received_by(self, visitor): """Call visitor.receive_connection_closed.""" visitor.receive_connection_closed(self)
[docs]class UnknownMessage(FixedSizeMessage): """This is a special message for unknown message types."""
[docs] def is_unknown(self): """Whether this is a StateIndication message. :rtype: bool :returns: :obj:`True` """ return True
[docs] def is_valid(self): """Whether the message is valid. :rtype: bool :return: :obj:`False` """ return False
[docs] def received_by(self, visitor): """Call visitor.receive_unkown.""" visitor.receive_unknown(self)
FirmwareVersion = namedtuple("FirmwareVersion", ["major", "minor"])
[docs]class InformationConfirmation(FixedSizeMessage): """This message is the answer in the initial handshake. A :class:`~AYABInterface.communication.host_messages.InformationRequest` requests this message from the controller to start the initial handshake. .. seealso:: :ref:`cnfinfo` :class:`~AYABInterface.communication.host_messages.InformationRequest` """ MESSAGE_ID = 0xc3 #: The first byte that indicates this message
[docs] def is_information_confirmation(self): """Whether this is a InformationConfirmation message. :rtype: bool :returns: :obj:`True` """ return True
def _init(self): """Read the success byte.""" self._api_version = self._file.read(1)[0] self._firmware_version = FirmwareVersion(*self._file.read(2)) @property def api_version(self): """The API version of the controller. :rtype: int """ return self._api_version
[docs] def api_version_is_supported(self): """Whether the communication object supports this API version. :rtype: bool .. seealso:: :meth:`Communication.api_version_is_supported <AYABInterface.communication.Communication.api_version_is_supported>` """ return self._communication.api_version_is_supported(self._api_version)
@property def firmware_version(self): """The firmware version of the controller. :rtype: FirmwareVersion .. code:: python minor_version = int() mayor_version = int() assert message.firmware_version == (mayor_version, minor_version) assert message.firmware_version.major == mayor_version assert message.firmware_version.minor == minor_version """ return self._firmware_version
[docs] def received_by(self, visitor): """Call visitor.receive_information_confirmation.""" visitor.receive_information_confirmation(self)
[docs]class TestConfirmation(SuccessConfirmation): """This message is sent at/when""" # TODO MESSAGE_ID = 0xc4 #: The first byte that indicates this message
[docs] def is_test_confirmation(self): """Whether this is a TestConfirmation message. :rtype: bool :returns: :obj:`True` """ return True
[docs] def received_by(self, visitor): """Call visitor.test_information_confirmation.""" visitor.receive_test_confirmation(self)
[docs]class LineRequest(FixedSizeMessage): """The controller requests a line. .. seealso:: :ref:`reqline` """ MESSAGE_ID = 0x82 #: The first byte that indicates this message
[docs] def is_line_request(self): """Whether this is a LineRequest message. :rtype: bool :returns: :obj:`True` """ return True
def _init(self): """Read the line number.""" self._line_number = next_line( self._communication.last_requested_line_number, self._file.read(1)[0]) @property def line_number(self): """The line number that was requested.""" return self._line_number
[docs] def received_by(self, visitor): """Call visitor.receive_line_request.""" visitor.receive_line_request(self)
[docs]class StateIndication(FixedSizeMessage): """This message shows the state of the controller. .. seealso:: :ref:`indstate` """ MESSAGE_ID = 0x84 #: The first byte that indicates this message
[docs] def is_state_indication(self): """Whether this is a InformationConfirmation message. :rtype: bool :returns: :obj:`True` """ return True
def _init(self): """Read the success byte.""" self._ready = self._file.read(1) self._hall_left = self._file.read(2) self._hall_right = self._file.read(2) self._carriage_type = self._file.read(1)[0] self._carriage_position = self._file.read(1)[0]
[docs] def is_valid(self): """Whether this messages matches the specification.""" return self._ready == b'\x00' or self._ready == b'\x01'
[docs] def is_ready_to_knit(self): """Whether this message indicates that the controller can knit now.""" return self._ready == b'\x01'
@property def left_hall_sensor_value(self): """The value of the left hall sensor. :rtype: int """ return struct.unpack(">H", self._hall_left)[0] @property def right_hall_sensor_value(self): """The value of the left hall sensor. :rtype: int """ return struct.unpack(">H", self._hall_right)[0] @property def carriage(self): """The carriage which is reported. :rtype: AYABInterface.communication.carriages.Carriage :return: the carriage with information about its position """ carriage_type = id_to_carriage_type(self._carriage_type) return carriage_type(self._carriage_position) @property def current_needle(self): """The current needle position. :rtype: int """ return self._carriage_position
[docs] def received_by(self, visitor): """Call visitor.receive_state_indication.""" visitor.receive_state_indication(self)
[docs]class Debug(Message): """This message contains debug output from the controller. .. seealso:: :ref:`debug` """ MESSAGE_ID = 0x23
[docs] def is_debug(self): """Whether this is a Debug message. :rtype: bool :returns: :obj:`False` """ return True
def _init(self): """Read the b"\\r\\n" at the end of the message.""" read_values = [] read = self._file.read last = read(1) current = read(1) while last != b'' and current != b'' and not \ (last == b'\r' and current == b'\n'): read_values.append(last) last = current current = read(1) if current == b'' and last != b'\r': read_values.append(last) self._bytes = b''.join(read_values) @property def bytes(self): """The debug message as bytes. :rtype: bytes :return: the debug message as bytes without the ``b'\\r\\n'`` at the end """ return self._bytes
[docs] def received_by(self, visitor): """Call visitor.receive_debug.""" visitor.receive_debug(self)
_message_types = {} for message_type in list(globals().values()): message_id = getattr(message_type, "MESSAGE_ID", None) if message_id is not None: _message_types[message_id] = message_type del message_type, message_id
[docs]def read_message_type(file): """Read the message type from a file.""" message_byte = file.read(1) if message_byte == b'': return ConnectionClosed message_number = message_byte[0] return _message_types.get(message_number, UnknownMessage)
__all__ = ["read_message_type", "StateIndication", "LineRequest", "TestConfirmation", "InformationConfirmation", "Debug", "StartConfirmation", "SuccessConfirmation", "UnknownMessage", "Message", "ConnectionClosed", "FirmwareVersion", "FixedSizeMessage"]