"""This modue contains all the messages taht are received."""
from ..utils import next_line
from collections import namedtuple
from .carriages import id_to_carriage_type
import struct
[docs]class Message(object):
"""This is the base class for messages that are received."""
[docs] def __init__(self, file, communication):
"""Create a new Message."""
self._file = file
self._communication = communication
self._init()
def _init(self):
"""Initialize the message.
Override this method to configure your message.
This pattern is called template method.
Reading from the file should be done here and nowhere else.
"""
[docs] def is_from_host(self):
"""Whether this message is sent by the host.
:rtype: bool
:return: :obj:`False`
"""
return False
[docs] def is_from_controller(self):
"""Whether this message is sent by the controller.
:rtype: bool
:return: :obj:`True`
"""
return True
[docs] def is_valid(self):
"""Whether the message is valid.
:rtype: bool
:return: :obj:`True`
"""
return True
[docs] def is_start_confirmation(self):
"""Whether this is a StartConfirmation message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def is_test_confirmation(self):
"""Whether this is a TestConfirmation message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def is_state_indication(self):
"""Whether this is a StateIndication message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def is_line_request(self):
"""Whether this is a LineRequest message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def is_connection_closed(self):
"""Whether this is a ConnectionClosed message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def is_unknown(self):
"""Whether this is a StateIndication message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def is_debug(self):
"""Whether this is a Debug message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def wants_to_answer(self):
"""Whether this message produces and answer message.
:rtype: bool
:returns: :obj:`False`
"""
return False
[docs] def __repr__(self):
"""This object as string.
:rtype: str
"""
return "<{}>".format(self.__class__.__name__)
[docs]class FixedSizeMessage(Message):
"""This is a message of fixed size."""
[docs] def __init__(self, file, communication):
"""Create a new Message."""
super().__init__(file, communication)
self.read_end_of_message()
[docs] def read_end_of_message(self):
"""Read the b"\\r\\n" at the end of the message."""
read = self._file.read
last = read(1)
current = read(1)
while last != b'' and current != b'' and not \
(last == b'\r' and current == b'\n'):
last = current
current = read(1)
[docs]class SuccessConfirmation(FixedSizeMessage):
"""Base class for massages of success and failure."""
def _init(self):
"""Read the success byte."""
self._success = self._file.read(1)
[docs] def is_valid(self):
"""Whether this message is valid."""
return self._success <= b"\x01"
[docs] def is_success(self):
"""Whether the configuration was successful.
:rtype: bool
"""
return self._success == b"\x01"
[docs]class StartConfirmation(SuccessConfirmation):
"""This marks the success or failure of a reqStart message.
.. seealso:: :ref:`cnfstart`
"""
MESSAGE_ID = 0xc1 #: The first byte that indicates this message
[docs] def is_start_confirmation(self):
"""Whether this is a StartConfirmation message.
:rtype: bool
:returns: :obj:`True`
"""
return True
[docs] def received_by(self, visitor):
"""Call visitor.receive_state_confirmation."""
visitor.receive_start_confirmation(self)
[docs]class ConnectionClosed(Message):
"""This message is notified about when the connection is closed."""
[docs] def is_connection_closed(self):
"""Whether this is a ConnectionClosed message.
:rtype: bool
:returns: :obj:`True`
"""
return True
[docs] def received_by(self, visitor):
"""Call visitor.receive_connection_closed."""
visitor.receive_connection_closed(self)
[docs]class UnknownMessage(FixedSizeMessage):
"""This is a special message for unknown message types."""
[docs] def is_unknown(self):
"""Whether this is a StateIndication message.
:rtype: bool
:returns: :obj:`True`
"""
return True
[docs] def is_valid(self):
"""Whether the message is valid.
:rtype: bool
:return: :obj:`False`
"""
return False
[docs] def received_by(self, visitor):
"""Call visitor.receive_unkown."""
visitor.receive_unknown(self)
FirmwareVersion = namedtuple("FirmwareVersion", ["major", "minor"])
[docs]class TestConfirmation(SuccessConfirmation):
"""This message is sent at/when""" # TODO
MESSAGE_ID = 0xc4 #: The first byte that indicates this message
[docs] def is_test_confirmation(self):
"""Whether this is a TestConfirmation message.
:rtype: bool
:returns: :obj:`True`
"""
return True
[docs] def received_by(self, visitor):
"""Call visitor.test_information_confirmation."""
visitor.receive_test_confirmation(self)
[docs]class LineRequest(FixedSizeMessage):
"""The controller requests a line.
.. seealso:: :ref:`reqline`
"""
MESSAGE_ID = 0x82 #: The first byte that indicates this message
[docs] def is_line_request(self):
"""Whether this is a LineRequest message.
:rtype: bool
:returns: :obj:`True`
"""
return True
def _init(self):
"""Read the line number."""
self._line_number = next_line(
self._communication.last_requested_line_number,
self._file.read(1)[0])
@property
def line_number(self):
"""The line number that was requested."""
return self._line_number
[docs] def received_by(self, visitor):
"""Call visitor.receive_line_request."""
visitor.receive_line_request(self)
[docs]class StateIndication(FixedSizeMessage):
"""This message shows the state of the controller.
.. seealso:: :ref:`indstate`
"""
MESSAGE_ID = 0x84 #: The first byte that indicates this message
[docs] def is_state_indication(self):
"""Whether this is a InformationConfirmation message.
:rtype: bool
:returns: :obj:`True`
"""
return True
def _init(self):
"""Read the success byte."""
self._ready = self._file.read(1)
self._hall_left = self._file.read(2)
self._hall_right = self._file.read(2)
self._carriage_type = self._file.read(1)[0]
self._carriage_position = self._file.read(1)[0]
[docs] def is_valid(self):
"""Whether this messages matches the specification."""
return self._ready == b'\x00' or self._ready == b'\x01'
[docs] def is_ready_to_knit(self):
"""Whether this message indicates that the controller can knit now."""
return self._ready == b'\x01'
@property
def left_hall_sensor_value(self):
"""The value of the left hall sensor.
:rtype: int
"""
return struct.unpack(">H", self._hall_left)[0]
@property
def right_hall_sensor_value(self):
"""The value of the left hall sensor.
:rtype: int
"""
return struct.unpack(">H", self._hall_right)[0]
@property
def carriage(self):
"""The carriage which is reported.
:rtype: AYABInterface.communication.carriages.Carriage
:return: the carriage with information about its position
"""
carriage_type = id_to_carriage_type(self._carriage_type)
return carriage_type(self._carriage_position)
@property
def current_needle(self):
"""The current needle position.
:rtype: int
"""
return self._carriage_position
[docs] def received_by(self, visitor):
"""Call visitor.receive_state_indication."""
visitor.receive_state_indication(self)
[docs]class Debug(Message):
"""This message contains debug output from the controller.
.. seealso:: :ref:`debug`
"""
MESSAGE_ID = 0x23
[docs] def is_debug(self):
"""Whether this is a Debug message.
:rtype: bool
:returns: :obj:`False`
"""
return True
def _init(self):
"""Read the b"\\r\\n" at the end of the message."""
read_values = []
read = self._file.read
last = read(1)
current = read(1)
while last != b'' and current != b'' and not \
(last == b'\r' and current == b'\n'):
read_values.append(last)
last = current
current = read(1)
if current == b'' and last != b'\r':
read_values.append(last)
self._bytes = b''.join(read_values)
@property
def bytes(self):
"""The debug message as bytes.
:rtype: bytes
:return: the debug message as bytes without the ``b'\\r\\n'`` at
the end
"""
return self._bytes
[docs] def received_by(self, visitor):
"""Call visitor.receive_debug."""
visitor.receive_debug(self)
_message_types = {}
for message_type in list(globals().values()):
message_id = getattr(message_type, "MESSAGE_ID", None)
if message_id is not None:
_message_types[message_id] = message_type
del message_type, message_id
[docs]def read_message_type(file):
"""Read the message type from a file."""
message_byte = file.read(1)
if message_byte == b'':
return ConnectionClosed
message_number = message_byte[0]
return _message_types.get(message_number, UnknownMessage)
__all__ = ["read_message_type", "StateIndication", "LineRequest",
"TestConfirmation", "InformationConfirmation", "Debug",
"StartConfirmation", "SuccessConfirmation",
"UnknownMessage", "Message", "ConnectionClosed", "FirmwareVersion",
"FixedSizeMessage"]