Source code for apstools.callbacks.scan_signal_statistics
"""
Collect statistics on the signals used in 1-D scans.
====================================================
.. autosummary::
~factor_fwhm
~SignalStatsCallback
"""
__all__ = """
factor_fwhm
SignalStatsCallback
""".split()
import math
import logging
import pyRestTable
import pysumreg
logger = logging.getLogger(__name__)
logger.info(__file__)
factor_fwhm = 2 * math.sqrt(2 * math.log(2))
r"""
FWHM :math:`=2\sqrt{2\ln{2}}\cdot\sigma_c`
see: https://statproofbook.github.io/P/norm-fwhm.html
"""
[docs]class SignalStatsCallback:
"""
Callback: Collect peak (& other) statistics during a scan.
.. caution:: This is an early draft and is subject to change!
Subscribe the ``receiver()`` method. Use with step scan plans such as
``bp.scan()`` and ``bp.rel_scan()``.
.. caution:: It is recommended to subscribe this callback to specific plans. It
should not be run with just any plan (it could easily raise exceptions).
.. rubric:: Basic example
.. code-block::
:linenos:
from bluesky import plans as bp
from bluesky import preprocessors as bpp
signal_stats = SignalStatsCallback()
def my_plan(detectors, mover, rel_start, rel_stop, points, md={}):
@bpp.subs_decorator(signal_stats.receiver) # collect the data
def _inner():
yield from bp.rel_scan(detectors, mover, rel_start, rel_end, points, md)
yield from _inner() # run the scan
signal_stats.report() # print the statistics
.. rubric:: Public API
.. autosummary::
~receiver
~report
~data_stream
~stop_report
.. rubric:: Internal API
.. autosummary::
~clear
~descriptor
~event
~start
~stop
~_scanning
~_registers
"""
data_stream: str = "primary"
"""RunEngine document with signals to to watch."""
stop_report: bool = True
"""If ``True`` (default), call the ``report()`` method when a ``stop`` document is received."""
_scanning: bool = False
"""Is a run *in progress*?"""
_registers: dict = {}
"""Dictionary (keyed on Signal name) of ``SummationRegister()`` objects."""
# TODO: What happens when the run is paused?
def __repr__(self):
if "_motor" not in dir(self): # not initialized
self.clear()
args = f"motor={self._motor!r}, detectors={self._detectors!r}"
return f"{self.__class__.__name__}({args})"
[docs] def clear(self):
"""Clear the internal memory for the next run."""
self._scanning = False
self._detectors = []
self._motor = ""
self._registers = {}
self._descriptor_uid = None
self._x_name = None
self._y_names = []
[docs] def descriptor(self, doc):
"""Receives 'descriptor' documents from the RunEngine."""
if not self._scanning:
return
if doc["name"] != self.data_stream:
return
# Remember now, to match with later events.
self._descriptor_uid = doc["uid"]
# Pick the first motor signal.
self._x_name = doc["hints"][self._motor]["fields"][0]
# Get the signals for each detector object.s
for d in self._detectors:
self._y_names += doc["hints"][d]["fields"]
# Keep statistics for each of the Y signals (vs. the one X signal).
self._registers = {y: pysumreg.SummationRegisters() for y in self._y_names}
[docs] def event(self, doc):
"""Receives 'event' documents from the RunEngine."""
if not self._scanning:
return
if doc["descriptor"] != self._descriptor_uid:
return
# Collect the data for the signals.
x = doc["data"][self._x_name]
for yname in self._y_names:
self._registers[yname].add(x, doc["data"][yname])
[docs] def receiver(self, key, document):
"""Client method used to subscribe to the RunEngine."""
handlers = "start stop descriptor event".split()
if key in handlers:
getattr(self, key)(document)
else:
logger.debug("%s: unhandled document type: %s", self.__class__.__name__, key)
[docs] def report(self):
"""Print a table with the collected statistics for each signal."""
if len(self._registers) == 0:
return
keys = "n centroid sigma x_at_max_y max_y min_y mean_y stddev_y".split()
table = pyRestTable.Table()
if len(keys) <= len(self._registers):
# statistics in the column labels
table.labels = ["detector"] + keys
for yname, stats in self._registers.items():
row = [yname]
for k in keys:
try:
v = getattr(stats, k)
except (ValueError, ZeroDivisionError):
v = 0
row.append(v)
table.addRow(row)
else:
# signals in the column labels
table.labels = ["statistic"] + list(self._registers)
for k in keys:
row = [k]
for stats in self._registers.values():
try:
v = getattr(stats, k)
except (ValueError, ZeroDivisionError):
v = 0
row.append(v)
table.addRow(row)
print(f"Motor: {self._x_name}")
print(table)
[docs] def start(self, doc):
"""Receives 'start' documents from the RunEngine."""
self.clear()
self._scanning = True
# These command arguments might each have many signals.
self._detectors = doc["detectors"]
self._motor = doc["motors"][0] # just keep the first one
[docs] def stop(self, doc):
"""Receives 'stop' documents from the RunEngine."""
if not self._scanning:
return
self._scanning = False
if self.stop_report:
self.report()