Source code for apstools.callbacks.scan_signal_statistics

"""
Collect statistics on the first detector used in 1-D scans.
===========================================================

.. autosummary::

    ~SignalStatsCallback
"""

import logging

import pyRestTable
import pysumreg  # deprecate, will remove in next major version bump

logger = logging.getLogger(__name__)
logger.info(__file__)


[docs] class SignalStatsCallback: """ Callback: Collect peak (& other) statistics during a scan. .. caution:: This is an early draft and is subject to change! Subscribe the ``receiver()`` method. Use with step scan plans such as ``bp.scan()`` and ``bp.rel_scan()``. .. caution:: It is recommended to subscribe this callback to specific plans. It should not be run with just any plan (it could easily raise exceptions). .. rubric:: Basic example .. code-block:: :linenos: from bluesky import plans as bp from bluesky import preprocessors as bpp from apstools.callbacks import SignalStatsCallback signal_stats = SignalStatsCallback() def my_plan(detectors, mover, rel_start, rel_stop, points, md={}): @bpp.subs_decorator(signal_stats.receiver) # collect the data def _inner(): yield from bp.rel_scan(detectors, mover, rel_start, rel_end, points, md) yield from _inner() # run the scan signal_stats.report() # print the statistics .. rubric:: Public API .. autosummary:: ~receiver ~report ~data_stream ~reporting .. rubric:: Internal API .. autosummary:: ~clear ~descriptor ~event ~start ~stop ~analysis ~_data ~_scanning ~_registers """ data_stream: str = "primary" """RunEngine document with signals to to watch.""" reporting: bool = True """If ``True`` (default), call the ``report()`` method when a ``stop`` document is received.""" _scanning: bool = False """Is a run *in progress*?""" _registers: dict = {} """ Deprecated: Use 'analysis' instead, will remove in next major release. Dictionary (keyed on Signal name) of ``SummationRegister()`` objects. """ _data: dict = {} """Arrays of x & y data""" analysis: object = None """Dictionary of statistical array analyses.""" # TODO: What happens when the run is paused? def __repr__(self): if "_motor" not in dir(self): # not initialized self.clear() args = f"motor={self._motor!r}, detectors={self._detectors!r}" return f"{self.__class__.__name__}({args})"
[docs] def clear(self): """Clear the internal memory for the next run.""" self._scanning = False self._detectors = [] self._motor = "" self._registers = {} # deprecated, for removal self._descriptor_uid = None self._x_name = None self._y_names = [] self._data = {}
[docs] def descriptor(self, doc): """Receives 'descriptor' documents from the RunEngine.""" if not self._scanning: return if doc["name"] != self.data_stream: return # Remember now, to match with later events. self._descriptor_uid = doc["uid"] # Pick the first motor signal. self._x_name = doc["hints"][self._motor]["fields"][0] self._data[self._x_name] = [] # Get the signals for each detector object(s) for d in self._detectors: hint = doc["hints"].get(d, {"fields": [d]}) for y_name in hint["fields"]: if y_name not in self._y_names: self._y_names.append(y_name) self._data[y_name] = [] # Keep statistics for each of the Y signals (vs. the one X signal). # deprecated, for removal self._registers = {y: pysumreg.SummationRegisters() for y in self._y_names}
[docs] def event(self, doc): """Receives 'event' documents from the RunEngine.""" if not self._scanning: return if doc["descriptor"] != self._descriptor_uid: return # Collect the data for the signals. x = doc["data"][self._x_name] self._data[self._x_name].append(x) for yname in self._y_names: y = doc["data"][yname] self._registers[yname].add(x, y) # deprecated, for removal self._data[yname].append(y)
[docs] def receiver(self, key, document): """Client method used to subscribe to the RunEngine.""" handlers = "start stop descriptor event".split() if key in handlers: getattr(self, key)(document) else: logger.debug("%s: unhandled document type: %s", self.__class__.__name__, key)
[docs] def report(self): """Print a table with the collected statistics for each signal.""" if len(self._data) == 0: return x_name = self._x_name y_name = self._detectors[0] keys = """ n centroid x_at_max_y fwhm variance sigma min_x mean_x max_x min_y mean_y max_y success reasons """.split() table = pyRestTable.Table() table.labels = "statistic value".split() table.rows = [(k, self.analysis.get(k, "--")) for k in keys if k in self.analysis] print(f"Motor: {x_name!r} Detector: {y_name!r}") print(table)
[docs] def start(self, doc): """Receives 'start' documents from the RunEngine.""" self.clear() self._scanning = True # These command arguments might each have many signals. self._detectors = doc["detectors"] self._motor = doc["motors"][0] # just keep the first one
[docs] def stop(self, doc): """Receives 'stop' documents from the RunEngine.""" from ..utils.statistics import xy_statistics if not self._scanning: return self._scanning = False if self._x_name is None: logger.warning("No XY statistics: no X axis name found.") elif len(self._y_names) == 0: logger.warning("No XY statistics: no Y axis names found.") else: self.analysis = xy_statistics( self._data[self._x_name], self._data[self._y_names[0]], ) if self.reporting: self.report()