Source code for acq4.devices.ThorlabsMFC1.MFC1

import time

from acq4.drivers.ThorlabsMFC1 import MFC1 as MFC1_Driver
from acq4.util import Qt
from acq4.util.Mutex import Mutex
from acq4.util.Thread import Thread
from pyqtgraph import debug
from ..Stage import Stage, StageInterface, MoveFuture


class ChangeNotifier(Qt.QObject):
    sigPosChanged = Qt.Signal(object, object, object)


[docs] class ThorlabsMFC1(Stage): """ Driver for Thorlabs MFC1 motorized focus controller. Provides Z-axis control for microscope focusing with optional ROE integration. Configuration options: * **port** (str, required): Serial port (e.g., 'COM9') * **scale** (tuple, optional): (x, y, z) scale factors for position (default: (1, 1, 1)). Only Z-axis is used. * **motorParams** (dict, optional): Low-level motor parameters to pass to driver * **roe** (str, optional): Name of ROE device to integrate Z-axis control When specified, ROE Z-axis movements control the MFC1 focus * **limits** (list, optional): [min, max] Z-axis position limits * **parentDevice** (str, optional): Name of parent device (typically a stage) * **transform** (dict, optional): Spatial transform relative to parent device Example configuration:: FocusMotor: driver: 'ThorlabsMFC1' port: 'COM9' parentDevice: 'SutterStage' scale: [1.0, 1.0, 1e-6 * 85e-6 / 10e-3] limits: [-5e-3, 5e-3] """ def __init__(self, man, config, name): self.port = config.pop('port') self.scale = config.pop('scale', (1, 1, 1)) params = config.pop('motorParams', {}) self.dev = MFC1_Driver(self.port, **params) man.sigAbortAll.connect(self.dev.stop) # Optionally use ROE-200 z axis to control focus roe = config.pop('roe', None) self._roeDev = None self._roeEnabled = "waiting" # ROE control is disabled until after the first update if roe is not None: dev = man.getDevice(roe) self._roeDev = dev # need to connect to internal change signal because # the public signal should already have z-axis information removed. dev._notifier.sigPosChanged.connect(self._roeChanged) self._lastPos = None Stage.__init__(self, man, config, name) self.getPosition(refresh=True) # Optionally read limits from config limits = list(config.pop('limits', (None, None))) self.setLimits(z=limits) self._monitor = MonitorThread(self) self._monitor.start()
[docs] def capabilities(self): # device only reads/writes z-axis return { 'getPos': (False, False, True), 'setPos': (False, False, True), 'limits': (False, False, True), }
[docs] def mfcPosChanged(self, pos, oldpos): self.posChanged(pos)
def _getPosition(self): pos = self.dev.position() * self.scale[2] if pos != self._lastPos: oldpos = self._lastPos self._lastPos = pos self.posChanged([0, 0, pos]) return [0, 0, pos] def _move(self, pos, speed, linear, **kwds): pos = self._toAbsolutePosition(pos) limits = self.getLimits()[2] if limits[0] is not None: pos[2] = max(pos[2], limits[0]) if limits[1] is not None: pos[2] = min(pos[2], limits[1]) return MFC1MoveFuture(self, pos, speed) @property def positionUpdatesPerSecond(self): return 1 / self._monitor.minInterval
[docs] def targetPosition(self): return [0, 0, self.dev.target_position() * self.scale[2]]
[docs] def quit(self): self._monitor.stop() Stage.quit(self)
def _roeChanged(self, drive, pos, oldpos): if drive != self._roeDev.drive: return if self._roeEnabled is not True: if self._roeEnabled == 'waiting': self._roeEnabled = True return dz = pos[2] - oldpos[2] if dz == 0: return target = self.dev.target_position() * self.scale[2] + dz self.moveTo([0, 0, target], 'fast')
[docs] def deviceInterface(self, win): return MFC1StageInterface(self, win)
[docs] def setRoeEnabled(self, enable): self._roeEnabled = enable
[docs] def setZero(self): """Reset the device position to 0 (without moving the motor). """ self.dev.set_encoder(0) self._getPosition()
[docs] def stop(self): self.dev.stop()
[docs] def setHolding(self, hold): self.dev.set_holding(hold)
class MonitorThread(Thread): def __init__(self, dev): self.dev = dev self.lock = Mutex(recursive=True) self.stopped = False self.interval = 0.3 self.minInterval = 100e-3 Thread.__init__(self, name=f'{dev.name()}_MFC1MonitorThread') def start(self): self.stopped = False Thread.start(self) def stop(self): with self.lock: self.stopped = True def setInterval(self, i): with self.lock: self.interval = i def run(self): interval = self.minInterval lastPos = None while True: try: with self.lock: if self.stopped: break maxInterval = self.interval pos = self.dev._getPosition()[2] if pos != lastPos: # stage is moving; request more frequent updates interval = self.minInterval else: interval = min(maxInterval, interval*2) lastPos = pos time.sleep(interval) except: self.dev.logger.exception('Error in MFC1 monitor thread:') time.sleep(maxInterval) class MFC1StageInterface(StageInterface): def __init__(self, dev, win): StageInterface.__init__(self, dev, win) if dev._roeDev is not None: self.connectRoeBtn = Qt.QPushButton('Enable ROE') self.connectRoeBtn.setCheckable(True) self.connectRoeBtn.setChecked(True) self.layout.addWidget(self.connectRoeBtn, self.nextRow, 0, 1, 2) self.connectRoeBtn.toggled.connect(self.connectRoeToggled) self.setZeroBtn = Qt.QPushButton('Set Zero') self.layout.addWidget(self.setZeroBtn, self.nextRow, 2, 1, 1) self.setZeroBtn.clicked.connect(self.setZeroClicked) def setZeroClicked(self): self.dev.setZero() def connectRoeToggled(self, b): self.dev.setRoeEnabled(b) class MFC1MoveFuture(MoveFuture): """Provides access to a move-in-progress on an MPC200 drive. """ def __init__(self, dev, pos, speed): MoveFuture.__init__(self, dev, pos, speed) self.startPos = dev.getPosition() self.stopPos = pos self._moveStatus = {'status': None} self.id = dev.dev.move(pos[2] / dev.scale[2]) def wasInterrupted(self): """Return True if the move was interrupted before completing. """ return self._getStatus()['status'] in ('interrupted', 'failed') def percentDone(self): """Return an estimate of the percent of move completed based on the device's speed table. """ if self.isDone(): return 100 pos = self.dev.getPosition()[2] - self.startPos[2] target = self.stopPos[2] - self.startPos[2] if target == 0: return 99 return 100 * pos / target def isDone(self): """Return True if the move is complete. """ return self._getStatus()['status'] in ('interrupted', 'failed', 'done') def errorMessage(self): stat = self._getStatus() if stat['status'] == 'interrupted': return "move was interrupted" elif stat['status'] == 'failed': return "did not reach the expected position (%s != %s)" % (stat['final_pos'], stat['target']) else: return None def _getStatus(self): # check status of move unless we already know it is complete. if self._moveStatus['status'] in (None, 'moving'): self._moveStatus = self.dev.dev.move_status(self.id) return self._moveStatus