Source code for acq4.devices.MicroManagerCamera.mmcamera

import time
from collections import OrderedDict
from functools import lru_cache

import numpy as np

import acq4.util.ptime as ptime
from acq4.devices.Camera import Camera
from acq4.util import micromanager
from acq4.util.micromanager import MicroManagerError
from pyqtgraph.debug import Profiler

# Micromanager does not standardize trigger modes across cameras,
# so we use this dict to translate the modes of various cameras back
# to the standard ACQ4 modes:
#   Normal: Camera starts by software and acquires frames on its own clock
#   TriggerStart: Camera starts by trigger and acquires frames on its own clock
#   Strobe: Camera acquires one frame of a predefined exposure time for every trigger pulse
#   Bulb: Camera exposes one frame for the duration of each trigger pulse

triggerModes = {
    'TriggerType': {'Freerun': 'Normal'},  # QImaging 
    'Trigger': {'NORMAL': 'Normal', 'START': 'TriggerStart'},  # Hamamatsu
}


[docs] class MicroManagerCamera(Camera): """ Camera device that uses MicroManager to provide imaging support. Requires pymmcore to be installed along with MicroManager with matching API versions. MicroManager-specific configuration options: * **mmAdapterName** (str, required): MicroManager adapter name (e.g., 'HamamatsuHam') Find this in MicroManager hardware wizard when setting up the camera * **mmDeviceName** (str, required): MicroManager device name (e.g., 'HamamatsuHam_DCAM') Find this in MicroManager hardware wizard when setting up the camera * **path** (str, optional): Path to MicroManager installation directory Uses default MicroManager installation if not specified Standard Camera configuration options (see Camera base class): * **parentDevice** (str, optional): Name of parent optical device (microscope, etc.) * **transform** (dict, optional): Spatial transform relative to parent device - pos: Position offset [x, y] - scale: Scale factors [x, y] in m/pixel - angle: Rotation angle in radians * **params** (dict, optional): Camera parameters to set at startup - exposure: Default exposure time - binning: Pixel binning [x, y] Setup instructions: 1. Ensure pymmcore API version matches MicroManager API version - MicroManager API: Help -> About in MicroManager GUI - pymmcore API: `import pymmcore; print(pymmcore.__version__.split('.')[3])` 2. Test camera operation in MicroManager GUI first 3. Note adapter and device names from hardware wizard Example configuration:: Camera: driver: 'MicroManagerCamera' mmAdapterName: 'HamamatsuHam' mmDeviceName: 'HamamatsuHam_DCAM' parentDevice: 'Microscope' transform: pos: [0, 0] scale: [4.088e-6, -4.088e-6] angle: 0 params: exposure: 10e-3 binning: [4, 4] """ def __init__(self, manager, config, name): self.camName = str(name) # we will use this name as the handle to the MM camera mmpath = config.get('path') self.mmc = micromanager.getMMCorePy(mmpath) self._isRunning = False # track running state manually (see isRunning()) self._triggerProp = None # the name of the property for setting trigger mode self._triggerModes = ({}, {}) # forward and reverse mappings for the names of trigger modes self._binningMode = None # 'x' or 'xy' for binning strings like '1' and '1x1', respectively Camera.__init__(self, manager, config, name) ## superclass will call setupCamera when it is ready. self.acqBuffer = None self.frameId = 0 self.lastFrameTime = None
[docs] def setupCamera(self): # sanity check for MM adapter and device name adapterName = self.config['mmAdapterName'] allAdapters = self.mmc.getDeviceAdapterNames() if adapterName not in allAdapters: raise ValueError(f"Adapter name '{adapterName}' is not valid. Options are: {allAdapters}") deviceName = self.config['mmDeviceName'] try: allDevices = self.mmc.getAvailableDevices(adapterName) except Exception as e: raise RuntimeError(f"Error getting available devices for MicroManager adapter '{adapterName}'. {micromanager.versionWarning()}") from e if deviceName not in allDevices: raise ValueError("Device name '%s' is not valid for adapter '%s'. Options are: %s" % ( deviceName, adapterName, allDevices)) if deviceName == 'CellCam': self.camName = 'CellCam' # load.Device() for CellCam needs to have 'CellCam' as device name self.mmc.loadDevice(self.camName, adapterName, deviceName) # the 'Camera ID' property is not prefilled after loadDevice(). Need to assign it: if self.camName == 'CellCam': self.mmc.setProperty(self.camName, 'Camera ID', ''.join(self.mmc.getAllowedPropertyValues('CellCam', 'Camera ID'))) self.mmc.initializeDevice(self.camName) self._readAllParams()
def _selectCamera(self): # MMCore is shared across devices; ensure we talk to the correct camera. self.mmc.setCameraDevice(self.camName)
[docs] def startCamera(self): with self.camLock: self._selectCamera() self.mmc.startContinuousSequenceAcquisition(0) self._isRunning = True
[docs] def stopCamera(self): with self.camLock: # self._selectCamera() # MM doesn't allow starting sequence on two cameras independently, so in theory this can't be needed self.mmc.stopSequenceAcquisition() self.acqBuffer = None self._isRunning = False
[docs] def isRunning(self): # This is needed to allow calling setParam inside startCamera before the acquisition has actually begun # (but after the acquisition thread has started) with self.camLock: return self._isRunning
# note: if we have more than one camera, we can't call selectCamera while another is running # so it's not possible to query isSequenceRunning() reliably. # self._selectCamera() # return self.mmc.isSequenceRunning() def _acquireFrames(self, n=1): if self.isRunning(): self.stop() with self.camLock: try: self.mmc.setCameraDevice(self.camName) self.mmc.startSequenceAcquisition(n, 0, True) self._isRunning = True frames = [] timeoutStart = ptime.time() while self.mmc.isSequenceRunning() or self.mmc.getRemainingImageCount() > 0: if self.mmc.getRemainingImageCount() > 0: timeoutStart = ptime.time() frames.append(self.mmc.popNextImage().T[np.newaxis, ...]) elif ptime.time() - timeoutStart > 10.0: raise TimeoutError("Timed out waiting for camera frame.") else: time.sleep(0.005) finally: self._isRunning = False self.mmc.stopSequenceAcquisition() if len(frames) < n: self.logger.exception( f"Fixed-frame camera acquisition ended before all frames received ({len(frames)}/{n})" ) return np.concatenate(frames, axis=0)
[docs] def newFrames(self): """Return a list of all frames acquired since the last call to newFrames.""" with self.camLock: # self._selectCamera() # MM doesn't allow starting sequence on two cameras independently, so in theory this can't be needed nFrames = self.mmc.getRemainingImageCount() if nFrames == 0: return [] now = ptime.time() if self.lastFrameTime is None: self.lastFrameTime = now dt = (now - self.lastFrameTime) / nFrames frames = [] with self.camLock: # self._selectCamera() for i in range(nFrames): frames.append({ 'time': self.lastFrameTime + (dt * (i + 1)), 'id': self.frameId, 'data': self.mmc.popNextImage().T, }) self.frameId += 1 self.lastFrame = frames[-1] self.lastFrameTime = now return frames
[docs] def quit(self): self.mmc.stopSequenceAcquisition() self.mmc.unloadDevice(self.camName)
def _readAllParams(self): # these are parameters expected for all cameras defaultParams = ['exposure', 'binningX', 'binningY', 'regionX', 'regionY', 'regionW', 'regionH', 'triggerMode'] with self.camLock: params = OrderedDict([(n, None) for n in defaultParams]) properties = self.mmc.getDevicePropertyNames(self.camName) + ('Exposure',) # because the CellCam driver didn't present the exposure as a property, need to add it with a getExposure() call for prop in properties: vals = self.mmc.getAllowedPropertyValues(self.camName, prop) if vals == (): if self.camName == 'CellCam' and prop == 'Exposure': vals = (1, 100) # sensible range of exposure values... elif self.mmc.hasPropertyLimits(self.camName, prop): vals = ( self.mmc.getPropertyLowerLimit(self.camName, prop), self.mmc.getPropertyUpperLimit(self.camName, prop), ) else: # just guess.. vals = (1e-6, 1e3) vals = list(vals) if self.camName == 'CellCam' and prop == 'Exposure': readonly = False # again, workaround... else: readonly = self.mmc.isPropertyReadOnly(self.camName, prop) # translate standard properties to the names / formats that we expect if prop == 'Exposure': prop = 'exposure' # convert ms to s vals = tuple([v * 1e-3 for v in vals]) elif prop == 'Binning': for i in range(len(vals)): if 'x' in vals[i]: vals[i] = vals[i].split('x') self._binningMode = 'xy' else: vals[i] = [vals[i], vals[i]] self._binningMode = 'x' params['binningX'] = ([int(v[0]) for v in vals], not readonly, True, []) params['binningY'] = ([int(v[1]) for v in vals], not readonly, True, []) continue elif prop in triggerModes: self._triggerProp = prop modes = triggerModes[prop] self._triggerModes = (modes, {v: k for k, v in modes.items()}) prop = 'triggerMode' vals = [modes[v] for v in vals] # translation from PixelType to bitDepth is not exact; this will take more work. # for now we just expose PixelType directly. # elif prop == 'PixelType': # prop = 'bitDepth' # vals = [int(bd.rstrip('bit')) for bd in vals] params[prop] = (vals, not readonly, True, []) # Reset ROI to full frame so we know the native resolution self.mmc.setCameraDevice(self.camName) bin = '1' if self._binningMode == 'x' else '1x1' self.mmc.setProperty(self.camName, 'Binning', bin) self.mmc.clearROI() rgn = self.getROI() self._sensorSize = rgn[2:] params.update({ 'regionX': [(0, rgn[2] - 1, 1), True, True, []], 'regionY': [(0, rgn[3] - 1, 1), True, True, []], 'regionW': [(1, rgn[2], 1), True, True, []], 'regionH': [(1, rgn[3], 1), True, True, []], }) if params['triggerMode'] is None: params['triggerMode'] = (['Normal'], False, True, []) if params['binningX'] is None: params['binningX'] = [[1], False, True, []] params['binningY'] = [[1], False, True, []] self._allParams = params
[docs] def getROI(self): camRegion = self.mmc.getROI(self.camName) if self._useBinnedPixelsForROI: xAdjustment = self.getParam("binningX") yAdjustment = self.getParam("binningY") else: xAdjustment = 1 yAdjustment = 1 return [ camRegion[0] * xAdjustment, camRegion[1] * yAdjustment, camRegion[2] * xAdjustment, camRegion[3] * yAdjustment, ]
[docs] def setROI(self, rgn): if self._useBinnedPixelsForROI: rgn[0] = int(rgn[0] / self.getParam('binningX')) rgn[1] = int(rgn[1] / self.getParam('binningY')) rgn[2] = int(rgn[2] / self.getParam('binningX')) rgn[3] = int(rgn[3] / self.getParam('binningY')) # is this different from current ROI? currentRgn = self.mmc.getROI(self.camName) if tuple(rgn) == currentRgn: return self.mmc.setROI(*rgn)
@lru_cache(maxsize=None) def _useBinnedPixelsForROI(self): # Adjusting ROI to be in binned-pixel units is necessary in all versions of # MMCore 7.0.2 and above. version = self.mmc.getVersionInfo() # e.g. "MMCore version 7.0.2" ver_num = version.split(" ")[-1].split(".") ver_tup = tuple([int(d) for d in ver_num]) return ver_tup >= (7, 0, 2)
[docs] def listParams(self, params=None): """List properties of specified parameters, or of all parameters if None""" if params is None: return self._allParams.copy() if isinstance(params, str): return self._allParams[params] return dict([(p, self._allParams[p]) for p in params])
[docs] def setParams(self, params, autoRestart=True, autoCorrect=True): p = Profiler(disabled=True, delayed=False) # umanager will refuse to set params while camera is running, # so autoRestart doesn't make sense in this context if self.isRunning(): restart = autoRestart self.stop() p('stop') else: restart = False # Join region params into one request (_setParam can be very slow) regionKeys = ['regionX', 'regionY', 'regionW', 'regionH'] nRegionKeys = len([k for k in regionKeys if k in params]) if nRegionKeys > 1: rgn = list(self.getROI()) for k in regionKeys: if k not in params: continue i = {'X': 0, 'Y': 1, 'W': 2, 'H': 3}[k[-1]] rgn[i] = params[k] del params[k] params['region'] = rgn newVals = {} for k, v in params.items(): try: self._setParam(k, v, autoCorrect=autoCorrect) except MicroManagerError: self.logger.exception(f"Unable to set {k} param to {v}") else: p(f'setParam {k!r}') if k == 'binning': newVals['binningX'], newVals['binningY'] = self.getParam(k) elif k == 'region': newVals['regionX'], newVals['regionY'], newVals['regionW'], newVals['regionH'] = self.getParam(k) else: newVals[k] = self.getParam(k) p('reget param') self.sigParamsChanged.emit(newVals) p('emit') if restart: self.start() p('start') needRestart = False return newVals, needRestart
[docs] def setParam(self, param, value, autoCorrect=True, autoRestart=True): return self.setParams({param: value}, autoCorrect=autoCorrect, autoRestart=autoRestart)
def _setParam(self, param, value, autoCorrect=True): if param.startswith('region'): if param == 'region': rgn = [value[0], value[1], value[2], value[3]] else: rgn = list(self.getROI()) if param[-1] == 'X': rgn[0] = value elif param[-1] == 'Y': rgn[1] = value elif param[-1] == 'W': rgn[2] = value elif param[-1] == 'H': rgn[3] = value self.mmc.setCameraDevice(self.camName) self.setROI(rgn) return # translate requested parameter into a list of sub-parameters to set setParams = [] if param.startswith('binning'): if self._binningMode is None: # camera does not support binning; only allow values of 1 if value in [1, (1, 1)]: return else: raise ValueError('Invalid binning value %s=%s' % (param, value)) if param == 'binningX': y = self.getParam('binningY') value = (value, y) elif param == 'binningY': x = self.getParam('binningX') value = (x, value) if self._binningMode == 'x': value = '%d' % value[0] else: value = '%dx%d' % value setParams.append(('Binning', value)) elif param == 'exposure': # s to ms setParams.append(('Exposure', value * 1e3)) elif param == 'triggerMode': if self._triggerProp is None: # camera does not support triggering; only allow 'Normal' mode if value != 'Normal': raise ValueError("Invalid trigger mode '%s'" % value) return # translate trigger mode name setParams.append((self._triggerProp, self._triggerModes[1][value])) # Hamamatsu cameras require setting a trigger source as well if self.config['mmAdapterName'] == 'HamamatsuHam': if value == 'Normal': source = 'INTERNAL' elif value == 'TriggerStart': source = 'EXTERNAL' else: raise ValueError("Invalid trigger mode '%s'" % value) # On Orca 4 we actually have to toggle the source property # back and forth, otherwise it is sometimes ignored. setParams.append(('TRIGGER SOURCE', 'INTERNAL')) setParams.append(('TRIGGER SOURCE', source)) else: setParams.append((param, value)) # elif param == 'bitDepth': # param = 'PixelType' # value = '%dbit' % value with self.camLock: for param, value in setParams: if param == 'Exposure' and self.camName == "CellCam": # workaround for CellCam - call to setExposure(), not getProperty() self.mmc.setExposure(self.camName, value) else: self.mmc.setProperty(self.camName, str(param), str(value))
[docs] def getParams(self, params=None): if params is None: params = list(self.listParams().keys()) return dict([(p, self.getParam(p)) for p in params])
[docs] def getParam(self, param): if param == 'sensorSize': return self._sensorSize elif param.startswith('region'): rgn = self.getROI() if param == 'region': return rgn i = ['regionX', 'regionY', 'regionW', 'regionH'].index(param) return rgn[i] elif param.startswith('binning') and self._binningMode is None: # camera does not support binning; fake it here if param == 'binning': return 1, 1 elif param in ('binningX', 'binningY'): return 1 elif param == 'triggerMode' and self._triggerProp is None: # camera does not support triggering; fake it here return 'Normal' paramTrans = { 'exposure': 'Exposure', 'binning': 'Binning', 'binningX': 'Binning', 'binningY': 'Binning', 'triggerMode': self._triggerProp, 'bitDepth': 'PixelType', }.get(param, param) with self.camLock: if paramTrans == 'Exposure' and self.camName == "CellCam": val = self.mmc.getExposure(self.camName) # workaround for CellCam else: val = self.mmc.getProperty(self.camName, str(paramTrans)) # coerce to int or float if possible try: val = int(val) except ValueError: try: val = float(val) except ValueError: pass if param in ('binning', 'binningX', 'binningY'): if self._binningMode == 'x': val = (int(val),) * 2 else: val = tuple([int(b) for b in val.split('x')]) if param == 'binningY': return val[1] elif param == 'binningX': return val[0] elif param == 'binning': return val elif param == 'exposure': # ms to s val = val * 1e-3 # elif param == 'bitDepth': # val = int(val.rstrip('bit')) elif param == 'triggerMode': val = self._triggerModes[0][val] return val