Source code for acq4.devices.FilterWheel.filterwheel

import time
from collections import OrderedDict

import pyqtgraph as pg
from acq4.devices.Device import TaskGui, Device, DeviceTask
from acq4.devices.OptomechDevice import OptomechDevice
from acq4.util import Qt
from acq4.util import ptime
from acq4.util.Mutex import Mutex
from acq4.util.Thread import Thread

Ui_Form = Qt.importTemplate('.FilterWheelTaskTemplate')


[docs] class FilterWheel(Device, OptomechDevice): """Optical filter wheel device for swapping FilterSet devices. The Filter wheel device class adds control and display for a filter wheel that selects between many filters or filter sets. Filters must be defined in the configuration prior to the FilterWheel; see FilterSet for more information. * Maintains list of the filters in the wheel positions with their description * Support for selecting a filter wheel position * Support for filter wheel implementation during task : specific filter wheel position during one task, different positions as task sequence Configuration examples: FilterWheel: driver: 'FilterWheel' parentDevice: 'Microscope' slots: # These are the names of FilterSet devices that have been defined elsewhere 0: "DIC_FilterCube" 1: "EGFP_FilterCube" 2: "EYFP_FilterCube" """ sigFilterChanged = Qt.Signal(object, object) # self, Filter sigFilterWheelSpeedChanged = Qt.Signal(object, object) # self, speed def __init__(self, dm, config, name): Device.__init__(self, dm, config, name) self.lock = Mutex(Qt.QMutex.Recursive) self._filters = OrderedDict() self._slotNames = OrderedDict() self._slotIndicators = OrderedDict() nPos = self.getPositionCount() ports = config.get('ports', None) for k in range(nPos): ## Set value for each filter slot = config['slots'].get(str(k)) if slot is None: self._filters[k] = None self._slotNames[k] = "empty" continue if isinstance(slot, str): # We are only naming this slot; no actual filter device is defined here self._filters[k] = None self._slotNames[k] = slot elif isinstance(slot, dict): filtname = slot.get('device') filt = None if filtname is None else dm.getDevice(filtname) self._filters[k] = filt self._slotNames[k] = slot.get('name', filtname) if filt is not None: devports = filt.ports() if ports is None: ports = devports elif set(ports) != set(devports): raise Exception(f"{filt!r} does not have the expected ports ({devports!r} vs {ports!r})") else: raise TypeError("Slot definition must be str or dict; got: %r" % slot) if 'hotkey' in slot: dev = dm.getDevice(slot['hotkey']['device']) key = slot['hotkey']['key'] dev.addKeyCallback(key, self._hotkeyPressed, (k,)) self._slotIndicators[k] = (dev, key) # todo: connect to key config['ports'] = ports OptomechDevice.__init__(self, dm, config, name) self._lastFuture = None self._lastPosition = None # polling thread just checks position regularly; this causes sigFilterChanged to be emitted # whenever a change is detected pollInterval = config.get('pollInterval', 0.1) if pollInterval is not None: self.fwThread = FilterWheelPollThread(self, interval=pollInterval) self.fwThread.start() dm.sigAbortAll.connect(self.stop) if 'initialSlot' in config: self.setPosition(config['initialSlot'])
[docs] def listFilters(self): """Return a dict of available {slot_n: filter} pairs. """ return self._filters.copy()
[docs] def slotNames(self): """Return a dict of names for each slot in the wheel. """ return self._slotNames.copy()
[docs] def getFilter(self, position=None): """Return the Filter at *position*. If *position* is None, then return the currently active Filter.""" if position is None: position = self.getPosition() if position is None: return None return self._filters[position]
[docs] def getPositionCount(self): """Return the number of filter positions. The number returned indicates all available positions, regardless of the presence or absence of a filter in each position. By default this returns the largest configured slot number, but subclasses may override this method. """ return max(map(int, self.config['slots'].keys())) + 1
[docs] def setPosition(self, pos): """Set the filter wheel position and return a FilterWheelFuture instance that can be used to wait for the move to complete. """ with self.lock: fut = self._lastFuture if fut is not None and not fut.isDone(): fut.cancel() self._lastFuture = self._setPosition(pos) return self._lastFuture
def _setPosition(self, pos): """Must be implemented in subclass to request device movement and return a FilterWheelFuture. Example:: def _setPosition(self, pos): self.device.setPosition(pos) # actually ask device to move return FilterWheelFuture(self, pos) """ raise NotImplementedError("Method must be implemented in subclass") def _hotkeyPressed(self, dev, changes, pos): self.setPosition(pos)
[docs] def getPosition(self): """Return the current position of the filter wheel. """ pos = self._getPosition() if pos != self._lastPosition: self._lastPosition = pos self._positionChanged(pos) return pos
def _getPosition(self): raise NotImplementedError("Method must be implemented in subclass")
[docs] def loadPreset(self, name): """Load a preset filter wheel position by name.""" idx = next((i for i, n in self._slotNames.items() if n == name), None) return self.setPosition(idx)
def _positionChanged(self, pos): filt = self.getFilter(pos) self.setCurrentSubdevice(filt) self.sigFilterChanged.emit(self, filt) for k,indicator in self._slotIndicators.items(): dev, key = indicator if k == pos: dev.setBacklight(key, blue=1, red=1) else: dev.setBacklight(key, blue=0, red=0) def _checkMoveFuture(self): if self._lastFuture is None: return self._lastFuture.isDone()
[docs] def isMoving(self): """Return the current position of the filter wheel. """ raise NotImplementedError("Method must be implemented in subclass")
[docs] def stop(self): """Immediately stop the filter wheel. """ self._stop() with self.lock: fut = self._lastFuture if fut is not None: fut.cancel()
def _stop(self): raise NotImplementedError("Method must be implemented in subclass")
[docs] def setSpeed(self, speed): raise NotImplementedError("Method must be implemented in subclass")
[docs] def getSpeed(self): raise NotImplementedError("Method must be implemented in subclass")
[docs] def speedChanged(self, speed): """Sublclasses should call this method when the filterwheel speed has changed. """ self.sigSpeedChanged.emit(self, speed)
[docs] def createTask(self, cmd, parentTask): return FilterWheelTask(self, cmd, parentTask)
[docs] def taskInterface(self, taskRunner): return FilterWheelTaskGui(self, taskRunner)
[docs] def deviceInterface(self, win): return FilterWheelDevGui(self)
class FilterWheelFuture(object): def __init__(self, dev, position): self.dev = dev self.position = position self._wasInterrupted = False self._done = False self._error = None def wasInterrupted(self): """Return True if the move was interrupted before completing. """ return self._wasInterrupted def cancel(self): if self.isDone(): return self._wasInterrupted = True self._error = "Filter change was cancelled" def _atTarget(self): return self.dev.getPosition() == self.position def isDone(self): """Return True if the move has completed or was interrupted. """ if self._wasInterrupted or self._done: return True if self.dev.isMoving(): return False if self._atTarget(): self._done = True return True else: self._wasInterrupted = True self._error = f"Filter wheel did not reach target while moving to {self.position} (got to {self.dev.getPosition()})" return True def errorMessage(self): """Return a string description of the reason for a move failure, or None if there was no failure (or if the reason is unknown). """ return self._error def wait(self, timeout=None, updates=False): """Block until the move has completed, has been interrupted, or the specified timeout has elapsed. If *updates* is True, process Qt events while waiting. If the move did not complete, raise an exception. """ start = ptime.time() while (timeout is None) or (ptime.time() < start + timeout): if self.isDone(): break if updates is True: Qt.QTest.qWait(100) else: time.sleep(0.1) if not self.isDone(): err = self.errorMessage() if err is None: raise RuntimeError("Timeout waiting for filter wheel change") else: raise RuntimeError("Move did not complete: %s" % err) if self.wasInterrupted(): err = self.errorMessage() raise RuntimeError("Move was interrupted: %s" % err) class FilterWheelTask(DeviceTask): """Set a filter wheel position before beginning the task. Command structure:: {'filterWheelPosition': N} """ def __init__(self, dev, cmd, parentTask): DeviceTask.__init__(self, dev, cmd, parentTask) self.dev = dev self.cmd = cmd self.parentTask = parentTask def configure(self): requiredPos = self.cmd['filterWheelPosition'] self.future = self.dev.setPosition(requiredPos) def start(self): self.future.wait() def stop(self, abort): pass def isDone(self): return True class FilterWheelTaskGui(TaskGui): def __init__(self, dev, taskRunner): TaskGui.__init__(self, dev, taskRunner) self.ui = Ui_Form() self.ui.setupUi(self) self.dev = dev self.filters = self.dev.slotNames() for slotn, name in self.filters.items(): name = "%d: %s" % (slotn, name) self.ui.filterCombo.addItem(name, slotn) self.ui.sequenceCombo.addItem('off') self.ui.sequenceCombo.addItem('list') self.ui.sequenceListEdit.hide() self.ui.sequenceCombo.currentIndexChanged.connect(self.sequenceChanged) self.ui.sequenceListEdit.editingFinished.connect(self.sequenceChanged) ## Create state group for saving/restoring state self.stateGroup = pg.WidgetGroup([ (self.ui.filterCombo,), (self.ui.sequenceCombo,), (self.ui.sequenceListEdit,), ]) def generateTask(self, params=None): if params is None or 'filterWheelPosition' not in params: ind = self.ui.filterCombo.currentIndex() slotn = self.ui.filterCombo.itemData(ind) else: slotn = params['filterWheelPosition'] return {'filterWheelPosition': slotn} def saveState(self, saveItems=False): state = self.stateGroup.state() return state def restoreState(self, state): self.stateGroup.setState(state) self.ui.sequenceListEdit.setVisible(state['sequenceCombo'] != 'off') self.sequenceChanged() def listSequence(self): if self.ui.sequenceCombo.currentIndex() == 0: return {} pos = str(self.ui.sequenceListEdit.text()) if pos == '': return {} try: pos = list(map(int, pos.split(','))) except Exception: raise ValueError("Filter list must be a comma-separated list of integer positions (got %r)" % pos) return {'filterWheelPosition': pos} def sequenceChanged(self): self.sigSequenceChanged.emit(self.dev.name()) if self.ui.sequenceCombo.currentIndex() == 1: self.ui.sequenceListEdit.show() else: self.ui.sequenceListEdit.hide() class FilterWheelDevGui(Qt.QWidget): def __init__(self, dev): Qt.QWidget.__init__(self) self.dev = dev self.layout = Qt.QGridLayout() self.setLayout(self.layout) self.positionBtnLayout = Qt.QGridLayout() self.layout.addLayout(self.positionBtnLayout, 0, 0) self.positionBtnLayout.setContentsMargins(0, 0, 0, 0) self.positionGroup = Qt.QButtonGroup() self.positionButtons = [] cols = 3 slotNames = self.dev.slotNames() for i in range(self.dev.getPositionCount()): name = slotNames[i] btn = Qt.QPushButton("%d: %s" % (i, name)) btn.setCheckable(True) btn.filterPosition = i self.positionButtons.append(btn) self.positionGroup.addButton(btn, i) self.positionBtnLayout.addWidget(btn, i // cols, i % cols) btn.clicked.connect(self.positionButtonClicked) self.positionGroup.setExclusive(True) self.positionChanged() self.dev.sigFilterChanged.connect(self.positionChanged) def positionChanged(self): pos = self.dev.getPosition() if pos is None: for btn in self.positionButtons: btn.setChecked(False) else: self.positionButtons[pos].setChecked(True) def positionButtonClicked(self): self.positionChanged() # reset button until the filter wheel catches up btn = self.sender() self.dev.setPosition(btn.filterPosition) class FilterWheelPollThread(Thread): def __init__(self, dev, interval=0.1): Thread.__init__(self, name=f"FilterWheelPollThread_{dev.name()}") self.dev = dev self.interval = interval def run(self): self.stopThread = False while self.stopThread is False: try: pos = self.dev.getPosition() time.sleep(self.interval) except: self.dev.logger.exception("Error in Filter Wheel poll thread:") time.sleep(1.0) def stop(self): self.stopThread = True