Source code for id3c.plans.flyscan_3idc

"""Fly scan for 3-ID-C: area detector *vs*. motor.

Fly scan an EPICS motor and collect Eiger2 (or any AreaDetector) images.

Implementation note: this plan is software-correlated (no hardware
gate signal). Frame-to-position pairing happens downstream from
the ``monitor_during_decorator`` streams, joined by IOC timestamp.
Waits inside the plan use ophyd ``Status`` objects (``MoveStatus``,
``SubscriptionStatus``, ``AndStatus``) driven by CA monitor
callbacks rather than busy-poll loops. See ``flyscan-3idc-status-
strategy.md`` (alongside this file) for the design history.

Usage from a command-line session::

    from bits2606.startup import *           # provides RE, oregistry, etc.
    from flyscan_3idc import flyscan, configure_adsimdet

    # The plan: drive a fly scan and record a Bluesky run.
    uid, = RE(
        flyscan(
            det_name: str = "eiger2",
            flymotor_name: str = "sample_stage.omega",
            p_start=5, p_end=10,
            exposures_per_egu=10,  # approximate
            t_period=0.05,
        )
    )

    # The standalone diagnostic: exercise the AD acquisition
    # protocol (no plan, no RunEngine) for triage.
    result = configure_adsimdet(adsimdet, capture_duration=3.0)

General outline
---------------
1. Preparation
   1. Validate input parameters
   2. Collect metadata
   3. Snapshot mutable state (``stage_sigs``, ``kind``,
      overridden signal values) for later restore
   4. concurrently (motor moves while parameters are set)
      1. send motor to *initial* position (``bps.abs_set(...,
         group="taxi")``)
      2. set *most* parameters for scan
      3. wait for motor to reach taxi position
         (``bps.wait(group="taxi")``)
   5. set motor velocity for scan
2. Kickoff
   1. stage devices
   2. open run
   3. subscribe to bespoke monitor streams (one stream per signal):
      1. HDF writer's frame count (``det.hdf1.array_counter``,
         carries EPICS timestamp for downstream sync with flymotor)
      2. camera's frame count (``det.cam.array_counter``, for
         cam-vs-HDF latency comparison)
      3. motor position (``flymotor.user_readback``)
   4. send motor to *final* position (``bps.abs_set(...,
      group="scan")``)
   5. start detector acquiring (without a hardware gate signal,
      coordinating acquire-start to a specific motor position is
      extremely difficult; instead, acquire **continuously** for
      the entire span ``p_initial <= flymotor.position <= p_final``
      and let downstream analysis select frames in
      ``[p_start, p_end]`` by timestamp/position).  This is an
      intentional oversample.
   6. build status objects for the monitor stage:
      - ``cam_stopped_status``: ``cam.acquire == 0``
      - ``drain_status``: HDF queue empty and idle
      - ``hdf_drain_status = AndStatus(cam_stopped_status,
        drain_status)`` — scan is done when both
      - ``watchdog_status``: ``num_captured > 0`` with
        ``timeout=no_frames_timeout``
3. Monitor (``monitor_loop``)
   1. CA monitor callback on ``det.hdf1.num_captured`` pushes
      ``(timestamp, value)`` onto a bounded queue (the *producer*)
   2. plan-side *consumer* wakes every ``_consumer_tick`` seconds,
      drains the queue, and emits one ``primary`` event per
      newly-captured frame (``create / read(det) / read(flymotor)
      / save``).  ``bps.read`` returns cached monitor values — no
      extra CA traffic
   3. stop detector acquiring when motor crosses *p_end*
   4. raise ``RuntimeError`` if the watchdog times out without any
      frame arriving (RE will then STOP all in-motion movables)
   5. exit when ``hdf_drain_status.done`` (cam has stopped AND
      every in-flight frame has been flushed)
   6. after exit, ``bps.wait(group="scan")`` absorbs any motor
      settling past *p_final*
4. Conclusion (``_cleanup``)
   1. stop motor (if still moving — checked via ``motor_is_moving``)
   2. stop cam acquire
   3. stop hdf1 capture
   4. wait for cam idle AND HDF queue drained
      (``wait_for_acquire_drained`` — uses ``AndStatus`` of
      ``SubscriptionStatus`` per signal)
   5. verify the HDF5 file landed (``full_file_name``)
   6. restore overridden signal values from ``CacheParameters``,
      restore mutated ``stage_sigs`` dicts, restore mutated
      ``kind`` values
   7. close run (handled by ``run_decorator``)
   8. unstage devices (handled by ``stage_decorator``)
"""

import inspect
import logging
import queue
import time
import warnings
from dataclasses import dataclass

from apsbits.core.instrument_init import oregistry
from bluesky import plan_stubs as bps
from bluesky import preprocessors as bpp
from bluesky.utils import plan as bluesky_plan
from epics import caget
from ophyd import ADBase
from ophyd import EpicsMotor
from ophyd import Kind
from ophyd.status import AndStatus
from ophyd.status import SubscriptionStatus
from ophyd.utils.errors import WaitTimeoutError

logger = logging.getLogger(__name__)
# Default the module's own logger to INFO so diagnostics show up in a
# fresh CLI session without the user having to configure logging first.
# We deliberately raise *only this module's* level (not the root logger
# or ophyd's loggers) so the pyepics/ophyd control layer stays quiet.
# If a parent logger has been configured (e.g. apsbits set up a console
# handler on the root logger), our records will propagate to it.  If
# nothing is configured at all, attach a minimal handler so messages
# still reach the terminal.
logger.setLevel(logging.INFO)
if not logger.handlers and not logging.getLogger().handlers:
    _h = logging.StreamHandler()
    _h.setFormatter(
        logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
    )
    logger.addHandler(_h)
    # Don't bubble up to a (possibly later-configured) root that might
    # double-print; the local handler is sufficient in this fallback.
    logger.propagate = False


# ---------------------------------------------------------------------------
# Module-level tunable timing constants.
#
# These are the wake-up ticks for plan-side loops that wait on a
# status-object flag (Phase 3 refactor).  CA monitor callbacks update
# the flags asynchronously on the pyepics dispatch thread; the plan
# wakes up at these intervals to check the flag and decide whether to
# proceed.
#
# Tradeoff in either direction: smaller tick = lower latency to
# noticing the flag flipped, higher RunEngine wake-up rate.
# Sub-50 ms ticks are imperceptible to human watchers of progress
# events; sub-1 ms ticks waste CPU without observable benefit.
#
# Adjust here rather than at call sites — keeps related knobs together
# and discoverable.

# Default wake-up tick for monitor_loop's consumer (also the default
# for the flyscan(_consumer_tick=...) plan kwarg).  20 ms = 50 Hz.
_CONSUMER_TICK_DEFAULT = 0.02

# Wake-up tick for wait_for_acquire_drained in the cleanup path.
# Cleanup latency does not matter for live progress, so coarser is
# fine; 50 ms = 20 Hz.
_CLEANUP_DRAIN_TICK = 0.05

# Bounded size for monitor_loop's per-frame producer/consumer queue.
# The producer (a CA monitor callback on hdf1.num_captured) pushes
# (timestamp, value) tuples; the consumer drains in monitor_loop.
# At typical scan rates (10 Hz frames, 20 ms consumer tick), the
# queue stays nearly empty.  This bound exists to detect a
# producer/consumer mismatch — overflow is logged once as a WARNING
# and the entry is dropped (per Phase 3 decision 3.3b).  Increase
# here if higher-rate scans are ever attempted.
_FRAME_QUEUE_SIZE = 64


# Public API of this module.  Other symbols (validators,
# snapshot/restore helpers, monitor_loop, the _wait_for helper,
# motor_is_moving, etc.) are implementation detail — accessible
# via direct import for diagnostics but not advertised as a
# stable interface.
__all__ = [
    "flyscan",  # the bluesky plan
    "configure_adsimdet",  # standalone diagnostic, no plan/RE
]


class CacheParameters(dict):
    """Remember original ophyd signal settings for later restoration.

    Usage::

        cache = CacheParameters()
        # motor1.velocity is 0.5
        yield from cache.override(motor1.velocity, 1.5)
        # motor1.velocity is now 1.5
        # ...
        yield from cache.restore()
        # motor1.velocity is back to 0.5
    """

    @bluesky_plan
    def override(self, signal, value):
        """Cache the current value of ``signal`` and set it to ``value``.

        This is a bluesky plan stub; use with ``yield from``.
        """
        # Only cache the first time we override a given signal so that
        # repeated overrides still restore back to the *original* value.
        if signal not in self:
            self[signal] = signal.get()
        yield from bps.mv(signal, value)

    @bluesky_plan
    def restore(self, clear=True):
        """Restore all cached signals to their original values.

        This is a bluesky plan stub; use with ``yield from``.

        Parameters
        ----------
        clear : bool
            If True (default), clear the cache after restoring.
        """
        if not self:
            return
        # Restore in reverse order of how they were added, so that
        # signals with ordering dependencies (e.g. set velocity *after*
        # restoring acceleration) are handled correctly.
        for signal, value in reversed(self.items()):
            yield from bps.mv(signal, value)
        if clear:
            self.clear()


def read_motor_field(motor, suffix, timeout=1.0):
    """Read an EPICS motor-record field (e.g. ``.VMAX``) ad-hoc.

    Returns the field value, or ``None`` if the PV does not connect
    within ``timeout`` seconds or any other read error occurs.

    Uses ``epics.caget`` directly rather than constructing a throwaway
    ``EpicsSignal``.  Rationale:

    - ``caget`` runs on the calling thread's CA context and does no
      asynchronous metadata fetching.  An earlier ``EpicsSignal``-based
      implementation triggered a SIGSEGV in pyepics' ``util3`` dispatch
      thread on ``gp:m1.VBAS``: the ``EpicsSignal`` was created, read,
      then ``destroy()``ed, but pyepics had queued a deferred metadata
      callback (``get_all_metadata_callback``) that fired *after* the
      underlying CA channel had been torn down — at first a recoverable
      ``RuntimeError: Expected CA context is unset`` (2026-06-04
      12:46:38 session), then a segfault (2026-06-04 13:10 session).
    - ``caget`` also sidesteps the oregistry-pollution concern (the
      previous implementation needed a ``_suspended_auto_register``
      context manager to keep the throwaway signal out of the global
      registry).
    """
    pv = motor.prefix + suffix
    try:
        value = caget(pv, timeout=timeout)
        if value is None:
            # caget returns None on connection timeout (no exception).
            logger.debug("read_motor_field(%s) timed out", pv)
            return None
        logger.debug("read_motor_field(%s) -> %r", pv, value)
        return value
    except Exception as exc:
        logger.debug("read_motor_field(%s) failed: %r", pv, exc)
        return None


def preflight_connectivity(det, det_name, flymotor, flymotor_name, timeout=2.0):
    """Quick CA-connectivity sanity check before staging.

    Touches a small, representative set of PVs on ``det`` and
    ``flymotor`` and raises ``RuntimeError`` with a clear, single-line
    message if any of them fail to connect within ``timeout`` seconds.

    The goal is to fail *before* device staging when an IOC is down or
    wedged.  Staging issues many serial writes (``set_and_wait`` with
    a default 5-second connection timeout each), so a dead IOC at
    stage time turns into 30+ seconds of cleanup-of-cleanup noise
    that obscures the actual cause.  Detecting it here gives the
    user a one-line error instead.

    Why these specific PVs:
        - ``flymotor.user_readback``: motor record exists at all
        - ``flymotor.motor_done_move``: motor record is responsive
        - ``det.cam.acquire``: cam IOC is up
        - ``det.hdf1.capture``: HDF plugin is up (if present)
        - ``det.hdf1.num_captured``: HDF plugin readback works

    Components are accessed via ``getattr``, so any not-yet-instantiated
    ophyd Components are instantiated here.  That is the *intended*
    behavior: instantiate them now, while we still have IOC contact,
    so later code (including cleanup) does not pay the
    ``wait_for_connection`` price.
    """
    to_check = [
        ("flymotor.user_readback", getattr(flymotor, "user_readback", None)),
        ("flymotor.motor_done_move", getattr(flymotor, "motor_done_move", None)),
        ("det.cam.acquire", getattr(getattr(det, "cam", None), "acquire", None)),
    ]
    # Optional components: only check if the class declares them.
    if _has_component(det, "hdf1"):
        hdf1 = det.hdf1
        if _has_component(hdf1, "capture"):
            to_check.append(("det.hdf1.capture", hdf1.capture))
        if _has_component(hdf1, "num_captured"):
            to_check.append(("det.hdf1.num_captured", hdf1.num_captured))

    failed = []
    for label, sig in to_check:
        if sig is None:
            failed.append(f"{label} missing")
            continue
        try:
            sig.wait_for_connection(timeout=timeout)
        except Exception as exc:
            failed.append(f"{label} ({sig.pvname}): {exc}")

    if failed:
        msg = (
            f"preflight_connectivity failed for det={det_name!r}"
            f" flymotor={flymotor_name!r} (IOC down?): " + "; ".join(failed)
        )
        logger.error(msg)
        raise RuntimeError(msg)
    logger.info(
        "preflight_connectivity OK: checked %d PV(s) on det=%r flymotor=%r",
        len(to_check),
        det_name,
        flymotor_name,
    )


