Source code for AYABInterface.communication

"""This module is used to communicate with the shield.

Requirement: Make objects from binary stuff.
"""
from .hardware_messages import read_message_type, ConnectionClosed
from .states import WaitingForStart
from .cache import NeedlePositionCache
from threading import RLock, Thread
from itertools import chain
from time import sleep


[docs]class Communication(object): """This class comunicates with the AYAB shield."""
[docs] def __init__(self, file, get_needle_positions, machine, on_message_received=(), left_end_needle=None, right_end_needle=None): """Create a new Communication object. :param file: a file-like object with read and write methods for the communication with the Arduino. This could be a :class:`serial.Serial` or a :meth:`socket.socket.makefile`. :param get_needle_positions: a callable that takes an :class:`index <int>` and returns :obj:`None` or an iterable over needle positions. :param AYABInterface.machines.Machine machine: the machine to use for knitting :param list on_message_received: an iterable over callables that takes a received :class:`message <AYABInterface.communication.hardware_messages.Message>` whenever it comes in. Since :attr:`state` changes only take place when a message is received, this can be used as an state observer. :param left_end_needle: A needle number on the machine. Other needles that are on the left side of this needle are not used for knitting. Their needle positions are not be set. :param right_end_needle: A needle number on the machine. Other needles that are on the right side of this needle are not used for knitting. Their needle positions are not be set. """ self._file = file self._on_message_received = on_message_received self._machine = machine self._state = WaitingForStart(self) self._controller = None self._last_requested_line_number = 0 self._needle_positions_cache = NeedlePositionCache( get_needle_positions, self._machine) self._left_end_needle = ( machine.left_end_needle if left_end_needle is None else left_end_needle) self._right_end_needle = ( machine.right_end_needle if right_end_needle is None else right_end_needle) self._lock = RLock() self._thread = None self._number_of_threads_receiving_messages = 0 self._on_message = []
@property def needle_positions(self): """A cache for the needle positions. :rtype: AYABInterface.communication.cache.NeedlePositionCache """ return self._needle_positions_cache
[docs] def start(self): """Start the communication about a content. :param Content content: the content of the communication. """ with self.lock: self._state.communication_started()
_read_message_type = staticmethod(read_message_type) def _message_received(self, message): """Notify the observers about the received message.""" with self.lock: self._state.receive_message(message) for callable in chain(self._on_message_received, self._on_message): callable(message)
[docs] def on_message(self, callable): """Add an observer to received messages. :param callable: a callable that is called every time a :class:`AYABInterface.communication.host_messages.Message` is sent or a :class:`AYABInterface.communication.controller_messages.Message` is received """ self._on_message.append(callable)
[docs] def receive_message(self): """Receive a message from the file.""" with self.lock: assert self.can_receive_messages() message_type = self._read_message_type(self._file) message = message_type(self._file, self) self._message_received(message)
[docs] def can_receive_messages(self): """Whether tihs communication is ready to receive messages.] :rtype: bool .. code:: python assert not communication.can_receive_messages() communication.start() assert communication.can_receive_messages() communication.stop() assert not communication.can_receive_messages() """ with self.lock: return not self._state.is_waiting_for_start() and \ not self._state.is_connection_closed()
[docs] def stop(self): """Stop the communication with the shield.""" with self.lock: self._message_received(ConnectionClosed(self._file, self))
[docs] def api_version_is_supported(self, api_version): """Return whether an api version is supported by this class. :rtype: bool :return: if the :paramref:`api version <api_version>` is supported :param int api_version: the api version Currently supported api versions: ``4`` """ return api_version == 4
[docs] def send(self, host_message_class, *args): """Send a host message. :param type host_message_class: a subclass of :class:`AYABImterface.communication.host_messages.Message` :param args: additional arguments that shall be passed to the :paramref:`host_message_class` as arguments """ message = host_message_class(self._file, self, *args) with self.lock: message.send() for callable in self._on_message: callable(message)
@property def state(self): """The state this object is in. :return: the state this communication object is in. :rtype: AYABInterface.communication.states.State .. note:: When calling :meth:`parallelize` the state can change while you check it. """ return self._state @property def lock(self): """The lock of the communication. In case you :meth:`parallelize` the communication, you may want to use this :class:`lock <threading.RLock>` to make shure the parallelization does not break your code. """ return self._lock @state.setter def state(self, new_state): """Set the state.""" with self.lock: self._state.exit() self._state = new_state self._state.enter() @property def left_end_needle(self): """The left end needle of the needle positions. :rtype: int :return: the :attr:`left end needle of the machine <AYABInterface.machine.Machine.left_end_needle>` """ return self._left_end_needle @property def right_end_needle(self): """The left end needle of the needle positions. :rtype: int :return: the :attr:`right end needle of the machine <AYABInterface.machine.Machine.right_end_needle>` """ return self._right_end_needle @property def controller(self): """Information about the controller. If no information about the controller is received, the return value is :obj:`None`. If information about the controller is known after :ref:`cnfinfo` was received, you can access these values: .. code:: python >>> communication.controller.firmware_version (5, 2) >>> communication.controller.firmware_version.major 5 >>> communication.controller.firmware_version.minor 2 >>> communication.controller.api_version 4 """ return self._controller @controller.setter def controller(self, value): self._controller = value @property def last_requested_line_number(self): """The number of the last line that was requested. :rtype: int :return: the last requested line number or ``0`` """ return self._last_requested_line_number @last_requested_line_number.setter def last_requested_line_number(self, line_number): """Set the last requested line number.""" self._last_requested_line_number = line_number
[docs] def parallelize(self, seconds_to_wait=2): """Start a parallel thread for receiving messages. If :meth:`start` was no called before, start will be called in the thread. The thread calls :meth:`receive_message` until the :attr:`state` :meth:`~AYABInterface.communication.states.State.is_connection_closed`. :param float seconds_to_wait: A time in seconds to wait with the parallel execution. This is useful to allow the controller time to initialize. .. seealso:: :attr:`lock`, :meth:`runs_in_parallel` """ with self.lock: thread = Thread(target=self._parallel_receive_loop, args=(seconds_to_wait,)) thread.deamon = True thread.start() self._thread = thread
def _parallel_receive_loop(self, seconds_to_wait): """Run the receiving in parallel.""" sleep(seconds_to_wait) with self._lock: self._number_of_threads_receiving_messages += 1 try: with self._lock: if self.state.is_waiting_for_start(): self.start() while True: with self.lock: if self.state.is_connection_closed(): return self.receive_message() finally: with self._lock: self._number_of_threads_receiving_messages -= 1
[docs] def runs_in_parallel(self): """Whether the communication runs in parallel. :rtype: bool :return: whether :meth:`parallelize` was called and the communication still receives messages and is not stopped """ return self._number_of_threads_receiving_messages != 0
__all__ = ["Communication"]