Source code for AYABInterface.interaction

"""This module can be used to interact with the AYAB Interface."""
from .actions import SwitchOnMachine, \
    MoveCarriageOverLeftHallSensor, MoveCarriageToTheLeft, \
    MoveCarriageToTheRight, PutColorInNutA, PutColorInNutB, \
    MoveNeedlesIntoPosition, SwitchCarriageToModeNl, SwitchCarriageToModeKc, \
    SwitchOffMachine
from AYABInterface.carriages import KnitCarriage
from AYABInterface.communication import Communication
from itertools import chain


cached_property = property


[docs]class Interaction(object): """Interaction with the knitting pattern."""
[docs] def __init__(self, knitting_pattern, machine): """Create a new interaction object. :param knitting_pattern: a :class:`~knittingpattern.KnittingPattern.KnittingPattern` :param AYABInterface.machines.Machine machine: the machine to knit on """ self._machine = machine self._communication = None self._rows = knitting_pattern.rows_in_knit_order() self._knitting_pattern = knitting_pattern
@cached_property def left_end_needle(self): return min(chain(*map(self._get_row_needles, range(len(self._rows))))) @cached_property def right_end_needle(self): return max(chain(*map(self._get_row_needles, range(len(self._rows))))) @property def communication(self): """The communication with the controller. :rtype:AYABInterface.communication.Communication """ return self._communication
[docs] def communicate_through(self, file): """Setup communication through a file. :rtype: AYABInterface.communication.Communication """ if self._communication is not None: raise ValueError("Already communicating.") self._communication = communication = Communication( file, self._get_needle_positions, self._machine, [self._on_message_received], right_end_needle=self.right_end_needle, left_end_needle=self.left_end_needle) return communication
@cached_property def colors(self): return list(reversed(self._knitting_pattern.instruction_colors)) def _get_row_needles(self, row_index): number_of_needles = self._rows[row_index].number_of_consumed_meshes start = int(self._machine.number_of_needles / 2 - number_of_needles / 2) return list(range(start, start + number_of_needles)) def _get_needle_positions(self, row_index): if row_index not in range(len(self._rows)): return None needle_positions = self._machine.needle_positions needles = self._get_row_needles(row_index) result = [needle_positions[0]] * self._machine.number_of_needles colors = self.colors row = self._rows[row_index] consumed_meshes = row.consumed_meshes for i, needle in enumerate(needles): color = consumed_meshes[i].consuming_instruction.color color_index = colors.index(color) needle_position = needle_positions[color_index] result[needle] = needle_position print("row {} at {}\t{}\t{}".format( row.id, row_index, needle, needle_position)) return result def _on_message_received(self, message): """Call when a potential state change has occurred.""" @cached_property def actions(self): """A list of actions to perform. :return: a list of :class:`AYABInterface.actions.Action` """ actions = [] do = actions.append # determine the number of colors colors = self.colors # rows and colors movements = ( MoveCarriageToTheRight(KnitCarriage()), MoveCarriageToTheLeft(KnitCarriage())) rows = self._rows first_needles = self._get_row_needles(0) # handle switches if len(colors) == 1: actions.extend([ SwitchOffMachine(), SwitchCarriageToModeNl()]) else: actions.extend([ SwitchCarriageToModeKc(), SwitchOnMachine()]) # move needles do(MoveNeedlesIntoPosition("B", first_needles)) do(MoveCarriageOverLeftHallSensor()) # use colors if len(colors) == 1: do(PutColorInNutA(colors[0])) if len(colors) == 2: do(PutColorInNutA(colors[0])) do(PutColorInNutB(colors[1])) # knit for index, row in enumerate(rows): do(movements[index & 1]) return actions