from __future__ import annotations
import contextlib
import json
import os
import weakref
from typing import List
import numpy as np
import pyqtgraph as pg
from acq4 import getManager
from acq4.devices.Device import Device
from acq4.devices.OptomechDevice import OptomechDevice
from acq4.devices.Stage import Stage, MovePathFuture
from acq4.modules.Camera import CameraModuleInterface
from acq4.util import Qt, ptime
from acq4.util.future import future_wrap, Future
from acq4.util.target import Target
from pyqtgraph import Point, siFormat
from .planners import PipettePathGenerator
from .planners import defaultMotionPlanners
from .tracker import ResnetPipetteTracker
from ..Camera import Camera
from ..RecordingChamber import RecordingChamber
from ...util.PromptUser import prompt
from ...util.geometry import Plane
from ...util.imaging.sequencer import run_image_sequence
CamModTemplate = Qt.importTemplate('.cameraModTemplate')
[docs]
class Pipette(Device, OptomechDevice):
"""Represents a pipette or electrode attached to a motorized manipulator.
This device provides a camera module interface for driving a motorized electrode holder:
* Visually direct pipette tip via camera module
* Automatically align pipette tip for diagonal approach to cells
* Automatically calibrate pipette tip position (via Tracker)
This device must be configured with a Stage as its parent.
The local coordinate system of the device is configured such that the X axis points in the direction of the pipette
tip, the Z axis points upward (same as global +Z), and the Y axis is the vector perpendicular to both X and Z.
Configuration options:
* **pitch** (float or 'auto', required): The angle of the pipette (in degrees) relative to the horizontal plane.
Positive values point downward. This option must be specified in the configuration.
If the value 'auto' is given, then the pitch is derived from the parent manipulator's X axis
(or other specified by parentAutoAxis) pitch.
* **yaw** (float or 'auto', required): The angle of the pipette (in degrees) relative to the global +X axis (points to the operator's right
when facing the microscope).
Positive values are clockwise from global +X. This option must be specified in the configuration.
If the value 'auto' is given, then the yaw is derived from the parent manipulator's X axis
(or other specified by parentAutoAxis) yaw.
* **parentAutoAxis** (str, optional): One of '+x' (default), '-x', '+y', '-y', '+z', or '-z' indicating the axis and direction in the
parent manipulator's coordinate system that points along the pipette and toward the tip. This axis
is used by the *pitch* and *yaw* options when they are set to 'auto'. If the pipette is not parallel
to one of these axes, then a numerical value must be provided for the pitch and/or yaw.
* **searchHeight** (float, optional): The distance to focus above the sample surface when searching for pipette tips. This
should be about 1-2mm, enough to avoid collisions between the pipette tip and the sample during search.
Default is 2 * mm.
* **searchTipHeight** (float, optional): The distance above the sample surface to bring the (putative) pipette tip position
when searching for new pipette tips. For low working-distance objectives, this should be about 0.5 mm less
than *searchHeight* to avoid collisions between the tip and the objective during search.
Default is 1.5 * mm.
* **approachHeight** (float, optional): The distance to bring the pipette tip above the sample surface when beginning
a diagonal approach. Default is 100 * um.
* **idleHeight** (float, optional): The distance to bring the pipette tip above the sample surface when in idle position.
Default is 1 * mm.
* **idleDistance** (float, optional): The x/y distance from the global origin from which the pipette top should be placed
in idle mode. Default is 7 * mm.
* **recordingChambers** (list, optional): List of names of RecordingChamber devices that this Pipette is meant to work with.
* **cleaningWell** (str, optional): Name of the well (RecordingChamber) device associated with
this pipette for cleaning.
* **reasonableTipOffsetDistance** (float, optional): When updating the tip offset, this is the maximum distance (in meters)
from the original tip offset that is considered reasonable. If the tip offset is outside this distance,
the user will be prompted to confirm the new offset. Default is 30 * um.
Standard OptomechDevice configuration options (see OptomechDevice base class):
* **parentDevice** (str, required): Name of parent Stage device (manipulator)
* **transform** (dict, optional): Spatial transform relative to parent device
Example configuration::
PatchPipette1:
driver: 'Pipette'
parentDevice: 'Manipulator1'
pitch: 15.0
yaw: 45.0
searchHeight: 2 * mm
approachHeight: 100 * um
idleHeight: 1 * mm
recordingChambers: ['Chamber1']
cleaningWell: 'CleaningWell1'
"""
sigTargetChanged = Qt.Signal(object, object)
sigCalibrationChanged = Qt.Signal(object)
# move start/finish are used for recording coarse movement information;
# they are not emitted for every transform change.
sigMoveStarted = Qt.Signal(object, object) # self, pos
sigMoveFinished = Qt.Signal(object, object) # self, pos
sigMoveRequested = Qt.Signal(object, object, object, object) # self, pos, speed, opts
# May add items here to implement custom motion planning for all pipettes
defaultMotionPlanners = defaultMotionPlanners()
pathGeneratorClass = PipettePathGenerator
def __init__(self, deviceManager, config, name):
Device.__init__(self, deviceManager, config, name)
OptomechDevice.__init__(self, deviceManager, config, name)
self.moving = False
self._scopeDev = None
self._imagingDev = None
self._boundaries = None
self._opts = {
'searchHeight': config.get('searchHeight', 2e-3),
'searchTipHeight': config.get('searchTipHeight', 1.5e-3),
'approachHeight': config.get('approachHeight', 100e-6),
'cleanApproachHeight': config.get('cleanApproachHeight', 1500e-6),
'idleHeight': config.get('idleHeight', 1e-3),
'idleDistance': config.get('idleDistance', 7e-3),
'showCameraModuleUI': config.get('showCameraModuleUI', False),
}
parent = self
while True:
parent = parent.parentDevice()
if isinstance(parent, Stage):
break
self.parentStage: Stage = parent
if not isinstance(self.parentStage, Stage):
raise Exception(
f"Pipette device requires some type of translation stage as its parentDevice (got {parent})."
)
parent.sigOrientationChanged.connect(self.clearSavedOffsets)
# may add items here to implement per-pipette custom motion planning
self.motionPlanners = {}
self.currentMotionPlanner = None
self.keepOnStepping = True
self.pathGenerator = self.pathGeneratorClass(self)
self._camInterfaces = weakref.WeakKeyDictionary()
self.target = None
cal = self.readConfigFile('calibration')
self.offset = np.array(cal.get('offset', [0, 0, 0]))
# kept for backward compatibility
self._calibratedPitch = cal.get('pitch', None)
self._calibratedYaw = cal.get(
'yaw', cal.get('angle', None)
) # backward support for old 'angle' config key
self._globalDirection = None
self._localDirection = None
# timer used to emit sigMoveFinished when no motion is detected for a certain period
self.moveTimer = Qt.QTimer()
self.moveTimer.timeout.connect(self.positionChangeFinished)
self.sigGlobalTransformChanged.connect(self.positionChanged)
# If parent orientation changes (probably due to being recalibrated), update pitch/yaw angles if needed.
parent.sigOrientationChanged.connect(self._directionChanged)
self._updateTransform()
self.tracker = ResnetPipetteTracker(self)
deviceManager.declareInterface(name, ['pipette'], self)
target = self.readConfigFile('target').get('targetGlobalPosition', None)
if target is not None:
self.setTarget(target)
deviceManager.sigAbortAll.connect(self.stop)
[docs]
def getGeometry(self, name=None):
if isinstance(self.config.get("geometry"), dict):
defaults = {'color': (0, 1, 0.2, 1)}
defaults.update(self.config["geometry"])
self.config["geometry"] = defaults
return super().getGeometry(name)
[docs]
def getBoundaries(self) -> List[Plane]:
if self._boundaries is None:
self._boundaries = []
for plane in self.parentStage.getBoundaries():
mapped_pt = self._solveMyGlobalPosition(plane.point)
mapped_vec = self._solveMyGlobalPosition(plane.point + plane.normal) - mapped_pt
new_name = self.name() + " " + plane.name.partition(" ")[2]
self._boundaries.append(Plane(mapped_vec, mapped_pt, new_name))
return self._boundaries
[docs]
def moveTo(self, position: str, speed, raiseErrors=False, **kwds):
"""Move the pipette tip to a named position, with safe motion planning.
If *raiseErrors* is True, then an exception will be raised in a background
thread if the move fails.
"""
# Select a motion planner based on the target position
plannerClass = self.motionPlanners.get(
position, self.defaultMotionPlanners.get(position, None)
)
if plannerClass is None:
savedPos = self.loadPosition(position)
if savedPos is not None:
plannerClass = self.motionPlanners.get(
'saved', self.defaultMotionPlanners.get('saved', None)
)
if plannerClass is None:
raise ValueError(f"Unknown pipette move position {position!r}")
if self.currentMotionPlanner is not None:
self.currentMotionPlanner.stop()
self.currentMotionPlanner = plannerClass(self, position, speed, **kwds)
future = self.currentMotionPlanner.move()
if raiseErrors is not False:
future.raiseErrors(
message=f"Move to {position} position failed ({{error}}); requested from:\n{{stack}}"
)
return future
[docs]
def savePosition(self, name, pos=None):
"""Store a position in global coordinates for later use.
If no position is provided, then the current position of the pipette tip is used.
"""
if pos is None:
pos = self.globalPosition()
manip_pos = self._solveGlobalStagePosition(pos)
manip: Stage = self.parentStage
manip.checkLimits(manip.mapGlobalToDevicePosition(manip_pos))
cache = self.readConfigFile('stored_positions')
cache[name] = list(pos)
self.writeConfigFile(cache, 'stored_positions')
self.checkRangeOfMotion(pos, name)
[docs]
def loadPosition(self, name, default=None):
"""Return a previously saved position."""
cache = self.readConfigFile('stored_positions')
return cache.get(name, default)
[docs]
def checkRangeOfMotion(self, pos, name, tolerance=500e-6):
"""Warn user if the position (in global coordinates) is within 500µm of the manipulator's range of motion."""
manipulator: Stage = self.parentStage
manipulator.checkRangeOfMotion(self._solveGlobalStagePosition(pos), name, tolerance)
[docs]
def scopeDevice(self):
if self._scopeDev is None:
imdev = self.imagingDevice()
self._scopeDev = imdev.scopeDev
return self._scopeDev
[docs]
def imagingDevice(self) -> Camera:
if self._imagingDev is None:
man = getManager()
name = self.config.get('imagingDevice', None)
if name is None:
cams = man.listInterfaces('camera')
if len(cams) == 1:
name = cams[0]
else:
raise Exception(
"Pipette requires either a single imaging device available (found %d) or 'imagingDevice' specified in its configuration."
% len(cams)
)
self._imagingDev = man.getDevice(name)
return self._imagingDev
[docs]
def stop(self):
self.keepOnStepping = False # thread safety? if a user starts a new stepwise movement simultaneous with stopping, they deserve to have to stop a second or even third time.
cmp = self.currentMotionPlanner
if cmp is not None:
cmp.stop()
[docs]
def deviceInterface(self, win):
"""Return a widget with a UI to put in the device rack"""
return PipetteDeviceGui(self, win)
[docs]
def cameraModuleInterface(self, mod):
iface = PipetteCamModInterface(self, mod, showUi=self._opts['showCameraModuleUI'])
self._camInterfaces[iface] = None
return iface
[docs]
def tipOffsetIsReasonable(self, pos) -> bool:
dist = np.linalg.norm(np.array(self.mapToGlobal((0, 0, 0))) - pos)
return dist < self.config.get("reasonableTipOffsetDistance", 30e-6)
[docs]
def newPipetteTipOffsetIsReasonable(self, pos) -> bool:
cal = self.readConfigFile('calibration')
if 'offset history' in cal and len(cal['offset history']) > 10:
avg = np.mean(cal['offset history'], axis=0)
sigma = np.std(np.linalg.norm(np.array(cal['offset history']) - avg, axis=1))
if np.linalg.norm(np.array(pos) - avg) > 3 * sigma:
return False
return True
[docs]
@future_wrap
def setTipOffsetIfAcceptable(self, pos, _future=None):
if self.tipOffsetIsReasonable(pos):
self.resetGlobalPosition(pos)
else:
dist = np.linalg.norm(np.array(self.mapToGlobal((0, 0, 0))) - pos)
dist = siFormat(dist, suffix='m', precision=3)
button_text = _future.waitFor(
prompt(
title="Pipette displacement detected",
text=f"The tip offset for {self.name()} is {dist} off from its initial value.",
extra_text="Do you want to use it, discard it or override all historic offsets?",
choices=["Use", "Discard", "Override"],
),
timeout=None,
).getResult()
if button_text == "Use":
self.recordTipOffsetInHistory(pos)
elif button_text == "Discard":
return False
elif button_text == "Override":
self.overrideTipOffsetHistory(pos)
else:
raise AssertionError("Unknown button clicked")
return True
[docs]
@future_wrap
def setNewPipetteTipOffsetIfAcceptable(self, pos, _future=None):
"""Returns whether the tip position was saved. Otherwise, the user requested a re-do."""
if self.newPipetteTipOffsetIsReasonable(pos):
self.recordTipOffsetInHistory(pos)
else:
button_text = _future.waitFor(
prompt(
title="Initial tip offset outlier",
text=f"The tip offset for {self.name()} is outside of its normal range.",
extra_text="Do you want to include this outlier, discard the value, override all historic "
"offsets, or only use this as a temporary offset?",
choices=["Include", "Discard", "Override", "Temporary"],
),
timeout=None,
).getResult()
if button_text == "Include":
self.recordTipOffsetInHistory(pos)
elif button_text == "Discard":
return False
elif button_text == "Override":
self.overrideTipOffsetHistory(pos)
elif button_text == "Temporary":
self.setTipOffset(pos)
else:
raise AssertionError("Unknown button clicked")
return True
[docs]
def recordTipOffsetInHistory(self, pos):
self.resetGlobalPosition(pos)
cal = self.readConfigFile('calibration')
cal['offset'] = list(self.offset)
cal.setdefault('offset history', []).append(cal['offset'])
cal['offset history'] = cal['offset history'][-20:]
self.writeConfigFile(cal, 'calibration')
[docs]
def overrideTipOffsetHistory(self, pos):
self.resetGlobalPosition(pos)
cal = self.readConfigFile('calibration')
cal['offset'] = list(self.offset)
cal['offset history'] = [cal['offset']]
self.writeConfigFile(cal, 'calibration')
[docs]
def clearSavedOffsets(self, parent):
"""Clear the saved offsets if the parent manipulator's axes are re-calibrated."""
cal = self.readConfigFile('calibration')
if 'offset history' in cal:
del cal['offset history']
self.writeConfigFile(cal, 'calibration')
[docs]
def setTipOffset(self, pos):
"""Given a global position, set the offset such that the pipette tip is located at that position."""
self.resetGlobalPosition(pos)
cal = self.readConfigFile('calibration')
cal['offset'] = list(self.offset)
self.writeConfigFile(cal, 'calibration')
[docs]
def averageHistoricOffset(self):
cal = self.readConfigFile('calibration')
if 'offset history' in cal and len(cal['offset history']) > 0:
return np.mean(cal['offset history'], axis=0)
else:
return self.offset
[docs]
@future_wrap
def saveManualTipPosition(self, stack=True, _future=None):
path = os.path.join(self.configPath(), "manual-calibrations")
path = self.dm.configFileName(path)
path = self.dm.dirHandle(path, create=True)
cam: Camera = self.imagingDevice()
with cam.ensureRunning():
img = _future.waitFor(cam.acquireFrames(n=1, ensureFreshFrames=True)).getResult()[0]
img.addInfo({"tip position": self.globalPosition()})
path.writeFile(
img.data(),
"manual-calibration.ma",
info=img.info(),
autoIncrement=True,
)
if stack:
depth = cam.getFocusDepth()
is_below_surface = depth <= self.scopeDevice().getSurfaceDepth()
scan_dist = np.random.randint(2, 40 if is_below_surface else 100) * 1e-6
step = scan_dist / 2
try:
seq_future = run_image_sequence(
cam, z_stack=(depth - scan_dist, depth + scan_dist, step), storage_dir=path
)
_future.waitFor(seq_future)
finally:
_future.waitFor(cam.setFocusDepth(depth))
fh = seq_future.imagesSavedIn
info = {
**fh.info(),
"tip position": self.globalPosition(),
}
fh.setInfo(info)
[docs]
def resetGlobalPosition(self, pos):
"""Set the device transform such that the pipette tip is located at the global position *pos*.
This method is for recalibration; it does not physically move the device.
"""
lpos = np.array(self.mapFromGlobal(pos))
self.setOffset(self.offset + lpos)
[docs]
def setOffset(self, offset):
self.offset = np.array(offset)
self._updateTransform()
self.sigCalibrationChanged.emit(self)
def _updateTransform(self):
# matrix mapping from local to parent
x = self.globalDirection()
x[2] = 0
x = x / np.linalg.norm(x)
z = np.array([0, 0, 1])
y = np.cross(x, z)
y = y / np.linalg.norm(y)
m = np.array(
[
[x[0], y[0], z[0], 0],
[x[1], y[1], z[1], 0],
[x[2], y[2], z[2], 0],
[0, 0, 0, 1],
]
)
tr = pg.Transform3D(m)
tr.translate(*self.offset)
self.setDeviceTransform(tr)
def _directionChanged(self):
"""Orientation has changed"""
self._globalDirection = None
self._localDirection = None
self._updateTransform()
[docs]
def saveCalibration(self):
cal = self.readConfigFile('calibration')
cal['offset'] = list(self.offset)
# kept for backward compatibility
if self._calibratedPitch is not None:
cal['pitch'] = self._calibratedPitch
if self._calibratedYaw is not None:
cal['yaw'] = self._calibratedYaw
self.writeConfigFile(cal, 'calibration')
[docs]
def yawAngle(self):
"""Return the yaw (azimuthal angle) of the electrode around the Z-axis in degrees.
Value is returned in degrees such that an angle of 0 indicate the tip points along the positive x axis,
and 90 points along the positive y axis.
"""
if 'yaw' not in self.config:
# for backward compatibility
if self._calibratedYaw is not None:
return self._calibratedYaw
raise ValueError(f"Yaw angle is not configured for {self.name()}")
if self.config['yaw'] == 'auto':
return self._manipulatorOrientation()['yaw']
else:
return self.config['yaw']
[docs]
def pitchAngle(self):
"""Return the pitch of the electrode in degrees (angle relative to horizontal plane).
For positive angles, the pipette tip points downward, toward -Z.
"""
if 'pitch' not in self.config:
# for backward compatibility
if self._calibratedPitch is not None:
return self._calibratedPitch
raise ValueError(f"Pitch angle is not configured for {self.name()}")
if self.config['pitch'] == 'auto':
return self._manipulatorOrientation()['pitch']
else:
return self.config['pitch']
def _manipulatorOrientation(self) -> dict:
axis = self.config.get('parentAutoAxis', '+x')
return self.parentStage.calculatedAxisOrientation(axis)
[docs]
def yawRadians(self):
return self.yawAngle() * np.pi / 180.0
[docs]
def pitchRadians(self):
return self.pitchAngle() * np.pi / 180.0
[docs]
def goHome(self, speed='fast', **kwds):
"""Extract pipette tip diagonally, then move to home position."""
return self.moveTo('home', speed=speed, **kwds)
[docs]
def goSearch(self, speed='fast', distance=0, **kwds):
return self.moveTo('search', speed=speed, distance=distance, **kwds)
[docs]
def goApproach(self, speed, **kwds):
"""Move the electrode tip such that it is 100um above the sample surface with its
axis aligned to the target.
"""
return self.moveTo('approach', speed=speed, **kwds)
[docs]
def goIdle(self, speed='fast', **kwds):
return self.moveTo('idle', speed=speed, **kwds)
[docs]
def goTarget(self, speed, **kwds):
return self.moveTo('target', speed=speed, **kwds)
[docs]
def goAboveTarget(self, speed, **kwds):
return self.moveTo('aboveTarget', speed=speed, **kwds)
def _movePath(self, path, name=None) -> MovePathFuture:
"""
move along a path defined in global coordinates.
Format is [(pos, speed, linear, explanation), ...]
returns the movefuture of the last move.
WARNING: This method does _not_ implement any motion planning.
"""
self.sigMoveRequested.emit(self, path[-1][0], None, {'path': path})
stagePath = []
for pos, speed, linear, explanation in path:
stagePos = self._solveGlobalStagePosition(pos)
stagePath.append(
{
'globalPos': stagePos,
'speed': speed,
'linear': linear,
'explanation': explanation,
}
)
stage = self.parentStage
return stage.movePath(stagePath, name=name)
[docs]
def approachDepth(self):
"""Return the global depth where the electrode should move to when starting approach mode.
This is defined as the sample surface + 100um.
"""
scope = self.scopeDevice()
surface = scope.getSurfaceDepth()
if surface is None:
raise ValueError("Surface depth has not been set.")
return surface + self._opts['approachHeight']
@property
def cleanApproachHeight(self):
return self._opts["cleanApproachHeight"]
[docs]
def depthBelowSurface(self):
"""Return the current depth of the pipette tip below the sample surface
(positive values are below the surface).
"""
scope = self.scopeDevice()
surface = scope.getSurfaceDepth()
return surface - self.globalPosition()[2]
[docs]
def globalDirection(self):
"""Return a global unit vector pointing in the direction of the pipette axis."""
if self._globalDirection is None:
pitch = self.pitchRadians()
yaw = self.yawRadians()
s = np.cos(pitch)
self._globalDirection = np.array([s * np.cos(yaw), s * np.sin(yaw), -np.sin(pitch)])
return self._globalDirection.copy()
[docs]
def localDirection(self):
"""Return a local unit vector pointing in the direction of the pipette axis."""
if self._localDirection is None:
pitch = self.pitchRadians()
self._localDirection = np.array([np.cos(pitch), 0, -np.sin(pitch)])
return self._localDirection.copy()
[docs]
def positionAtDepth(self, depth, start=None):
"""Return the global position at *depth* that lies along the axis of the pipette.
If *start* is given, then the pipette axis is assumed to go through this global position rather than
its current position.
"""
if start is None:
start = self.globalPosition()
axis = self.globalDirection()
dz = depth - start[2]
dist = dz / axis[2]
return start + dist * axis
[docs]
def advance(self, depth, speed, name=None):
"""Move the electrode along its axis until it reaches the specified
(global) depth.
"""
if name is None:
name = f"advance to depth {depth:0.2g}"
pos = self.positionAtDepth(depth)
return self._moveToGlobal(pos, speed, name=name)
[docs]
def retractFromSurface(self, speed='slow') -> Future:
"""Retract the pipette along its axis until it is above the slice surface."""
depth = self.globalPosition()[2]
appDepth = self.approachDepth()
if depth < appDepth:
return self.advance(appDepth, speed=speed)
return Future.immediate()
[docs]
@future_wrap
def stepwiseAdvance(
self,
depth: float | None = None,
target: np.ndarray | None = None,
speed: float = 10e-6,
interval: float = 5,
step: float = 1e-6,
name: str | None = None,
_future=None,
):
"""Retract/advance in small steps, allowing for manual user movements.
Parameters
----------
depth : float | None
The target depth (in global coordinates) to advance to.
target : np.ndarray | None
If specified, the pipette will advance toward this target position instead of the
specified depth. The target should be in global coordinates.
speed : float
The speed (in m/s) to use for the movement.
interval : float
The time (in seconds) to wait between steps.
step : float
The step size (in meters) to use for each advance.
"""
initial_direction = None
self.keepOnStepping = True
while self.keepOnStepping:
pos = self.globalPosition()
if target:
goal = target
else:
goal = self.positionAtDepth(depth)
direction = goal - pos
if initial_direction is None:
initial_direction = np.sign(direction[2])
if np.sign(direction[2]) != initial_direction:
break # overshot
distance = np.linalg.norm(direction)
next_pos = pos + step * direction / distance
_future.waitFor(self._moveToGlobal(next_pos, speed=speed, linear=True, name=name))
if distance <= step:
break
_future.sleep(interval)
[docs]
@future_wrap
def wiggle(
self, speed, radius, repetitions, duration, pipette_direction=None, extra=None, _future=None
):
if pipette_direction is None:
pipette_direction = self.globalDirection()
def random_wiggle_direction():
"""pick a random point on a circle perpendicular to the pipette axis"""
while (
np.linalg.norm(vec := np.cross(pipette_direction, np.random.uniform(-1, 1, size=3)))
== 0
):
pass # prevent division by zero
return vec / np.linalg.norm(vec)
pos = np.array(self.globalPosition())
prev_dir = random_wiggle_direction()
for step_i in range(repetitions):
with contextlib.ExitStack() as stack:
if extra is not None:
stack.enter_context(extra())
start = ptime.time()
while ptime.time() - start < duration:
while np.dot(direction := random_wiggle_direction(), prev_dir) > 0:
pass # ensure different direction from previous
_future.waitFor(self._moveToGlobal(pos=pos + radius * direction, speed=speed, name=f"wiggle step {step_i}"))
prev_dir = direction
_future.waitFor(self._moveToGlobal(pos=pos, speed=speed, name=f"wiggle return {step_i}"))
[docs]
def globalPosition(self):
"""Return the position of the electrode tip in global coordinates.
Note: the position in local coordinates is always [0, 0, 0].
"""
return self.mapToGlobal([0, 0, 0])
def _moveToGlobal(self, pos, speed, name=None, **kwds):
"""Move the electrode tip directly to the given position in global coordinates.
WARNING: This method does _not_ implement any motion planning.
"""
self.sigMoveRequested.emit(self, pos, speed, kwds)
stagePos = self._solveGlobalStagePosition(pos)
stage: Stage = self.parentStage
try:
adapter = self.dm.getOrLoadModule("Visualize3D").window().findAdapter(lambda a: a.device == self)
if adapter is not None:
adapter.setPath([self.globalPosition(), pos])
except Exception:
self.logger.exception("Error visualizing pipette move path")
try:
return stage.moveToGlobal(stagePos, speed, name=name, **kwds)
except Exception as exc:
exc.add_note(
f"Moving {self} to global position {pos!r} (name={name})"
)
raise exc
def _solveGlobalStagePosition(self, pos):
"""Return global stage position required in order to move pipette to a global position."""
dif = np.asarray(pos) - np.asarray(self.globalPosition())
stage = self.parentStage
spos = np.asarray(stage.globalPosition())
return spos + dif
def _solveMyGlobalPosition(self, pos):
"""Return the global position of the pipette tip when the stage is at the given global position."""
dif = np.asarray(pos) - np.asarray(self.parentStage.globalPosition())
return np.asarray(self.globalPosition()) + dif
def _moveToLocal(self, pos, speed, linear=False, **kwds):
"""Move the electrode tip directly to the given position in local coordinates.
WARNING: This method does _not_ implement any motion planning.
"""
return self._moveToGlobal(self.mapToGlobal(pos), speed, linear=linear, **kwds)
[docs]
def setTarget(self, target):
self.target = np.array(target)
self.writeConfigFile({'targetGlobalPosition': list(map(float, self.target))}, 'target')
self.sigTargetChanged.emit(self, self.target)
[docs]
def targetPosition(self):
if self.target is None:
raise RuntimeError(f"No target defined for {self.name()}")
return self.target
[docs]
def hideMarkers(self, hide):
for iface in self._camInterfaces.keys():
iface.hideMarkers(hide)
[docs]
def focusTip(self, speed='fast', raiseErrors=False):
pos = self.globalPosition()
future = self.imagingDevice().moveCenterToGlobal(
pos, speed=speed, name=f"focus on {self.name()}"
)
if raiseErrors:
future.raiseErrors("Focus on pipette tip failed ({error}); requested from:\n{stack})")
return future
[docs]
def focusTarget(self, speed='fast', raiseErrors=False):
pos = self.targetPosition()
future = self.imagingDevice().moveCenterToGlobal(
pos, speed=speed, name=f"focus on target for {self.name()}"
)
if raiseErrors:
future.raiseErrors(
"Focus on pipette target failed ({error}); requested from:\n{stack})"
)
return future
[docs]
def positionChanged(self):
self.moveTimer.start(500)
if self.moving is False:
self.moving = True
self.sigMoveStarted.emit(self, self.globalPosition())
[docs]
def positionChangeFinished(self):
self.moveTimer.stop()
self.moving = False
self.sigMoveFinished.emit(self, self.globalPosition())
[docs]
def getRecordingChambers(self) -> List[RecordingChamber]:
"""Return a list of RecordingChamber instances that are associated with this Pipette (see
'recordingChambers' config option).
"""
man = getManager()
return [man.getDevice(d) for d in self.config.get('recordingChambers', [])]
[docs]
def getCleaningWell(self) -> RecordingChamber | None:
"""Return the RecordingChamber instance that is associated with this Pipette for cleaning
(see 'cleaningWell' config option).
"""
name = self.config.get('cleaningWell', None)
if name is None:
return None
return self.dm.getDevice(name)
[docs]
def startRecording(self):
"""Return an object that records all motion updates from this pipette"""
return PipetteRecorder(self)
[docs]
@future_wrap
def iterativelyFindTip(self, max_reps=10, found_threshold=3e-6, delay_after_move=0.2,
max_allowed_offset=None, delay_after_update=0, reserve_devices=True,
go_to_tip_first=False, _future=None):
"""Iteratively refine the tip position by finding the tip in frame and focusing, until convergence.
Returns if convergence is reached (tip position changes less than *found_threshold* between iterations) or after *max_reps* iterations.
Otherwise, raises an exception.
Parameters
----------
max_reps : int
Maximum number of iterations to perform.
If exceeded before convergence, TimeoutError is raised.
found_threshold : float
Distance threshold (meters) for convergence.
delay_after_move : float
Time to wait (seconds) after each move before finding the tip again.
max_allowed_offset : float
Maximum allowed tip offset distance (meters).
If exceeded, ValueError is raised.
delay_after_update : float
Time to wait (seconds) after updating the tip position before the next iteration.
Default is 0; this is used to allow visual confirmation of the update before moving on.
reserve_devices : bool
If True, then devices will be reserved for the duration of this method.
go_to_tip_first : bool
If True, then the pipette will first move to the current tip position before starting the
iterative process.
"""
imgr = self.imagingDevice()
manager = getManager()
with contextlib.ExitStack() as stack:
if reserve_devices:
stack.enter_context(manager.reserveDevices([self] + imgr.devicesToReserve(), timeout=30.0))
try:
last_pos = None
start_pos = self.globalPosition()
if go_to_tip_first:
_future.waitFor(self.focusTip())
_future.sleep(delay_after_move)
for _ in range(max_reps):
pos = self.tracker.findTipInFrame()
_future.waitFor(self.setTipOffsetIfAcceptable(pos), timeout=None)
converged = last_pos is not None and np.linalg.norm(np.array(pos) - np.array(last_pos)) < found_threshold
_future.sleep(delay_after_update)
if converged:
diff = np.linalg.norm(np.array(pos) - np.array(start_pos))
if max_allowed_offset is not None and diff > max_allowed_offset:
raise ValueError(f"Tip offset exceeded maximum allowed distance: {diff} > {max_allowed_offset}")
return
last_pos = pos
_future.waitFor(self.focusTip())
_future.sleep(delay_after_move)
raise TimeoutError(f"Iterative tip finding did not converge after {max_reps} iterations.")
except Exception as exc:
# reset position to start if we fail, to avoid leaving the pipette in a bad position
self.setTipOffset(start_pos)
raise exc
[docs]
def findNewPipette(self):
from acq4.devices.Pipette.calibration import findNewPipette
future = findNewPipette(self, self.imagingDevice(), self.scopeDevice())
self._last_calibration_future = future # keep for easy debugging of calibration algorithm
return future
class PipetteRecorder:
def __init__(self, pip):
self.pip = pip
self.events = []
self.pip.sigGlobalTransformChanged.connect(self.recordPos, Qt.Qt.DirectConnection)
self.pip.sigMoveStarted.connect(self.recordMoveStarted, Qt.Qt.DirectConnection)
self.pip.sigMoveFinished.connect(self.recordMoveFinished, Qt.Qt.DirectConnection)
self.pip.sigMoveRequested.connect(self.recordMoveRequested, Qt.Qt.DirectConnection)
self.newEvent(
'init',
{
'position': tuple(self.pip.globalPosition()),
'direction': tuple(self.pip.globalDirection()),
},
)
def recordPos(self):
self.newEvent('position_change', {'position': tuple(self.pip.globalPosition())})
def recordMoveStarted(self, pip, pos):
self.newEvent('move_start', {'position': tuple(pos)})
def recordMoveFinished(self, pip, pos):
self.newEvent('move_stop', {'position': tuple(pos)})
def recordMoveRequested(self, pip, pos, speed, opts):
self.newEvent('move_request', {'position': tuple(pos), 'speed': speed, 'opts': opts})
def newEvent(self, eventType, eventData):
newEv = dict(
[
('device', self.pip.name()),
('event_time', ptime.time()),
('event', eventType),
]
)
if eventData is not None:
newEv.update(eventData)
self.events.append(newEv)
def stop(self):
self.pip.sigGlobalTransformChanged.disconnect(self.recordPos)
self.pip.sigMoveStarted.disconnect(self.recordMoveStarted)
self.pip.sigMoveFinished.disconnect(self.recordMoveFinished)
self.pip.sigMoveRequested.disconnect(self.recordMoveRequested)
def store(self, filename):
json.dump(self.events, open(f'{filename}.json', 'w'))
class PipetteCamModInterface(CameraModuleInterface):
"""**DEPRECATED** use MultiPatch module instead
This is only used for displaying targets.
Implements user interface for Pipette.
"""
canImage = False
def __init__(self, dev: Pipette, win, showUi=True):
CameraModuleInterface.__init__(self, dev, win)
self._haveTarget = False
self._showUi = showUi
self.ui = CamModTemplate()
self.ctrl = Qt.QWidget()
self.ui.setupUi(self.ctrl)
self.calibrateAxis = Axis([0, 0], 0, inverty=False)
self.calibrateAxis.setZValue(5000)
win.addItem(self.calibrateAxis)
self.calibrateAxis.setVisible(False)
self.centerArrow = pg.ArrowItem()
self.centerArrow.setZValue(5000)
win.addItem(self.centerArrow)
self.target = Target()
self.target.setZValue(5000)
win.addItem(self.target)
self.target.setVisible(False)
# decide how / whether to add a label for the target
basename = dev.name().rstrip('0123456789')
self.pipetteNumber = dev.name()[len(basename) :]
showLabel = False
if basename != dev.name():
# If this device looks like "Name00" and another device has the same
# prefix, then we will label all targets with their device numbers.
for devname in getManager().listDevices():
if devname.startswith(basename):
showLabel = True
break
if showLabel:
self._updateTargetLabel()
self.depthTarget = Target(movable=False)
win.getDepthView().addItem(self.depthTarget)
self.depthTarget.setVisible(False)
self.depthArrow = pg.ArrowItem(angle=180 - dev.yawAngle())
win.getDepthView().addItem(self.depthArrow)
# self.ui.setOrientationBtn.toggled.connect(self.setOrientationToggled)
self.ui.setOrientationBtn.setEnabled(False)
win.getView().scene().sigMouseClicked.connect(self.sceneMouseClicked)
dev.sigGlobalTransformChanged.connect(self.transformChanged)
dev.imagingDevice().sigGlobalTransformChanged.connect(self.focusChanged)
dev.sigTargetChanged.connect(self.targetChanged)
self.calibrateAxis.sigRegionChangeFinished.connect(self.calibrateAxisChanged)
self.calibrateAxis.sigRegionChanged.connect(self.calibrateAxisChanging)
self.ui.homeBtn.clicked.connect(self.homeClicked)
self.ui.searchBtn.clicked.connect(self.searchClicked)
self.ui.idleBtn.clicked.connect(self.idleClicked)
self.ui.setTargetBtn.toggled.connect(self.setTargetToggled)
self.ui.targetBtn.clicked.connect(self.targetClicked)
self.ui.approachBtn.clicked.connect(self.approachClicked)
self.ui.autoCalibrateBtn.clicked.connect(self.autoCalibrateClicked)
self.ui.getRefBtn.clicked.connect(self.getRefFramesClicked)
self.ui.aboveTargetBtn.clicked.connect(self.aboveTargetClicked)
self.target.sigPositionChangeFinished.connect(self.targetDragged)
self.transformChanged()
try:
targetPos = dev.targetPosition()
except RuntimeError:
pass # no target defined
else:
self.targetChanged(dev, targetPos)
self.updateCalibrateAxis()
def setOrientationToggled(self):
self.updateCalibrateAxis()
self.calibrateAxis.setVisible(self.ui.setOrientationBtn.isChecked())
def selectedSpeed(self):
return 'fast' if self.ui.fastRadio.isChecked() else 'slow'
def hideMarkers(self, hide):
self.centerArrow.setVisible(not hide)
self.target.setVisible(not hide and self._haveTarget)
def sceneMouseClicked(self, ev):
if ev.button() != Qt.Qt.LeftButton:
return
if self.ui.setCenterBtn.isChecked():
self.ui.setCenterBtn.setChecked(False)
pos = self.win().getView().mapSceneToView(ev.scenePos())
self.calibrateAxis.setPos(pos)
elif self.ui.setTargetBtn.isChecked():
pos = self.win().getView().mapSceneToView(ev.scenePos())
z = self.getDevice().imagingDevice().getFocusDepth()
self.setTargetPos(pos, z)
self.target.setFocusDepth(z)
def setTargetPos(self, pos, z):
self.dev().setTarget((pos.x(), pos.y(), z))
def targetChanged(self, dev, pos):
self.target.setPos(pg.Point(pos[:2]))
self.target.setDepth(pos[2])
self.depthTarget.setPos(Point(0, pos[2]))
self.target.setVisible(True)
self._haveTarget = True
self.depthTarget.setVisible(True)
self.ui.targetBtn.setEnabled(True)
self.ui.approachBtn.setEnabled(True)
self.ui.setTargetBtn.setChecked(False)
self.focusChanged()
def targetDragged(self):
z = self.getDevice().imagingDevice().getFocusDepth()
self.setTargetPos(self.target.pos(), z)
self.target.setFocusDepth(z)
def transformChanged(self):
# manipulator's global transform has changed; update the center arrow and orientation axis
pos, angle = self.analyzeTransform()
self.centerArrow.setPos(pos[0], pos[1])
self.centerArrow.setStyle(angle=180 - angle)
# self.depthLine.setValue(pos[2])
self.depthArrow.setPos(0, pos[2])
if self.target.label() is not None:
self._updateTargetLabel()
def _updateTargetLabel(self):
num = self.pipetteNumber
dev = self.getDevice()
angle = dev.yawAngle() + 180
offset = 16 * np.cos(angle * np.pi / 180), 16 * np.sin(angle * np.pi / 180)
self.target.setLabel(num, {'offset': offset, 'anchor': (0.5, 0.5)})
def analyzeTransform(self):
"""Return the position and yaw angle of the device transform"""
dev = self.getDevice()
pos = dev.mapToGlobal([0, 0, 0])
x = dev.mapToGlobal([1, 0, 0])
p1 = pg.Point(x[:2])
p2 = pg.Point(pos[:2])
p3 = pg.Point(1, 0)
angle = (p1 - p2).angle(p3)
if angle is None:
angle = 0
return pos, angle
def updateCalibrateAxis(self):
pos, angle = self.analyzeTransform()
with pg.SignalBlock(self.calibrateAxis.sigRegionChangeFinished, self.calibrateAxisChanged):
self.calibrateAxis.setPos(pos[:2])
self.calibrateAxis.setAngle(angle)
def focusChanged(self):
try:
tdepth = self.dev().targetPosition()[2]
except RuntimeError:
return
fdepth = self.dev().imagingDevice().getFocusDepth()
self.target.setFocusDepth(fdepth)
def calibrateAxisChanging(self):
pos = self.calibrateAxis.pos()
angle = self.calibrateAxis.angle()
self.centerArrow.setPos(pos[0], pos[1])
self.centerArrow.setStyle(angle=180 - angle)
def calibrateAxisChanged(self):
pos = self.calibrateAxis.pos()
angle = self.calibrateAxis.angle()
size = self.calibrateAxis.size()
dev = self.getDevice()
z = dev.imagingDevice().getFocusDepth()
# first orient the parent stage
dev.setCalibratedOrientation(yaw=angle)
# next set our position offset
pos = [pos.x(), pos.y(), z]
dev.resetGlobalPosition(pos)
def controlWidget(self):
if self._showUi:
return self.ctrl
else:
return None
def boundingRect(self):
return None
def quit(self):
for item in self.calibrateAxis, self.centerArrow, self.depthArrow:
scene = item.scene()
if scene is not None:
scene.removeItem(item)
def homeClicked(self):
self.getDevice().goHome(self.selectedSpeed())
def searchClicked(self):
self.getDevice().goSearch(self.selectedSpeed())
def idleClicked(self):
self.getDevice().goIdle(self.selectedSpeed())
def setTargetToggled(self, b):
if b:
self.ui.setCenterBtn.setChecked(False)
def setCenterToggled(self, b):
if b:
self.ui.setTargetBtn.setChecked(False)
def targetClicked(self):
self.getDevice().goTarget(self.selectedSpeed())
def approachClicked(self):
self.getDevice().goApproach(self.selectedSpeed())
def autoCalibrateClicked(self):
pip = self.getDevice()
pos = pip.tracker.findTipInFrame()
tip_future = pip.setTipOffsetIfAcceptable(pos)
tip_future.onFinish(self._handleTipPositionSet)
def _handleTipPositionSet(self, future):
success = future.getResult()
if not success:
return self.autoCalibrateClicked()
def getRefFramesClicked(self):
dev = self.getDevice()
zrange = dev.config.get('referenceZRange', None)
zstep = dev.config.get('referenceZStep', None)
dev.tracker.takeReferenceFrames(zRange=zrange, zStep=zstep)
def aboveTargetClicked(self):
self.getDevice().goAboveTarget(self.selectedSpeed())
class Axis(pg.ROI):
"""Used for calibrating pipette position and orientation."""
def __init__(self, pos, angle, inverty):
arrow = pg.makeArrowPath(headLen=20, tipAngle=30, tailLen=60, tailWidth=2).translated(
-84, 0
)
tr = Qt.QTransform()
tr.rotate(180)
self._path = tr.map(arrow)
tr.rotate(90)
self._path |= tr.map(arrow)
self.pxLen = [1, 1]
self._bounds = None
pg.ROI.__init__(self, pos, angle=angle, invertible=True, movable=False)
if inverty:
self.setSize([1, -1])
else:
self.setSize([1, 1])
self.addRotateHandle([1, 0], [0, 0])
self.addScaleHandle([0, 1], [0, 0])
self.addTranslateHandle([0, 0])
self.viewTransformChanged()
self.x = pg.TextItem('X', anchor=(0.5, 0.5))
self.x.setParentItem(self)
self.y = pg.TextItem('Y', anchor=(0.5, 0.5))
self.y.setParentItem(self)
self.sigRegionChanged.connect(self.viewTransformChanged)
def viewTransformChanged(self):
if not self.isVisible():
return
w = self.pixelLength(pg.Point(1, 0))
if w is None:
self._pxLen = [None, None]
return
h = self.pixelLength(pg.Point(0, 1))
if self.size()[1] < 0:
h = -h
self._pxLen = [w, h]
self.blockSignals(True)
try:
self.setSize([w * 50, h * 50])
finally:
self.blockSignals(False)
self.updateText()
self._bounds = None
self.prepareGeometryChange()
def updateText(self):
w, h = self._pxLen
if w is None:
return
self.x.setPos(w * 100, 0)
self.y.setPos(0, h * 100)
def boundingRect(self):
if self._bounds is None:
w, h = self._pxLen
if w is None:
return Qt.QRectF()
w = w * 100
h = abs(h * 100)
self._bounds = Qt.QRectF(-w, -h, w * 2, h * 2)
return self._bounds
def setVisible(self, v):
pg.ROI.setVisible(self, v)
if v is True:
self.viewTransformChanged()
def paint(self, p, *args):
p.setRenderHint(p.Antialiasing)
w, h = self._pxLen
p.setPen(pg.mkPen('y'))
p.setBrush(pg.mkBrush(255, 255, 0, 100))
p.scale(w, h)
p.drawPath(self._path)
def setAngle(self, angle, update=True, **kwds):
if self.state['angle'] == angle:
return
pg.ROI.setAngle(self, angle, update=update, **kwds)
class PipetteDeviceGui(Qt.QWidget):
def __init__(self, dev, win):
Qt.QWidget.__init__(self)
self.win = win
self.dev = dev
self.layout = Qt.QGridLayout()
self.setLayout(self.layout)
self.posLabelLayout = Qt.QHBoxLayout()
self.layout.addLayout(self.posLabelLayout, 0, 0)
self.posLabels = [Qt.QLabel(), Qt.QLabel(), Qt.QLabel()]
for l in self.posLabels:
self.posLabelLayout.addWidget(l)
self.dev.sigGlobalTransformChanged.connect(self.pipetteMoved)
self.pipetteMoved()
def pipetteMoved(self):
pos = self.dev.globalPosition()
for i in range(3):
self.posLabels[i].setText("%0.3f mm" % (pos[i] * 1e3))