Source code for acq4.devices.OdorDelivery

from collections import namedtuple

import math
import numpy as np
import threading
from datetime import datetime
from time import sleep
from typing import Union, List, Dict, Tuple

from pyqtgraph import PlotWidget, intColor, mkPen
from pyqtgraph.parametertree import ParameterTree
from pyqtgraph.parametertree.parameterTypes import GroupParameter, ListParameter
from .Device import Device, TaskGui, DeviceTask
from ..util import Qt
from ..util.future import Future
from ..util.generator.StimParamSet import SeqParameter


[docs] class OdorDelivery(Device): """ Base class for odor delivery systems. Provides common functionality for controlling valve-based odor delivery systems used in olfactory research and behavioral experiments. Configuration options: * **odors** (dict): Odor channel and port definitions - Key: Channel group name (arbitrary string) - Value: Channel configuration dict: - channel: Hardware channel number (int) - ports: Dict mapping port numbers to odor names - Key: Port number (int) - Value: Odor name (str) Config should include a section describing all the possible odors in the following format:: odors: first channel name: channel: 0 ports: 2: 'first odor name' 4: 'second odor name' ... second channel name: channel: 1 ports: 2: 'first odor name' 4: 'second odor name' ... ... """ odors: "dict[str, dict[str, Union[int, dict[int, str]]]]" def __init__(self, deviceManager, config: dict, name: str): super().__init__(deviceManager, config, name) self.odors = { group: { "channel": int(group_config["channel"]), "ports": {int(port): name for port, name in group_config["ports"].items()}, } for group, group_config in config.get("odors", {}).items() }
[docs] def odorChannels(self) -> List[int]: return sorted([gr["channel"] for gr in self.odors.values()])
[docs] def odorDetails(self, odor: Tuple[int, int]) -> str: for chan_name in self.odors: if self.odors[chan_name]["channel"] == odor[0]: port_name = self.odors[chan_name]["ports"][odor[1]] return f"{chan_name}: {port_name}" raise ValueError(f"Invalid odor specification: {odor}")
[docs] def odorsAsParameterLimits(self) -> Dict[str, Tuple[int, int]]: return { f"{chanName}[{port}]: {name}": (chanOpts["channel"], port) for chanName, chanOpts in self.odors.items() for port, name in chanOpts["ports"].items() }
[docs] def channelName(self, channel: int) -> str: for chan_name in self.odors: if self.odors[chan_name]["channel"] == channel: return chan_name raise ValueError(f"Invalid channel: {channel}")
[docs] def setChannelValue(self, channel: int, value: int) -> None: """Turn a given odor channel value""" raise NotImplementedError()
[docs] def setAllChannelsOff(self) -> None: """Turn off all odors. (Reimplement if that should be handled other than by iterating)""" for ch in self.odorChannels(): self.setChannelValue(ch, 0)
[docs] def deviceInterface(self, win): return OdorDevGui(self)
[docs] def taskInterface(self, task): return OdorTaskGui(self, task)
[docs] def createTask(self, cmd, parentTask): """cmd will be coming in from the TaskGui.generateTask with whatever data I want it to have""" return OdorTask(self, cmd, parentTask)
class OdorDevGui(Qt.QWidget): """ Take the {group_name: {channel: odor_name, ...}, ...} odors and make a ui that: * lets user select which group is in right now * lets user turn on/off odors * lets user set the intensity value """ OFF_LABEL = "OFF" def __init__(self, dev: OdorDelivery): super().__init__() # TODO configuration option for being able to activate multiple channels at the same time self.dev = dev self.layout = Qt.FlowLayout() self.setLayout(self.layout) self._buttonGroups = {} self._groupButtons = {} self._controlButtons = {} self._setupOdorButtons() def _setupOdorButtons(self): first = True for group_name, group_config in self.dev.odors.items(): channel = group_config["channel"] group_box = Qt.QGroupBox(f"{channel}: {group_name}") self._groupButtons[group_name] = group_box group_box.setObjectName(group_name) group_box.toggled.connect(self._handleChannelButtonPush) group_box.setCheckable(True) group_layout = Qt.FlowLayout() group_box.setLayout(group_layout) self.layout.addWidget(group_box) button_group = Qt.QButtonGroup() self._buttonGroups[group_name] = button_group def _add_button(btn): group_layout.addWidget(btn) button_group.addButton(btn) btn.clicked.connect(self._handleOdorButtonPush) if 1 not in group_config["ports"]: control_button = Qt.QRadioButton(f"{channel}[1]: Control") control_button.setObjectName(f"{channel}:1") control_button.setChecked(True) _add_button(control_button) self._controlButtons[group_name] = control_button for port, odor in group_config["ports"].items(): if port == 0: # Off is handled by group_box continue button = Qt.QRadioButton(f"{channel}[{port}]: {odor}") button.setObjectName(f"{channel}:{port}") _add_button(button) if port == 1: self._controlButtons[group_name] = button button.setChecked(True) group_box.setChecked(False) # needed to guarantee toggle signal group_box.setChecked(first) first = False def _handleChannelButtonPush(self, enabled): btn = self.sender() group_name = btn.objectName() channel = self.dev.odors[group_name]["channel"] self.dev.setChannelValue(channel, 1 if enabled else 0) if enabled: for button in self._buttonGroups[group_name].buttons(): if button.isChecked(): channel, port = map(int, button.objectName().split(":")) if port != 1: self.dev.setChannelValue(channel, port) for group in self._groupButtons: if group != group_name: self._groupButtons[group].setChecked(False) else: self._controlButtons[group_name].setChecked(True) def _handleOdorButtonPush(self): btn = self.sender() channel, port = map(int, btn.objectName().split(":")) self.dev.setChannelValue(channel, port) class _ListSeqParameter(ListParameter): def __init__(self, **kwargs): kwargs["expanded"] = kwargs.get("expanded", False) super().__init__(**kwargs) initialParams = [p.name() for p in self] sequence_names = ["off", "select"] newParams = [ {"name": "sequence", "type": "list", "value": "off", "limits": sequence_names}, {"name": "select", "type": "checklist", "visible": False, "limits": kwargs["limits"], "exclusive": False}, {"name": "randomize", "type": "bool", "value": False, "visible": False}, ] self.visibleParams = { # list of params to display in each mode "off": initialParams + ["sequence"], "select": initialParams + ["sequence", "select", "randomize"], } if "group_by" in kwargs: for name, fn in kwargs["group_by"].items(): grouping = {} for n, v in kwargs["limits"].items(): grouping.setdefault(fn(n, v), []).append(v) sequence_names.append(name) newParams.append({"name": name, "type": "list", "visible": False, "limits": grouping}) self.visibleParams[name] = initialParams + ["sequence", name, "randomize"] for ch in newParams: self.addChild(ch) def compile(self): name = f"{self.parent().name()}_{self.name()}" mode = self["sequence"] if mode == "select": seq = self["select"] elif mode == "off": seq = [] else: # arbitrarily-named groupings seq = self[mode] if self["randomize"]: np.random.shuffle(seq) return name, seq def setState(self, state): for k in state: self[k] = state[k] self.param(k).setDefault(state[k]) def getState(self): state = {} for ch in self: if not ch.opts["visible"]: continue name = ch.name() val = ch.value() if val is False: continue state[name] = val state["value"] = self.value() return state def treeStateChanged(self, param, changes): # catch changes to 'sequence' so we can hide/show other params. # Note: it would be easier to just catch self.sequence.sigValueChanged, # but this approach allows us to block tree change events, so they are all # released as a single update. with self.treeChangeBlocker(): # queue up change ListParameter.treeStateChanged(self, param, changes) # if needed, add some more changes before releasing the signal for param, change, data in changes: # if the sequence value changes, hide/show other parameters if param is self.param("sequence") and change == "value": vis = self.visibleParams[self["sequence"]] for ch in self: if ch.name() in vis: ch.show() else: ch.hide() class OdorEventParameter(GroupParameter): def varName(self): return self.name().replace(' ', '_') class OdorTaskGui(TaskGui): _events: List[OdorEventParameter] def __init__(self, dev: OdorDelivery, taskRunner): super().__init__(dev, taskRunner) self._events = [] self._next_event_number = 0 self.taskRunner.sigTaskChanged.connect(self._redrawPlot) layout = Qt.QHBoxLayout() self.setLayout(layout) splitter = Qt.QSplitter() splitter.setOrientation(Qt.Qt.Horizontal) splitter.setContentsMargins(0, 0, 0, 0) layout.addWidget(splitter) self._params = GroupParameter(name="Odor Events", addText="Add Odor Event") self._params.sigTreeStateChanged.connect(self._redrawPlot) self._params.sigAddNew.connect(self._addNewOdorEvent) ptree = ParameterTree() ptree.addParameters(self._params) splitter.addWidget(ptree) self._plot = PlotWidget() self._plot.setObjectName("OdorDelivery_taskPlot") splitter.addWidget(self._plot) # TODO validate if the events will go longer than the total task runner def _addNewOdorEvent(self) -> OdorEventParameter: # ignore signal args: self, typ ev = OdorEventParameter(name=f"Event {self._next_event_number}", removable=True) self._next_event_number += 1 ev.addChildren( [ SeqParameter(name="Start Time", type="float", limits=(0, None), units="s", siPrefix=True), SeqParameter(name="Duration", type="float", limits=(0, None), units="s", siPrefix=True, value=0.1), _ListSeqParameter( name="Odor", type="list", limits=self.dev.odorsAsParameterLimits(), group_by={"channel": lambda name, address: self.dev.channelName(address[0])}, ), ] ) self._params.addChild(ev) self._events.append(ev) ev.sigRemoved.connect(self._handleEventRemoval) self._redrawPlot() return ev def _handleEventRemoval(self, event): self._events = [ev for ev in self._events if ev != event] self.sigSequenceChanged.emit(self.dev.name()) def _redrawPlot(self): self._plot.clear() chan_names = {conf["channel"]: chan for chan, conf in self.dev.odors.items()} self._plot.addLegend() if self._events: chans_in_use = {ev["Odor"][0] for ev in self._events if ev["Odor"]} def get_precision(a): if a == 0: return 0 return int(math.log10(float(str(a)[::-1]))) + 1 precision = max(get_precision(ev["Duration"]) for ev in self._events) precision = max([precision, max(get_precision(ev["Start Time"]) for ev in self._events)]) MIN_PRECISION = 3 MAX_PRECISION = 10 precision = max([MIN_PRECISION, min([MAX_PRECISION, precision])]) task_duration = self.taskRunner.getParam("duration") point_count = int(task_duration * (10 ** precision)) + 1 arrays = { chan: (np.ones(point_count, dtype=int) if chan in chans_in_use else np.zeros(point_count, dtype=int)) for chan in chan_names } for ev in self._events: start = int(ev["Start Time"] * (10 ** precision)) length = int(ev["Duration"] * (10 ** precision)) if ev["Odor"]: chan, val = ev["Odor"] end = min((start + length, point_count)) arrays[chan][start:end] &= 0xFE # turn off control (1) for the duration arrays[chan][start:end] |= val time_vals = np.linspace(0, task_duration, point_count) for chan, arr in arrays.items(): self._plot.plot(time_vals, arr, name=chan_names[chan], pen=mkPen(color=intColor(chan, max(arrays) + 1))) self.sigSequenceChanged.emit(self.dev.name()) def saveState(self): return [ev.saveState(filter="user") for ev in self._events] def restoreState(self, state): for eventState in state: ev = self._addNewOdorEvent() ev.restoreState(eventState) self._redrawPlot() def generateTask(self, params=None): if params is None: params = {} paramSpace = self.listSequence() params = {k: paramSpace[k][v] for k, v in params.items()} for ev in self._events: params.setdefault(f"{ev.name()} Start Time", ev["Start Time"]) params.setdefault(f"{ev.name()} Duration", ev["Duration"]) params.setdefault(f"{ev.name()} Odor", ev["Odor"]) return params def listSequence(self): params = {} for ev in self._events: if starts := ev.param("Start Time").compile()[1]: params[f"{ev.name()} Start Time"] = starts if durs := ev.param("Duration").compile()[1]: params[f"{ev.name()} Duration"] = durs if odors := ev.param("Odor").compile()[1]: params[f"{ev.name()} Odor"] = odors return params OdorEvent = namedtuple("OdorEvent", ["startTime", "duration", "odor"]) class OdorTask(DeviceTask): def __init__(self, dev: OdorDelivery, cmd: dict, parentTask): """ cmd: dict Structure: {"Event 0 Start Time": start_in_s, "Event 0 Duration": dur_in_s, "Event 0 Odor": (chan, port)} """ super().__init__(dev, cmd, parentTask) self._cmd = cmd self._events: List[OdorEvent] = [] i = 0 while i < len(cmd) / 3: self._events.append( OdorEvent(cmd[f"Event {i} Start Time"], cmd[f"Event {i} Duration"], cmd[f"Event {i} Odor"]) ) i += 1 self._future = None self._result = None def configure(self): first_chan = self._events[0].odor[0] for chan in self.dev.odorChannels(): self.dev.setChannelValue(chan, 1 if chan == first_chan else 0) def isDone(self): return self._future is not None and self._future.isDone() def getResult(self): cmd = self._cmd.copy() i = 0 while odor := cmd.get(f"Event {i} Odor"): cmd[f"Event {i} Odor Details"] = self.dev.odorDetails(odor) i += 1 return [cmd] def start(self): self._future = OdorFuture(self.dev, self._events) def stop(self, **kwargs): if self._future is not None: self._future.stop(reason=kwargs.get("reason")) class OdorFuture(Future): def __init__(self, dev, schedule: List[OdorEvent]): super().__init__(name=f"{dev.name()}_odor_future") self._dev = dev self._schedule = schedule self._duration = max(ev.startTime + ev.duration for ev in schedule) self._time_elapsed = 0 self._thread = threading.Thread(target=self._executeSchedule, name=f"{dev.name()}_odor_schedule") self._thread.start() def percentDone(self): if self.isDone(): return 100 return 100 * self._time_elapsed / self._duration def _executeSchedule(self): # TODO this spec is duplicated in the graphing code start = datetime.now() chan_values = {ev.odor[0]: 0 for ev in self._schedule} while True: sleep(0.01) now = (datetime.now() - start).total_seconds() self._time_elapsed = now if now > self._duration: # all done for chan in chan_values: self._dev.setChannelValue(chan, 1) break for event in self._schedule: chan, port = event.odor action_needed = False end_time = event.startTime + event.duration if now >= end_time: # turn off this port after time is up if chan_values[chan] & port > 0: chan_values[chan] ^= port if chan_values[chan] == 0: chan_values[chan] = 1 # ensure at least control is left on action_needed = True elif now >= event.startTime: # time to turn on this port if chan_values[chan] & port == 0: chan_values[chan] &= 0xFE # Turn off control while other ports are on chan_values[chan] |= port action_needed = True if action_needed: self._dev.setChannelValue(chan, chan_values[chan]) self._isDone = True