Source code for apstools.plans.alignment

"""
Alignment plans
+++++++++++++++++++++++++++++++++++++++

.. autosummary::

   ~lineup
   ~lineup2
   ~tune_axes
   ~TuneAxis
   ~TuneResults
"""

import datetime
import logging

import numpy as np
import pyRestTable
from scipy.optimize import curve_fit
from scipy.special import erf

from bluesky import plan_stubs as bps
from bluesky import plans as bp
from bluesky import preprocessors as bpp
from bluesky.callbacks.fitting import PeakStats
from ophyd import Component
from ophyd import Device
from ophyd import Signal
from ophyd.scaler import ScalerCH
from ophyd.scaler import ScalerChannel

from .. import utils
from .doc_run import write_stream

logger = logging.getLogger(__name__)


[docs]def lineup( # fmt: off detectors, axis, minus, plus, npts, time_s=0.1, peak_factor=4, width_factor=0.8, feature="cen", rescan=True, bec=None, md=None, # fmt: on ): """ (use ``lineup2()`` now) Lineup and center a given axis, relative to current position. If first run identifies a peak, makes a second run to fine tune the result. ..caution:: ``lineup()`` does not work in the queueserver. Use :func:`~apstools.plans.alignment.lineup2()` instead. .. index:: Bluesky Plan; lineup PARAMETERS detectors *[object]* or *object* : Instance(s) of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel) dependent measurement to be maximized. If a list, the first signal in the list will be used. axis movable *object* : instance of ophyd.Signal (or subclass such as EpicsMotor) independent axis to use for alignment minus *float* : first point of scan at this offset from starting position plus *float* : last point of scan at this offset from starting position npts *int* : number of data points in the scan time_s *float* : Count time per step (if detectors[0] is ScalerChannel object), other object types not yet supported. (default: 0.1) peak_factor *float* : peak maximum must be greater than ``peak_factor*minimum`` (default: 4) width_factor *float* : fwhm must be less than ``width_factor*plot_range`` (default: 0.8) feature *str* : One of the parameters returned by BestEffortCallback peak stats. (``cen``, ``com``, ``max``, ``min``) (default: ``cen``) rescan *bool* : If first scan indicates a peak, should a second scan refine the result? (default: ``True``) bec *object* : Instance of ``bluesky.callbacks.best_effort.BestEffortCallback``. (default: ``None``, meaning look for it from global namespace) EXAMPLE:: RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0)) """ bec = bec or utils.ipython_shell_namespace().get("bec") if bec is None: raise ValueError("Cannot find BestEffortCallback() instance.") bec.enable_plots() if not isinstance(detectors, (tuple, list)): detectors = [detectors] det0 = detectors[0] logger.info("Using detector '%s' for lineup()", det0.name) # first, determine if det0 is part of a ScalerCH device scaler = None obj = det0.parent if isinstance(det0.parent, ScalerChannel): if hasattr(obj, "parent") and obj.parent is not None: obj = obj.parent if hasattr(obj, "parent") and isinstance(obj.parent, ScalerCH): scaler = obj.parent if scaler is not None: old_sigs = scaler.stage_sigs scaler.stage_sigs["preset_time"] = time_s scaler.select_channels([det0.name]) if hasattr(axis, "position"): old_position = axis.position else: old_position = axis.get() def peak_analysis(): aligned = False if det0.name not in bec.peaks[feature]: logger.error( "No statistical analysis of scan peak for feature '%s'!" " (bec.peaks=%s, bec=%s)", feature, bec.peaks, bec, ) yield from bps.null() else: table = pyRestTable.Table() table.labels = ("key", "value") table.addRow(("axis", axis.name)) table.addRow(("detector", det0.name)) table.addRow(("starting position", old_position)) for key in bec.peaks.ATTRS: target = bec.peaks[key][det0.name] if isinstance(target, tuple): target = target[0] table.addRow((key, target)) logger.info(f"alignment scan results:\n{table}") lo = bec.peaks["min"][det0.name][-1] # [-1] means detector hi = bec.peaks["max"][det0.name][-1] # [0] means axis fwhm = bec.peaks["fwhm"][det0.name] final = bec.peaks["cen"][det0.name] # local PeakStats object of our plot of det0 .v x ps = list(bec._peak_stats.values())[0][det0.name] # get the X data range as received by PeakStats x_range = abs(max(ps.x_data) - min(ps.x_data)) if final is None: logger.error("centroid is None") final = old_position elif fwhm is None: logger.error("FWHM is None") final = old_position elif hi < peak_factor * lo: logger.error("no clear peak: %f < %f*%f", hi, peak_factor, lo) final = old_position elif fwhm > width_factor * x_range: logger.error( "FWHM too large: %f > %f*%f", fwhm, width_factor, x_range, ) final = old_position else: aligned = True logger.info( "moving %s to %f (aligned: %s)", axis.name, final, str(aligned), ) yield from bps.mv(axis, final) # too sneaky? We're modifying this structure locally bec.peaks.aligned = aligned bec.peaks.ATTRS = ("com", "cen", "max", "min", "fwhm") _md = dict(purpose="alignment") _md.update(md or {}) yield from bp.rel_scan([det0], axis, minus, plus, npts, md=_md) yield from peak_analysis() if bec.peaks.aligned and rescan: # again, tweak axis to maximize _md["purpose"] = "alignment - fine" fwhm = bec.peaks["fwhm"][det0.name] yield from bp.rel_scan([det0], axis, -fwhm, fwhm, npts, md=_md) yield from peak_analysis() if scaler is not None: scaler.select_channels() scaler.stage_sigs = old_sigs
[docs]def edge_align(detectors, mover, start, end, points, cat=None, md={}): """ Align to the edge given mover & detector data, relative to absolute position. This plan can be used in the queueserver, Jupyter notebooks, and IPython consoles. PARAMETERS ---------- detectors *Readable* or [*Readable*]: Detector object or list of detector objects (each is a Device or Signal). mover *Movable*: Mover object, such as motor or other positioner. start *float*: Starting point for the scan. This is an absolute motor location. end *float*: Ending point for the scan. This is an absolute motor location. points *int*: Number of points in the scan. cat *databroker.temp().v2*: Catalog where bluesky data is saved and can be retrieved from. md *dict*: User-supplied metadata for this scan. """ def guess_erf_params(x_data, y_data): """ Provide an initial guess for the parameters of an error function. Parameters ---------- x_data : A numpy array of the values on the x_axis y_data : A numpy array of the values on the y_axis Returns ------- guess : dict A dictionary containing the guessed parameters 'low_y_data', 'high_y_data', 'width', and 'midpoint'. """ # Sort data to make finding the mid-point easier and to assist in other estimations y_data_sorted = np.sort(y_data) x_data_sorted = np.sort(x_data) # Estimate low and high as the first and last elements (assuming sorted data) low_y_data = np.min(y_data_sorted) high_y_data = np.max(y_data_sorted) low_x_data = np.min(x_data_sorted) high_x_data = np.max(x_data_sorted) # Estimate wid as a fraction of the range. This is very arbitrary and might need tuning! width = ( high_x_data - low_x_data ) / 10 # This is a guess and might need adjustment based on your data's characteristics # Estimate the midpoint of the x values midpoint = x_data[int(len(x_data) / 2)] return [low_y_data, high_y_data, width, midpoint] def erf_model(x, low, high, width, midpoint): """ Create error function for fitting and simulation Parameters ---------- x : input upon which error function is evaluated low : min value of error function high : max value of error function width : "spread" of error function transition region midpoint: location of error function's "center" """ return (high - low) * 0.5 * (1 - erf((x - midpoint) / width)) + low if not isinstance(detectors, (tuple, list)): detectors = [detectors] _md = dict(purpose="edge_align") _md.update(md or {}) uid = yield from bp.scan(detectors, mover, start, end, points, md=_md) cat = cat or utils.getCatalog() run = cat[uid] # return uids ds = run.primary.read() x = ds["mover"] y = ds["noisy"] try: initial_guess = guess_erf_params(x, y) popt, pcov = curve_fit(erf_model, x, y, p0=initial_guess) if pcov[3, 3] != np.inf: print("Significant signal change detected; motor moving to detected edge.") yield from bps.mv(mover, popt[3]) else: raise Exception except Exception as reason: print(f"reason: {reason}") print("No significant signal change detected; motor movement skipped.")
[docs]def lineup2( # fmt: off detectors, mover, rel_start, rel_end, points, peak_factor=2.5, width_factor=0.8, feature="centroid", nscans=2, signal_stats=None, md={}, # fmt: on ): """ Lineup and center a given mover, relative to current position. This plan can be used in the queueserver, Jupyter notebooks, and IPython consoles. It does not require the bluesky BestEffortCallback. Instead, it uses *PySumReg* [#pysumreg]_ to compute statistics for each signal in a 1-D scan. New in release 1.6.18 .. index:: Bluesky Plan; lineup2; lineup PARAMETERS detectors *Readable* or [*Readable*]: Detector object or list of detector objects (each is a Device or Signal). If a list, the first Signal will be used for alignment. mover *Movable*: Mover object, such as motor or other positioner. rel_start *float*: Starting point for the scan, relative to the current mover position. rel_end *float*: Ending point for the scan, relative to the current mover position. points *int*: Number of points in the scan. peak_factor *float* : peak maximum must be greater than ``peak_factor*minimum`` (default: 2.5) width_factor *float* : fwhm must be less than ``width_factor*plot_range`` (default: 0.8) feature *str*: Use this statistical measure (default: centroid) to set the mover position after a peak has been found. Must be one of these values: ========== ==================== feature description ========== ==================== centroid center of mass x_at_max_y x location of y maximum x_at_min_y x location of y minimum ========== ==================== Statistical analysis provided by *PySumReg*. [#pysumreg]_ .. [#pysumreg] https://prjemian.github.io/pysumreg/latest/ nscans *int*: Number of scans. (default: 2) Scanning will stop if any scan cannot find a peak. signal_stats *object*: Caller could provide an object of :class:`~apstools.callbacks.scan_signal_statistics.SignalStatsCallback`. If ``None``, this function will search for the ``signal_stats`` signal in the global namespace. If not still found, a new one will be created for the brief lifetime of this function. md *dict*: User-supplied metadata for this scan. """ from ..callbacks import SignalStatsCallback from ..callbacks import factor_fwhm if not isinstance(detectors, (tuple, list)): detectors = [detectors] _md = dict(purpose="alignment") _md.update(md or {}) if signal_stats is None: signal_stats = utils.ipython_shell_namespace().get("signal_stats") if signal_stats is None: signal_stats = SignalStatsCallback() signal_stats.stop_report = True else: signal_stats.stop_report = False # Turn this automation off. # Allow for feature to be defined using a name from PeakStats. xref_PeakStats = { "com": "centroid", "cen": "x_at_max_y", "max": "x_at_max_y", "min": "x_at_min_y", } # translate from PeakStats feature to SignalStats feature = xref_PeakStats.get(feature, feature) def get_x_by_feature(): """Return the X value of the specified ``feature``.""" stats = principal_signal_stats() if strong_peak(stats) and not too_wide(stats): return getattr(stats, feature) def principal_signal_stats() -> str: """Return the name of the first detector Signal.""" return signal_stats._registers[signal_stats._y_names[0]] def strong_peak(stats) -> bool: """Determine if the peak is strong.""" try: value = (stats.max_y - stats.min_y) / stats.stddev_y return abs(value) > peak_factor except ZeroDivisionError: # not enough samples try: value = stats.max_y / stats.min_y return abs(value) > peak_factor except ZeroDivisionError: return False def too_wide(stats): """Does the measured peak width fill the full range of X?""" try: x_range = stats.max_x - stats.min_x fwhm = stats.sigma * factor_fwhm return fwhm > width_factor * x_range except ZeroDivisionError: # not enough samples return True @bpp.subs_decorator(signal_stats.receiver) def _inner(): """Run the scan, collecting statistics at each step.""" # TODO: save signal stats into separate stream yield from bp.rel_scan(detectors, mover, rel_start, rel_end, points, md=_md) while nscans > 0: # allow for repeated scans yield from _inner() # Run the scan. nscans -= 1 target = get_x_by_feature() if target is None: nscans = 0 # Nothing found, no point scanning again. else: yield from bps.mv(mover, target) # Move to the feature position. logger.info("Moved %s to %s: %f", mover.name, feature, target) if nscans > 0: # move the end points for the next scan rel_end = principal_signal_stats().sigma * factor_fwhm rel_start = -rel_end
[docs]class TuneAxis(object): """ tune an axis with a signal .. index:: Bluesky Device; TuneAxis This class provides a tuning object so that a Device or other entity may gain its own tuning process, keeping track of the particulars needed to tune this device again. For example, one could add a tuner to a motor stage:: motor = EpicsMotor("xxx:motor", "motor") motor.tuner = TuneAxis([det], motor) Then the ``motor`` could be tuned individually:: RE(motor.tuner.tune(md={"activity": "tuning"})) or the :meth:`tune()` could be part of a plan with other steps. Example:: tuner = TuneAxis([det], axis) live_table = LiveTable(["axis", "det"]) RE(tuner.multi_pass_tune(width=2, num=9), live_table) RE(tuner.tune(width=0.05, num=9), live_table) Also see the jupyter notebook tited **Demonstrate TuneAxis()** in the :ref:`examples` section. .. autosummary:: ~tune ~multi_pass_tune ~peak_detected SEE ALSO .. autosummary:: ~tune_axes """ _peak_choices_ = "cen com".split() def __init__(self, signals, axis, signal_name=None): self.signals = signals self.signal_name = signal_name or signals[0].name self.axis = axis self.tune_ok = False self.peaks = None self.peak_choice = self._peak_choices_[0] self.center = None self.stats = [] # defaults self.width = 1 self.num = 10 self.step_factor = 4 self.peak_factor = 4 self.pass_max = 4 self.snake = True
[docs] def tune(self, width=None, num=None, peak_factor=None, md=None): """ Bluesky plan to execute one pass through the current scan range .. index:: Bluesky Plan; TuneAxis.tune Scan self.axis centered about current position from ``-width/2`` to ``+width/2`` with ``num`` observations. If a peak was detected (default check is that max >= 4*min), then set ``self.tune_ok = True``. PARAMETERS width *float* : width of the tuning scan in the units of ``self.axis`` Default value in ``self.width`` (initially 1) num *int* : number of steps Default value in ``self.num`` (initially 10) md *dict* : (optional) metadata """ width = width or self.width num = num or self.num peak_factor = peak_factor or self.peak_factor if self.peak_choice not in self._peak_choices_: msg = "peak_choice must be one of {}, geave {}" msg = msg.format(self._peak_choices_, self.peak_choice) raise ValueError(msg) initial_position = self.axis.position # final_position = initial_position # unless tuned start = initial_position - width / 2 finish = initial_position + width / 2 self.tune_ok = False tune_md = dict( width=width, initial_position=self.axis.position, time_iso8601=str(datetime.datetime.now()), ) _md = { "tune_md": tune_md, "plan_name": self.__class__.__name__ + ".tune", "tune_parameters": dict( num=num, width=width, initial_position=self.axis.position, peak_choice=self.peak_choice, x_axis=self.axis.name, y_axis=self.signal_name, ), "motors": (self.axis.name,), "detectors": (self.signal_name,), "hints": dict(dimensions=[([self.axis.name], "primary")]), } _md.update(md or {}) if "pass_max" not in _md: self.stats = [] self.peaks = PeakStats(x=self.axis.name, y=self.signal_name) @bpp.subs_decorator(self.peaks) def _scan(md=None): yield from bps.open_run(md) position_list = np.linspace(start, finish, num) # fmt: off signal_list = list(self.signals) signal_list += [self.axis] # fmt: on for pos in position_list: yield from bps.mv(self.axis, pos) yield from bps.trigger_and_read(signal_list) final_position = initial_position if self.peak_detected(peak_factor=peak_factor): self.tune_ok = True if self.peak_choice == "cen": final_position = self.peaks.cen elif self.peak_choice == "com": final_position = self.peaks.com else: final_position = None self.center = final_position # add stream with results # yield from add_results_stream() stream_name = "PeakStats" results = TuneResults(name=stream_name) results.tune_ok.put(self.tune_ok) results.center.put(self.center) results.final_position.put(final_position) results.initial_position.put(initial_position) results.set_stats(self.peaks) self.stats.append(results) if results.tune_ok.get(): try: yield from write_stream(results, label=stream_name) except ValueError as ex: separator = " " * 8 + "-" * 12 print(separator) print(f"Error saving stream {stream_name}:\n{ex}") print(separator) yield from bps.mv(self.axis, final_position) yield from bps.close_run() results.report(stream_name) return (yield from _scan(md=_md))
[docs] def multi_pass_tune( self, width=None, step_factor=None, num=None, pass_max=None, peak_factor=None, snake=None, md=None, ): """ Bluesky plan for tuning this axis with this signal Execute multiple passes to refine the centroid determination. Each subsequent pass will reduce the width of scan by ``step_factor``. If ``snake=True`` then the scan direction will reverse with each subsequent pass. PARAMETERS width *float* : width of the tuning scan in the units of ``self.axis`` Default value in ``self.width`` (initially 1) num *int* : number of steps Default value in ``self.num`` (initially 10) step_factor *float* : This reduces the width of the next tuning scan by the given factor. Default value in ``self.step_factor`` (initially 4) pass_max *int* : Maximum number of passes to be executed (avoids runaway scans when a centroid is not found). Default value in ``self.pass_max`` (initially 4) peak_factor *float* : peak maximum must be greater than ``peak_factor*minimum`` (default: 4) snake *bool* : If ``True``, reverse scan direction on next pass. Default value in ``self.snake`` (initially True) md *dict* : (optional) metadata """ width = width or self.width num = num or self.num step_factor = step_factor or self.step_factor snake = snake or self.snake pass_max = pass_max or self.pass_max peak_factor = peak_factor or self.peak_factor self.stats = [] def _scan(width=1, step_factor=10, num=10, snake=True): for _pass_number in range(pass_max): logger.info("Multipass tune %d of %d", _pass_number + 1, pass_max) _md = { "pass": _pass_number + 1, "pass_max": pass_max, "plan_name": f"{self.__class__.__name__}.multi_pass_tune", } _md.update(md or {}) yield from self.tune(width=width, num=num, peak_factor=peak_factor, md=_md) if not self.tune_ok: return if width > 0: sign = 1 else: sign = -1 width = sign * 2 * self.stats[-1].fwhm.get() if snake: width *= -1 return (yield from _scan(width=width, step_factor=step_factor, num=num, snake=snake))
def multi_pass_tune_summary(self): t = pyRestTable.Table() t.labels = "pass Ok? center width max.X max.Y".split() for i, stat in enumerate(self.stats): row = [ i + 1, ] row.append(stat.tune_ok.get()) row.append(stat.cen.get()) row.append(stat.fwhm.get()) x, y = stat.max.get() row += [x, y] t.addRow(row) return t
[docs] def peak_detected(self, peak_factor=None): """ returns True if a peak was detected, otherwise False The default algorithm identifies a peak when the maximum value is four times the minimum value. Change this routine by subclassing :class:`TuneAxis` and override :meth:`peak_detected`. """ peak_factor = peak_factor or self.peak_factor if self.peaks is None: return False self.peaks.compute() if self.peaks.max is None: return False ymax = self.peaks.max[-1] ymin = self.peaks.min[-1] ok = ymax >= peak_factor * ymin # this works for USAXS@APS if not ok: logger.info("ymax < ymin * %f: is it a peak?", peak_factor) return ok
[docs]def tune_axes(axes): """ Bluesky plan to tune a list of axes in sequence. Expects each axis will have a ``tuner`` attribute which is an instance of :class:`~apstools.plans.alignment.TuneAxis()`. .. index:: Bluesky Plan; tune_axes EXAMPLE Sequentially, tune a list of preconfigured axes:: RE(tune_axes([mr, m2r, ar, a2r]) SEE ALSO .. autosummary:: ~TuneAxis """ for axis in axes: if "tuner" not in dir(axis): raise AttributeError(f"Did not find '{axis.name}.tuner' attribute.") yield from axis.tuner.tune()
[docs]class TuneResults(Device): """ Provides bps.read() as a Device .. index:: Bluesky Device; TuneResults """ tune_ok = Component(Signal) initial_position = Component(Signal) final_position = Component(Signal) center = Component(Signal) # - - - - - x = Component(Signal) y = Component(Signal) cen = Component(Signal) com = Component(Signal) fwhm = Component(Signal) min = Component(Signal) max = Component(Signal) crossings = Component(Signal) peakstats_attrs = "x y cen com fwhm min max crossings".split() def report(self, title=None): keys = self.peakstats_attrs + "tune_ok center initial_position final_position".split() t = pyRestTable.Table() t.addLabel("key") t.addLabel("result") for key in keys: v = getattr(self, key).get() t.addRow((key, str(v))) if title is not None: print(title) print(t) def set_stats(self, peaks): for key in self.peakstats_attrs: v = getattr(peaks, key) if v is None: # None cannot be mapped into json v = "null" # but "null" can replace it if key in ("crossings", "min", "max"): v = np.array(v) getattr(self, key).put(v)
# ----------------------------------------------------------------------------- # :author: Pete R. Jemian # :email: jemian@anl.gov # :copyright: (c) 2017-2024, UChicago Argonne, LLC # # Distributed under the terms of the Argonne National Laboratory Open Source License. # # The full license is in the file LICENSE.txt, distributed with this software. # -----------------------------------------------------------------------------