def check_hdf_file_path(det, settle_timeout=1.0):
    """Verify the HDF plugin can see the configured file_path.

    Returns silently on success.  Raises ``RuntimeError`` if the IOC
    reports ``FilePathExists_RBV == 0`` after ``settle_timeout``
    seconds.

    Why this is a separate gate (not just a stage_sig):

    The HDF plugin's ``file_path`` must be set, *and* the path must
    actually exist on the IOC's filesystem (i.e. inside the IOC's
    container if it's containerized), *before* ``capture`` is set to
    1 at stage time.  If the path doesn't exist when capture starts,
    the plugin fails at file-open with a generic
    ``Error writing file: status=3`` message that surfaces only via
    the plugin's ``WriteMessage`` PV — bluesky/ophyd never raises,
    and ``num_captured`` stays at 0 forever.

    This check must run *after* ``file_path`` has been written to the
    IOC (otherwise we'd be checking a stale value).  ``FilePathExists``
    is updated by the plugin in response to a ``file_path`` write, but
    not instantly — the wait below absorbs that settling delay via the
    ``_wait_for`` helper (precheck + CA monitor subscription).
    """
    if not _has_component(det, "hdf1") or not _has_component(
        det.hdf1, "file_path_exists"
    ):
        logger.debug(
            "check_hdf_file_path: %s lacks hdf1.file_path_exists; skipping",
            det.name,
        )
        return

    try:
        _wait_for(
            det.hdf1.file_path_exists,
            lambda value: value == 1,
            timeout=settle_timeout,
        )
    except WaitTimeoutError as err:
        exists = det.hdf1.file_path_exists.get(use_monitor=False)
        current_path = det.hdf1.file_path.get(use_monitor=False)
        msg = (
            f"HDF plugin reports file_path does not exist on the IOC's"
            f" filesystem: {det.name}.hdf1.file_path={current_path!r}"
            f" (file_path_exists={exists}). The path must exist (and be"
            f" writable) on the IOC's filesystem, which may be a container"
            f" view distinct from the host filesystem. Create the directory"
            f" inside the IOC, or correct ad_file_path."
        )
        logger.error(msg)
        raise RuntimeError(msg) from err

    current_path = det.hdf1.file_path.get(use_monitor=False)
    logger.info(
        "check_hdf_file_path: OK (%s.hdf1.file_path=%r)",
        det.name,
        current_path,
    )


