Source code for acq4.devices.PressureControl.device

import time
from typing import Optional

from acq4.util import Qt, ptime
from .widgets import PressureControlWidget
from ..Device import Device
from ...util.future import Future, future_wrap


[docs] class PressureControl(Device): """A device for controlling pressure to a single port. Pressure control may be implemented by a combination of a pressure regulator and multiple valves. The configuration for these devices should look like:: maximum: 50*kPa minimum: -50*kPa regulatorSettlingTime: 0.3 """ sigBusyChanged = Qt.Signal(object, object) # self, busyOrNot sigPressureChanged = Qt.Signal(object, object, object) # self, source, pressure def __init__(self, manager, config, name): Device.__init__(self, manager, config, name) self.maximum = config.get('maximum', 5e4) self.minimum = config.get('minimum', -5e4) self.pressure = None self.regulatorSettlingTime = config.get('regulatorSettlingTime', 0.3) self.source = None self.sources = ("regulator", "user", "atmosphere")
[docs] @future_wrap def rampPressure( self, target: Optional[float] = None, target_tolerance: float = 10, maximum: Optional[float] = None, minimum: Optional[float] = None, rate: Optional[float] = None, duration: Optional[float] = None, _future: Optional[Future] = None, ) -> None: if target is None and maximum is None and minimum is None: raise ValueError("Must specify at least one of target, maximum, or minimum") if target is not None and (maximum is not None or minimum is not None): raise ValueError("Cannot specify both target and maximum/minimum") if rate is not None and duration is not None: raise ValueError("Cannot specify both rate and duration") if target is not None: minimum = target - target_tolerance maximum = target + target_tolerance start_pressure = end_pressure = self.getPressure() if minimum is not None: end_pressure = max(minimum, end_pressure) if maximum is not None: end_pressure = min(maximum, end_pressure) if duration is None: if rate is None: duration = self.regulatorSettlingTime else: duration = abs(end_pressure - start_pressure) / abs(rate) start_time = ptime.time() frac_done = 0 while frac_done < 1: frac_done = min((ptime.time() - start_time) / duration, 1) self.setPressure("regulator", start_pressure + frac_done * (end_pressure - start_pressure)) _future.sleep(self.regulatorSettlingTime)
[docs] def setPressure(self, source=None, pressure=None): """Set the output pressure (float; in Pa) and/or pressure source (str). """ if source is not None and source not in self.sources: raise ValueError(f'Pressure source "{source}" is not valid; available sources are: {self.sources}') # order of operations depends on the requested source if source is not None and source != 'regulator': self._setSource(source) self.source = source if pressure is not None: self._setPressure(pressure) self.pressure = pressure if source == 'regulator': if pressure is not None: time.sleep(self.regulatorSettlingTime) # let pressure settle before switching valves self._setSource(source) self.source = source self.sigPressureChanged.emit(self, self.source, self.pressure)
def _setPressure(self, p): """Set the regulated output pressure (in Pascals). """ raise NotImplementedError()
[docs] def getPressure(self): raise NotImplementedError()
[docs] def setSource(self, source): self.setPressure(source=source)
def _setSource(self, source): """Configure valves for the specified pressure source: "atmosphere", "user", or "regulator" """ raise NotImplementedError()
[docs] def getSource(self): raise NotImplementedError()
[docs] def getBusyStatus(self): """Override this and emit sigBusyChanged appropriately if your subclass implements a busy state.""" return False
[docs] def deviceInterface(self, win): return PressureControlWidget(dev=self)