Source code for AYABInterface.communication.cache

"""Convert and cache needle positions."""
from crc8 import crc8


[docs]class NeedlePositionCache(object): """Convert and cache needle positions."""
[docs] def __init__(self, get_needle_positions, machine): """Create a new NeedlePositions object.""" self._get = get_needle_positions self._machine = machine self._get_cache = {} self._needle_position_bytes_cache = {} self._line_configuration_message_cache = {}
[docs] def get(self, line_number): """Return the needle positions or None. :param int line_number: the number of the line :rtype: list :return: the needle positions for a specific line specified by :paramref:`line_number` or :obj:`None` if no were given """ if line_number not in self._get_cache: self._get_cache[line_number] = self._get(line_number) return self._get_cache[line_number]
[docs] def is_last(self, line_number): """Whether the line number is has no further lines. :rtype: bool :return: is the next line above the line number are not specified """ return self.get(line_number + 1) is None
[docs] def get_bytes(self, line_number): """Get the bytes representing needle positions or None. :param int line_number: the line number to take the bytes from :rtype: bytes :return: the bytes that represent the message or :obj:`None` if no data is there for the line. Depending on the :attr:`machine`, the length and result may vary. """ if line_number not in self._needle_position_bytes_cache: line = self._get(line_number) if line is None: line_bytes = None else: line_bytes = self._machine.needle_positions_to_bytes(line) self._needle_position_bytes_cache[line_number] = line_bytes return self._needle_position_bytes_cache[line_number]
[docs] def get_line_configuration_message(self, line_number): """Return the cnfLine content without id for the line. :param int line_number: the number of the line :rtype: bytes :return: a cnfLine message without id as defined in :ref:`cnfLine` """ if line_number not in self._line_configuration_message_cache: line_bytes = self.get_bytes(line_number) if line_bytes is not None: line_bytes = bytes([line_number & 255]) + line_bytes line_bytes += bytes([self.is_last(line_number)]) line_bytes += crc8(line_bytes).digest() self._line_configuration_message_cache[line_number] = line_bytes del line_bytes line = self._line_configuration_message_cache[line_number] if line is None: # no need to cache a lot of empty lines line = (bytes([line_number & 255]) + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01') line += crc8(line).digest() return line
__all__ = ["NeedlePositionCache"]