from __future__ import annotations
from collections import OrderedDict
import numpy as np
from neuroanalysis.test_pulse import PatchClampTestPulse
from acq4.devices.PatchClamp.patchclamp import PatchClamp
from acq4.util import Qt
from acq4.util import ptime
from coorx import Point
from .devgui import PatchPipetteDeviceGui
from .statemanager import PatchPipetteStateManager
from ..Camera import Camera
from ..Device import Device
from ..Pipette import Pipette
from ..PressureControl import PressureControl
from ..Sonicator import Sonicator
[docs]
class PatchPipette(Device):
"""Represents a single patch pipette, manipulator, and headstage.
This class extends from the Pipette device class to provide automation and visual feedback
on the status of the patch:
* Whether a cell is currently patched
* Input resistance, access resistance, and holding levels
This is also a good place to implement pressure control, autopatching, slow voltage clamp, etc.
If you intend this for use with the MultiPatch module, the device name in the configuration
needs to end in a number.
"""
sigStateChanged = Qt.Signal(object, object, object) # self, newState, oldState
sigActiveChanged = Qt.Signal(object, object) # self, active
sigPressureChanged = Qt.Signal(object, object, object) # self, source, pressure
sigMoveStarted = Qt.Signal(object) # self
sigMoveFinished = Qt.Signal(object, object) # self, position
sigTargetChanged = Qt.Signal(object, object) # self, target
sigNewPipetteRequested = Qt.Signal(object) # self
sigTipCleanChanged = Qt.Signal(object, object) # self, clean
sigTipBrokenChanged = Qt.Signal(object, object) # self, broken
# catch-all signal for event logging
sigNewEvent = Qt.Signal(object, object) # self, event
# emitted every time we finish a patch attempt
sigPatchAttemptFinished = Qt.Signal(object, object) # self, patch record
# These attributes can be modified to customize state management and test pulse acquisition
defaultStateManagerClass = PatchPipetteStateManager
def __init__(self, deviceManager, config, name):
pipName = config.pop('pipetteDevice', None)
self.pipetteDevice: Pipette = deviceManager.getDevice(pipName)
clampName = config.pop('clampDevice', None)
self.clampDevice: PatchClamp | None = None
if clampName is not None:
self.clampDevice = deviceManager.getDevice(clampName)
self.clampDevice.sigStateChanged.connect(self.clampStateChanged)
self.clampDevice.sigAutoBiasChanged.connect(self._autoBiasChanged)
self.clampDevice.sigTestPulseFinished.connect(self._testPulseFinished)
Device.__init__(self, deviceManager, config, name)
# current state variables
self.active = False
self.broken = False
self.clean = False
self.calibrated = False
self.waitingForSwap = False
self._lastPos = None
self._emitTestPulseData = False
self.cell = None
# key measurements made during patch process and lifetime of pipette
self._patchRecord = None
self._pipetteRecord = None
self.pressureDevice: PressureControl | None = None
if 'pressureDevice' in config:
self.pressureDevice = deviceManager.getDevice(config['pressureDevice'])
self.pressureDevice.sigPressureChanged.connect(self.pressureChanged)
self.userPressure = False
self.sonicatorDevice: Sonicator | None = None
if 'sonicatorDevice' in config:
self.sonicatorDevice = deviceManager.getDevice(config['sonicatorDevice'])
self.sonicatorDevice.sigSonicationChanged.connect(self.sonicationChanged)
self._initStateManager()
self.pipetteDevice.sigCalibrationChanged.connect(self._pipetteCalibrationChanged)
self.pipetteDevice.sigMoveStarted.connect(self._pipetteMoveStarted)
self.pipetteDevice.sigMoveFinished.connect(self._pipetteMoveFinished)
self.pipetteDevice.sigMoveRequested.connect(self._pipetteMoveRequested)
self.pipetteDevice.sigTargetChanged.connect(self._pipetteTargetChanged)
self.pipetteDevice.parentDevice().sigPositionChanged.connect(
self._manipulatorTransformChanged
)
self.pipetteDevice.parentDevice().sigOrientationChanged.connect(
self._manipulatorTransformChanged
)
deviceManager.declareInterface(name, ['patchpipette'], self)
# restore last known state for this pipette
lastState = self.readConfigFile('last_state')
# restoring previous state is temporarily disabled -- this needs a lot more work to be safe.
# self.setState(lastState.get('state', 'out'))
# self.broken = lastState.get('broken', False)
# self.calibrated = lastState.get('calibrated', False)
# self.setActive(False) # Always start pipettes disabled rather than restoring last state?
# # self.setActive(lastState.get('active', False))
[docs]
def isTipClean(self):
return self.clean
[docs]
def setTipClean(self, clean):
if clean == self.clean:
return
self.clean = clean
self.sigTipCleanChanged.emit(self, clean)
self.emitNewEvent('tip_clean_changed', {'clean': clean})
[docs]
def isTipBroken(self):
return self.broken
[docs]
def setTipBroken(self, broken):
if broken == self.broken:
return
self.broken = broken
self.sigTipBrokenChanged.emit(self, broken)
self.emitNewEvent('tip_broken_changed', {'broken': broken})
# states should take care of this, but we want to make sure pressure stops quickly.
if broken and self.pressureDevice is not None:
self.pressureDevice.setPressure(pressure=0)
[docs]
def scopeDevice(self):
return self.pipetteDevice.scopeDevice()
[docs]
def imagingDevice(self) -> Camera:
return self.pipetteDevice.imagingDevice()
[docs]
def focusOnTip(self, speed, raiseErrors=False):
imdev = self.imagingDevice()
fut = imdev.moveCenterToGlobal(self.pipetteDevice.globalPosition(), speed=speed)
if raiseErrors:
fut.raiseErrors("Error while focusing on pipette tip: {error}")
[docs]
def focusOnTarget(self, speed, raiseErrors=False):
imdev = self.imagingDevice()
fut = imdev.moveCenterToGlobal(self.pipetteDevice.targetPosition(), speed=speed)
if raiseErrors:
fut.raiseErrors("Error while focusing on pipette target: {error}")
return fut
[docs]
def newPipette(self):
"""A new physical pipette has been attached; reset any per-pipette state."""
self.setTipBroken(False)
self.setTipClean(True)
self.calibrated = False
self.waitingForSwap = False
self._pipetteRecord = None
self.pipetteDevice.setOffset(self.pipetteDevice.averageHistoricOffset())
self.emitNewEvent('new_pipette', {})
self.newPatchAttempt()
self.setState('bath')
return self.pipetteDevice.findNewPipette()
[docs]
def requestNewPipette(self):
"""Call to emit a signal requesting a new pipette."""
self.waitingForSwap = True
self.sigNewPipetteRequested.emit(self)
[docs]
def pipetteRecord(self):
if self._pipetteRecord is None:
self._pipetteRecord = {
'originalResistance': None,
'cleanCount': 0,
}
return self._pipetteRecord
[docs]
def newPatchAttempt(self):
"""Ready to begin a new patch attempt; reset TP history and patch record."""
self.finishPatchRecord()
if self.clampDevice:
self.clampDevice.resetTestPulseHistory()
self.emitNewEvent('new_patch_attempt', {})
def _resetPatchRecord(self):
self.finishPatchRecord()
piprec = self.pipetteRecord()
self._patchRecord = OrderedDict(
[
('patchPipette', self.name()),
('pipetteOriginalResistance', piprec['originalResistance']),
('pipetteCleanCount', piprec['cleanCount']),
('initialResistance', None),
('initialOffset', None),
('attemptedCellDetect', False),
('detectedCell', None),
('cellDetectInitialTarget', None),
('cellDetectFinalTarget', None),
('attemptedSeal', False),
('sealSuccessful', None),
('fouledBeforeSeal', None),
('resistanceBeforeSeal', None),
('resistanceBeforeBreakin', None),
('offsetBeforeSeal', None),
('attemptedBreakin', False),
('breakinSuccessful', None),
('spontaneousBreakin', None),
('initialBaselineCurrent', None),
('initialBaselinePotential', None),
('wholeCellStartTime', None),
('wholeCellStopTime', None),
('wholeCellPosition', None),
('resealResistance', None),
('resistanceAfterBlowout', None),
('offsetAfterBlowout', None),
('complete', False),
]
)
[docs]
def patchRecord(self):
if self._patchRecord is None:
self._resetPatchRecord()
return self._patchRecord
[docs]
def setCell(self, cell, target=True):
self.cell = cell
if target:
self.pipetteDevice.setTarget(cell.position.mapped_to('global').coordinates)
[docs]
def newCell(self):
try:
from acq4_automation.feature_tracking.cell import Cell
self.cell = Cell(Point(self.pipetteDevice.targetPosition(), 'global'))
except ImportError:
self.logger.exception(
"Cell-based features are unavailable without the acq4_automation package",
)
[docs]
def finishPatchRecord(self):
if self._patchRecord is None:
return
self._patchRecord['complete'] = True
self.sigPatchAttemptFinished.emit(self, self._patchRecord)
self._patchRecord = None
[docs]
def pressureChanged(self, dev, source, pressure):
self.sigPressureChanged.emit(self, source, pressure)
self.emitNewEvent(
'pressure_changed', OrderedDict([('source', source), ('pressure', pressure)])
)
[docs]
def sonicationChanged(self, state: str):
self.emitNewEvent('sonication_changed', {'state': state})
[docs]
def setSelected(self):
pass
[docs]
def setState(self, state, setActive=True, **config):
"""Attempt to set the state (out, bath, seal, whole cell, etc.) of this patch pipette.
The actual resulting state is returned.
"""
if setActive:
self.setActive(True)
return self._stateManager.requestStateChange(state, **config)
[docs]
def listStates(self):
"""Return a list of all known state names this pipette can be set to."""
return self._stateManager.listStates()
def _setState(self, state, oldState):
"""Called by state manager when state has changed."""
self._writeStateFile()
self.emitNewEvent('state_change', OrderedDict([('state', state), ('old_state', oldState)]))
self.sigStateChanged.emit(self, state, oldState)
def _writeStateFile(self):
state = {
'state': self._stateManager.getState().stateName,
'active': self.active,
'calibrated': self.calibrated,
'broken': self.broken,
}
self.writeConfigFile(state, 'last_state')
[docs]
def getState(self):
return self._stateManager.getState()
[docs]
def breakIn(self):
"""Rupture the cell membrane using negative current pulses.
* -2 psi for 3 sec or until rupture
* -4, -6, -8 psi if needed
* longer wait time if needed
"""
def _pipetteCalibrationChanged(self):
self.calibrated = True
self.emitNewEvent('pipette_calibrated')
def _manipulatorTransformChanged(self, dev, *args):
pos = np.array(self.pipetteDevice.globalPosition())
if self._lastPos is None or np.linalg.norm(pos - self._lastPos) > 1e-6:
self._lastPos = pos
self.emitNewEvent('pipette_transform_changed', {'globalPosition': tuple(pos)})
[docs]
def setActive(self, active):
if self.active == active:
return
self.active = active
self.sigActiveChanged.emit(self, active)
self.emitNewEvent('active_changed', {'active': active})
[docs]
def deviceInterface(self, win):
"""Return a widget with a UI to put in the device rack"""
return PatchPipetteDeviceGui(self, win)
[docs]
def clampStateChanged(self, state):
self.emitNewEvent('clamp_state_change', state)
def _initStateManager(self):
# allow external modification of state manager class
self._stateManager = self.defaultStateManagerClass(self)
self.setState('out')
[docs]
def stateManager(self):
return self._stateManager
[docs]
def quit(self):
if self.clampDevice:
self.clampDevice.enableTestPulse(False, block=True)
if getattr(self, '_stateManager', None) is not None:
self._stateManager.quit()
[docs]
def goHome(self, speed, **kwds):
self.setState('out')
return self.pipetteDevice.goHome(speed, **kwds)
def _pipetteMoveStarted(self, pip, pos):
self.sigMoveStarted.emit(self)
self.emitNewEvent('move_start', {'position': tuple(pos)})
def _pipetteMoveRequested(self, pip, pos, speed, opts):
self.emitNewEvent(
'move_requested',
OrderedDict(
[
('position', tuple(pos)),
('speed', speed),
('opts', repr(opts)),
]
),
)
def _pipetteMoveFinished(self, pip, pos):
self.sigMoveFinished.emit(self, pos)
self.emitNewEvent('move_stop', {'position': [pos[0], pos[1], pos[2]]})
def _pipetteTargetChanged(self, pip, pos):
self.sigTargetChanged.emit(self, pos)
self.emitNewEvent('target_changed', {'target_position': [pos[0], pos[1], pos[2]]})
def _autoBiasChanged(self, clamp, enabled, target):
self.emitNewEvent('auto_bias_change', {'enabled': enabled, 'target': target})
[docs]
def emitFullTestPulseData(self, emit: bool):
self._emitTestPulseData = emit
def _testPulseFinished(self, clamp, result: PatchClampTestPulse):
data = result.analysis
if self._emitTestPulseData:
data = {'full_test_pulse': result, **data} # copy it so we don't modify the original
self.emitNewEvent('test_pulse', data)
[docs]
def emitNewEvent(self, eventType, eventData=None):
newEv = OrderedDict(
[
('device', self.name()),
('event_time', ptime.time()),
('event', eventType),
]
)
if eventData is not None:
newEv.update(eventData)
self.sigNewEvent.emit(self, newEv)