[docs] def configure_adsimdet( det, *, ad_file_path="/tmp/flyscan/", ad_file_name="flyscan", ad_file_template="%s%s_%6.6d.h5", ad_file_number=1, acquire_time=0.02, acquire_period=0.1, capture_duration=2.0, num_capture=None, capture_arm_timeout=5.0, drain_timeout=10.0, do_capture=True, do_acquire=True, ): """Configure & exercise an AD HDF5 detector without a plan. Diagnostic helper. No RunEngine, no plan, no stage_decorator — just straight ophyd ``put()`` calls in the order the IOC needs them. Simulates the flyscan acquisition protocol: 1. Configure file destination & cam timings. 2. Cam in ``Continuous`` image_mode. 3. ``num_capture = UNLIMITED_FRAMES`` (capture until told to stop). 4. Arm capture (``hdf1.capture.put(1)``). 5. **Wait** for ``Capture_RBV == 'Capturing'`` — this avoids a race in which the cam starts producing frames before the HDF plugin is ready to receive them. Without this wait, the leading frames of a scan are silently dropped (not counted in ``dropped_arrays`` because the plugin isn't even listening yet). 6. Start cam acquire. 7. Sleep ``capture_duration`` seconds (simulates the motor trajectory window in a real flyscan). 8. Stop capture (``hdf1.capture.put(0)``). 9. Drain: wait until ``num_queued_arrays == 0`` so all in-flight frames flush to disk before the file is closed. 10. Stop cam acquire. 11. Snapshot relevant PVs and return. Returns a dict of the post-operation PV snapshot. Usage:: from flyscan_3idc import configure_adsimdet result = configure_adsimdet(adsimdet, capture_duration=3.0) for k, v in result.items(): print(f" {k}: {v}") Parameters ---------- capture_duration : float Seconds to leave both capture and acquire active. Total file write count is approximately ``capture_duration / acquire_period``. capture_arm_timeout : float Maximum seconds to wait for ``Capture_RBV`` to transition to ``'Capturing'`` after arming. Raises ``RuntimeError`` on timeout. drain_timeout : float Maximum seconds to wait for ``num_queued_arrays`` to reach 0 after stopping capture. Logs a warning on timeout but does not raise. do_capture, do_acquire : bool Skip arming capture or starting acquire, respectively. Useful for narrowing down which step misbehaves. """ logger.info( "configure_adsimdet(%s): file_path=%r file_name=%r template=%r" " number=%d acquire_time=%g acquire_period=%g" " capture_duration=%g do_capture=%s do_acquire=%s", det.name, ad_file_path, ad_file_name, ad_file_template, ad_file_number, acquire_time, acquire_period, capture_duration, do_capture, do_acquire, ) UNLIMITED_FRAMES = 1_000_000_000 # 1. File destination (must happen before capture is armed) logger.info("configure_adsimdet: setting file_path=%r", ad_file_path) det.hdf1.file_path.put(ad_file_path) logger.info("configure_adsimdet: setting file_name=%r", ad_file_name) det.hdf1.file_name.put(ad_file_name) logger.info("configure_adsimdet: setting file_template=%r", ad_file_template) det.hdf1.file_template.put(ad_file_template) logger.info("configure_adsimdet: setting file_number=%d", ad_file_number) det.hdf1.file_number.put(ad_file_number) # 2. File save behavior — required so capture actually writes the file logger.info("configure_adsimdet: setting auto_save=Yes") det.hdf1.auto_save.put("Yes") logger.info("configure_adsimdet: setting auto_increment=Yes") det.hdf1.auto_increment.put("Yes") # file_write_mode='Capture' matches the arm-capture / continuous-cam / # stop-on-timer / drain-then-stop protocol below. In 'Capture' mode # the plugin queues frames while Capture=1 and writes them to a # single HDF5 file when Capture goes to 0. 'Stream' mode has subtler # file-lifecycle semantics that have been associated with # num_captured=0 even when cam frames are flowing. logger.info("configure_adsimdet: setting file_write_mode='Capture'") det.hdf1.file_write_mode.put("Capture") # 3. Permit creating the destination directory if missing if _has_component(det.hdf1, "create_directory"): logger.info("configure_adsimdet: setting create_directory=-5") det.hdf1.create_directory.put(-5) # 4. Verify the IOC now sees the path time.sleep(0.1) # let the IOC settle exists = det.hdf1.file_path_exists.get(use_monitor=False) if exists != 1: msg = ( f"configure_adsimdet: HDF plugin reports file_path does not" f" exist after setting it to {ad_file_path!r}" f" (file_path_exists={exists})." ) logger.error(msg) raise RuntimeError(msg) logger.info("configure_adsimdet: file_path_exists=1") # 5. Capture sizing. # # In 'Capture' mode, num_capture is an *upper bound*: the plugin # stops capturing when this count is reached, but the on-disk # dataset is sized to whatever number of frames actually got # captured (verified by experiment: num_capture=5 with only 3 # frames arriving produced a (3, H, W) dataset, not (5, H, W)). # # CAUTION: num_capture cannot be made arbitrarily large. The IOC's # NDFileHDF5 plugin computes byte counts as C int arithmetic during # dataset pre-allocation; values around 1e9 with Float64 1024x1024 # frames overflow, producing a file whose num_captured counter # advances but whose /entry/data/data dataset is never written # (verified empirically). num_capture <= ~1e6 is known safe for # typical frame sizes. # # We choose num_capture sized for the expected count times 1.5 # plus 20, which absorbs taxi-in/-out leading edges, post-stop # tail frames, and timing jitter for any sensible scan size. expected_frames = int(capture_duration / acquire_period) if num_capture is None: num_capture = int(expected_frames * 1.5) + 20 logger.info( "configure_adsimdet: setting num_capture=%d (upper bound;" " expected ~%d frames from capture_duration=%g / period=%g)", num_capture, expected_frames, capture_duration, acquire_period, ) det.hdf1.num_capture.put(num_capture) # 6. Cam configuration: Continuous mode, run until told to stop. logger.info("configure_adsimdet: setting cam.acquire_time=%g", acquire_time) det.cam.acquire_time.put(acquire_time) logger.info("configure_adsimdet: setting cam.acquire_period=%g", acquire_period) det.cam.acquire_period.put(acquire_period) logger.info("configure_adsimdet: setting cam.image_mode='Continuous'") det.cam.image_mode.put("Continuous") # cam.num_images is irrelevant in Continuous mode but some IOC builds # enforce a sanity limit, so a large number is safer than 0. This # is *not* the count that sizes the HDF dataset — that's num_capture. logger.info("configure_adsimdet: setting cam.num_images=%d", UNLIMITED_FRAMES) det.cam.num_images.put(UNLIMITED_FRAMES) # 7. Arm the HDF plugin and *wait* for it to reach the 'Capturing' # state. This was a race in earlier tests: cam frames arrived # before capture was ready, and were silently dropped. # # ``capture`` is an EpicsSignalWithRBV; ``set(1)`` runs the generic # Signal.set() path which puts and then polls Capture_RBV until it # equals the setpoint (enum 1 == "Capturing"). That's exactly the # condition we want, so we wait on the returned Status object # instead of running our own poll loop. if do_capture: logger.info( "configure_adsimdet: arming capture (det.hdf1.capture.set(1))", ) try: det.hdf1.capture.set(1).wait(timeout=capture_arm_timeout) except (WaitTimeoutError, TimeoutError) as exc: state = det.hdf1.capture.get(use_monitor=False, as_string=True) msg = ( f"configure_adsimdet: HDF plugin did not reach 'Capturing'" f" state within {capture_arm_timeout:g}s after arming" f" (last state={state!r}, write_status=" f"{det.hdf1.write_status.get(use_monitor=False, as_string=True)!r}," f" write_message=" f"{det.hdf1.write_message.get(use_monitor=False, as_string=True)!r})." f" Underlying error: {exc!r}" ) logger.error(msg) raise RuntimeError(msg) from exc logger.info("configure_adsimdet: capture is 'Capturing' (armed)") # 8. Start the cam in continuous mode. if do_acquire: logger.info( "configure_adsimdet: starting acquisition (cam.acquire.put(1))", ) det.cam.acquire.put(1) # 9. Hold for the requested duration (simulates motor trajectory). logger.info( "configure_adsimdet: capturing for %g s ...", capture_duration, ) time.sleep(capture_duration) captured_during = det.hdf1.num_captured.get(use_monitor=False) queued = det.hdf1.num_queued_arrays.get(use_monitor=False) logger.info( "configure_adsimdet: after %g s: num_captured=%d, queued=%s", capture_duration, captured_during, queued, ) # 10. Stop capture. In Capture mode this halts further frame # accumulation but does NOT reliably flush the file to disk # when stopped early (i.e. before num_capture is reached), # even with auto_save=Yes. We must explicitly press WriteFile # below to force the flush. if do_capture: logger.info("configure_adsimdet: stopping capture (put 0)") det.hdf1.capture.put(0) # 11. Drain: wait until the plugin has flushed any queued frames # from its in-memory queue into the file buffer. # # num_queued_arrays is an RBV-only PV (no .set() to lean on); # use the _wait_for helper. Timeout is a warning, not a # raise, matching the original behavior. int() cast in the # predicate guards against pyepics returning the value as a # string for certain record types. try: _wait_for( det.hdf1.num_queued_arrays, lambda value: int(value) == 0, timeout=drain_timeout, ) logger.info( "configure_adsimdet: HDF queue drained" " (num_queued_arrays=0)", ) except (WaitTimeoutError, TimeoutError): logger.warning( "configure_adsimdet: HDF queue did not drain within" " %g s (num_queued_arrays=%s)", drain_timeout, det.hdf1.num_queued_arrays.get(use_monitor=False), ) # 12. Explicitly flush the file to disk. WriteFile=1 forces # the plugin to write out whatever it has captured. This # is required when stopping capture before num_capture is # reached: empirically, neither Capture=0 alone nor # auto_save=Yes is sufficient in that case (the file ends # up with the NeXus skeleton but no image dataset). captured = det.hdf1.num_captured.get(use_monitor=False) if captured > 0: logger.info( "configure_adsimdet: flushing file via write_file=1" " (num_captured=%d)", captured, ) # Note: we use put() + _wait_for here rather than # write_file.set(1).wait(). Both write_file and its # _RBV are EpicsSignalWithRBV; .set(1) would call # _set_and_wait which completes when WriteFile_RBV # equals the setpoint (1, "Writing") — i.e. at the # *start* of the write, not its completion. The # operation we need to wait for is the back-to-idle # transition (RBV returns to 0, "Done"). So we # put() the trigger and _wait_for the idle state. det.hdf1.write_file.put(1) try: _wait_for( det.hdf1.write_file, lambda value: int(value) == 0, timeout=drain_timeout, ) logger.info( "configure_adsimdet: write_file completed" " (full_file_name=%r)", det.hdf1.full_file_name.get( use_monitor=False, as_string=True, ), ) except (WaitTimeoutError, TimeoutError): logger.warning( "configure_adsimdet: write_file did not complete" " within %g s (write_status=%r write_message=%r)", drain_timeout, det.hdf1.write_status.get(use_monitor=False, as_string=True), det.hdf1.write_message.get(use_monitor=False, as_string=True), ) else: logger.info( "configure_adsimdet: skipping write_file (no frames" " captured)", ) # 13. Stop the cam. if do_acquire: logger.info("configure_adsimdet: stopping acquire (put 0)") det.cam.acquire.put(0) # 13. Report. result = { "cam.acquire": det.cam.acquire.get(use_monitor=False, as_string=True), "cam.array_counter": det.cam.array_counter.get(use_monitor=False), "cam.detector_state": det.cam.detector_state.get( use_monitor=False, as_string=True ) if _has_component(det.cam, "detector_state") else None, "hdf1.capture": det.hdf1.capture.get(use_monitor=False, as_string=True), "hdf1.num_captured": det.hdf1.num_captured.get(use_monitor=False), "hdf1.num_queued_arrays": det.hdf1.num_queued_arrays.get(use_monitor=False), "hdf1.dropped_arrays": det.hdf1.dropped_arrays.get(use_monitor=False), "hdf1.write_status": det.hdf1.write_status.get( use_monitor=False, as_string=True ), "hdf1.write_message": det.hdf1.write_message.get( use_monitor=False, as_string=True ), "hdf1.full_file_name": det.hdf1.full_file_name.get( use_monitor=False, as_string=True ), "hdf1.file_path_exists": det.hdf1.file_path_exists.get(use_monitor=False), } logger.info("configure_adsimdet: result=%s", result) return result
# Fallback acceleration (seconds) used when ``.ACCL`` cannot be read. # Deliberately generous; over-allocating the taxi region only costs a # bit of extra travel before the first useful frame. _ACCL_FALLBACK_SECONDS = 0.25 class FlyscanDataLossWarning(UserWarning): """Emitted when a flyscan run loses frames at the HDF plugin input. A non-zero delta in ``hdf1.dropped_arrays`` over the run means the cam produced frames the HDF plugin couldn't accept. This is a data-integrity concern: the on-disk HDF5 file is missing frames the cam exposed. Surfaced as a ``UserWarning`` subclass so it appears in IPython/Jupyter's warning channel in addition to the log file, and is filterable independently of other warnings by user code (``warnings.filterwarnings('error', category=FlyscanDataLossWarning)`` to turn it into an exception, for example). """ @dataclass(frozen=True) class FlyscanGeometry: """Derived geometry for a flyscan: taxi/coast distances + frame count. All distances are in motor engineering units; all times are in seconds. ``accl_was_default`` is True when ``.ACCL`` could not be read and the fallback was used. """ num_frames: int scan_duration: float # num_frames * t_period scan_velocity: float # (p_end - p_start) / scan_duration d_taxi: float # 0.5 * scan_velocity * motor_accl p_initial: float # p_start - d_taxi - taxi_allowance p_final: float # p_end + d_taxi + taxi_allowance motor_accl: float # seconds (.ACCL or fallback) motor_egu: str # .EGU string for error messages/metadata accl_was_default: bool def compute_flyscan_geometry( flymotor, p_start, p_end, exposures_per_egu, t_period, taxi_allowance, ): """Derive flyscan geometry from user-meaningful kwargs. Pure function: no CA writes, no bluesky plan messages. Reads ``.ACCL`` and ``.EGU`` from the motor record via the same caget path as ``validate_flyscan_inputs``; ``.ACCL`` falls back to ``_ACCL_FALLBACK_SECONDS`` (with a warning log) if the read fails so the caller still gets a usable geometry object — the velocity-bound check in ``validate_flyscan_inputs`` will then catch any downstream impossibility. Frame count uses fence-post counting: a frame at both endpoints of ``[p_start, p_end]`` plus ``exposures_per_egu`` frames per unit of motor travel between them:: num_frames = round(1 + (p_end - p_start) * exposures_per_egu) Taxi distance uses the over-allocating form ``d_taxi = 0.5 * scan_velocity * motor_accl`` (the motor record's ``.ACCL`` is "seconds to reach .VELO", so this slightly overestimates the time to reach ``scan_velocity`` — intentional; gives the cam a beat to settle before the first useful frame). """ p_fly_dist = p_end - p_start if p_fly_dist <= 0: raise ValueError(f"p_end={p_end:g} must be greater than p_start={p_start:g}.") if exposures_per_egu <= 0: raise ValueError(f"exposures_per_egu={exposures_per_egu:g} must be positive.") if taxi_allowance < 0: raise ValueError(f"taxi_allowance={taxi_allowance:g} must be non-negative.") if t_period <= 0: raise ValueError(f"t_period={t_period:g} must be positive.") num_frames = int(round(1 + p_fly_dist * exposures_per_egu)) if num_frames < 2: raise ValueError( f"computed num_frames={num_frames} (from exposures_per_egu=" f"{exposures_per_egu:g}, p_fly_dist={p_fly_dist:g}) must be" " at least 2." ) scan_duration = num_frames * t_period scan_velocity = p_fly_dist / scan_duration accl = read_motor_field(flymotor, ".ACCL") accl_was_default = False if accl is None or accl <= 0: logger.warning( "compute_flyscan_geometry: motor .ACCL unreadable (got %r);" " using fallback %g s. Taxi region may be under-allocated.", accl, _ACCL_FALLBACK_SECONDS, ) accl = _ACCL_FALLBACK_SECONDS accl_was_default = True egu = read_motor_field(flymotor, ".EGU") if not isinstance(egu, str): egu = "" d_taxi = 0.5 * scan_velocity * accl p_initial = p_start - d_taxi - taxi_allowance p_final = p_end + d_taxi + taxi_allowance logger.info( "compute_flyscan_geometry: num_frames=%d scan_velocity=%g %s/s" " d_taxi=%g %s (ACCL=%g s%s, allowance=%g %s)" " => p_initial=%g p_final=%g", num_frames, scan_velocity, egu or "?", d_taxi, egu or "?", accl, " [fallback]" if accl_was_default else "", taxi_allowance, egu or "?", p_initial, p_final, ) return FlyscanGeometry( num_frames=num_frames, scan_duration=scan_duration, scan_velocity=scan_velocity, d_taxi=d_taxi, p_initial=p_initial, p_final=p_final, motor_accl=accl, motor_egu=egu, accl_was_default=accl_was_default, ) def validate_flyscan_inputs( det, det_name, flymotor, flymotor_name, geometry, t_acquire, t_period, compression, ): """Validate flyscan arguments against derived geometry and IOC state. ``geometry`` is the ``FlyscanGeometry`` returned by ``compute_flyscan_geometry``. This function does the checks that require either a connected device or values that must be cross-checked against the motor's velocity limits / the HDF plugin's compression enum. Returns ``(v_max, v_base)`` for inclusion in run metadata. Raises ``KeyError`` for missing/wrong-type devices and ``ValueError`` for out-of-range numeric arguments or an unsupported compression. """ if not isinstance(det, ADBase): raise KeyError(f"Area Detector {det_name!r} not found in registry.") if not isinstance(flymotor, EpicsMotor): raise KeyError(f"Motor {flymotor_name!r} not found in registry.") if not 0 < t_acquire <= t_period: raise ValueError( "Acquisition time must be positive and less than or equal to the period." ) # Sanity-check the derivation: this should always hold after a # successful compute_flyscan_geometry, but assert it so any future # refactor that changes the formula doesn't silently produce an # invalid geometry. if not ( geometry.p_initial < (geometry.p_initial + geometry.d_taxi) # p_start < (geometry.p_final - geometry.d_taxi) # p_end < geometry.p_final ): raise ValueError( "Derived geometry violates p_initial < p_start < p_end <" f" p_final: {geometry!r}" ) if geometry.scan_velocity <= 0: raise ValueError(f"scan_velocity={geometry.scan_velocity:g} must be positive.") # EpicsMotor does not expose .VMAX or .VBAS as components. # Read ad-hoc; None or 0 means "no limit on that side." v_max = read_motor_field(flymotor, ".VMAX") v_base = read_motor_field(flymotor, ".VBAS") if v_max is not None and v_max > 0 and geometry.scan_velocity > v_max: raise ValueError( f"scan_velocity={geometry.scan_velocity:g} exceeds motor max" f" velocity .VMAX={v_max:g} for {flymotor_name!r}." ) if v_base is not None and v_base > 0 and geometry.scan_velocity < v_base: raise ValueError( f"scan_velocity={geometry.scan_velocity:g} is below motor" f" base velocity .VBAS={v_base:g} for {flymotor_name!r}." ) # Compression validation against the HDF plugin's enum_strs. The # PV is an mbbi/mbbo enumeration; rejecting bad values here gives # the user a useful message ("got 'gzip', expected one of [...]") # instead of a much-later IOC-side "invalid value" at write time. # Defensive: skip if enum_strs is not populated (offline IOC, # mock, etc.) — falls through to IOC rejection at write time. enum_strs = () try: comp_sig = det.hdf1.compression enum_strs = tuple(getattr(comp_sig, "enum_strs", ()) or ()) except Exception as exc: logger.debug( "validate_flyscan_inputs: cannot read compression enum_strs:" " %r", exc, ) if enum_strs and compression not in enum_strs: raise ValueError( f"compression={compression!r} not in HDF plugin's allowed" f" set {list(enum_strs)!r}." ) logger.info( "validated inputs: det=%r flymotor=%r p=%g/%g/%g/%g num_frames=%d" " t_acquire=%g t_period=%g scan_active_duration=%g" " scan_velocity=%g (VMAX=%r VBAS=%r) compression=%r", det_name, flymotor_name, geometry.p_initial, geometry.p_initial + geometry.d_taxi, # p_start geometry.p_final - geometry.d_taxi, # p_end geometry.p_final, geometry.num_frames, t_acquire, t_period, geometry.scan_duration, geometry.scan_velocity, v_max, v_base, compression, ) return v_max, v_base def build_flyscan_md( *, plan_name, det_name, flymotor_name, p_start, p_end, exposures_per_egu, t_acquire, t_period, taxi_allowance, compression, geometry, v_max, v_base, ad_file_name, ad_file_path, hdf_num_capture, hdf_flush_timeout_max, consumer_tick, ): """Assemble the metadata dict recorded with the run. Any plan kwarg that affects scan behavior is recorded here so the run document preserves the value used. Derived values from ``geometry`` (``p_initial``, ``p_final``, ``num_frames``, ``scan_velocity``, ...) are also recorded so downstream readers have both the user-supplied inputs and the resulting plan. Internal underscore-prefixed kwargs (e.g. ``_consumer_tick``) appear without the underscore in the metadata for readability. """ return { "plan_name": plan_name, "det_name": det_name, "flymotor_name": flymotor_name, # User-supplied scan parameters "p_start": p_start, "p_end": p_end, "exposures_per_egu": exposures_per_egu, "t_acquire": t_acquire, "t_period": t_period, "taxi_allowance": taxi_allowance, "compression": compression, # Derived geometry "p_initial": geometry.p_initial, "p_final": geometry.p_final, "num_frames": geometry.num_frames, "scan_active_duration": geometry.scan_duration, "scan_velocity": geometry.scan_velocity, "d_taxi": geometry.d_taxi, "motor_accl": geometry.motor_accl, "motor_accl_was_default": geometry.accl_was_default, "motor_egu": geometry.motor_egu, "motor_velocity_max": v_max, # .VMAX; None if not readable "motor_velocity_base": v_base, # .VBAS; None if not readable # File destination "ad_file_name": ad_file_name, "ad_file_path": ad_file_path, "hdf_num_capture": hdf_num_capture, # HDF plugin upper bound "hdf_flush_timeout_max": hdf_flush_timeout_max, # worst-case (s) "consumer_tick": consumer_tick, # monitor_loop wake-up tick (s) } def snapshot_stage_sigs(*devices): """Shallow-copy each device's ``stage_sigs`` for later restore. Returns a list of (device, dict-copy) pairs. Use with ``restore_stage_sigs``. """ return [(dev, dict(dev.stage_sigs)) for dev in devices] def restore_stage_sigs(snapshot): """Restore device ``stage_sigs`` dicts from a ``snapshot_stage_sigs``. Clear-then-update is intentional: mutate the *same* dict object the device already holds (don't reassign the attribute). """ for dev, saved in snapshot: dev.stage_sigs.clear() dev.stage_sigs.update(saved) def snapshot_kinds(*signals): """Capture each signal's ``.kind`` for later restore. Returns a list of ``(signal, original_kind)`` pairs. Use with ``restore_kinds``. Intended for plan-local mutations of ``kind`` (e.g. to make ``cam.array_counter`` ``Kind.hinted`` for the duration of a fly scan so it appears in primary-stream events, without imposing that choice on every other plan that touches the device). """ return [(sig, sig.kind) for sig in signals] def restore_kinds(snapshot): """Restore each signal's ``.kind`` from a ``snapshot_kinds`` result.""" for sig, original_kind in snapshot: sig.kind = original_kind def motor_is_moving(motor): """True iff the motor is currently moving, checked via DMOV (no cache). Reads ``motor_done_move`` (the motor record's ``.DMOV`` field) with ``use_monitor=False`` to bypass the pyepics monitor cache; this matches the rest of this module's "bypass cache for timing-critical reads" discipline (see strategy-doc Phase 0.1 for background). Prefer this function over ``motor.moving`` (the EpicsMotor property), which depending on ophyd version may read ``MOVN`` rather than ``DMOV``. ``MOVN`` transitions briefly to "not moving" between the primary move and the backlash-correction move when BDST != 0; ``DMOV`` only goes to 1 when both phases are complete. Used by ``_cleanup`` to decide whether to issue ``bps.stop(flymotor)`` (skipped if the motor is already idle, per Phase 2 decision 2.5 / Phase 3 decision 3.12). """ return motor.motor_done_move.get(use_monitor=False, as_string=False) != 1 def _wait_for(signal, predicate, timeout, *, settle_time=0.0): """Wait for ``signal`` to satisfy ``predicate``, using CA monitors. Two-step pattern: 1. **Precheck** the signal's current value via ``signal.get(use_monitor=False)``. If ``predicate(value)`` is already true, return immediately without subscribing. 2. Otherwise create an ``ophyd.status.SubscriptionStatus(..., run=True)`` whose callback fires whenever a CA monitor update arrives, and ``.wait(timeout=timeout)``. The precheck defends against a known IOC failure mode: CA monitors are posted **on change**. If the PV's current value already satisfies the predicate, an IOC may have no reason to post the "next" monitor update the status is waiting for, and the wait will hang to timeout (surfacing as ``ophyd.utils.errors.WaitTimeoutError``, or ``FailedStatus`` if the status was used inside a bluesky plan). The precheck short-circuits this case. ``SubscriptionStatus(run=True)`` (the ophyd default) also evaluates the predicate once at subscribe time against the value most recently seen by the monitor stream, which is a *second* layer of defense in case a monitor update arrived between the precheck and the subscription. Parameters ---------- signal : ophyd.Signal The signal to watch. Must support both ``.get(use_monitor=False)`` (used by the precheck) and subscription via ``SubscriptionStatus`` (i.e. a CA-backed signal). predicate : callable ``predicate(value) -> bool``. Returns true when the wait should end. Called with positional ``value`` only; the ``SubscriptionStatus`` callback wraps it to accept the ``value=..., **kwargs`` shape ophyd uses. timeout : float Seconds to wait for the predicate to become true. Raises ``ophyd.utils.errors.WaitTimeoutError`` on timeout. settle_time : float, optional Forwarded to ``SubscriptionStatus``. If non-zero, ophyd requires the predicate to remain true for this many seconds before completing the status. Default 0. Returns ------- The current value of ``signal`` (read via ``use_monitor=False``) once the predicate is satisfied. Raises ------ ophyd.utils.errors.WaitTimeoutError If the predicate does not become true within ``timeout``. Notes ----- This is a synchronous helper for use **outside** bluesky plans (diagnostic functions, preflight checks). Inside a plan, prefer ``signal.set(value).wait(...)`` where applicable, or build a plan-stub wrapper that yields control while waiting. """ current = signal.get(use_monitor=False) if predicate(current): logger.debug( "_wait_for(%s): precheck satisfied (value=%r); no subscription", signal.name, current, ) return current logger.debug( "_wait_for(%s): precheck not satisfied (value=%r); subscribing", signal.name, current, ) status = SubscriptionStatus( signal, lambda *, value, **_: predicate(value), run=True, settle_time=settle_time, ) status.wait(timeout=timeout) return signal.get(use_monitor=False) def _safe_get(device, name, **get_kwargs): """Best-effort ``device.<name>.get(**get_kwargs)`` for diagnostics. Returns ``None`` if the component does not exist on the class or if the read raises any exception. Used to harvest extra context for error messages (e.g. ``write_status`` / ``write_message`` from a sick HDF plugin) without letting the diagnostic itself raise and mask the real error. """ if not _has_component(device, name): return None try: sig = getattr(device, name) return sig.get(**get_kwargs) except Exception as exc: logger.debug("_safe_get(%s.%s) failed: %r", device.name, name, exc) return None def _has_component(device, name): """True if ``name`` is declared as an ophyd component on ``device``'s class. Uses class-level introspection only — does **not** trigger lazy instantiation or any CA traffic. Safe to call against detectors whose IOC is unresponsive (e.g. inside cleanup paths after a failure). Compare with ``hasattr(device, name)``, which calls ``getattr`` and therefore *will* instantiate a not-yet-touched ophyd component, blocking for ``wait_for_connection`` (default 5s) when the IOC is down. """ if device is None: return False return name in type(device).component_names def _hdf_flush_timeout( det, n_captured, floor=10.0, headroom=3.0, assumed_rate_mb_s=50.0, assumed_bytes_per_pixel=8, ): """Estimate a generous timeout for HDF5 ``write_file`` to complete. Computes the expected file size from frame dimensions and pixel depth, divides by a conservative write rate, multiplies by ``headroom``, and floors at ``floor`` seconds. Parameters ---------- det : ophyd Device The area detector; ``det.hdf1.width`` and ``det.hdf1.height`` are read to estimate frame size. n_captured : int Number of frames the IOC will be writing. floor : float Minimum timeout, regardless of computed value. Protects very small captures from a near-zero timeout. headroom : float Multiplier applied to the estimated wall time. 3.0 gives a comfortable margin without being silly. assumed_rate_mb_s : float Conservative write rate (MB/s). The IOC typically achieves much more; 50 leaves a 4x margin on our measured ~200 MB/s. assumed_bytes_per_pixel : int Worst case for the cam's data type (Float64 = 8). Smaller cam types (Int8, UInt16) will write faster but the headroom absorbs the over-estimate. Returns ------- float Timeout in seconds. Always >= ``floor``. """ try: width = int(det.hdf1.width.get(use_monitor=False)) height = int(det.hdf1.height.get(use_monitor=False)) except Exception: return max(floor, floor * headroom) mb = width * height * assumed_bytes_per_pixel * int(n_captured) / (1024 * 1024) estimated = mb / assumed_rate_mb_s return max(floor, estimated * headroom) def wait_for_acquire_drained(det, poll=0.001, timeout=10.0): """Plan stub: wait for cam to report idle *and* HDF queue to drain. With ``cam.wait_for_plugins='Yes'``, ``cam.acquire_busy`` goes 0 only after every enabled plugin has finished processing the last frame. Add the HDF-queue check as belt-and-suspenders for cases where the cam class does not expose ``acquire_busy``. Implementation: builds an ``AndStatus`` of one ``SubscriptionStatus`` per available signal (cam ``acquire_busy``, HDF ``num_queued_arrays`` with race-window-safe corroboration on ``queue_free``/ ``queue_size``) and yields ``bps.sleep`` ticks until ``status.done`` is true or ``timeout`` elapses. Each sub-status is driven by its own CA monitor stream (one subscription per signal), so neither signal can mask the other through a missed monitor edge. Returns after ``timeout`` seconds even if drained signals do not settle, so this is safe to call from cleanup paths. Short-circuit: if the cam is not acquiring *and* the HDF plugin has not captured any frames, there is nothing to drain. This spares cleanup paths a needless ``timeout``-second wait when the plan failed before acquisition started (e.g. an ``AttributeError`` in the preparation phase). This precheck uses ``use_monitor=False`` to avoid stale-cache reads (notably ``num_captured`` is cumulative across runs unless explicitly reset). Capability discovery uses class-level ``component_names`` introspection rather than ``hasattr``, so we don't pay a 5-second CA timeout to answer "does this device have ``queue_free``?" when the IOC is dead during a crash-cleanup. The ``poll`` parameter is the *plan wake-up tick* (how often the plan checks ``status.done``), not a PV-polling interval. The underlying status updates happen on the CA monitor thread; ``poll`` only controls how soon the plan notices. A future-proofing alternative would be ``bps.wait_for([asyncio_future])`` (would tie the plan to the RunEngine's asyncio loop; see strategy doc Tier 4). """ has_busy = _has_component(det.cam, "acquire_busy") has_hdf_queue = _has_component(det, "hdf1") and _has_component( det.hdf1, "queue_free" ) acquire_off = det.cam.acquire.get(use_monitor=False) == 0 nothing_captured = ( not _has_component(det, "hdf1") or not _has_component(det.hdf1, "num_captured") or det.hdf1.num_captured.get(use_monitor=False) == 0 ) if acquire_off and nothing_captured: logger.info( "wait_for_acquire_drained(%s): nothing to drain" " (acquire=0, num_captured=0); short-circuiting", det.name, ) # finalize_wrapper consumes this as a generator, so we must # yield at least one message before returning. yield from bps.null() return # Build sub-statuses for whichever signals the device exposes. sub_statuses = [] if has_busy: sub_statuses.append( SubscriptionStatus( det.cam.acquire_busy, lambda *, value, **_: int(value) == 0, run=True, ) ) if has_hdf_queue: # HDF-drain predicate preserves the original race-window # protection: queue empty AND all slots free (i.e. no frame # is currently being written). The two corroboration reads # use use_monitor=False so the predicate sees consistent # post-update values rather than a stale cache. hdf = det.hdf1 sub_statuses.append( SubscriptionStatus( hdf.num_queued_arrays, lambda *, value, **_: ( int(value) == 0 and int(hdf.queue_free.get(use_monitor=False)) == int(hdf.queue_size.get(use_monitor=False)) ), run=True, ) ) t0 = time.time() logger.info( "wait_for_acquire_drained(%s): has_busy=%s has_hdf_queue=%s" " timeout=%gs", det.name, has_busy, has_hdf_queue, timeout, ) if not sub_statuses: # No drained signals to wait on — nothing more to do. logger.info( "wait_for_acquire_drained(%s): no drainable signals on" " this device; returning immediately", det.name, ) yield from bps.null() return # Combine via AndStatus (no-op if there's only one sub-status). status = sub_statuses[0] for s in sub_statuses[1:]: status = AndStatus(status, s) deadline = t0 + timeout while time.time() < deadline: if status.done: logger.info( "wait_for_acquire_drained(%s): drained after %.3fs", det.name, time.time() - t0, ) return yield from bps.sleep(poll) logger.warning( "wait_for_acquire_drained(%s): TIMEOUT after %gs" " (status.done=%s)", det.name, timeout, status.done, ) def monitor_loop( flymotor, det, p_end, *, exit_when, watchdog=None, tick=_CONSUMER_TICK_DEFAULT ): """Plan stub: emit one primary-stream event per HDF frame written. Producer/consumer design (Phase 3.C refactor): - **Producer:** a CA monitor callback on ``det.hdf1.num_captured`` pushes ``(timestamp, new_value)`` onto a small bounded ``queue.Queue`` whenever the IOC publishes a monitor update. The producer runs on the pyepics dispatch thread. - **Consumer:** this plan-stub loop wakes up every ``tick`` seconds, drains the queue, and emits one ``primary``-stream event per newly-captured frame. The consumer runs on the plan/RunEngine thread; ``yield from bps.sleep(tick)`` keeps the RunEngine in control of pause/abort. Per Phase 0.2 / Phase 0e: the primary stream is a progress indicator. Pairing of detector frames to flymotor positions happens downstream via the IOC-timestamped monitor streams set up by ``@bpp.monitor_during_decorator``; the per-frame ``bps.read(det)`` + ``bps.read(flymotor)`` here is a snapshot, not the system of record, and uses the cached monitor values (no extra CA traffic). When the motor's readback crosses ``p_end``, the cam is told to stop (``bps.mv(det.cam.acquire, 0)``). This check happens on each consumer tick (per Phase 3 decision 3.6 — overshoot is dominated by the motor record's ~10 Hz update rate, not by the tick). Exit: ``exit_when.done``. Per Phase 3 decisions 3.8 / 3.B, the caller constructs ``exit_when`` as ``AndStatus(cam_stopped_status, drain_status)`` so the loop exits only when the cam has been stopped AND every in-flight frame has been flushed by the HDF plugin. Watchdog: ``watchdog`` is an ``ophyd.status.SubscriptionStatus`` constructed with ``timeout=no_frames_timeout`` watching ``num_captured > 0``. ophyd's StatusBase timeout machinery completes the status as failed if no frame arrives in time; the consumer checks ``watchdog.done and not watchdog.success`` per tick and raises ``RuntimeError`` annotated with the HDF plugin's ``WriteStatus`` and ``WriteMessage``. Per Phase 3 decision 3.11, the raise lets the RunEngine send STOP to all in-motion movables (including ``flymotor``), which is exactly what we want when something is wrong with the IOC/cam/HDF chain. Parameters ---------- flymotor : EpicsMotor The fly-scan motor. Position read with ``use_monitor=False`` for the ``p_end`` crossing check, to bypass cached value. det : ADBase The area detector (with ``cam`` and ``hdf1`` plugin sub-devices). p_end : float Stop the cam acquisition when ``flymotor.user_readback`` meets or exceeds this value. exit_when : ophyd.status.StatusBase Loop exits when this status's ``.done`` is True. See "Exit" above. watchdog : ophyd.status.StatusBase, optional If supplied, the loop checks ``.done and not .success`` per tick and raises ``RuntimeError`` if the watchdog timed out. None disables the watchdog. tick : float Consumer wake-up tick in seconds. See module-level ``_CONSUMER_TICK_DEFAULT``. """ # --- Producer setup: CA monitor callback into a bounded queue --- frame_queue = queue.Queue(maxsize=_FRAME_QUEUE_SIZE) overflow_warned = [False] # list-of-one so callback can mutate def _on_num_captured(*, value, timestamp, **kwargs): """CA monitor callback (runs on pyepics dispatch thread). Pushes ``(timestamp, value)`` onto ``frame_queue``. Drops the entry and logs a single WARNING if the queue is full (per Phase 3 decision 3.3b — never block the CA thread). """ try: frame_queue.put_nowait((timestamp, int(value))) except queue.Full: if not overflow_warned[0]: overflow_warned[0] = True logger.warning( "monitor_loop: frame queue overflow (size=%d);" " consumer falling behind producer." " New events will be dropped silently until the" " queue drains. Adjust _FRAME_QUEUE_SIZE in" " flyscan_3idc.py if this recurs.", _FRAME_QUEUE_SIZE, ) # --- Inner helpers (per Phase 3 decision 3.1: named for clarity) def _emit_pending_frames(last_captured): """Drain the queue and emit one primary event per new frame. Returns the updated ``last_captured`` count. """ # Snapshot the highest value seen in the queue this tick. # The IOC publishes monotonically-increasing values; we # only care about the delta count. highest = last_captured n_drained = 0 while True: try: _ts, value = frame_queue.get_nowait() except queue.Empty: break n_drained += 1 if value > highest: highest = value if highest > last_captured: n_new = highest - last_captured logger.debug( "monitor_loop: %d new frame(s) (highest=%d," " drained %d queue entries) motor=%g", n_new, highest, n_drained, flymotor.user_readback.get(use_monitor=False), ) for _ in range(n_new): yield from bps.create(name="primary") yield from bps.read(det) yield from bps.read(flymotor) yield from bps.save() return highest def _check_p_end_crossing(acquire_stopped, last_captured): """Stop the cam if the motor has crossed p_end.""" if acquire_stopped: return acquire_stopped # Bypass cache for the position read — pairing timing matters # here (Phase 0.1 decision). if flymotor.user_readback.get(use_monitor=False) >= p_end: logger.info( "monitor_loop: motor crossed p_end (%g); stopping acquire" " at num_captured=%d", p_end, last_captured, ) yield from bps.mv(det.cam.acquire, 0) return True return acquire_stopped def _check_watchdog(): """If the watchdog timed out, harvest context and raise. Returns nothing; raises RuntimeError on watchdog trip. Watchdog timeout is signalled by ophyd's StatusBase as ``done=True, success=False`` after the timeout elapses without the predicate becoming true (per Phase 3 decision 3.10). """ if watchdog is None: return if not (watchdog.done and not watchdog.success): return # Watchdog has tripped. Harvest diagnostics from the HDF # plugin to annotate the exception (per Phase 3 decision # 3.11 — keep raise behavior; RE will STOP movables). write_status = _safe_get(det.hdf1, "write_status", as_string=True) write_message = _safe_get(det.hdf1, "write_message", as_string=True) full_file_name = _safe_get(det.hdf1, "full_file_name", as_string=True) msg = ( f"monitor_loop: HDF watchdog tripped — no frames captured" f" within the timeout on {det.name}." f" hdf1.write_status={write_status!r}" f" hdf1.write_message={write_message!r}" f" hdf1.full_file_name={full_file_name!r}." f" Stop acquire and abort." ) logger.error(msg) raise RuntimeError(msg) # --- Main loop --- t0 = time.time() last_captured = int(det.hdf1.num_captured.get(use_monitor=False)) logger.info( "monitor_loop: starting at num_captured=%d, motor=%g, p_end=%g" " (tick=%gs, watchdog=%s)", last_captured, flymotor.user_readback.get(use_monitor=False), p_end, tick, "enabled" if watchdog is not None else "disabled", ) acquire_stopped = False cid = det.hdf1.num_captured.subscribe(_on_num_captured) try: while True: # Check watchdog first — if it tripped, no point in # emitting events or checking other conditions. _check_watchdog() # Drain producer queue, emit primary events. last_captured = yield from _emit_pending_frames(last_captured) # Stop cam when motor crosses p_end. acquire_stopped = yield from _check_p_end_crossing( acquire_stopped, last_captured ) # Exit when the caller's status fires. if exit_when.done: logger.info( "monitor_loop: exit after %.3fs" " (final num_captured=%d, motor=%g," " acquire_stopped=%s)", time.time() - t0, last_captured, flymotor.user_readback.get(use_monitor=False), acquire_stopped, ) break yield from bps.sleep(tick) finally: # Always unsubscribe — even on RuntimeError from the # watchdog, on plan abort, or on any other exception. try: det.hdf1.num_captured.unsubscribe(cid) except Exception as exc: logger.warning("monitor_loop: unsubscribe failed: %r", exc) @bluesky_plan
[docs] def flyscan( det_name: str = "adsimdet", flymotor_name: str = "m1", p_start: float = 0, p_end: float = 5, exposures_per_egu: float = 2.0, t_period: float = 0.1, t_acquire: float = None, # defaults to t_period when None taxi_allowance: float = 0.5, # motor EGU compression: str = "zlib", ad_file_name: str = "flyscan", ad_file_path: str = "/tmp/flyscan", # TODO: trigger mode? # internal parameters # Wake-up tick for monitor_loop's consumer (Phase 3). CA # monitor callbacks update status flags asynchronously; the # plan wakes up every _consumer_tick seconds to check them. # Default defined as a module-level constant so all related # timing knobs live in one place; can be overridden per-run. _consumer_tick: float = _CONSUMER_TICK_DEFAULT, # Override the HDF plugin's blocking_callbacks setting. # Default (False) leaves the safe behaviour in place: HDF runs # with blocking_callbacks="Yes" so the cam back-throttles to # HDF's write rate and no frames are dropped. Setting True # forces blocking_callbacks="No" on the HDF plugin, restoring # the historical (data-losing) behaviour. The only legitimate # use is to *demonstrate* the FlyscanDataLossWarning code path # on hardware where blocking mode prevents drops — set True, # crank up exposures_per_egu past what the HDF can sustain, # and watch the post-scan warning fire. Not for production # data collection. _force_hdf_nonblocking: bool = False, # user-supplied metadata is always last md: dict = None, ): """Fly scan: move motor through range continuously acquiring detector frames. The motor traverses ``p_initial → p_final``, maintaining constant velocity between ``p_start → p_end`` to deliver ``num_frames`` frames within ``[p_start, p_end]``. ``p_initial`` and ``p_final`` are computed from ``p_start``, ``p_end``, the motor's ``.ACCL``, and ``taxi_allowance``; ``num_frames`` is computed from ``(p_end - p_start) * exposures_per_egu``. Detector frames are acquired continuously during the traverse; downstream processing trims the data to ``[p_start, p_end]`` by motor position. An HDF5 file containing every captured frame is written next to the run (the path is in the run metadata under ``ad_file_path`` / ``ad_file_name``). Position geometry ----------------- User-supplied: ``p_start`` and ``p_end`` (in-scan range). Derived: ``p_initial`` (parked, pre-scan) and ``p_final`` (coast, post-scan):: p_initial < p_start < p_end < p_final | | | | | |--scan----| | |--taxi-in---| |---coast--| - ``p_start``: the position at which the first useful frame should be captured. Downstream processing trims frames captured before this point. - ``p_end``: the position at which the last useful frame should be captured. The plan stops the cam when the motor passes this point. - ``p_initial`` (derived): where the motor is parked before the scan, far enough below ``p_start`` that the motor reaches its scan velocity *before* it enters the acquisition region. Computed as ``p_start - d_taxi - taxi_allowance`` where ``d_taxi = 0.5 * scan_velocity * motor.ACCL``. - ``p_final`` (derived): where the motor coasts to after the scan ends — far enough above ``p_end`` that the cam can finish processing its last frames before the motor stops. Computed symmetric to ``p_initial``. ``taxi_allowance`` (default ``0.5``, in motor EGU) is added to both ends as a slack margin on top of the acceleration-based distance. Increase it if the cam's first/last frame is observed to fall outside ``[p_start, p_end]``; decrease it if the scan takes too long to taxi. Position units are whatever the motor reports (``user_readback``); typically engineering units (mm, degrees, etc.) — the motor's ``.EGU`` field is recorded in run metadata. Frame timing ------------ - ``exposures_per_egu``: target frame density. Combined with the scan range, gives ``num_frames = round(1 + (p_end - p_start) * exposures_per_egu)`` (fence-post counting: one frame at each endpoint plus ``exposures_per_egu`` frames per unit between). - ``t_period``: seconds between successive frame exposures. - ``t_acquire``: per-frame exposure time, in seconds. Defaults to ``t_period`` (continuous exposure). Must satisfy ``0 < t_acquire <= t_period``. The scan velocity is computed as ``(p_end - p_start) / (num_frames * t_period)``. Pre-scan validation rejects velocities outside the motor's ``.VBAS`` / ``.VMAX`` limits with a clear ``ValueError``. Detector & file --------------- - ``det_name``: ophyd device registry key for the area detector (default ``"adsimdet"``). Must be an AreaDetector with an HDF5 plugin attached. - ``flymotor_name``: ophyd device registry key for the motor (default ``"m1"``). - ``compression``: HDF5 chunk compression name (default ``"zlib"``). Validated against the HDF plugin's ``compression.enum_strs`` at scan start; raises ``ValueError`` with the allowed list if the value isn't supported by the IOC's HDF plugin build. - ``ad_file_name``: stem for the saved HDF5 file (default ``"flyscan"``); the IOC appends an auto-incrementing number and the ``.h5`` extension. - ``ad_file_path``: directory on the IOC's filesystem where the HDF5 file is written (default ``"/tmp/flyscan"``). **Must exist on the IOC's filesystem.** If the IOC runs in a container, this is the container's view of the path, not the host's. The plan checks this before staging and raises ``RuntimeError`` with a clear message if the path doesn't exist. What gets recorded ------------------ Each call to ``RE(flyscan(...))`` produces one bluesky run containing: - A ``primary`` event stream with one event per HDF frame accepted by the writer. Each event records the cam and HDF array counters and the motor's reported position at the moment the consumer drained that frame from its queue. Treat this as a progress indicator and at-the-bench snapshot; use the monitor streams below for high-precision pairing. - Three monitor streams (``adsimdet_cam_array_counter_monitor``, ``adsimdet_hdf1_array_counter_monitor``, ``m1_monitor``) carrying IOC-timestamped values for downstream synchronization of frame counters with motor position. - A ``baseline`` stream (whatever ``apsbits`` configures). - Metadata under ``start``: user-supplied scan parameters (``p_start``, ``p_end``, ``exposures_per_egu``, ``t_period``, ``t_acquire``, ``taxi_allowance``, ``compression``), derived geometry (``p_initial``, ``p_final``, ``num_frames``, ``scan_velocity``, ``d_taxi``, ``motor_accl``, ``motor_egu``), motor velocity limits, file destination, watchdog timeout, ``consumer_tick``, plus anything you pass in ``md``. - An HDF5 file with the actual image data at ``ad_file_path/ad_file_name_NNNNNN.h5``. Common usage ------------ From a bits2606 IPython session:: from bits2606.startup import * # provides RE, oregistry from flyscan_3idc import flyscan # 50 frames over a 5-EGU range at 20 Hz: uid, = RE(flyscan(p_start=0, p_end=5, exposures_per_egu=10, t_period=0.05)) Override more defaults for a specific run:: uid, = RE(flyscan( flymotor_name="m1", p_start=0, p_end=10, exposures_per_egu=10, t_period=0.05, t_acquire=0.01, taxi_allowance=1.0, compression="lz4", ad_file_path="/tmp/myexperiment/", ad_file_name="sample42", md={"sample": "Ag behenate", "operator": "your-name"}, )) Common pitfalls --------------- - **"file_path does not exist" RuntimeError at scan start.** The directory in ``ad_file_path`` doesn't exist on the IOC's filesystem. If the IOC is containerized, create the directory inside the container or use a path that's visible there. - **"scan_velocity exceeds motor max velocity" ValueError.** The requested combination of position range and frame rate would require the motor to move faster than its ``.VMAX``. Either reduce ``exposures_per_egu``, increase ``t_period``, or shorten ``p_end - p_start``. - **"compression=... not in HDF plugin's allowed set" ValueError.** The IOC's HDF plugin doesn't support the requested compression algorithm. Inspect ``det.hdf1.compression.enum_strs`` to see what *is* supported by this IOC build. - **Watchdog: "no frames captured" RuntimeError mid-scan.** The cam isn't delivering frames to the HDF plugin. Likely the HDF plugin's ``EnableCallbacks`` is ``Disable``, the cam's ``ArrayCallbacks`` is ``Disable``, or the HDF plugin's ``NDArrayPort`` doesn't point at the cam. The RunEngine will have stopped the motor; investigate the IOC and try again. - **The scan completes but the data dictionary's ``num_captured`` is 0.** The IOC resets ``NumCaptured_RBV`` to 0 after the HDF5 file is closed. Look at ``full_file_name`` (in ``_cleanup``'s log line) and the actual file on disk to confirm what was saved. - **"HDF plugin dropped N frame(s) during this run" FlyscanDataLossWarning at scan end.** The HDF plugin couldn't keep up with the cam at the requested rate, and ``N`` frames the cam produced are missing from the on-disk HDF5 file. The warning is emitted both to the log (WARNING level) and via Python's ``warnings`` machinery (subclass of ``UserWarning``). The plan uses ``blocking_callbacks="Yes"`` on the HDF plugin to throttle the cam to HDF's write rate, so this should be rare — when it does occur, it usually means the cam emitted a burst before back-pressure propagated, or the HDF queue size is too small. Treat ``N > 0`` as a data-integrity concern: increase ``t_period`` or reduce ``exposures_per_egu``. Promote the warning to an exception with ``warnings.filterwarnings("error", category=flyscan_3idc.FlyscanDataLossWarning)`` to fail-fast in strict environments. Parameters ---------- det_name : str, default ``"adsimdet"`` ophyd registry name of the area detector to fly. flymotor_name : str, default ``"m1"`` ophyd registry name of the motor to fly. p_start : float, default ``0`` First in-scan position (motor units). p_end : float, default ``5`` Last in-scan position (motor units). exposures_per_egu : float, default ``2.0`` Frame density: frames per motor engineering unit. Total frame count is ``round(1 + (p_end - p_start) * exposures_per_egu)``. Must be positive. t_period : float, default ``0.1`` Time between successive frame exposures (seconds). t_acquire : float or None, default ``None`` Per-frame exposure time (seconds). ``None`` (default) means "use ``t_period``" (continuous exposure). Must satisfy ``0 < t_acquire <= t_period``. taxi_allowance : float, default ``0.5`` Extra distance (in motor EGU) added past the acceleration-based taxi region at each end of the scan. Increase if the first/last useful frame falls outside ``[p_start, p_end]``; must be non-negative. compression : str, default ``"zlib"`` HDF5 chunk compression name. Must match one of ``det.hdf1.compression.enum_strs`` if the IOC is reachable. ad_file_name : str, default ``"flyscan"`` HDF5 filename stem (IOC appends a number and ``.h5``). ad_file_path : str, default ``"/tmp/flyscan"`` Directory on the IOC's filesystem to write the HDF5 file. _consumer_tick : float, default ``_CONSUMER_TICK_DEFAULT`` (20 ms) Internal: wake-up tick for the per-frame event consumer. Increase if your run-engine subscriptions can't keep up; decrease only for very high frame rates. Rarely needs to be changed. md : dict, optional Additional metadata to record under the run's ``start`` document. Merged on top of the plan's computed metadata. Returns ------- None (yields bluesky messages — pass to ``RE()`` to execute). Raises ------ KeyError ``det_name`` or ``flymotor_name`` does not resolve to the expected ophyd device type in the registry. ValueError Position ordering is wrong (``p_end <= p_start``), ``exposures_per_egu`` is non-positive, ``taxi_allowance`` is negative, ``t_acquire > t_period``, computed ``num_frames`` is too small, computed ``scan_velocity`` is outside the motor's limits, or ``compression`` is not in the HDF plugin's enumeration. RuntimeError IOC preflight failed (an expected PV did not connect), or the HDF plugin's file path does not exist on the IOC's filesystem, or the no-frames watchdog tripped during the scan. See Also -------- configure_adsimdet : Standalone diagnostic that exercises the same AD acquisition protocol without a plan or RunEngine. Useful for triaging an IOC that's misbehaving. compute_flyscan_geometry : Pure-function helper that derives ``p_initial``, ``p_final``, and ``num_frames`` from the user-supplied kwargs; unit- testable without an IOC. """ ## Preparation # t_acquire defaults to t_period (continuous exposure). if t_acquire is None: t_acquire = t_period logger.info( "flyscan: entered. det_name=%r flymotor_name=%r" " p_start=%g p_end=%g exposures_per_egu=%g" " t_acquire=%g t_period=%g taxi_allowance=%g compression=%r", det_name, flymotor_name, p_start, p_end, exposures_per_egu, t_acquire, t_period, taxi_allowance, compression, ) det = oregistry.find(det_name, allow_none=True) flymotor = oregistry.find(flymotor_name, allow_none=True) logger.info( "flyscan: lookup -> det=%r flymotor=%r", getattr(det, "name", det), getattr(flymotor, "name", flymotor), ) # Fail fast if the IOCs are down, before we incur the much longer # staging-time cost of discovering it via per-PV 5-second timeouts. # validate_flyscan_inputs (below) does device-type validation that # makes no CA calls, so preflight runs only after we know det and # flymotor are the right kinds of objects. Reuse that validation # first by doing isinstance() guards here that mirror the validator; # cleaner would be to split validate_flyscan_inputs in two, but for # now we accept the small redundancy. if isinstance(det, ADBase) and isinstance(flymotor, EpicsMotor): preflight_connectivity(det, det_name, flymotor, flymotor_name) # Derive geometry first (motor must be the right type for the # .ACCL/.EGU caget calls in compute_flyscan_geometry to be # meaningful), then validate. if not isinstance(flymotor, EpicsMotor): # Mirror validate_flyscan_inputs' error so the user sees the # same message either way. raise KeyError(f"Motor {flymotor_name!r} not found in registry.") geometry = compute_flyscan_geometry( flymotor, p_start, p_end, exposures_per_egu, t_period, taxi_allowance, ) v_max, v_base = validate_flyscan_inputs( det, det_name, flymotor, flymotor_name, geometry, t_acquire, t_period, compression, ) # Rebind derived values to plain locals so the rest of the plan # (taxi/scan kickoff, watchdog timing, _cleanup) keeps reading # them by their familiar names. p_initial = geometry.p_initial p_final = geometry.p_final num_frames = geometry.num_frames scan_active_duration = geometry.scan_duration scan_velocity = geometry.scan_velocity # IOC HDF plugin pre-allocation overflows somewhere between 1e6 # and 1e9 for Float64 1024x1024 frames (verified empirically; # likely a C int byte-count overflow in NDFileHDF5). num_capture # = num_frames * 1.5 + 20 is comfortable for any sensible scan # size while absorbing taxi-in/-out leading frames, post-stop # tail frames, and timing jitter. hdf_num_capture = int(num_frames * 1.5) + 20 # Worst-case flush timeout assumes the HDF plugin fills to # ``hdf_num_capture``. Actual flush time at run-end uses the real # ``num_captured`` (recomputed in _cleanup). This metadata lets a # downstream reader see the planning assumption. hdf_flush_timeout_max = _hdf_flush_timeout(det, hdf_num_capture) _md = build_flyscan_md( plan_name=inspect.currentframe().f_code.co_name, det_name=det_name, flymotor_name=flymotor_name, p_start=p_start, p_end=p_end, exposures_per_egu=exposures_per_egu, t_acquire=t_acquire, t_period=t_period, taxi_allowance=taxi_allowance, compression=compression, geometry=geometry, v_max=v_max, v_base=v_base, ad_file_name=ad_file_name, ad_file_path=ad_file_path, hdf_num_capture=hdf_num_capture, hdf_flush_timeout_max=hdf_flush_timeout_max, consumer_tick=_consumer_tick, ) _md.update(md or {}) original_cache = CacheParameters() # Closure flag used by _cleanup to decide whether to flush the # HDF5 file. Stays False if the plan dies before acquisition # actually starts (e.g. a FailedStatus during the override loop), # so _cleanup doesn't waste time waiting for a non-existent flush # or, worse, flush a stale num_captured value left over from a # prior run. List-of-one so kickoff_and_monitor can flip it # without a nonlocal declaration chain. capture_started = [False] # Snapshot the HDF plugin's cumulative dropped-arrays counter # (the IOC does not reset it across runs). ``_cleanup`` reads # it again and reports the per-run delta. Initialize with the # current value if readable, else None to signal "no baseline" # (then the delta isn't computed). Defensive against IOCs that # don't expose ``dropped_arrays``. dropped_arrays_baseline = ( _safe_get( det.hdf1, "dropped_arrays", use_monitor=False, ) if _has_component(det, "hdf1") else None ) if dropped_arrays_baseline is not None: logger.info( "flyscan: hdf1.dropped_arrays baseline = %d", int(dropped_arrays_baseline), ) # Snapshot stage_sigs on every component we may mutate below. plugins = [ getattr(det, nm) for nm in det.component_names if hasattr(getattr(det, nm), "blocking_callbacks") ] logger.info( "flyscan: snapshotting stage_sigs on det, cam, hdf1, and %d plugin(s):" " %s", len(plugins), [p.name for p in plugins], ) saved_stage_sigs = snapshot_stage_sigs(det, det.cam, det.hdf1, *plugins) # Snapshot the ``kind`` of the two array_counter signals we want # in the primary-stream events for this run. Setting Kind.hinted # makes bps.read(det) include them and live displays plot them. # We restore in _cleanup so other plans against this detector see # whatever kind it was configured with by default (typically # Kind.config or Kind.omitted for these counters). See Phase 3 # decision 3.5a/3.5b in the strategy doc. saved_kinds = snapshot_kinds( det.cam.array_counter, det.hdf1.array_counter, ) logger.info( "flyscan: snapshotted kinds for %d signal(s): %s", len(saved_kinds), [s.name for s, _ in saved_kinds], ) for sig, _ in saved_kinds: sig.kind = Kind.hinted ### Send motor to initial position (start moving; we wait below) # The kickoff is grouped so the RunEngine tracks the MoveStatus # and we can wait on it explicitly after the AD setup work in # _main has completed (see bps.wait(group="taxi") below). This # preserves the deliberate concurrency: motor moves while AD # parameters are configured, then we wait, then we change # velocity. Group name "taxi" matches the idiom used by other # APS fly scans. logger.info( "flyscan: taxi -> sending %s to p_initial=%g (non-blocking)", flymotor.name, p_initial, ) yield from bps.abs_set(flymotor, p_initial, group="taxi") def _main(): """Preparation (no data collection) and Kickoff (data collection).""" # AD runtime parameters (overridden, restored on exit) # Set these before device staging logger.info("flyscan._main: overriding AD runtime parameters") # must set before it is used when setting hdf1.file_path yield from original_cache.override(det.hdf1.create_directory, -5) # then, a list of them for obj, value in [ (det.hdf1.array_counter, 0), # optional (det.hdf1.auto_increment, "Yes"), (det.hdf1.auto_save, "Yes"), (det.hdf1.compression, compression), (det.hdf1.file_name, ad_file_name), (det.hdf1.file_number, 1), (det.hdf1.file_path, ad_file_path), (det.hdf1.file_template, "%s%s_%6.6d.h5"), ]: yield from original_cache.override(obj, value) # Verify the IOC can see the path. Must happen before staging. # kicks the HDF plugin into capture mode. check_hdf_file_path(det) # Set parameters for staging # stage_decorator: sets before run, then restores after the run. det.stage_sigs["cam.image_mode"] = "Continuous" det.cam.stage_sigs["acquire_time"] = t_acquire det.cam.stage_sigs["acquire_period"] = t_period det.hdf1.stage_sigs["num_capture"] = hdf_num_capture # AD callback-chain throttling (revised 2026-06-08 after a # flyscan reported only 59 of an expected 101 frames written # to HDF, with cam.array_counter showing the cam itself ran # at the requested 10 Hz — i.e. the cam produced 117 frames # but HDF wrote only 59, and hdf1.dropped_arrays climbed by # ~58 to confirm). Root cause: under # ``blocking_callbacks="No"`` the cam doesn't wait for HDF to # consume the previous frame before producing the next; HDF's # input queue overflows when the file-write throughput is # below the cam's rate, and frames are silently dropped. # # Trade-off chosen here: prefer fidelity (no drops) over rate. # Set ``blocking_callbacks="Yes"`` on the HDF plugin so the # cam auto-throttles to the HDF write rate. Result: every # cam frame the IOC produces is captured; the achieved # frame rate may be below the user's requested ``1/t_period`` # if the IOC can't keep up. Look at the post-scan WARNING # log line that compares cam.array_counter to hdf1.array_counter # — if they diverge, increase ``t_period`` or reduce # ``exposures_per_egu``. # # Non-HDF plugins (image, pva, stats1, roi1) keep # ``blocking_callbacks="No"`` because they're typically # display/analysis sinks where dropped frames are tolerable # and we don't want them gating the cam. if hasattr(det.cam, "wait_for_plugins"): det.cam.stage_sigs["wait_for_plugins"] = "Yes" hdf_blocking_setting = "No" if _force_hdf_nonblocking else "Yes" if _force_hdf_nonblocking: logger.warning( "flyscan._main: _force_hdf_nonblocking=True — staging" " HDF with blocking_callbacks='No'. This DELIBERATELY" " disables the cam-to-HDF back-pressure and will drop" " frames if HDF can't keep up with the cam. This" " escape hatch is for testing the data-loss warning" " path; do NOT use for production data collection.", ) for plugin in plugins: if plugin is det.hdf1: plugin.stage_sigs["blocking_callbacks"] = hdf_blocking_setting else: plugin.stage_sigs["blocking_callbacks"] = "No" det.hdf1.stage_sigs.move_to_end("capture") # always last logger.debug( "flyscan._main: stage_sigs configured." " det=%s cam=%s hdf1=%s", dict(det.stage_sigs), dict(det.cam.stage_sigs), dict(det.hdf1.stage_sigs), ) # Wait for the taxi-in move to finish, *then* change velocity. # bps.wait(group="taxi") consumes the MoveStatus that # bps.abs_set(..., group="taxi") registered with the # RunEngine; this replaces the old hand-rolled # wait_for_motor_done polling loop with the RunEngine's own # status-tracking machinery. logger.info( "flyscan._main: waiting for %s to reach p_initial", flymotor.name, ) yield from bps.wait(group="taxi") logger.info( "flyscan._main: setting %s velocity to scan_velocity=%g", flymotor.name, scan_velocity, ) yield from original_cache.override(flymotor.velocity, scan_velocity) logger.info("flyscan._main: entering kickoff_and_monitor") yield from kickoff_and_monitor() logger.info("flyscan._main: kickoff_and_monitor returned") ## Kickoff & Monitor @bpp.stage_decorator([det]) # Don't stage the flymotor! # Three bespoke monitor streams (one per signal): # - det.hdf1.array_counter: HDF writer's frame count (with # EPICS timestamp, used downstream to sync with flymotor) # - det.cam.array_counter: camera's frame count; cheap to # collect, lets users compare cam & hdf # - flymotor.user_readback: motor position at CA monitor rate @bpp.monitor_during_decorator( [ det.hdf1.array_counter, det.cam.array_counter, flymotor.user_readback, ] ) @bpp.run_decorator(md=_md) def kickoff_and_monitor(): # Kickoff ordering (revised 2026-06-08 after empirical # observation that the cam delivered its first frame several # seconds after Acquire=1, by which time the motor was already # past p_start, costing the user a chunk of their requested # frame budget): # # 1. Start the cam acquiring (Acquire=1). # 2. Wait for the HDF plugin to receive its first frame # (num_captured >= 1), bounded by a generous timeout. # This is the "cam is genuinely producing frames" gate. # 3. Only then launch the motor toward p_final. # # Cost: a few pre-roll frames captured while the motor is # still parked at p_initial. These are written to the HDF5 # file (no harm — downstream trims by motor position anyway) # and counted into the IOC's num_capture allocation (already # generously sized at 1.5*num_frames+20). # # group="scan" registers the MoveStatus with the RunEngine so # the bps.wait(group="scan") below absorbs any post-scan # motor settling after monitor_loop returns. See Phase 3 # decisions 2.4/3.8 in the strategy doc. # Diagnostic: log what the IOC actually has for cam timings # right before we start. Helps catch staging defects (e.g. # acquire_period got overridden, or acquire_time > t_period # silently capped by the IOC) without an empirical "why is # my frame rate wrong" investigation. actual_acquire_time = _safe_get(det.cam, "acquire_time", use_monitor=False) actual_acquire_period = _safe_get(det.cam, "acquire_period", use_monitor=False) actual_image_mode = _safe_get( det.cam, "image_mode", use_monitor=False, as_string=True ) actual_num_capture = _safe_get(det.hdf1, "num_capture", use_monitor=False) logger.info( "kickoff_and_monitor: IOC state pre-acquire:" " cam.acquire_time=%r cam.acquire_period=%r" " cam.image_mode=%r hdf1.num_capture=%r" " (requested: t_acquire=%g t_period=%g num_capture=%d)", actual_acquire_time, actual_acquire_period, actual_image_mode, actual_num_capture, t_acquire, t_period, hdf_num_capture, ) logger.info("kickoff_and_monitor: starting %s acquisition", det.name) # Use bps.mv (not bps.abs_set) so we don't proceed until # Acquire_RBV has caught up to 1 ("Acquiring"). Otherwise # cam_stopped_status (built below) could fire immediately # at run=True evaluation if the RBV hadn't yet updated past # its pre-scan value of 0. In continuous image_mode (set by # stage_sigs above), Acquire_RBV reaches 1 and stays there # until monitor_loop stops the cam. yield from bps.mv(det.cam.acquire, 1) # From this point on, _cleanup should treat the HDF plugin as # active (drain, flush, verify). If we never get here, # _cleanup skips that work entirely. capture_started[0] = True # Wait for the cam to actually start producing frames before # launching the motor. This gates on hdf1.num_captured (the # downstream-of-everything signal: cam produced a frame AND # the HDF plugin accepted it). Timeout generously: if the # cam can't produce a single frame within 5 t_periods (floor # 5 s), something is wrong with the IOC chain and we'd rather # raise than launch the motor into a dead scan. first_frame_timeout = max(5.0 * t_period, 5.0) logger.info( "kickoff_and_monitor: waiting for first frame (timeout=%gs)", first_frame_timeout, ) first_frame_status = SubscriptionStatus( det.hdf1.num_captured, lambda *, value, **_: int(value) >= 1, run=True, # OK to fire at subscribe time if the IOC's # num_captured cache happens to already be >0 # (e.g. from a prior run that left it set); # the alternative — missing a fast first # frame between subscribe and the first plan # tick — is worse. timeout=first_frame_timeout, ) try: # Yield ticks until the status fires or its timeout # expires. Using the same _consumer_tick that # monitor_loop uses below so plan-wakeup cadence is # consistent. t0_first = time.time() while not first_frame_status.done: yield from bps.sleep(_consumer_tick) if not first_frame_status.success: msg = ( "kickoff_and_monitor: cam did not deliver a first" f" frame within {first_frame_timeout:g}s after" f" Acquire=1 on {det.name}." f" hdf1.num_captured={int(det.hdf1.num_captured.get(use_monitor=False))}" # noqa: E501 f" hdf1.write_status={_safe_get(det.hdf1, 'write_status', as_string=True)!r}" # noqa: E501 f" hdf1.write_message={_safe_get(det.hdf1, 'write_message', as_string=True)!r}" # noqa: E501 ) logger.error(msg) raise RuntimeError(msg) first_frame_latency = time.time() - t0_first logger.info( "kickoff_and_monitor: first frame received after %.3fs" " (num_captured=%d)", first_frame_latency, int(det.hdf1.num_captured.get(use_monitor=False)), ) except Exception: # Best-effort cleanup of the watchdog status; SubscriptionStatus # auto-unsubscribes on .done, so this only matters if we # raise before it fires. raise # Cam is live. Now launch the motor toward p_final. logger.info( "kickoff_and_monitor: launching %s -> p_final=%g (non-blocking)", flymotor.name, p_final, ) yield from bps.abs_set(flymotor, p_final, group="scan") # Watchdog grace period for the *rest* of the scan: the # expected scan duration plus a couple of periods of slack, # with a floor of 5 s for tiny scans. At this point we # already know the cam delivered at least one frame, so the # watchdog below is now guarding "cam went silent mid-scan" # rather than "cam never started". no_frames_timeout = max(scan_active_duration + 2 * t_period, 5.0) # Status-based exit condition for monitor_loop (Phase 3 # decisions 3.8 + 3.9, revised by 3.B.4 after empirical # gates caught two false-early-exit defects, and revised # again 2026-06-08 after a flyscan hang where the loop # never exited because cam.acquire (== Acquire_RBV) stayed # at 1 for >60s after we wrote Acquire=0 — the simulator # IOC apparently finishes its current burst before the # RBV drops. Replaced with cam.acquire_busy, which the # IOC drops to 0 promptly when wait_for_plugins=Yes # (which we set via stage_sigs); same signal # wait_for_acquire_drained uses successfully in cleanup. # Falls back to cam.acquire only on devices that don't # expose acquire_busy. # This is an AndStatus of two sub-statuses: # 1. cam_stopped_status: cam.acquire_busy == 0 (preferred) # or cam.acquire == 0 (fallback). The busy signal # goes 0 only after every enabled plugin (including # hdf1) has finished processing the last frame, so # this also implicitly covers cam-to-plugin drain. # monitor_loop tells the cam to stop (Acquire=0) when # the motor crosses p_end, then this status fires. # 2. drain_status: the HDF plugin queue is fully idle # (no frames waiting AND no frame currently being # written). Same predicate shape as # wait_for_acquire_drained's HDF sub-status. # Together: "the cam has stopped *and* every frame it # produced has been flushed by the HDF plugin" — the # actual condition for "scan is done." # ``run=False`` is critical here: the subscribe-time precheck # path of ``SubscriptionStatus(run=True)`` evaluates the # predicate on the cached value at subscribe time, which on # ``acquire_busy`` may transiently be 0 (the cam hasn't yet # started the burst from our recent ``bps.mv(cam.acquire, 1)``) # and would immediately satisfy ``value == 0`` — firing the # AndStatus at scan-start and exiting monitor_loop before any # frame arrives. With ``run=False`` the callback fires only on # real CA monitor edges *after* subscribe time, so the first # interesting edge is "busy went to 0" later in the run when # the cam actually stops. if _has_component(det.cam, "acquire_busy"): cam_stopped_sig = det.cam.acquire_busy cam_stopped_signal_name = "cam.acquire_busy" else: cam_stopped_sig = det.cam.acquire cam_stopped_signal_name = "cam.acquire" logger.info( "kickoff_and_monitor: using %s for cam-stopped status", cam_stopped_signal_name, ) cam_stopped_status = SubscriptionStatus( cam_stopped_sig, lambda *, value, **_: int(value) == 0, run=False, ) # ``drain_status`` keeps ``run=True``: pre-scan the queue is # trivially empty so this fires immediately at subscribe time, # which on its own would be a defect — but the AndStatus is # gated by ``cam_stopped_status`` (which uses ``run=False`` # above), so this just means "drain check is a no-op when # using acquire_busy with wait_for_plugins=Yes" (since # acquire_busy already implies HDF drain). On detectors # without wait_for_plugins, drain_status's *later* edge from # queue going > 0 then back to 0 carries the real signal — # AndStatus completes the first time both halves are # simultaneously done, and StatusBase is monotonic so this # half being already-done is fine. drain_status = SubscriptionStatus( det.hdf1.num_queued_arrays, lambda *, value, **_: ( int(value) == 0 and int(det.hdf1.queue_free.get(use_monitor=False)) == int(det.hdf1.queue_size.get(use_monitor=False)) ), run=True, ) hdf_drain_status = AndStatus(cam_stopped_status, drain_status) # No-frames watchdog (Phase 3 decision 3.10): a status that # times out if num_captured doesn't reach > 0 within # no_frames_timeout seconds. ophyd's StatusBase timeout # mechanism sets the status to done-with-exception # (StatusTimeoutError) on its own thread; the consumer in # monitor_loop checks `watchdog_status.done and not # watchdog_status.success` per tick to detect the trip and # raise RuntimeError (per 3.11 — RE then sends STOP to all # in-motion movables, including flymotor). watchdog_status = SubscriptionStatus( det.hdf1.num_captured, lambda *, value, **_: int(value) > 0, run=True, timeout=no_frames_timeout, ) yield from monitor_loop( flymotor, det, p_end, exit_when=hdf_drain_status, watchdog=watchdog_status, tick=_consumer_tick, ) # Absorb any motor settling past p_final after monitor_loop # has exited (the HDF queue may drain before the motor fully # stops at p_final). logger.info( "kickoff_and_monitor: waiting for %s to reach p_final", flymotor.name, ) yield from bps.wait(group="scan") def _cleanup(): # Best-effort cleanup; swallow secondary failures so the # original exception (if any) reaches the RunEngine. # # Ordering matters here: # 1. Stop motor (no more position changes) # 2. Stop cam.acquire (no more new frames) # 3. Stop hdf1.capture (close the capture window) # 4. Drain HDF queue (flush in-flight frames to plugin buffer) # 5. write_file=1 (force HDF5 file to disk; required because # auto_save=Yes does not flush when capture is stopped # early, as we do here — verified empirically) # 6. Verify full_file_name (the "tell" that the file landed) # 7. Restore overridden signals # 8. Restore stage_sigs snapshots logger.info("flyscan._cleanup: starting") try: if motor_is_moving(flymotor): logger.info("flyscan._cleanup: stopping moving %s", flymotor.name) yield from bps.stop(flymotor) except Exception as exc: logger.exception("flyscan._cleanup: stop(flymotor) failed: %r", exc) try: logger.info("flyscan._cleanup: stopping %s acquire", det.name) yield from bps.mv(det.cam.acquire, 0) except Exception as exc: logger.exception("flyscan._cleanup: stop acquire failed: %r", exc) # Steps 3-6 below only matter if the plan actually got as far # as starting acquisition. If we died earlier (e.g. a # FailedStatus during the override loop), skip them to avoid: # - waiting for drains that will never come, # - flushing a num_captured value left over from a prior run # (the PV is cumulative across runs unless explicitly # reset, which we do not do). if not capture_started[0]: logger.info( "flyscan._cleanup: capture was never armed in this run;" " skipping stop-capture / drain / flush", ) else: try: logger.info("flyscan._cleanup: stopping %s.hdf1 capture", det.name) yield from bps.mv(det.hdf1.capture, 0) except Exception as exc: logger.exception("flyscan._cleanup: stop capture failed: %r", exc) # Let the cam settle and the HDF plugin drain its queue # before we ask it to write the file. Cleanup latency # does not affect user-visible behavior, so we use a # coarser tick than monitor_loop's _consumer_tick. try: yield from wait_for_acquire_drained(det, poll=_CLEANUP_DRAIN_TICK) except Exception as exc: logger.exception( "flyscan._cleanup: wait_for_acquire_drained failed: %r", exc, ) # Verify the file landed on disk. We rely on auto_save=Yes # (set in the override list) to flush the HDF5 file when # capture stops. Verified empirically: from a bluesky plan # with auto_save=Yes, Capture=0 causes the IOC to write the # file without our needing to set WriteFile=1. # # (Manual GUI use with auto_save=No is different: there, # WriteFile=1 must be set explicitly after Capture=0. An # earlier version of this code issued WriteFile=1 here as # well, but it ran *after* the auto_save had already # written the file, so the IOC rejected it with status=3. # The file was fine; the error was spurious; the code was # redundant.) # # FullFileName_RBV is populated by the IOC after a # successful write. An empty value here is the "tell" # that no file was saved. try: n_captured = _safe_get(det.hdf1, "num_captured", use_monitor=False) or 0 full_name = _safe_get( det.hdf1, "full_file_name", use_monitor=False, as_string=True ) if n_captured > 0 and full_name: logger.info( "flyscan._cleanup: HDF5 file saved: %s" " (num_captured=%d)", full_name, n_captured, ) elif n_captured > 0: logger.warning( "flyscan._cleanup: HDF5 file not saved" " (full_file_name empty; num_captured=%d," " write_status=%r, write_message=%r)", n_captured, _safe_get( det.hdf1, "write_status", use_monitor=False, as_string=True ), _safe_get( det.hdf1, "write_message", use_monitor=False, as_string=True ), ) else: logger.info( "flyscan._cleanup: no frames captured;" " no file expected", ) except Exception as exc: logger.exception( "flyscan._cleanup: file verification failed: %r", exc, ) # Report frames the HDF input dropped this run (cam # produced them, HDF couldn't keep up). The counter is # cumulative across runs in the IOC, so we compare to the # baseline snapshotted at plan entry. Non-zero drops # indicate the cam is producing faster than the HDF can # write — even with blocking_callbacks=Yes there are edge # cases (e.g. the cam's first burst before back-pressure # propagates). Cleanup logs at WARNING for visibility. try: if dropped_arrays_baseline is not None: dropped_now = _safe_get( det.hdf1, "dropped_arrays", use_monitor=False, ) if dropped_now is not None: delta = int(dropped_now) - int(dropped_arrays_baseline) if delta > 0: cam_counter = _safe_get( det.cam, "array_counter", use_monitor=False ) hdf_counter = _safe_get( det.hdf1, "array_counter", use_monitor=False ) msg = ( f"HDF plugin dropped {delta} frame(s)" f" during this run" f" (hdf1.dropped_arrays:" f" {int(dropped_arrays_baseline)}" f" -> {int(dropped_now)})." f" cam.array_counter={cam_counter!r}," f" hdf1.array_counter={hdf_counter!r}." f" The HDF plugin cannot keep up with the" f" cam at the requested rate on this IOC" f" / filesystem / host combination." f" Frames produced by the cam are missing" f" from the on-disk HDF5 file." f" Increase t_period or reduce" f" exposures_per_egu and retry." ) # Two channels for visibility: # - logger.warning: lands in the log file # and any stream handlers (apsbits sets # these up to print to the console). # - warnings.warn: appears in IPython's # warning channel (rendered distinctly # from normal output) and can be # filtered/escalated to exception via # ``warnings.filterwarnings(...)``. logger.warning("flyscan._cleanup: %s", msg) warnings.warn( msg, category=FlyscanDataLossWarning, stacklevel=2, ) else: logger.info( "flyscan._cleanup: HDF dropped 0 frames" " this run (dropped_arrays unchanged at %d)", int(dropped_now), ) except Exception as exc: logger.exception( "flyscan._cleanup: dropped-arrays check failed: %r", exc, ) try: logger.info( "flyscan._cleanup: restoring %d cached signal(s)", len(original_cache), ) yield from original_cache.restore() except Exception as exc: logger.exception("flyscan._cleanup: restore failed: %r", exc) try: logger.info("flyscan._cleanup: restoring stage_sigs snapshot") restore_stage_sigs(saved_stage_sigs) except Exception as exc: logger.exception( "flyscan._cleanup: restore stage_sigs failed: %r", exc, ) try: logger.info( "flyscan._cleanup: restoring kinds snapshot (%d signal(s))", len(saved_kinds), ) restore_kinds(saved_kinds) except Exception as exc: logger.exception( "flyscan._cleanup: restore kinds failed: %r", exc, ) logger.info("flyscan._cleanup: done") # finalize_wrapper consumes _cleanup as a generator, so it must # yield at least one message. yield from bps.null() # finalize_wrapper guarantees _cleanup runs on success, exception, # *and* RE-injected exceptions (Ctrl-C, RequestAbort, ...), and # re-raises the original exception so the RunEngine sees it. yield from bpp.finalize_wrapper(_main(), _cleanup())
# ---------------- # TODO: Function identify motor readback with each image frame """ 1. Starting with Bluesky Run object: 1. Get flymotor readback monitor stream (position v. timestamp) 1. Sort by timestamp 1. Get timestamps bracketing p_start 1. Get timestamps bracketing p_end 1. Interpolate to approximate t_start & t_end 1. Note flyscan data is the part between t_start & t_end 1. Get area detector HDF image number monitor stream (image_num v. timestamp) 1. Sort by timestamp 1. Identify all flyscan image numbers collected between t_start & t_end 1. For each frame number, interpolate flymotor readback at each timestamp """