Source code for acq4.devices.MicroManagerStage.mmstage

import time

import numpy as np
from pyqtgraph import debug

from acq4.util.Mutex import Mutex
from acq4.util.Thread import Thread
from acq4.util.micromanager import getMMCorePy
from ..Stage import Stage, MoveFuture, StageInterface


[docs] class MicroManagerStage(Stage): """ Stage device that uses MicroManager to control motorized stages. Supports separate XY and Z stage devices through MicroManager adapters. Configuration options: * **scale** (tuple, optional): (x, y, z) scale factors in m/step (default: (1e-6, 1e-6, 1e-6)) * **xyStage** (dict, optional): XY stage configuration - mmAdapterName: MicroManager adapter name (e.g., 'Scientifica') - mmDeviceName: MicroManager device name (e.g., 'XYStage') - serial: Serial port configuration dict - port: Serial port (e.g., 'COM22') - baud: Baud rate (e.g., 9600) * **zStage** (dict, optional): Z stage configuration - mmAdapterName: MicroManager adapter name (e.g., 'Scientifica') - mmDeviceName: MicroManager device name (e.g., 'ZStage') - serial: Serial port configuration dict - port: 'shared' to use same serial as XY stage, or specific port At least one of xyStage or zStage must be specified. * **parentDevice** (str, optional): Name of parent device for coordinate transforms * **transform** (dict, optional): Spatial transform relative to parent device Example configuration:: SliceScope: driver: 'MicroManagerStage' scale: [-1e-6, -1e-6, 1e-6] xyStage: mmAdapterName: 'Scientifica' mmDeviceName: 'XYStage' serial: port: 'COM22' baud: 9600 zStage: mmAdapterName: 'Scientifica' mmDeviceName: 'ZStage' serial: port: 'shared' """ def __init__(self, man, config, name): self.scale = config.pop('scale', (1e-6, 1e-6, 1e-6)) self.speedToMeters = .001 self.mmc = getMMCorePy() self._mmDeviceNames = {'xy': None, 'z': None} self._mmSerialPortNames = {'xy': None, 'z': None} self._axes = [] # Configure XY and Z stages separately if 'xyStage' not in config and 'zStage' not in config: raise Exception("Micromanager stage configuration myst have 'xyStage', 'zStage', or both.") allAdapters = self.mmc.getDeviceAdapterNames() for axes in ('xy', 'z'): # sanity check for MM adapter and device name stageCfg = config.get(axes + 'Stage', None) if stageCfg is None: continue self._axes.append(axes) adapterName = stageCfg['mmAdapterName'] if adapterName not in allAdapters: raise ValueError("Adapter name '%s' is not valid. Options are: %s" % (adapterName, allAdapters)) mmDeviceName = stageCfg.get('mmDeviceName', None) allDevices = self.mmc.getAvailableDevices(adapterName) if mmDeviceName not in allDevices: raise ValueError("Device name '%s' is not valid for adapter '%s'. Options are: %s" % ( mmDeviceName, adapterName, allDevices)) # Load this device devName = str(name) + '_' + axes self._mmDeviceNames[axes] = devName self.mmc.loadDevice(devName, adapterName, mmDeviceName) # Set up serial port if needed if 'serial' in stageCfg: # Z stage may use the same serial port as XY stage if stageCfg['serial']['port'] == 'shared': if axes != 'z': raise Exception('Shared serial port only allowed for Z axis.') if 'xyStage' not in config: raise Exception('Shared serial port requires xyStage.') portName = self._mmDeviceNames['xy'] + '_port' self.mmc.setProperty(devName, 'Port', portName) self._mmSerialPortNames[axes] = portName else: portName = devName + "_port" self.mmc.loadDevice(portName, "SerialManager", str(stageCfg['serial']['port'])) if 'baud' in stageCfg['serial']: self.mmc.setProperty(portName, 'BaudRate', stageCfg['serial']['baud']) self.mmc.setProperty(devName, 'Port', portName) self.mmc.initializeDevice(portName) self._mmSerialPortNames[axes] = portName self.mmc.initializeDevice(devName) self._lastMove = None self._focusDevice = self # self.userSpeed = np.asarray(self.mmc.getProperty(self._mmDeviceName, 'Speed-S')).astype(float) * self.speedToMeters man.sigAbortAll.connect(self.abort) Stage.__init__(self, man, config, name) # clear cached position for this device and re-read to generate an initial position update self._lastPos = None time.sleep(1.0) self.getPosition(refresh=True) # thread for polling position changes self.monitor = MonitorThread(self) self.monitor.start()
[docs] def capabilities(self): """Return a structure describing the capabilities of this device""" if 'capabilities' in self.config: return self.config['capabilities'] else: haveXY = 'xy' in self._axes haveZ = 'z' in self._axes return { 'getPos': (haveXY, haveXY, haveZ), 'setPos': (haveXY, haveXY, haveZ), 'limits': (False, False, False), }
[docs] def stop(self): """Stop the manipulator. If the manipulator is currently in use elsewhere, this method blocks until it becomes available. """ with self.lock: for ax in self._axes: self.mmc.stop(self._mmDeviceNames[ax]) if self._lastMove is not None: self._lastMove._stopped() self._lastMove = None
[docs] def abort(self): """Stop the manipulator immediately. This method asks the manipulator to stop even if it is being accessed elsewhere. This can cause communication errors, but may be preferred if stopping immediately is critical. """ for ax in self._axes: try: self.mmc.stop(self._mmDeviceNames[ax]) if self._lastMove is not None: self._lastMove._stopped() self._lastMove = None except: self.logger.exception(f"Error stopping axis {ax}:")
@property def positionUpdatesPerSecond(self): return 1 / self.monitor.minInterval
[docs] def setUserSpeed(self, v): """Set the maximum speed of the stage (m/sec) when under manual control. The stage's maximum speed is reset to this value when it is not under programmed control. """ self.userSpeed = v self.mmc.setProperty(self._mmDeviceName, 'Speed-S', v / self.speedToMeters)
def _getPosition(self): # Called by superclass when user requests position refresh with self.lock: pos = [0., 0., 0.] if 'xy' in self._axes: pos[0] = self.mmc.getXPosition(self._mmDeviceNames['xy']) * self.scale[0] pos[1] = self.mmc.getYPosition(self._mmDeviceNames['xy']) * self.scale[1] if 'z' in self._axes: pos[2] = self.mmc.getPosition(self._mmDeviceNames['z']) * self.scale[2] if pos != self._lastPos: self._lastPos = pos emit = True else: emit = False if emit: # don't emit signal while locked self.posChanged(pos) return pos
[docs] def targetPosition(self): with self.lock: if self._lastMove is None or self._lastMove.isDone(): return self.getPosition() else: return self._lastMove.targetPos
[docs] def quit(self): self.monitor.stop() Stage.quit(self)
def _move(self, pos, speed, linear, **kwds): with self.lock: if self._lastMove is not None and not self._lastMove.isDone(): self.stop() # Decide which axes to move moveZ = pos[2] is not None moveXY = pos[0] is not None and pos[1] is not None speed = self._interpretSpeed(speed) self._lastMove = MicroManagerMoveFuture(self, pos, speed, self.userSpeed, moveXY=moveXY, moveZ=moveZ) return self._lastMove
[docs] def deviceInterface(self, win): return MicroManagerGUI(self, win)
[docs] def startMoving(self, vel): """Begin moving the stage at a continuous velocity. """ raise Exception("MicroManager stage does not support startMoving() function.")
class MonitorThread(Thread): """Thread to poll for manipulator position changes. """ def __init__(self, dev): self.dev = dev self.lock = Mutex(recursive=True) self.stopped = False self.interval = 0.3 self.minInterval = 100e-3 Thread.__init__(self, name=f'{dev.name()}_MonitorThread') def start(self): self.stopped = False Thread.start(self) def stop(self): with self.lock: self.stopped = True def setInterval(self, i): with self.lock: self.interval = i def run(self): interval = self.minInterval lastPos = None while True: try: with self.lock: if self.stopped: break maxInterval = self.interval pos = self.dev._getPosition() # this causes sigPositionChanged to be emitted if pos != lastPos: # if there was a change, then loop more rapidly for a short time. interval = self.minInterval lastPos = pos else: interval = min(maxInterval, interval * 2) time.sleep(interval) except Exception: self.dev.logger.exception('Error in MicromanagerStage monitor thread:') time.sleep(maxInterval) class MicroManagerMoveFuture(MoveFuture): """Provides access to a move-in-progress on a micromanager stage. """ def __init__(self, dev, pos, speed, userSpeed, moveXY=True, moveZ=True): MoveFuture.__init__(self, dev, pos, speed, name=f'{dev.name()}_move') self._interrupted = False self._errorMSg = None self._finished = False pos = np.array(pos) / np.array(self.dev.scale) with self.dev.lock: if moveXY: self.dev.mmc.setXYPosition(self.dev._mmDeviceNames['xy'], pos[:1]) if moveXY: self.dev.mmc.setPosition(self.dev._mmDeviceNames['z'], pos[2]) def wasInterrupted(self): """Return True if the move was interrupted before completing. """ return self._interrupted def isDone(self): """Return True if the move is complete. """ return self._getStatus() != 0 def _getStatus(self): # check status of move unless we already know it is complete. # 0: still moving; 1: finished successfully; -1: finished unsuccessfully if self._finished: if self._interrupted: return -1 else: return 1 for ax in self._axes: if self.dev.mmc.deviceBusy(self.dev._mmDeviceNames[ax]): # Still moving return 0 # did we reach target? pos = self.dev._getPosition() dif = ((np.array(pos) - np.array(self.targetPos)) ** 2).sum() ** 0.5 if dif < 2.5e-6: # reached target self._finished = True return 1 else: # missed self._finished = True self._interrupted = True self._errorMsg = "Move did not complete (target=%s, position=%s, dif=%s)." % (self.targetPos, pos, dif) return -1 def _stopped(self): # Called when the manipulator is stopped, possibly interrupting this move. status = self._getStatus() if status == 1: # finished; ignore stop return elif status == -1: self._errorMsg = "Move was interrupted before completion." elif status == 0: # not actually stopped! This should not happen. raise RuntimeError("Interrupted move but manipulator is still running!") else: raise ValueError(f"Unknown status: {status}") def errorMessage(self): return self._errorMsg class MicroManagerGUI(StageInterface): def __init__(self, dev, win): StageInterface.__init__(self, dev, win)