Source code for acq4.Manager

import argparse
import atexit
import contextlib
import gc
import getpass
import json
import logging
import os
import socket
import sys
import threading
import time
import weakref
from collections import OrderedDict
from datetime import datetime

from MetaArray import MetaArray

import pyqtgraph as pg
import pyqtgraph.reload as reload
from pyqtgraph import configfile
from pyqtgraph.debug import Profiler
from pyqtgraph.util.mutex import Mutex
from . import __version__
from . import devices, modules
from .Interfaces import InterfaceDirectory
from .devices.Device import Device, DeviceTask
from .logging_config import get_logger, setup_logging, HistoricLogRecord
from .util import DataManager, ptime, Qt
from .util.DataManager import DirHandle
from .util.HelpfulException import HelpfulException
from .util.LogWindow import get_log_window, get_error_dialog

TEMP_LOG = "temp_log.json"
setup_logging(TEMP_LOG, gui=False, console_level=logging.DEBUG)
logger = get_logger()


def __reload__(old):
    Manager.CREATED = old['Manager'].CREATED
    Manager.single = old['Manager'].single


[docs] class Manager(Qt.QObject): """Manager class is responsible for: - Loading/configuring device modules and storing their handles - Managing the device rack GUI - Creating acquisition task handles - Loading gui modules and storing their handles - Creating and managing DirectoryHandle objects - Providing unified timestamps - Making sure all devices/modules are properly shut down at the end of the program""" sigConfigChanged = Qt.Signal() sigModulesChanged = Qt.Signal() sigModuleHasQuit = Qt.Signal(object) ## (module name) sigCurrentDirChanged = Qt.Signal(object, object, object) # (file, change, args) sigBaseDirChanged = Qt.Signal() sigLogDirChanged = Qt.Signal(object) # dir sigTaskCreated = Qt.Signal(object, object) ## for debugger module sigAbortAll = Qt.Signal() # User requested abort all tasks via ESC key CREATED = False single = None @classmethod def makeArgParser(cls): parser = argparse.ArgumentParser(description="CQ4 control script") parser.add_argument("--config", "-c", help="Configuration file to load", default=cls._getConfigFile()) parser.add_argument("--config-name", "-a", help="Named configuration to load", action="append") parser.add_argument("--module", "-m", help="Module name to load", action="append") parser.add_argument("--base-dir", "-b", help="Base directory to use") parser.add_argument("--storage-dir", "-s", help="Storage directory to use") parser.add_argument("--log-level", action="store", help="Set the console log level", default="WARNING") parser.add_argument("--root-log-level", action="store", help="Set the root log level", default="DEBUG") parser.add_argument("--disable", "-d", help="Disable the device specified", action="append") parser.add_argument("--disable-all", "-D", help="Disable all devices", action="store_true") parser.add_argument("--exit-on-error", "-x", help="Whether to exit immidiately on the first exception during initial Manager setup", action="store_true") parser.add_argument("--no-manager", "-n", help="Do not load manager module", action="store_true") return parser
[docs] @classmethod def runFromCommandLine(cls, args): """Run the Manager from the command line.""" m = cls() m.initFromCommandLine(args) return m
[docs] def __init__(self): self.moduleLock = Mutex(recursive=True) ## used for keeping some basic methods thread-safe # self.devices = OrderedDict() # all currently loaded devices self.isReady = threading.Event() self.modules = OrderedDict() # all currently running modules self.devices = OrderedDict() # all devices loaded via Manager self.definedModules = OrderedDict() # all custom-defined module configurations self.config = OrderedDict() self.currentDir = None self.baseDir = None self.exitOnError = False self.gui = None self.shortcuts = [] self.disableDevs = [] self.disableAllDevs = False self.alreadyQuit = False self.taskLock = Mutex(Qt.QMutex.Recursive) self._folderTypes = None self._logFile = None self._consoleLogLevel = logging.WARNING self._rootLogLevel = logging.DEBUG try: if Manager.CREATED: raise ValueError("Manager object already created!") Manager.CREATED = True Manager.single = self self.logWindow = get_log_window() self.documentation = Documentation() Qt.QObject.__init__(self) atexit.register(self.quit) self.interfaceDir = InterfaceDirectory() # Import all built-in module classes modules.importBuiltinClasses() logger.info(f'ACQ4 version {__version__} started.') except: Manager.CREATED = False Manager.single = None if self.exitOnError: raise else: logger.exception( "Error while configuring Manager", extra={"docs": ["userGuide/configuration.html"]} )
def initFromCommandLine(self, args: argparse.Namespace): self.exitOnError = args.exit_on_error self.disableDevs = args.disable or [] self.disableAllDevs = args.disable_all self._consoleLogLevel = getattr(logging, args.log_level.upper(), logging.WARNING) self._rootLogLevel = getattr(logging, args.root_log_level.upper(), logging.DEBUG) self.configDir = os.path.dirname(args.config) self.readConfig(args.config) ## Act on options if they were specified.. try: for name in (args.config_name or []): self.loadDefinedConfig(name) if args.base_dir is not None: self.setBaseDir(args.base_dir) if args.storage_dir is not None: self.setCurrentDir(args.storage_dir) if not args.no_manager: self.showGUI() self.createWindowShortcut('F1', self.gui.win) for m in (args.module or []): try: if m in self.definedModules: self.loadDefinedModule(m) else: self.loadModule(m) except Exception: if args.no_manager: # we have to show it now, otherwise we'll have no windows self.showGUI() raise setup_logging( TEMP_LOG, acq4_level=self._rootLogLevel, console_level=self._consoleLogLevel ) except Exception: if self.exitOnError: raise else: logger.exception("Error while acting on command line options") finally: self.isReady.set() if len(self.modules) == 0: self.quit() raise RuntimeError( "No modules loaded during startup (likely due to errors above). Exiting now.") @staticmethod def _getConfigFile(): ## search all the default locations to find a configuration file. from acq4 import CONFIGPATH for path in CONFIGPATH: cf = os.path.join(path, 'default.cfg') if os.path.isfile(cf): return cf raise FileNotFoundError(f"Could not find default.cfg file in any of: {CONFIGPATH}") def _appDataDir(self): # return the user application data directory if sys.platform == 'win32': # resolves to "C:/Documents and Settings/User/Application Data/acq4" on XP # and "C:\User\Username\AppData\Roaming" on win7 return os.path.join(os.environ['APPDATA'], 'acq4') elif sys.platform == 'darwin': return os.path.expanduser('~/Library/Preferences/acq4') else: return os.path.expanduser('~/.local/acq4')
[docs] def readConfig(self, configFile): """Read configuration file, create device objects, add devices to list""" logger.info(f"============= Starting Manager configuration from {configFile} =================") ns = { 'hostname': socket.gethostname(), 'username': getpass.getuser(), 'environ': os.environ, 'configpath': self.configDir, } cfg = configfile.readConfigFile(configFile, **ns) self.config.update(cfg) ## read modules, devices, and stylesheet out of config self.configure(self.config) self.configFile = configFile logger.info("============= Manager configuration complete =================")
[docs] def exec_(self, pyfile): """Execute a Python file. This is used to enable easy loading of customizations from an externally defined file. Note that sys.path is temporarily modified to allow the external file to import from scripts in its own path. For more complex customizations, it is recommended to build an importable module instead. Parameters ---------- pyfile : str The full path to the python file to be exec'd Returns ------- globs : dict global namespace defined by the exec """ modDir = os.path.dirname(pyfile) sys.path.insert(0, modDir) try: globs = {} with open(pyfile, 'rb') as fh: exec(fh.read(), globs) finally: sys.path.pop(0) return globs
[docs] def configure(self, cfg): """Load the devices, modules, stylesheet, and storageDir defined in cfg""" self._loadConfig(cfg) self.sigConfigChanged.emit()
def _loadConfig(self, cfg): # Handle custom import prior to loading devices if 'imports' in cfg: try: if isinstance(cfg["imports"], str): cfg["imports"] = [cfg["imports"]] for mod in cfg["imports"]: __import__(mod) except: if self.exitOnError: raise else: logger.exception("Unable to import module specified in config file") for key, val in cfg.items(): try: # Hand custom exec if key == 'execFiles': if isinstance(val, str): val = [val] for pyfile in val: self.exec_(pyfile) ## configure new devices elif key == 'devices': for k in cfg['devices']: if self.disableAllDevs or k in self.disableDevs: logger.info(f" --> Ignoring device '{k}' -- disabled by request") continue logger.info(f" === Configuring device '{k}' ===") try: conf = cfg['devices'][k] try: driverName = conf['driver'] except KeyError as exc: raise KeyError(f"No driver specified for device {k}") from exc if 'config' in conf: # for backward compatibility conf = conf['config'] self.loadDevice(driverName, conf, k) except: if self.exitOnError: raise else: logger.exception(f"Error configuring device {k}") logger.info("=== Device configuration complete ===") ## Copy in new module definitions elif key == 'modules': for m in cfg['modules']: self.definedModules[m] = cfg['modules'][m] ## set new storage directory elif key == 'storageDir': logger.info(f"=== Setting base directory: {cfg['storageDir']} ===") self.setBaseDir(cfg['storageDir']) elif key == 'defaultCompression': comp = cfg['defaultCompression'] try: if isinstance(comp, tuple): cstr = comp[0] assert isinstance(comp[1], int) else: cstr = comp assert cstr in [None, 'gzip', 'szip', 'lzf'] except Exception as exc: raise ValueError( "'defaultCompression' option must be one of: None, 'gzip', 'szip'," f" 'lzf', ('gzip', 0-9), or ('szip', opts). Got: '{comp}'" ) from exc logger.info(f"=== Setting default HDF5 compression: {comp} ===") MetaArray.defaultCompression = comp elif key == 'folderTypes': self._folderTypes = val ## load stylesheet elif key == 'stylesheet': css = open(os.path.join(self.configDir, cfg['stylesheet'])).read() Qt.QApplication.instance().setStyleSheet(css) elif key == 'disableErrorPopups': get_error_dialog().disable(bool(cfg[key])) elif key == 'defaultMouseMode': mode = cfg[key].lower() if mode == 'onebutton': pg.setConfigOption('leftButtonPan', False) elif mode == 'threebutton': pg.setConfigOption('leftButtonPan', True) else: logger.warning( "Warning: ignored config option 'defaultMouseMode'; value must be" "either 'oneButton' or 'threeButton'." ) elif key == 'useOpenGL': pg.setConfigOption('useOpenGL', cfg[key]) elif key == 'misc': # Let's start moving things out of the top level, but stay backwards compatible self._loadConfig(cfg[key]) except: if self.exitOnError: raise else: logger.exception("Error in ACQ4 configuration", docs=["userGuide/configuration.html"])
[docs] def listConfigurations(self): """Return a list of the named configurations available""" return list(self.config.get('configurations', {}).keys())
def loadDefinedConfig(self, name): try: cfg = self.config['configurations'][name] except KeyError: raise KeyError(f"Could not find configuration named '{name}'") self.configure(cfg) def readConfigFile(self, fileName, missingOk=True): fileName = self.configFileName(fileName) if os.path.isfile(fileName): return configfile.readConfigFile(fileName) else: if missingOk: return {} else: raise FileNotFoundError(f'Config file "{fileName}" not found.')
[docs] def writeConfigFile(self, data, fileName): """Write a file into the currently used config directory.""" fileName = self.configFileName(fileName) dirName = os.path.dirname(fileName) if not os.path.exists(dirName): os.makedirs(dirName) return configfile.writeConfigFile(data, fileName)
def appendConfigFile(self, data, fileName): fileName = self.configFileName(fileName) if os.path.exists(fileName): return configfile.appendConfigFile(data, fileName) else: raise FileNotFoundError(f"Could not find file '{fileName}'") def updateConfig(self, config: dict): self.config.update(config) def configFileName(self, name): return os.path.join(self.configDir, name)
[docs] def loadDevice(self, devClassName, conf, name): """Create a new instance of a device. Parameters ---------- devClassName : str The name of a device class that was registered using acq4.devices.registerDeviceClass(). See acq4.devices.DEVICE_CLASSES for access to all available device classes. conf : dict A structure passed to the device providing configuration options name : str The name of this device. The instantiated device object will be retrievable using ``Manager.getDevice(name)`` Returns ------- device : Device instance The instantiated device object """ devclass = devices.getDeviceClass(devClassName) dev = devclass(self, conf, name) self.devices[name] = dev # just to prevent device being collected return dev
[docs] def getDevice(self, name): """Return a device instance given its name. """ name = str(name) try: return self.getInterface('device', name) except KeyError as exc: raise ValueError(f"No device named {name}. Options are {','.join(self.listDevices())}") from exc
[docs] def listDevices(self): """Return a list of the names of available devices. """ return self.listInterfaces('device')
[docs] def reserveDevices(self, devices, timeout=10.0): """Return a DeviceLocker that can be used to reserve multiple devices simultaneously:: with manager.reserveDevices(['Camera', 'Clamp1', 'Stage']): # .. do stuff """ devices = [self.getDevice(d) if isinstance(d, str) else d for d in devices] return DeviceLocker(self, devices, timeout=timeout)
[docs] def loadModule(self, moduleClassName, name=None, config=None, forceReload=False, importMod=None, execPath=None): """Create a new instance of an user interface module. Parameters ---------- moduleClassName : str The name of the module *class* to instantiate. The class must have been registered by calling acq4.modules.registerModuleClass(). See acq4.modules.MODULE_CLASSES for access to all available module classes. name : str or None The name to assign to the newly instantiated module. If None, then the class name is used instead. Module names are automatically modified to avoid name collision with previously loaded modules. config : dict | None Configuration options to pass to the module constructor """ if name is None: name = moduleClassName ## Find an unused name for this module baseName = name n = 0 with self.moduleLock: while name in self.listInterfaces().get("module", []): name = "%s_%d" % (baseName, n) n += 1 if name in self.modules: raise NameError(f"Module name '{name}' is already in use.") self.modules[name] = None # reserve this spot if config is None: config = {} logger.info(f'Loading module "{moduleClassName}" as "{name}"...') # deprecated args if importMod is not None: __import__(importMod) elif execPath is not None: self.exec_(execPath) modclass = modules.getModuleClass(moduleClassName) mod = modclass(self, name, config) self.modules[name] = mod self.sigModulesChanged.emit() return mod
[docs] def listModules(self): """List names of currently loaded modules. """ return list(self.modules.keys())
[docs] def getDirOfSelectedFile(self): """Returns the directory that is currently selected, or the directory of the file that is currently selected in Data Manager.""" try: f = self.getModule("Data Manager").selectedFile() if not isinstance(f, DirHandle): f = f.parent() except Exception: f = False logger.warning("Can't find currently selected directory, Data Manager has not been loaded.") if self.exitOnError: raise return f
[docs] def getModule(self, name: str): """Return a module""" with self.moduleLock: if name not in self.modules: self.loadDefinedModule(name) return self.modules[name]
[docs] def getCurrentDatabase(self): """Return the database currently selected in the Data Manager""" return self.getModule("Data Manager").currentDatabase()
[docs] def listDefinedModules(self): """List module configurations defined in the config file""" return self.definedModules.copy()
[docs] def loadDefinedModule(self, name, forceReload=False): """Load a module and configure as defined in the config file""" if name not in self.definedModules: logger.error(f"Module '{name}' is not defined. Options are: {list(self.definedModules.keys())}") return conf = self.definedModules[name] mod = conf['module'] config = conf.get('config', {}) # Allow mechanisms for importing custom modules execPath = conf.get('exec', None) importMod = conf.get('import', None) mod = self.loadModule(mod, name, config, forceReload=forceReload, execPath=execPath, importMod=importMod) win = mod.window() if 'shortcut' in conf and win is not None: self.createWindowShortcut(conf['shortcut'], win) logger.info(f"Loaded module '{mod.name}'")
def moduleHasQuit(self, mod): with self.moduleLock: if mod.name in self.modules: del self.modules[mod.name] self.interfaceDir.removeObject(mod) else: return self.removeWindowShortcut(mod.window()) self.sigModulesChanged.emit() self.sigModuleHasQuit.emit(mod.name) def unloadModule(self, name): try: mod = self.getModule(name) if mod is not None: mod.quit() except: logger.exception(f"Error while requesting that module '{name}' quit.") ## Module should have called moduleHasQuit already, but just in case: mod = self.modules.pop(name, None) if mod is not None: self.sigModulesChanged.emit()
[docs] def reloadAll(self): """Reload all python code""" # path = os.path.split(os.path.abspath(__file__))[0] # path = os.path.abspath(os.path.join(path, '..')) path = 'acq4' logger.info(f"---- Reloading all libraries under {path} ----") reload.reloadAll(debug=True) logger.info(f"Reloaded all libraries under {path}.")
def createWindowShortcut(self, keys, win): ## Note: this is probably not safe to call from other threads. try: sh = Qt.QShortcut(Qt.QKeySequence(keys), win) sh.setContext(Qt.Qt.ApplicationShortcut) sh.activated.connect(lambda *args: win.raise_()) except: if self.exitOnError: raise else: logger.exception(f"Error creating shortcut '{keys}'") self.shortcuts.append((sh, keys, weakref.ref(win))) def removeWindowShortcut(self, win): ## Need to remove shortcuts after window is closed, because the shortcut is hanging on to all the widgets in the window s = None for i, s in enumerate(self.shortcuts): if s[2]() == win: break if s is not None: self.shortcuts.remove(s)
[docs] def runTask(self, cmd): """ Convenience function that runs a task and returns its results. """ t = Task(self, cmd) t.execute() return t.getResult()
[docs] def createTask(self, cmd) -> "Task": """ Creates a new Task instance from the specified command structure. """ t = Task(self, cmd) self.sigTaskCreated.emit(cmd, t) return t
[docs] def showGUI(self): """Show the Manager GUI""" if self.gui is None: self.gui = self.loadModule('Manager', 'Manager', {}) # win = self.modules[list(self.modules.keys())[0]].window() ? win = self.gui.window() self.quitShortcut = Qt.QShortcut(Qt.QKeySequence('Ctrl+q'), win) self.quitShortcut.setContext(Qt.Qt.ApplicationShortcut) self.abortShortcut = Qt.QShortcut(Qt.QKeySequence('Esc'), win) self.abortShortcut.setContext(Qt.Qt.ApplicationShortcut) self.reloadShortcut = Qt.QShortcut(Qt.QKeySequence('Ctrl+r'), win) self.reloadShortcut.setContext(Qt.Qt.ApplicationShortcut) self.quitShortcut.activated.connect(self.quit) self.abortShortcut.activated.connect(self.sigAbortAll) self.reloadShortcut.activated.connect(self.reloadAll) self.gui.show()
[docs] def getCurrentDir(self): """ Return a directory handle to the currently-selected directory for data storage. """ if self.currentDir is None: raise HelpfulException("Storage directory has not been set.", docs=["userGuide/modules/DataManager.html#acquired-data-storage"]) return self.currentDir
[docs] def setLogDir(self, d: DirHandle): """ Set the directory to which log messages are stored. """ was_temp = self._logFile is None self._logFile = d["log.json"] file_handler = setup_logging( self._logFile.name(), acq4_level=self._rootLogLevel, console_level=self._consoleLogLevel, ) self.sigLogDirChanged.emit(d) if was_temp: try: with open(TEMP_LOG, 'r') as f: for line in f: file_handler.emit(HistoricLogRecord(**(json.loads(line)))) finally: os.remove(TEMP_LOG) log_win = get_log_window() with open(self._logFile.name(), 'r') as f: for i, line in enumerate(f): log_win.new_record(HistoricLogRecord(**(json.loads(line))), sort=False) if i % 20 == 0: Qt.QApplication.processEvents() log_win.ensure_chronological_sorting()
[docs] def setCurrentDir(self, d): """ Set the currently-selected directory for data storage. """ if self.currentDir is not None: with contextlib.suppress(TypeError): self.currentDir.sigChanged.disconnect(self.currentDirChanged) if isinstance(d, str): d = self.baseDir.getDir(d, create=True) if not isinstance(d, DirHandle): raise TypeError(f"Argument must be a string or DirHandle. Got {type(d)}") self.currentDir = d # Storage directory is about to change; logDir = self._logFile.parent() if self._logFile else None while not d.info().get('expUnit', False) and d != self.baseDir and d != logDir: d = d.parent() if d not in [self.baseDir, logDir]: self.setLogDir(d) else: if logDir is None: docs = "userGuide/dataManagement.html#notes-and-logs" logger.warning( "No log directory set. Log messages will not be stored.", extra={"docs": docs} ) self.currentDir.sigChanged.connect(self.currentDirChanged) self.sigCurrentDirChanged.emit(None, None, None)
[docs] def currentDirChanged(self, fh, change=None, args=()): """Handle situation where currentDir is moved or renamed""" self.sigCurrentDirChanged.emit(fh, change, args)
[docs] def getBaseDir(self): """ Return a directory handle to the base directory for data storage. This is the highest-level directory where acquired data may be stored. If the base directory has not been set, return None. """ return self.baseDir
[docs] def setBaseDir(self, d): """ Set the base directory for data storage. """ if isinstance(d, str): dh = self.dirHandle(d, create=False) elif isinstance(d, DirHandle): dh = d else: raise TypeError("Invalid argument type: ", type(d), d) changed = False if self.baseDir is not dh: self.baseDir = dh changed = True if changed: self.sigBaseDirChanged.emit() self.setCurrentDir(self.baseDir)
[docs] def dirHandle(self, d, create=False) -> DirHandle: """Return a directory handle for the specified directory string.""" # return self.dataManager.getDirHandle(d, create) return DataManager.getDirHandle(d, create=create)
[docs] def fileHandle(self, d): """Return a file or directory handle for d""" # return self.dataManager.getHandle(d) return DataManager.getFileHandle(d)
## These functions just wrap the functionality of an InterfaceDirectory def declareInterface(self, *args, **kargs): ## args should be name, [types..], object return self.interfaceDir.declareInterface(*args, **kargs) def removeInterface(self, *args, **kargs): return self.interfaceDir.removeInterface(*args, **kargs) def listInterfaces(self, *args, **kargs): return self.interfaceDir.listInterfaces(*args, **kargs)
[docs] def getInterface(self, *args, **kargs): """Return the object that was previously declared with *name* and interface *type*. """ return self.interfaceDir.getInterface(*args, **kargs)
[docs] def suggestedDirFields(self, file): """Given a DirHandle with a dirType, suggest a set of meta-info fields to use.""" fields = OrderedDict() if isinstance(file, DirHandle): info = file.info() if 'dirType' in info: # infoKeys.remove('dirType') dt = info['dirType'] folderTypesConfig = self._folderTypesConfig() if dt in folderTypesConfig: fields = folderTypesConfig[dt]['info'] if 'notes' not in fields: fields['notes'] = 'text', 5 if 'important' not in fields: fields['important'] = 'bool' return fields
def _folderTypesConfig(self): return self._folderTypes def showDocumentation(self, label=None): self.documentation.show(label)
[docs] def quit(self): """Nicely request that all devices and modules shut down""" if not self.alreadyQuit: ## Need this because multiple triggers can call this function during quit self.alreadyQuit = True lm = len(self.modules) ld = len(self.listDevices()) with pg.ProgressDialog("Shutting down..", 0, lm + ld, cancelText=None, wait=0) as dlg: self.documentation.quit() logger.debug("Requesting all modules shut down..") logger.info("Shutting Down.") while len(self.modules) > 0: ## Modules may disappear from self.modules as we ask them to quit m = list(self.modules.keys())[0] logger.debug(f" {m}") self.unloadModule(m) dlg.setValue(lm - len(self.modules)) logger.debug("Requesting all devices shut down..") devs = Device._deviceCreationOrder[::-1] for d in devs: # shut down in reverse order d = d() if d is None: # device was already deleted continue logger.debug(f" {d}") try: d.quit() except: logger.exception(f"Error while requesting device '{d.name()}' quit.") dlg.setValue(lm + ld - len(devs)) logger.debug("Closing windows..") Qt.QApplication.instance().closeAllWindows() Qt.QApplication.instance().processEvents() logger.debug("\n ciao.") Qt.QApplication.quit()
# All other modules can use this function to get the manager instance def getManager() -> Manager: if Manager.single is None: raise RuntimeError("No manager created yet") return Manager.single class DeviceLocker(object): def __init__(self, manager, devices, timeout=10.0): # make sure we lock devices in a predictable order; this is what prevents deadlocks self.devices = sorted(devices, key=lambda d: d.name()) self.locked = [] self.timeout = timeout self.lockErr = None def tryLock(self, timeout=None): try: for device in self.devices: devLocked = device.reserve(block=True, timeout=timeout) if not devLocked: self.lockErr = "Timed out waiting for %s" % device.name() self.unlock() return False self.locked.append(device) return True except Exception: self.unlock() raise def lock(self): locked = self.tryLock(timeout=self.timeout) if not locked: self.unlock() raise RuntimeError("Failed to lock devices: %s" % self.lockErr) def unlock(self): for device in self.locked: try: device.release() except: pass self.locked = [] def __enter__(self): self.lock() return self def __exit__(self, *args): self.unlock() class Task: id = 0 def __init__(self, dm, command): self.dm = dm self.command = command self.result = None self.taskLock = Mutex(recursive=True) self.deviceLock = None self.startedDevs = [] self.startTime = None self.stopTime = None self.stopped = False self.abortRequested = False self._done = False # self.reserved = False try: self.cfg = command['protocol'] except: logger.error("================== Manager Task.__init__ command: =================") logger.error(command) logger.error("===========================================================") raise TypeError("Command specified for task is invalid. (Must be dictionary with 'protocol' key)") self.id = Task.id Task.id += 1 ## TODO: set up data storage with cfg['storeData'] and ['writeLocation'] self.devNames = list(command.keys()) self.devNames.remove('protocol') self.devs = {devName: self.dm.getDevice(devName) for devName in self.devNames} ## Create task objects. Each task object is a handle to the device which is unique for this task run. self.tasks: dict[str, DeviceTask] = {} for devName in self.devNames: task = self.devs[devName].createTask(self.command[devName], self) if task is None: logger.warning(f"Device '{devName}' does not have a task interface; ignoring.") continue self.tasks[devName] = task @staticmethod def getDevName(obj): if isinstance(obj, str): return obj elif isinstance(obj, Device): return obj.name() elif isinstance(obj, DeviceTask): return obj.dev.name() def getConfigOrder(self): ## determine the order in which tasks must be configured ## This is determined by tasks having called Task.addConfigDependency() ## when they were initialized. # request config order dependencies from devices deps = {devName: set() for devName in self.devNames} for devName, task in self.tasks.items(): before, after = task.getConfigOrder() deps[devName] |= set(map(Task.getDevName, before)) for t in map(self.getDevName, after): if t in deps: deps[t].add(devName) # request estimated configure time cost = {devName: self.tasks[devName].getPrepTimeEstimate() for devName in self.devNames} # convert sets to lists deps = dict([(k, list(deps[k])) for k in deps.keys()]) # return sorted order order = self.toposort(deps, cost) return order def getStartOrder(self): ## determine the order in which tasks must be started ## This is determined by tasks having called Task.addStartDependency() ## when they were initialized. deps = {devName: set() for devName in self.devNames} for devName, task in self.tasks.items(): before, after = task.getStartOrder() deps[devName] |= set(map(Task.getDevName, before)) for t in map(self.getDevName, after): if t not in deps: # device is not in task; don't worry about its start order # (this happens, for example, with Trigger devices that do not need to be started by acq4) continue deps[t].add(devName) deps = dict([(k, list(deps[k])) for k in deps.keys()]) # return sorted order order = self.toposort(deps) return order def execute(self, block=True, processEvents=True): """Start the task. If block is true, then the function blocks until the task is complete. if processEvents is true, then Qt events are processed while waiting for the task to complete. """ with self.taskLock: self.startedDevs = [] self.stopped = False # whether sub-tasks have been stopped yet self.abortRequested = False self._done = False # cached output of isDone() ## We need to make sure devices are stopped and unlocked properly if anything goes wrong.. prof = Profiler('Manager.Task.execute', disabled=True) try: ## Reserve all hardware self.reserveDevices() prof.mark('reserve') ## Determine order of device configuration. configOrder = self.getConfigOrder() ## Configure all subtasks. Some devices may need access to other tasks, so we make all available here. ## This is how we allow multiple devices to communicate and decide how to operate together. ## Each task may modify the startOrder list to suit its needs. for devName in configOrder: self.tasks[devName].configure() prof.mark(f'configure {devName}') startOrder = self.getStartOrder() if 'leadTime' in self.cfg: time.sleep(self.cfg['leadTime']) prof.mark('leadSleep') self.result = None ## Start tasks in specific order for devName in startOrder: try: self.startedDevs.append(devName) self.tasks[devName].start() except: self.startedDevs.remove(devName) logger.error(f"Error starting device '{devName}'; aborting task.") raise prof.mark(f'start {devName}') self.startTime = ptime.time() if not block: prof.finish() return ## Wait until all tasks are done lastProcess = ptime.time() isGuiThread = Qt.QThread.currentThread() == Qt.QCoreApplication.instance().thread() while not self.isDone(): now = ptime.time() elapsed = now - self.startTime if isGuiThread: if processEvents and now - lastProcess > 20e-3: ## only process Qt events every 20ms Qt.QApplication.processEvents() lastProcess = ptime.time() ## If the task duration has not elapsed yet, only wake up every 10ms, and attempt to wake up 5ms before the end if elapsed < self.cfg['duration'] - 10e-3: sleep = min(10e-3, self.cfg['duration'] - elapsed - 5e-3) else: sleep = 1.0e-3 ## afterward, wake up more quickly so we can respond as soon as the task finishes time.sleep(sleep) self.stop() except: logger.exception("========== Error in task execution: ==============") self.abort() self.releaseDevices() raise finally: prof.finish() def isDone(self): """Return True if all tasks are completed and ready to return results. If the task run time exceeds the timeout duration, then raise RuntimeError. """ with self.taskLock: # If we previously returned True or raised an exception, then # just repeat that result. if self._done is True: return True elif self._done is not False: raise self._done # Check for timeout if self.startTime is not None: # By default, timeout occurs 10 sec after requested duration is elapsed. # Set timeout=None to disable the check. timeout = self.cfg.get('timeout', self.cfg['duration'] + 10.0) now = ptime.time() elapsed = now - self.startTime if timeout is not None and elapsed > timeout: self.stop(abort=True) self._done = RuntimeError("Task timed out (>%0.2fs)." % timeout) raise self._done # For testing tasks that fail to complete if getattr(self, 'test_endless', False): return False if not self.abortRequested: t = ptime.time() if self.startTime is None or t - self.startTime < self.cfg['duration']: return False d = self._tasksDone() self._done = d return d def _tasksDone(self): for t in self.tasks: if not self.tasks[t].isDone(): return False if self.stopTime is None: self.stopTime = ptime.time() return True def duration(self): """Return the requested task duration, or None if it was not given.""" return self.command.get('protocol', {}).get('duration', None) def runTime(self): """Return the length of time since this task began running. If the task has already finished, return the length of time the task ran for. If the task has not started yet, return None. """ if self.startTime is None: return None if self.stopTime is None: return ptime.time() - self.startTime return self.stopTime - self.startTime def stop(self, abort=False): """Stop all tasks and read data. If abort is True, do not attempt to collect results from the task. """ with self.taskLock: prof = Profiler("Manager.Task.stop", disabled=True) self.abortRequested = abort try: if not self.stopped: ## Stop all device tasks while len(self.startedDevs) > 0: t = self.startedDevs.pop() try: self.tasks[t].stop(abort=abort) except: logger.exception(f"Error while stopping task {t}") prof.mark(" ..task " + t + " stopped") self.stopped = True if not abort and not self._tasksDone(): raise RuntimeError("Cannot get result; task is still running.") if not abort and self.result is None: ## Let each device generate its own output structure. result = {'protocol': {'startTime': self.startTime}} for devName in self.tasks: try: result[devName] = self.tasks[devName].getResult() except: logger.exception( f"Error getting result for task {devName} (will set result=None for" " this task)" ) result[devName] = None prof.mark("get result: " + devName) self.result = result ## Store data if requested if 'storeData' in self.cfg and self.cfg['storeData'] is True: self.cfg['storageDir'].setInfo(result['protocol']) for t in self.tasks: self.tasks[t].storeResult(self.cfg['storageDir']) prof.mark("store data") finally: ## Regardless of any other problems, at least make sure we ## release hardware for future use if self.stopTime is None: self.stopTime = ptime.time() self.releaseDevices() prof.mark("release all") prof.finish() if abort: gc.collect() ## it is often the case that now is a good time to garbage-collect. def getResult(self): with self.taskLock: self.stop() return self.result def reserveDevices(self): if self.deviceLock is None: try: self.deviceLock = self.dm.reserveDevices(list(self.tasks.keys())) self.deviceLock.lock() except Exception: self.deviceLock = None raise def releaseDevices(self): if self.deviceLock is None: return self.deviceLock.unlock() self.deviceLock = None def abort(self): """Stop all tasks, to not attempt to get data.""" self.stop(abort=True) @staticmethod def toposort(deps, cost=None): """Topological sort. Arguments are: deps Dictionary describing dependencies where a:[b,c] means "a depends on b and c" cost Optional dictionary of per-node cost values. This will be used to sort independent graph branches by total cost. Examples:: # Sort the following graph: # # B ──┬─────> C <── D # │ │ # E <─┴─> A <─┘ # deps = {'a': ['b', 'c'], 'c': ['b', 'd'], 'e': ['b']} toposort(deps) => ['b', 'e', 'd', 'c', 'a'] # This example is underspecified; there are several orders # that correctly satisfy the graph. However, we may use the # 'cost' argument to impose more constraints on the sort order. # Let each node have the following cost: cost = {'a': 0, 'b': 0, 'c': 1, 'e': 1, 'd': 3} # Then the total cost of following any node is its own cost plus # the cost of all nodes that follow it: # A = cost[a] # B = cost[b] + cost[c] + cost[e] + cost[a] # C = cost[c] + cost[a] # D = cost[d] + cost[c] + cost[a] # E = cost[e] # If we sort independent branches such that the highest cost comes # first, the output is: toposort(deps, cost=cost) => ['d', 'b', 'c', 'e', 'a'] """ # copy deps and make sure all nodes have a key in deps deps0 = deps deps = {} for k, v in deps0.items(): deps[k] = v[:] for k2 in v: if k2 not in deps: deps[k2] = [] # Compute total branch cost for each node key = None if cost is not None: order = Task.toposort(deps) allDeps = {n: set(n) for n in order} for n in order[::-1]: for n2 in deps.get(n, []): allDeps[n2] |= allDeps.get(n, set()) totalCost = {n: sum([cost.get(x, 0) for x in allDeps[n]]) for n in allDeps} key = lambda x: totalCost.get(x, 0) # compute weighted order order = [] while len(deps) > 0: # find all nodes with no remaining dependencies ready = [k for k in deps if len(deps[k]) == 0] # If no nodes are ready, then there must be a cycle in the graph if len(ready) == 0: logger.error(f"Cyclic graph of dependencies: {deps}") raise HelpfulException( "Cannot resolve requested device configure/start order.", docs=["userGuide/configuration.html#devices-configuration"], ) # sort by branch cost if key is not None: ready.sort(key=key, reverse=True) # add the highest-cost node to the order, then remove it from the # entire set of dependencies order.append(ready[0]) del deps[ready[0]] for v in deps.values(): try: v.remove(ready[0]) except ValueError: pass return order DOC_ROOT = 'http://acq4.org/documentation/' class Documentation(Qt.QObject): def __init__(self): Qt.QObject.__init__(self) def show(self, label=None): if label is None: url = DOC_ROOT else: url = DOC_ROOT + label Qt.QDesktopServices.openUrl(Qt.QUrl(url)) def quit(self): pass class QtDocumentation(Qt.QObject): """Encapsulates documentation functionality. Note: this class is currently out of service in favor of referencing online documentation instead. """ def __init__(self): Qt.QObject.__init__(self) path = os.path.abspath(os.path.dirname(__file__)) self.docFile = os.path.normpath(os.path.join(path, '..', 'documentation', 'build', 'qthelp', 'ACQ4.qhc')) self.process = Qt.QProcess() self.process.finished.connect(self.processFinished) def show(self, label=None): if self.process.state() == self.process.NotRunning: self.startProcess() if label is not None: Qt.QTimer.singleShot(2000, lambda: self.activateId(label)) return if label is not None: self.activateId(label) def expandToc(self, n=2): self.write('expandToc %d\n' % n) def startProcess(self): self.process.start('assistant', ['-collectionFile', self.docFile, '-enableRemoteControl']) if not self.process.waitForStarted(): output = str(self.process.readAllStandardError()) raise Exception("Error starting documentation viewer: " + output) Qt.QTimer.singleShot(1000, self.expandToc) def activateId(self, id): logger.info("activate:", id) self.show() self.write('activateIdentifier %s\n' % id) def activateKeyword(self, kwd): self.show() self.write('activateKeyword %s\n' % kwd) def write(self, data): ba = Qt.QByteArray(data) return self.process.write(ba) def quit(self): self.process.close() def processFinished(self): logger.info(f"Doc viewer exited: {self.process.exitCode()}") logger.info(str(self.process.readAllStandardError()))