"""Fly scan for 3-ID-C: area detector *vs*. motor.
Fly scan an EPICS motor and collect Eiger2 (or any AreaDetector) images.
Implementation note: this plan is software-correlated (no hardware
gate signal). Frame-to-position pairing happens downstream from
the ``monitor_during_decorator`` streams, joined by IOC timestamp.
Waits inside the plan use ophyd ``Status`` objects (``MoveStatus``,
``SubscriptionStatus``, ``AndStatus``) driven by CA monitor
callbacks rather than busy-poll loops. See ``flyscan-3idc-status-
strategy.md`` (alongside this file) for the design history.
Usage from a command-line session::
from bits2606.startup import * # provides RE, oregistry, etc.
from flyscan_3idc import flyscan, configure_adsimdet
# The plan: drive a fly scan and record a Bluesky run.
uid, = RE(
flyscan(
det_name: str = "eiger2",
flymotor_name: str = "sample_stage.omega",
p_start=5, p_end=10,
exposures_per_egu=10, # approximate
t_period=0.05,
)
)
# The standalone diagnostic: exercise the AD acquisition
# protocol (no plan, no RunEngine) for triage.
result = configure_adsimdet(adsimdet, capture_duration=3.0)
General outline
---------------
1. Preparation
1. Validate input parameters
2. Collect metadata
3. Snapshot mutable state (``stage_sigs``, ``kind``,
overridden signal values) for later restore
4. concurrently (motor moves while parameters are set)
1. send motor to *initial* position (``bps.abs_set(...,
group="taxi")``)
2. set *most* parameters for scan
3. wait for motor to reach taxi position
(``bps.wait(group="taxi")``)
5. set motor velocity for scan
2. Kickoff
1. stage devices
2. open run
3. subscribe to bespoke monitor streams (one stream per signal):
1. HDF writer's frame count (``det.hdf1.array_counter``,
carries EPICS timestamp for downstream sync with flymotor)
2. camera's frame count (``det.cam.array_counter``, for
cam-vs-HDF latency comparison)
3. motor position (``flymotor.user_readback``)
4. send motor to *final* position (``bps.abs_set(...,
group="scan")``)
5. start detector acquiring (without a hardware gate signal,
coordinating acquire-start to a specific motor position is
extremely difficult; instead, acquire **continuously** for
the entire span ``p_initial <= flymotor.position <= p_final``
and let downstream analysis select frames in
``[p_start, p_end]`` by timestamp/position). This is an
intentional oversample.
6. build status objects for the monitor stage:
- ``cam_stopped_status``: ``cam.acquire == 0``
- ``drain_status``: HDF queue empty and idle
- ``hdf_drain_status = AndStatus(cam_stopped_status,
drain_status)`` — scan is done when both
- ``watchdog_status``: ``num_captured > 0`` with
``timeout=no_frames_timeout``
3. Monitor (``monitor_loop``)
1. CA monitor callback on ``det.hdf1.num_captured`` pushes
``(timestamp, value)`` onto a bounded queue (the *producer*)
2. plan-side *consumer* wakes every ``_consumer_tick`` seconds,
drains the queue, and emits one ``primary`` event per
newly-captured frame (``create / read(det) / read(flymotor)
/ save``). ``bps.read`` returns cached monitor values — no
extra CA traffic
3. stop detector acquiring when motor crosses *p_end*
4. raise ``RuntimeError`` if the watchdog times out without any
frame arriving (RE will then STOP all in-motion movables)
5. exit when ``hdf_drain_status.done`` (cam has stopped AND
every in-flight frame has been flushed)
6. after exit, ``bps.wait(group="scan")`` absorbs any motor
settling past *p_final*
4. Conclusion (``_cleanup``)
1. stop motor (if still moving — checked via ``motor_is_moving``)
2. stop cam acquire
3. stop hdf1 capture
4. wait for cam idle AND HDF queue drained
(``wait_for_acquire_drained`` — uses ``AndStatus`` of
``SubscriptionStatus`` per signal)
5. verify the HDF5 file landed (``full_file_name``)
6. restore overridden signal values from ``CacheParameters``,
restore mutated ``stage_sigs`` dicts, restore mutated
``kind`` values
7. close run (handled by ``run_decorator``)
8. unstage devices (handled by ``stage_decorator``)
"""
import inspect
import logging
import queue
import time
import warnings
from dataclasses import dataclass
from apsbits.core.instrument_init import oregistry
from bluesky import plan_stubs as bps
from bluesky import preprocessors as bpp
from bluesky.utils import plan as bluesky_plan
from epics import caget
from ophyd import ADBase
from ophyd import EpicsMotor
from ophyd import Kind
from ophyd.status import AndStatus
from ophyd.status import SubscriptionStatus
from ophyd.utils.errors import WaitTimeoutError
logger = logging.getLogger(__name__)
# Default the module's own logger to INFO so diagnostics show up in a
# fresh CLI session without the user having to configure logging first.
# We deliberately raise *only this module's* level (not the root logger
# or ophyd's loggers) so the pyepics/ophyd control layer stays quiet.
# If a parent logger has been configured (e.g. apsbits set up a console
# handler on the root logger), our records will propagate to it. If
# nothing is configured at all, attach a minimal handler so messages
# still reach the terminal.
logger.setLevel(logging.INFO)
if not logger.handlers and not logging.getLogger().handlers:
_h = logging.StreamHandler()
_h.setFormatter(
logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
)
logger.addHandler(_h)
# Don't bubble up to a (possibly later-configured) root that might
# double-print; the local handler is sufficient in this fallback.
logger.propagate = False
# ---------------------------------------------------------------------------
# Module-level tunable timing constants.
#
# These are the wake-up ticks for plan-side loops that wait on a
# status-object flag (Phase 3 refactor). CA monitor callbacks update
# the flags asynchronously on the pyepics dispatch thread; the plan
# wakes up at these intervals to check the flag and decide whether to
# proceed.
#
# Tradeoff in either direction: smaller tick = lower latency to
# noticing the flag flipped, higher RunEngine wake-up rate.
# Sub-50 ms ticks are imperceptible to human watchers of progress
# events; sub-1 ms ticks waste CPU without observable benefit.
#
# Adjust here rather than at call sites — keeps related knobs together
# and discoverable.
# Default wake-up tick for monitor_loop's consumer (also the default
# for the flyscan(_consumer_tick=...) plan kwarg). 20 ms = 50 Hz.
_CONSUMER_TICK_DEFAULT = 0.02
# Wake-up tick for wait_for_acquire_drained in the cleanup path.
# Cleanup latency does not matter for live progress, so coarser is
# fine; 50 ms = 20 Hz.
_CLEANUP_DRAIN_TICK = 0.05
# Bounded size for monitor_loop's per-frame producer/consumer queue.
# The producer (a CA monitor callback on hdf1.num_captured) pushes
# (timestamp, value) tuples; the consumer drains in monitor_loop.
# At typical scan rates (10 Hz frames, 20 ms consumer tick), the
# queue stays nearly empty. This bound exists to detect a
# producer/consumer mismatch — overflow is logged once as a WARNING
# and the entry is dropped (per Phase 3 decision 3.3b). Increase
# here if higher-rate scans are ever attempted.
_FRAME_QUEUE_SIZE = 64
# Public API of this module. Other symbols (validators,
# snapshot/restore helpers, monitor_loop, the _wait_for helper,
# motor_is_moving, etc.) are implementation detail — accessible
# via direct import for diagnostics but not advertised as a
# stable interface.
__all__ = [
"flyscan", # the bluesky plan
"configure_adsimdet", # standalone diagnostic, no plan/RE
]
class CacheParameters(dict):
"""Remember original ophyd signal settings for later restoration.
Usage::
cache = CacheParameters()
# motor1.velocity is 0.5
yield from cache.override(motor1.velocity, 1.5)
# motor1.velocity is now 1.5
# ...
yield from cache.restore()
# motor1.velocity is back to 0.5
"""
@bluesky_plan
def override(self, signal, value):
"""Cache the current value of ``signal`` and set it to ``value``.
This is a bluesky plan stub; use with ``yield from``.
"""
# Only cache the first time we override a given signal so that
# repeated overrides still restore back to the *original* value.
if signal not in self:
self[signal] = signal.get()
yield from bps.mv(signal, value)
@bluesky_plan
def restore(self, clear=True):
"""Restore all cached signals to their original values.
This is a bluesky plan stub; use with ``yield from``.
Parameters
----------
clear : bool
If True (default), clear the cache after restoring.
"""
if not self:
return
# Restore in reverse order of how they were added, so that
# signals with ordering dependencies (e.g. set velocity *after*
# restoring acceleration) are handled correctly.
for signal, value in reversed(self.items()):
yield from bps.mv(signal, value)
if clear:
self.clear()
def read_motor_field(motor, suffix, timeout=1.0):
"""Read an EPICS motor-record field (e.g. ``.VMAX``) ad-hoc.
Returns the field value, or ``None`` if the PV does not connect
within ``timeout`` seconds or any other read error occurs.
Uses ``epics.caget`` directly rather than constructing a throwaway
``EpicsSignal``. Rationale:
- ``caget`` runs on the calling thread's CA context and does no
asynchronous metadata fetching. An earlier ``EpicsSignal``-based
implementation triggered a SIGSEGV in pyepics' ``util3`` dispatch
thread on ``gp:m1.VBAS``: the ``EpicsSignal`` was created, read,
then ``destroy()``ed, but pyepics had queued a deferred metadata
callback (``get_all_metadata_callback``) that fired *after* the
underlying CA channel had been torn down — at first a recoverable
``RuntimeError: Expected CA context is unset`` (2026-06-04
12:46:38 session), then a segfault (2026-06-04 13:10 session).
- ``caget`` also sidesteps the oregistry-pollution concern (the
previous implementation needed a ``_suspended_auto_register``
context manager to keep the throwaway signal out of the global
registry).
"""
pv = motor.prefix + suffix
try:
value = caget(pv, timeout=timeout)
if value is None:
# caget returns None on connection timeout (no exception).
logger.debug("read_motor_field(%s) timed out", pv)
return None
logger.debug("read_motor_field(%s) -> %r", pv, value)
return value
except Exception as exc:
logger.debug("read_motor_field(%s) failed: %r", pv, exc)
return None
def preflight_connectivity(det, det_name, flymotor, flymotor_name, timeout=2.0):
"""Quick CA-connectivity sanity check before staging.
Touches a small, representative set of PVs on ``det`` and
``flymotor`` and raises ``RuntimeError`` with a clear, single-line
message if any of them fail to connect within ``timeout`` seconds.
The goal is to fail *before* device staging when an IOC is down or
wedged. Staging issues many serial writes (``set_and_wait`` with
a default 5-second connection timeout each), so a dead IOC at
stage time turns into 30+ seconds of cleanup-of-cleanup noise
that obscures the actual cause. Detecting it here gives the
user a one-line error instead.
Why these specific PVs:
- ``flymotor.user_readback``: motor record exists at all
- ``flymotor.motor_done_move``: motor record is responsive
- ``det.cam.acquire``: cam IOC is up
- ``det.hdf1.capture``: HDF plugin is up (if present)
- ``det.hdf1.num_captured``: HDF plugin readback works
Components are accessed via ``getattr``, so any not-yet-instantiated
ophyd Components are instantiated here. That is the *intended*
behavior: instantiate them now, while we still have IOC contact,
so later code (including cleanup) does not pay the
``wait_for_connection`` price.
"""
to_check = [
("flymotor.user_readback", getattr(flymotor, "user_readback", None)),
("flymotor.motor_done_move", getattr(flymotor, "motor_done_move", None)),
("det.cam.acquire", getattr(getattr(det, "cam", None), "acquire", None)),
]
# Optional components: only check if the class declares them.
if _has_component(det, "hdf1"):
hdf1 = det.hdf1
if _has_component(hdf1, "capture"):
to_check.append(("det.hdf1.capture", hdf1.capture))
if _has_component(hdf1, "num_captured"):
to_check.append(("det.hdf1.num_captured", hdf1.num_captured))
failed = []
for label, sig in to_check:
if sig is None:
failed.append(f"{label} missing")
continue
try:
sig.wait_for_connection(timeout=timeout)
except Exception as exc:
failed.append(f"{label} ({sig.pvname}): {exc}")
if failed:
msg = (
f"preflight_connectivity failed for det={det_name!r}"
f" flymotor={flymotor_name!r} (IOC down?): " + "; ".join(failed)
)
logger.error(msg)
raise RuntimeError(msg)
logger.info(
"preflight_connectivity OK: checked %d PV(s) on det=%r flymotor=%r",
len(to_check),
det_name,
flymotor_name,
)
def check_hdf_file_path(det, settle_timeout=1.0):
"""Verify the HDF plugin can see the configured file_path.
Returns silently on success. Raises ``RuntimeError`` if the IOC
reports ``FilePathExists_RBV == 0`` after ``settle_timeout``
seconds.
Why this is a separate gate (not just a stage_sig):
The HDF plugin's ``file_path`` must be set, *and* the path must
actually exist on the IOC's filesystem (i.e. inside the IOC's
container if it's containerized), *before* ``capture`` is set to
1 at stage time. If the path doesn't exist when capture starts,
the plugin fails at file-open with a generic
``Error writing file: status=3`` message that surfaces only via
the plugin's ``WriteMessage`` PV — bluesky/ophyd never raises,
and ``num_captured`` stays at 0 forever.
This check must run *after* ``file_path`` has been written to the
IOC (otherwise we'd be checking a stale value). ``FilePathExists``
is updated by the plugin in response to a ``file_path`` write, but
not instantly — the wait below absorbs that settling delay via the
``_wait_for`` helper (precheck + CA monitor subscription).
"""
if not _has_component(det, "hdf1") or not _has_component(
det.hdf1, "file_path_exists"
):
logger.debug(
"check_hdf_file_path: %s lacks hdf1.file_path_exists; skipping",
det.name,
)
return
try:
_wait_for(
det.hdf1.file_path_exists,
lambda value: value == 1,
timeout=settle_timeout,
)
except WaitTimeoutError as err:
exists = det.hdf1.file_path_exists.get(use_monitor=False)
current_path = det.hdf1.file_path.get(use_monitor=False)
msg = (
f"HDF plugin reports file_path does not exist on the IOC's"
f" filesystem: {det.name}.hdf1.file_path={current_path!r}"
f" (file_path_exists={exists}). The path must exist (and be"
f" writable) on the IOC's filesystem, which may be a container"
f" view distinct from the host filesystem. Create the directory"
f" inside the IOC, or correct ad_file_path."
)
logger.error(msg)
raise RuntimeError(msg) from err
current_path = det.hdf1.file_path.get(use_monitor=False)
logger.info(
"check_hdf_file_path: OK (%s.hdf1.file_path=%r)",
det.name,
current_path,
)
# Fallback acceleration (seconds) used when ``.ACCL`` cannot be read.
# Deliberately generous; over-allocating the taxi region only costs a
# bit of extra travel before the first useful frame.
_ACCL_FALLBACK_SECONDS = 0.25
class FlyscanDataLossWarning(UserWarning):
"""Emitted when a flyscan run loses frames at the HDF plugin input.
A non-zero delta in ``hdf1.dropped_arrays`` over the run means
the cam produced frames the HDF plugin couldn't accept. This
is a data-integrity concern: the on-disk HDF5 file is missing
frames the cam exposed. Surfaced as a ``UserWarning`` subclass
so it appears in IPython/Jupyter's warning channel in addition
to the log file, and is filterable independently of other
warnings by user code (``warnings.filterwarnings('error',
category=FlyscanDataLossWarning)`` to turn it into an
exception, for example).
"""
@dataclass(frozen=True)
class FlyscanGeometry:
"""Derived geometry for a flyscan: taxi/coast distances + frame count.
All distances are in motor engineering units; all times are in
seconds. ``accl_was_default`` is True when ``.ACCL`` could not be
read and the fallback was used.
"""
num_frames: int
scan_duration: float # num_frames * t_period
scan_velocity: float # (p_end - p_start) / scan_duration
d_taxi: float # 0.5 * scan_velocity * motor_accl
p_initial: float # p_start - d_taxi - taxi_allowance
p_final: float # p_end + d_taxi + taxi_allowance
motor_accl: float # seconds (.ACCL or fallback)
motor_egu: str # .EGU string for error messages/metadata
accl_was_default: bool
def compute_flyscan_geometry(
flymotor,
p_start,
p_end,
exposures_per_egu,
t_period,
taxi_allowance,
):
"""Derive flyscan geometry from user-meaningful kwargs.
Pure function: no CA writes, no bluesky plan messages. Reads
``.ACCL`` and ``.EGU`` from the motor record via the same caget
path as ``validate_flyscan_inputs``; ``.ACCL`` falls back to
``_ACCL_FALLBACK_SECONDS`` (with a warning log) if the read
fails so the caller still gets a usable geometry object — the
velocity-bound check in ``validate_flyscan_inputs`` will then
catch any downstream impossibility.
Frame count uses fence-post counting: a frame at both endpoints
of ``[p_start, p_end]`` plus ``exposures_per_egu`` frames per unit
of motor travel between them::
num_frames = round(1 + (p_end - p_start) * exposures_per_egu)
Taxi distance uses the over-allocating form
``d_taxi = 0.5 * scan_velocity * motor_accl`` (the motor record's
``.ACCL`` is "seconds to reach .VELO", so this slightly
overestimates the time to reach ``scan_velocity`` — intentional;
gives the cam a beat to settle before the first useful frame).
"""
p_fly_dist = p_end - p_start
if p_fly_dist <= 0:
raise ValueError(f"p_end={p_end:g} must be greater than p_start={p_start:g}.")
if exposures_per_egu <= 0:
raise ValueError(f"exposures_per_egu={exposures_per_egu:g} must be positive.")
if taxi_allowance < 0:
raise ValueError(f"taxi_allowance={taxi_allowance:g} must be non-negative.")
if t_period <= 0:
raise ValueError(f"t_period={t_period:g} must be positive.")
num_frames = int(round(1 + p_fly_dist * exposures_per_egu))
if num_frames < 2:
raise ValueError(
f"computed num_frames={num_frames} (from exposures_per_egu="
f"{exposures_per_egu:g}, p_fly_dist={p_fly_dist:g}) must be"
" at least 2."
)
scan_duration = num_frames * t_period
scan_velocity = p_fly_dist / scan_duration
accl = read_motor_field(flymotor, ".ACCL")
accl_was_default = False
if accl is None or accl <= 0:
logger.warning(
"compute_flyscan_geometry: motor .ACCL unreadable (got %r);"
" using fallback %g s. Taxi region may be under-allocated.",
accl,
_ACCL_FALLBACK_SECONDS,
)
accl = _ACCL_FALLBACK_SECONDS
accl_was_default = True
egu = read_motor_field(flymotor, ".EGU")
if not isinstance(egu, str):
egu = ""
d_taxi = 0.5 * scan_velocity * accl
p_initial = p_start - d_taxi - taxi_allowance
p_final = p_end + d_taxi + taxi_allowance
logger.info(
"compute_flyscan_geometry: num_frames=%d scan_velocity=%g %s/s"
" d_taxi=%g %s (ACCL=%g s%s, allowance=%g %s)"
" => p_initial=%g p_final=%g",
num_frames,
scan_velocity,
egu or "?",
d_taxi,
egu or "?",
accl,
" [fallback]" if accl_was_default else "",
taxi_allowance,
egu or "?",
p_initial,
p_final,
)
return FlyscanGeometry(
num_frames=num_frames,
scan_duration=scan_duration,
scan_velocity=scan_velocity,
d_taxi=d_taxi,
p_initial=p_initial,
p_final=p_final,
motor_accl=accl,
motor_egu=egu,
accl_was_default=accl_was_default,
)
def validate_flyscan_inputs(
det,
det_name,
flymotor,
flymotor_name,
geometry,
t_acquire,
t_period,
compression,
):
"""Validate flyscan arguments against derived geometry and IOC state.
``geometry`` is the ``FlyscanGeometry`` returned by
``compute_flyscan_geometry``. This function does the checks that
require either a connected device or values that must be
cross-checked against the motor's velocity limits / the HDF
plugin's compression enum.
Returns ``(v_max, v_base)`` for inclusion in run metadata.
Raises ``KeyError`` for missing/wrong-type devices and ``ValueError``
for out-of-range numeric arguments or an unsupported compression.
"""
if not isinstance(det, ADBase):
raise KeyError(f"Area Detector {det_name!r} not found in registry.")
if not isinstance(flymotor, EpicsMotor):
raise KeyError(f"Motor {flymotor_name!r} not found in registry.")
if not 0 < t_acquire <= t_period:
raise ValueError(
"Acquisition time must be positive and less than or equal to the period."
)
# Sanity-check the derivation: this should always hold after a
# successful compute_flyscan_geometry, but assert it so any future
# refactor that changes the formula doesn't silently produce an
# invalid geometry.
if not (
geometry.p_initial
< (geometry.p_initial + geometry.d_taxi) # p_start
< (geometry.p_final - geometry.d_taxi) # p_end
< geometry.p_final
):
raise ValueError(
"Derived geometry violates p_initial < p_start < p_end <"
f" p_final: {geometry!r}"
)
if geometry.scan_velocity <= 0:
raise ValueError(f"scan_velocity={geometry.scan_velocity:g} must be positive.")
# EpicsMotor does not expose .VMAX or .VBAS as components.
# Read ad-hoc; None or 0 means "no limit on that side."
v_max = read_motor_field(flymotor, ".VMAX")
v_base = read_motor_field(flymotor, ".VBAS")
if v_max is not None and v_max > 0 and geometry.scan_velocity > v_max:
raise ValueError(
f"scan_velocity={geometry.scan_velocity:g} exceeds motor max"
f" velocity .VMAX={v_max:g} for {flymotor_name!r}."
)
if v_base is not None and v_base > 0 and geometry.scan_velocity < v_base:
raise ValueError(
f"scan_velocity={geometry.scan_velocity:g} is below motor"
f" base velocity .VBAS={v_base:g} for {flymotor_name!r}."
)
# Compression validation against the HDF plugin's enum_strs. The
# PV is an mbbi/mbbo enumeration; rejecting bad values here gives
# the user a useful message ("got 'gzip', expected one of [...]")
# instead of a much-later IOC-side "invalid value" at write time.
# Defensive: skip if enum_strs is not populated (offline IOC,
# mock, etc.) — falls through to IOC rejection at write time.
enum_strs = ()
try:
comp_sig = det.hdf1.compression
enum_strs = tuple(getattr(comp_sig, "enum_strs", ()) or ())
except Exception as exc:
logger.debug(
"validate_flyscan_inputs: cannot read compression enum_strs:" " %r",
exc,
)
if enum_strs and compression not in enum_strs:
raise ValueError(
f"compression={compression!r} not in HDF plugin's allowed"
f" set {list(enum_strs)!r}."
)
logger.info(
"validated inputs: det=%r flymotor=%r p=%g/%g/%g/%g num_frames=%d"
" t_acquire=%g t_period=%g scan_active_duration=%g"
" scan_velocity=%g (VMAX=%r VBAS=%r) compression=%r",
det_name,
flymotor_name,
geometry.p_initial,
geometry.p_initial + geometry.d_taxi, # p_start
geometry.p_final - geometry.d_taxi, # p_end
geometry.p_final,
geometry.num_frames,
t_acquire,
t_period,
geometry.scan_duration,
geometry.scan_velocity,
v_max,
v_base,
compression,
)
return v_max, v_base
def build_flyscan_md(
*,
plan_name,
det_name,
flymotor_name,
p_start,
p_end,
exposures_per_egu,
t_acquire,
t_period,
taxi_allowance,
compression,
geometry,
v_max,
v_base,
ad_file_name,
ad_file_path,
hdf_num_capture,
hdf_flush_timeout_max,
consumer_tick,
):
"""Assemble the metadata dict recorded with the run.
Any plan kwarg that affects scan behavior is recorded here so the
run document preserves the value used. Derived values from
``geometry`` (``p_initial``, ``p_final``, ``num_frames``,
``scan_velocity``, ...) are also recorded so downstream readers
have both the user-supplied inputs and the resulting plan.
Internal underscore-prefixed kwargs (e.g. ``_consumer_tick``)
appear without the underscore in the metadata for readability.
"""
return {
"plan_name": plan_name,
"det_name": det_name,
"flymotor_name": flymotor_name,
# User-supplied scan parameters
"p_start": p_start,
"p_end": p_end,
"exposures_per_egu": exposures_per_egu,
"t_acquire": t_acquire,
"t_period": t_period,
"taxi_allowance": taxi_allowance,
"compression": compression,
# Derived geometry
"p_initial": geometry.p_initial,
"p_final": geometry.p_final,
"num_frames": geometry.num_frames,
"scan_active_duration": geometry.scan_duration,
"scan_velocity": geometry.scan_velocity,
"d_taxi": geometry.d_taxi,
"motor_accl": geometry.motor_accl,
"motor_accl_was_default": geometry.accl_was_default,
"motor_egu": geometry.motor_egu,
"motor_velocity_max": v_max, # .VMAX; None if not readable
"motor_velocity_base": v_base, # .VBAS; None if not readable
# File destination
"ad_file_name": ad_file_name,
"ad_file_path": ad_file_path,
"hdf_num_capture": hdf_num_capture, # HDF plugin upper bound
"hdf_flush_timeout_max": hdf_flush_timeout_max, # worst-case (s)
"consumer_tick": consumer_tick, # monitor_loop wake-up tick (s)
}
def snapshot_stage_sigs(*devices):
"""Shallow-copy each device's ``stage_sigs`` for later restore.
Returns a list of (device, dict-copy) pairs. Use with
``restore_stage_sigs``.
"""
return [(dev, dict(dev.stage_sigs)) for dev in devices]
def restore_stage_sigs(snapshot):
"""Restore device ``stage_sigs`` dicts from a ``snapshot_stage_sigs``.
Clear-then-update is intentional: mutate the *same* dict object the
device already holds (don't reassign the attribute).
"""
for dev, saved in snapshot:
dev.stage_sigs.clear()
dev.stage_sigs.update(saved)
def snapshot_kinds(*signals):
"""Capture each signal's ``.kind`` for later restore.
Returns a list of ``(signal, original_kind)`` pairs. Use with
``restore_kinds``. Intended for plan-local mutations of ``kind``
(e.g. to make ``cam.array_counter`` ``Kind.hinted`` for the
duration of a fly scan so it appears in primary-stream events,
without imposing that choice on every other plan that touches the
device).
"""
return [(sig, sig.kind) for sig in signals]
def restore_kinds(snapshot):
"""Restore each signal's ``.kind`` from a ``snapshot_kinds`` result."""
for sig, original_kind in snapshot:
sig.kind = original_kind
def motor_is_moving(motor):
"""True iff the motor is currently moving, checked via DMOV (no cache).
Reads ``motor_done_move`` (the motor record's ``.DMOV`` field) with
``use_monitor=False`` to bypass the pyepics monitor cache; this
matches the rest of this module's "bypass cache for timing-critical
reads" discipline (see strategy-doc Phase 0.1 for background).
Prefer this function over ``motor.moving`` (the EpicsMotor property),
which depending on ophyd version may read ``MOVN`` rather than
``DMOV``. ``MOVN`` transitions briefly to "not moving" between the
primary move and the backlash-correction move when BDST != 0;
``DMOV`` only goes to 1 when both phases are complete.
Used by ``_cleanup`` to decide whether to issue ``bps.stop(flymotor)``
(skipped if the motor is already idle, per Phase 2 decision 2.5 /
Phase 3 decision 3.12).
"""
return motor.motor_done_move.get(use_monitor=False, as_string=False) != 1
def _wait_for(signal, predicate, timeout, *, settle_time=0.0):
"""Wait for ``signal`` to satisfy ``predicate``, using CA monitors.
Two-step pattern:
1. **Precheck** the signal's current value via
``signal.get(use_monitor=False)``. If ``predicate(value)`` is
already true, return immediately without subscribing.
2. Otherwise create an
``ophyd.status.SubscriptionStatus(..., run=True)`` whose
callback fires whenever a CA monitor update arrives, and
``.wait(timeout=timeout)``.
The precheck defends against a known IOC failure mode: CA monitors
are posted **on change**. If the PV's current value already
satisfies the predicate, an IOC may have no reason to post the
"next" monitor update the status is waiting for, and the wait will
hang to timeout (surfacing as ``ophyd.utils.errors.WaitTimeoutError``,
or ``FailedStatus`` if the status was used inside a bluesky plan).
The precheck short-circuits this case.
``SubscriptionStatus(run=True)`` (the ophyd default) also evaluates
the predicate once at subscribe time against the value most recently
seen by the monitor stream, which is a *second* layer of defense in
case a monitor update arrived between the precheck and the
subscription.
Parameters
----------
signal : ophyd.Signal
The signal to watch. Must support both
``.get(use_monitor=False)`` (used by the precheck) and
subscription via ``SubscriptionStatus`` (i.e. a CA-backed
signal).
predicate : callable
``predicate(value) -> bool``. Returns true when the wait
should end. Called with positional ``value`` only; the
``SubscriptionStatus`` callback wraps it to accept the
``value=..., **kwargs`` shape ophyd uses.
timeout : float
Seconds to wait for the predicate to become true. Raises
``ophyd.utils.errors.WaitTimeoutError`` on timeout.
settle_time : float, optional
Forwarded to ``SubscriptionStatus``. If non-zero, ophyd
requires the predicate to remain true for this many seconds
before completing the status. Default 0.
Returns
-------
The current value of ``signal`` (read via ``use_monitor=False``)
once the predicate is satisfied.
Raises
------
ophyd.utils.errors.WaitTimeoutError
If the predicate does not become true within ``timeout``.
Notes
-----
This is a synchronous helper for use **outside** bluesky plans
(diagnostic functions, preflight checks). Inside a plan, prefer
``signal.set(value).wait(...)`` where applicable, or build a
plan-stub wrapper that yields control while waiting.
"""
current = signal.get(use_monitor=False)
if predicate(current):
logger.debug(
"_wait_for(%s): precheck satisfied (value=%r); no subscription",
signal.name,
current,
)
return current
logger.debug(
"_wait_for(%s): precheck not satisfied (value=%r); subscribing",
signal.name,
current,
)
status = SubscriptionStatus(
signal,
lambda *, value, **_: predicate(value),
run=True,
settle_time=settle_time,
)
status.wait(timeout=timeout)
return signal.get(use_monitor=False)
def _safe_get(device, name, **get_kwargs):
"""Best-effort ``device.<name>.get(**get_kwargs)`` for diagnostics.
Returns ``None`` if the component does not exist on the class or if
the read raises any exception. Used to harvest extra context for
error messages (e.g. ``write_status`` / ``write_message`` from a
sick HDF plugin) without letting the diagnostic itself raise and
mask the real error.
"""
if not _has_component(device, name):
return None
try:
sig = getattr(device, name)
return sig.get(**get_kwargs)
except Exception as exc:
logger.debug("_safe_get(%s.%s) failed: %r", device.name, name, exc)
return None
def _has_component(device, name):
"""True if ``name`` is declared as an ophyd component on ``device``'s class.
Uses class-level introspection only — does **not** trigger lazy
instantiation or any CA traffic. Safe to call against detectors
whose IOC is unresponsive (e.g. inside cleanup paths after a
failure).
Compare with ``hasattr(device, name)``, which calls ``getattr``
and therefore *will* instantiate a not-yet-touched ophyd component,
blocking for ``wait_for_connection`` (default 5s) when the IOC is
down.
"""
if device is None:
return False
return name in type(device).component_names
def _hdf_flush_timeout(
det,
n_captured,
floor=10.0,
headroom=3.0,
assumed_rate_mb_s=50.0,
assumed_bytes_per_pixel=8,
):
"""Estimate a generous timeout for HDF5 ``write_file`` to complete.
Computes the expected file size from frame dimensions and pixel
depth, divides by a conservative write rate, multiplies by
``headroom``, and floors at ``floor`` seconds.
Parameters
----------
det : ophyd Device
The area detector; ``det.hdf1.width`` and ``det.hdf1.height``
are read to estimate frame size.
n_captured : int
Number of frames the IOC will be writing.
floor : float
Minimum timeout, regardless of computed value. Protects very
small captures from a near-zero timeout.
headroom : float
Multiplier applied to the estimated wall time. 3.0 gives a
comfortable margin without being silly.
assumed_rate_mb_s : float
Conservative write rate (MB/s). The IOC typically achieves
much more; 50 leaves a 4x margin on our measured ~200 MB/s.
assumed_bytes_per_pixel : int
Worst case for the cam's data type (Float64 = 8). Smaller
cam types (Int8, UInt16) will write faster but the headroom
absorbs the over-estimate.
Returns
-------
float
Timeout in seconds. Always >= ``floor``.
"""
try:
width = int(det.hdf1.width.get(use_monitor=False))
height = int(det.hdf1.height.get(use_monitor=False))
except Exception:
return max(floor, floor * headroom)
mb = width * height * assumed_bytes_per_pixel * int(n_captured) / (1024 * 1024)
estimated = mb / assumed_rate_mb_s
return max(floor, estimated * headroom)
def wait_for_acquire_drained(det, poll=0.001, timeout=10.0):
"""Plan stub: wait for cam to report idle *and* HDF queue to drain.
With ``cam.wait_for_plugins='Yes'``, ``cam.acquire_busy`` goes 0
only after every enabled plugin has finished processing the last
frame. Add the HDF-queue check as belt-and-suspenders for cases
where the cam class does not expose ``acquire_busy``.
Implementation: builds an ``AndStatus`` of one ``SubscriptionStatus``
per available signal (cam ``acquire_busy``, HDF ``num_queued_arrays``
with race-window-safe corroboration on ``queue_free``/
``queue_size``) and yields ``bps.sleep`` ticks until ``status.done``
is true or ``timeout`` elapses. Each sub-status is driven by its
own CA monitor stream (one subscription per signal), so neither
signal can mask the other through a missed monitor edge.
Returns after ``timeout`` seconds even if drained signals do not
settle, so this is safe to call from cleanup paths.
Short-circuit: if the cam is not acquiring *and* the HDF plugin
has not captured any frames, there is nothing to drain. This
spares cleanup paths a needless ``timeout``-second wait when the
plan failed before acquisition started (e.g. an ``AttributeError``
in the preparation phase). This precheck uses ``use_monitor=False``
to avoid stale-cache reads (notably ``num_captured`` is cumulative
across runs unless explicitly reset).
Capability discovery uses class-level ``component_names``
introspection rather than ``hasattr``, so we don't pay a 5-second
CA timeout to answer "does this device have ``queue_free``?" when
the IOC is dead during a crash-cleanup.
The ``poll`` parameter is the *plan wake-up tick* (how often the
plan checks ``status.done``), not a PV-polling interval. The
underlying status updates happen on the CA monitor thread; ``poll``
only controls how soon the plan notices. A future-proofing
alternative would be ``bps.wait_for([asyncio_future])`` (would tie
the plan to the RunEngine's asyncio loop; see strategy doc Tier 4).
"""
has_busy = _has_component(det.cam, "acquire_busy")
has_hdf_queue = _has_component(det, "hdf1") and _has_component(
det.hdf1, "queue_free"
)
acquire_off = det.cam.acquire.get(use_monitor=False) == 0
nothing_captured = (
not _has_component(det, "hdf1")
or not _has_component(det.hdf1, "num_captured")
or det.hdf1.num_captured.get(use_monitor=False) == 0
)
if acquire_off and nothing_captured:
logger.info(
"wait_for_acquire_drained(%s): nothing to drain"
" (acquire=0, num_captured=0); short-circuiting",
det.name,
)
# finalize_wrapper consumes this as a generator, so we must
# yield at least one message before returning.
yield from bps.null()
return
# Build sub-statuses for whichever signals the device exposes.
sub_statuses = []
if has_busy:
sub_statuses.append(
SubscriptionStatus(
det.cam.acquire_busy,
lambda *, value, **_: int(value) == 0,
run=True,
)
)
if has_hdf_queue:
# HDF-drain predicate preserves the original race-window
# protection: queue empty AND all slots free (i.e. no frame
# is currently being written). The two corroboration reads
# use use_monitor=False so the predicate sees consistent
# post-update values rather than a stale cache.
hdf = det.hdf1
sub_statuses.append(
SubscriptionStatus(
hdf.num_queued_arrays,
lambda *, value, **_: (
int(value) == 0
and int(hdf.queue_free.get(use_monitor=False))
== int(hdf.queue_size.get(use_monitor=False))
),
run=True,
)
)
t0 = time.time()
logger.info(
"wait_for_acquire_drained(%s): has_busy=%s has_hdf_queue=%s" " timeout=%gs",
det.name,
has_busy,
has_hdf_queue,
timeout,
)
if not sub_statuses:
# No drained signals to wait on — nothing more to do.
logger.info(
"wait_for_acquire_drained(%s): no drainable signals on"
" this device; returning immediately",
det.name,
)
yield from bps.null()
return
# Combine via AndStatus (no-op if there's only one sub-status).
status = sub_statuses[0]
for s in sub_statuses[1:]:
status = AndStatus(status, s)
deadline = t0 + timeout
while time.time() < deadline:
if status.done:
logger.info(
"wait_for_acquire_drained(%s): drained after %.3fs",
det.name,
time.time() - t0,
)
return
yield from bps.sleep(poll)
logger.warning(
"wait_for_acquire_drained(%s): TIMEOUT after %gs" " (status.done=%s)",
det.name,
timeout,
status.done,
)
def monitor_loop(
flymotor, det, p_end, *, exit_when, watchdog=None, tick=_CONSUMER_TICK_DEFAULT
):
"""Plan stub: emit one primary-stream event per HDF frame written.
Producer/consumer design (Phase 3.C refactor):
- **Producer:** a CA monitor callback on ``det.hdf1.num_captured``
pushes ``(timestamp, new_value)`` onto a small bounded
``queue.Queue`` whenever the IOC publishes a monitor update.
The producer runs on the pyepics dispatch thread.
- **Consumer:** this plan-stub loop wakes up every ``tick`` seconds,
drains the queue, and emits one ``primary``-stream event per
newly-captured frame. The consumer runs on the plan/RunEngine
thread; ``yield from bps.sleep(tick)`` keeps the RunEngine in
control of pause/abort.
Per Phase 0.2 / Phase 0e: the primary stream is a progress
indicator. Pairing of detector frames to flymotor positions
happens downstream via the IOC-timestamped monitor streams set
up by ``@bpp.monitor_during_decorator``; the per-frame
``bps.read(det)`` + ``bps.read(flymotor)`` here is a snapshot,
not the system of record, and uses the cached monitor values
(no extra CA traffic).
When the motor's readback crosses ``p_end``, the cam is told to
stop (``bps.mv(det.cam.acquire, 0)``). This check happens on
each consumer tick (per Phase 3 decision 3.6 — overshoot is
dominated by the motor record's ~10 Hz update rate, not by the
tick).
Exit: ``exit_when.done``. Per Phase 3 decisions 3.8 / 3.B, the
caller constructs ``exit_when`` as
``AndStatus(cam_stopped_status, drain_status)`` so the loop
exits only when the cam has been stopped AND every in-flight
frame has been flushed by the HDF plugin.
Watchdog: ``watchdog`` is an ``ophyd.status.SubscriptionStatus``
constructed with ``timeout=no_frames_timeout`` watching
``num_captured > 0``. ophyd's StatusBase timeout machinery
completes the status as failed if no frame arrives in time; the
consumer checks ``watchdog.done and not watchdog.success`` per
tick and raises ``RuntimeError`` annotated with the HDF
plugin's ``WriteStatus`` and ``WriteMessage``. Per Phase 3
decision 3.11, the raise lets the RunEngine send STOP to all
in-motion movables (including ``flymotor``), which is exactly
what we want when something is wrong with the IOC/cam/HDF
chain.
Parameters
----------
flymotor : EpicsMotor
The fly-scan motor. Position read with ``use_monitor=False``
for the ``p_end`` crossing check, to bypass cached value.
det : ADBase
The area detector (with ``cam`` and ``hdf1`` plugin
sub-devices).
p_end : float
Stop the cam acquisition when ``flymotor.user_readback``
meets or exceeds this value.
exit_when : ophyd.status.StatusBase
Loop exits when this status's ``.done`` is True. See
"Exit" above.
watchdog : ophyd.status.StatusBase, optional
If supplied, the loop checks ``.done and not .success``
per tick and raises ``RuntimeError`` if the watchdog
timed out. None disables the watchdog.
tick : float
Consumer wake-up tick in seconds. See module-level
``_CONSUMER_TICK_DEFAULT``.
"""
# --- Producer setup: CA monitor callback into a bounded queue ---
frame_queue = queue.Queue(maxsize=_FRAME_QUEUE_SIZE)
overflow_warned = [False] # list-of-one so callback can mutate
def _on_num_captured(*, value, timestamp, **kwargs):
"""CA monitor callback (runs on pyepics dispatch thread).
Pushes ``(timestamp, value)`` onto ``frame_queue``. Drops
the entry and logs a single WARNING if the queue is full
(per Phase 3 decision 3.3b — never block the CA thread).
"""
try:
frame_queue.put_nowait((timestamp, int(value)))
except queue.Full:
if not overflow_warned[0]:
overflow_warned[0] = True
logger.warning(
"monitor_loop: frame queue overflow (size=%d);"
" consumer falling behind producer."
" New events will be dropped silently until the"
" queue drains. Adjust _FRAME_QUEUE_SIZE in"
" flyscan_3idc.py if this recurs.",
_FRAME_QUEUE_SIZE,
)
# --- Inner helpers (per Phase 3 decision 3.1: named for clarity)
def _emit_pending_frames(last_captured):
"""Drain the queue and emit one primary event per new frame.
Returns the updated ``last_captured`` count.
"""
# Snapshot the highest value seen in the queue this tick.
# The IOC publishes monotonically-increasing values; we
# only care about the delta count.
highest = last_captured
n_drained = 0
while True:
try:
_ts, value = frame_queue.get_nowait()
except queue.Empty:
break
n_drained += 1
if value > highest:
highest = value
if highest > last_captured:
n_new = highest - last_captured
logger.debug(
"monitor_loop: %d new frame(s) (highest=%d,"
" drained %d queue entries) motor=%g",
n_new,
highest,
n_drained,
flymotor.user_readback.get(use_monitor=False),
)
for _ in range(n_new):
yield from bps.create(name="primary")
yield from bps.read(det)
yield from bps.read(flymotor)
yield from bps.save()
return highest
def _check_p_end_crossing(acquire_stopped, last_captured):
"""Stop the cam if the motor has crossed p_end."""
if acquire_stopped:
return acquire_stopped
# Bypass cache for the position read — pairing timing matters
# here (Phase 0.1 decision).
if flymotor.user_readback.get(use_monitor=False) >= p_end:
logger.info(
"monitor_loop: motor crossed p_end (%g); stopping acquire"
" at num_captured=%d",
p_end,
last_captured,
)
yield from bps.mv(det.cam.acquire, 0)
return True
return acquire_stopped
def _check_watchdog():
"""If the watchdog timed out, harvest context and raise.
Returns nothing; raises RuntimeError on watchdog trip.
Watchdog timeout is signalled by ophyd's StatusBase as
``done=True, success=False`` after the timeout elapses
without the predicate becoming true (per Phase 3 decision
3.10).
"""
if watchdog is None:
return
if not (watchdog.done and not watchdog.success):
return
# Watchdog has tripped. Harvest diagnostics from the HDF
# plugin to annotate the exception (per Phase 3 decision
# 3.11 — keep raise behavior; RE will STOP movables).
write_status = _safe_get(det.hdf1, "write_status", as_string=True)
write_message = _safe_get(det.hdf1, "write_message", as_string=True)
full_file_name = _safe_get(det.hdf1, "full_file_name", as_string=True)
msg = (
f"monitor_loop: HDF watchdog tripped — no frames captured"
f" within the timeout on {det.name}."
f" hdf1.write_status={write_status!r}"
f" hdf1.write_message={write_message!r}"
f" hdf1.full_file_name={full_file_name!r}."
f" Stop acquire and abort."
)
logger.error(msg)
raise RuntimeError(msg)
# --- Main loop ---
t0 = time.time()
last_captured = int(det.hdf1.num_captured.get(use_monitor=False))
logger.info(
"monitor_loop: starting at num_captured=%d, motor=%g, p_end=%g"
" (tick=%gs, watchdog=%s)",
last_captured,
flymotor.user_readback.get(use_monitor=False),
p_end,
tick,
"enabled" if watchdog is not None else "disabled",
)
acquire_stopped = False
cid = det.hdf1.num_captured.subscribe(_on_num_captured)
try:
while True:
# Check watchdog first — if it tripped, no point in
# emitting events or checking other conditions.
_check_watchdog()
# Drain producer queue, emit primary events.
last_captured = yield from _emit_pending_frames(last_captured)
# Stop cam when motor crosses p_end.
acquire_stopped = yield from _check_p_end_crossing(
acquire_stopped, last_captured
)
# Exit when the caller's status fires.
if exit_when.done:
logger.info(
"monitor_loop: exit after %.3fs"
" (final num_captured=%d, motor=%g,"
" acquire_stopped=%s)",
time.time() - t0,
last_captured,
flymotor.user_readback.get(use_monitor=False),
acquire_stopped,
)
break
yield from bps.sleep(tick)
finally:
# Always unsubscribe — even on RuntimeError from the
# watchdog, on plan abort, or on any other exception.
try:
det.hdf1.num_captured.unsubscribe(cid)
except Exception as exc:
logger.warning("monitor_loop: unsubscribe failed: %r", exc)
@bluesky_plan
[docs]
def flyscan(
det_name: str = "adsimdet",
flymotor_name: str = "m1",
p_start: float = 0,
p_end: float = 5,
exposures_per_egu: float = 2.0,
t_period: float = 0.1,
t_acquire: float = None, # defaults to t_period when None
taxi_allowance: float = 0.5, # motor EGU
compression: str = "zlib",
ad_file_name: str = "flyscan",
ad_file_path: str = "/tmp/flyscan",
# TODO: trigger mode?
# internal parameters
# Wake-up tick for monitor_loop's consumer (Phase 3). CA
# monitor callbacks update status flags asynchronously; the
# plan wakes up every _consumer_tick seconds to check them.
# Default defined as a module-level constant so all related
# timing knobs live in one place; can be overridden per-run.
_consumer_tick: float = _CONSUMER_TICK_DEFAULT,
# Override the HDF plugin's blocking_callbacks setting.
# Default (False) leaves the safe behaviour in place: HDF runs
# with blocking_callbacks="Yes" so the cam back-throttles to
# HDF's write rate and no frames are dropped. Setting True
# forces blocking_callbacks="No" on the HDF plugin, restoring
# the historical (data-losing) behaviour. The only legitimate
# use is to *demonstrate* the FlyscanDataLossWarning code path
# on hardware where blocking mode prevents drops — set True,
# crank up exposures_per_egu past what the HDF can sustain,
# and watch the post-scan warning fire. Not for production
# data collection.
_force_hdf_nonblocking: bool = False,
# user-supplied metadata is always last
md: dict = None,
):
"""Fly scan: move motor through range continuously acquiring detector frames.
The motor traverses ``p_initial → p_final``, maintaining constant velocity
between ``p_start → p_end`` to deliver ``num_frames`` frames within
``[p_start, p_end]``. ``p_initial`` and ``p_final`` are computed from
``p_start``, ``p_end``, the motor's ``.ACCL``, and ``taxi_allowance``;
``num_frames`` is computed from ``(p_end - p_start) * exposures_per_egu``.
Detector frames are acquired continuously during the traverse;
downstream processing trims the data to ``[p_start, p_end]`` by motor
position. An HDF5 file containing every captured frame is written next
to the run (the path is in the run metadata under ``ad_file_path`` /
``ad_file_name``).
Position geometry
-----------------
User-supplied: ``p_start`` and ``p_end`` (in-scan range). Derived:
``p_initial`` (parked, pre-scan) and ``p_final`` (coast, post-scan)::
p_initial < p_start < p_end < p_final
| | | |
| |--scan----| |
|--taxi-in---| |---coast--|
- ``p_start``: the position at which the first useful frame should
be captured. Downstream processing trims frames captured before
this point.
- ``p_end``: the position at which the last useful frame should be
captured. The plan stops the cam when the motor passes this point.
- ``p_initial`` (derived): where the motor is parked before the
scan, far enough below ``p_start`` that the motor reaches its
scan velocity *before* it enters the acquisition region.
Computed as ``p_start - d_taxi - taxi_allowance`` where
``d_taxi = 0.5 * scan_velocity * motor.ACCL``.
- ``p_final`` (derived): where the motor coasts to after the scan
ends — far enough above ``p_end`` that the cam can finish
processing its last frames before the motor stops. Computed
symmetric to ``p_initial``.
``taxi_allowance`` (default ``0.5``, in motor EGU) is added to both
ends as a slack margin on top of the acceleration-based distance.
Increase it if the cam's first/last frame is observed to fall
outside ``[p_start, p_end]``; decrease it if the scan takes too
long to taxi.
Position units are whatever the motor reports (``user_readback``);
typically engineering units (mm, degrees, etc.) — the motor's
``.EGU`` field is recorded in run metadata.
Frame timing
------------
- ``exposures_per_egu``: target frame density. Combined with the
scan range, gives ``num_frames = round(1 + (p_end - p_start)
* exposures_per_egu)`` (fence-post counting: one frame at each
endpoint plus ``exposures_per_egu`` frames per unit between).
- ``t_period``: seconds between successive frame exposures.
- ``t_acquire``: per-frame exposure time, in seconds. Defaults to
``t_period`` (continuous exposure). Must satisfy
``0 < t_acquire <= t_period``.
The scan velocity is computed as ``(p_end - p_start) / (num_frames
* t_period)``. Pre-scan validation rejects velocities outside the
motor's ``.VBAS`` / ``.VMAX`` limits with a clear ``ValueError``.
Detector & file
---------------
- ``det_name``: ophyd device registry key for the area detector
(default ``"adsimdet"``). Must be an AreaDetector with an HDF5
plugin attached.
- ``flymotor_name``: ophyd device registry key for the motor
(default ``"m1"``).
- ``compression``: HDF5 chunk compression name (default ``"zlib"``).
Validated against the HDF plugin's ``compression.enum_strs`` at
scan start; raises ``ValueError`` with the allowed list if the
value isn't supported by the IOC's HDF plugin build.
- ``ad_file_name``: stem for the saved HDF5 file (default
``"flyscan"``); the IOC appends an auto-incrementing number and
the ``.h5`` extension.
- ``ad_file_path``: directory on the IOC's filesystem where the
HDF5 file is written (default ``"/tmp/flyscan"``). **Must exist on
the IOC's filesystem.** If the IOC runs in a container, this
is the container's view of the path, not the host's. The plan
checks this before staging and raises ``RuntimeError`` with a
clear message if the path doesn't exist.
What gets recorded
------------------
Each call to ``RE(flyscan(...))`` produces one bluesky run
containing:
- A ``primary`` event stream with one event per HDF frame
accepted by the writer. Each event records the cam and HDF
array counters and the motor's reported position at the moment
the consumer drained that frame from its queue. Treat this as
a progress indicator and at-the-bench snapshot; use the monitor
streams below for high-precision pairing.
- Three monitor streams (``adsimdet_cam_array_counter_monitor``,
``adsimdet_hdf1_array_counter_monitor``, ``m1_monitor``)
carrying IOC-timestamped values for downstream synchronization
of frame counters with motor position.
- A ``baseline`` stream (whatever ``apsbits`` configures).
- Metadata under ``start``: user-supplied scan parameters
(``p_start``, ``p_end``, ``exposures_per_egu``, ``t_period``,
``t_acquire``, ``taxi_allowance``, ``compression``), derived
geometry (``p_initial``, ``p_final``, ``num_frames``,
``scan_velocity``, ``d_taxi``, ``motor_accl``, ``motor_egu``),
motor velocity limits, file destination, watchdog timeout,
``consumer_tick``, plus anything you pass in ``md``.
- An HDF5 file with the actual image data at
``ad_file_path/ad_file_name_NNNNNN.h5``.
Common usage
------------
From a bits2606 IPython session::
from bits2606.startup import * # provides RE, oregistry
from flyscan_3idc import flyscan
# 50 frames over a 5-EGU range at 20 Hz:
uid, = RE(flyscan(p_start=0, p_end=5, exposures_per_egu=10,
t_period=0.05))
Override more defaults for a specific run::
uid, = RE(flyscan(
flymotor_name="m1",
p_start=0, p_end=10,
exposures_per_egu=10, t_period=0.05, t_acquire=0.01,
taxi_allowance=1.0,
compression="lz4",
ad_file_path="/tmp/myexperiment/",
ad_file_name="sample42",
md={"sample": "Ag behenate", "operator": "your-name"},
))
Common pitfalls
---------------
- **"file_path does not exist" RuntimeError at scan start.** The
directory in ``ad_file_path`` doesn't exist on the IOC's
filesystem. If the IOC is containerized, create the directory
inside the container or use a path that's visible there.
- **"scan_velocity exceeds motor max velocity" ValueError.** The
requested combination of position range and frame rate would
require the motor to move faster than its ``.VMAX``. Either
reduce ``exposures_per_egu``, increase ``t_period``, or shorten
``p_end - p_start``.
- **"compression=... not in HDF plugin's allowed set" ValueError.**
The IOC's HDF plugin doesn't support the requested compression
algorithm. Inspect ``det.hdf1.compression.enum_strs`` to see
what *is* supported by this IOC build.
- **Watchdog: "no frames captured" RuntimeError mid-scan.** The
cam isn't delivering frames to the HDF plugin. Likely the HDF
plugin's ``EnableCallbacks`` is ``Disable``, the cam's
``ArrayCallbacks`` is ``Disable``, or the HDF plugin's
``NDArrayPort`` doesn't point at the cam. The RunEngine will
have stopped the motor; investigate the IOC and try again.
- **The scan completes but the data dictionary's ``num_captured``
is 0.** The IOC resets ``NumCaptured_RBV`` to 0 after the
HDF5 file is closed. Look at ``full_file_name`` (in
``_cleanup``'s log line) and the actual file on disk to confirm
what was saved.
- **"HDF plugin dropped N frame(s) during this run"
FlyscanDataLossWarning at scan end.** The HDF plugin couldn't
keep up with the cam at the requested rate, and ``N`` frames
the cam produced are missing from the on-disk HDF5 file. The
warning is emitted both to the log (WARNING level) and via
Python's ``warnings`` machinery (subclass of ``UserWarning``).
The plan uses ``blocking_callbacks="Yes"`` on the HDF plugin
to throttle the cam to HDF's write rate, so this should be
rare — when it does occur, it usually means the cam emitted a
burst before back-pressure propagated, or the HDF queue size
is too small. Treat ``N > 0`` as a data-integrity concern:
increase ``t_period`` or reduce ``exposures_per_egu``.
Promote the warning to an exception with
``warnings.filterwarnings("error",
category=flyscan_3idc.FlyscanDataLossWarning)`` to fail-fast
in strict environments.
Parameters
----------
det_name : str, default ``"adsimdet"``
ophyd registry name of the area detector to fly.
flymotor_name : str, default ``"m1"``
ophyd registry name of the motor to fly.
p_start : float, default ``0``
First in-scan position (motor units).
p_end : float, default ``5``
Last in-scan position (motor units).
exposures_per_egu : float, default ``2.0``
Frame density: frames per motor engineering unit. Total
frame count is ``round(1 + (p_end - p_start) *
exposures_per_egu)``. Must be positive.
t_period : float, default ``0.1``
Time between successive frame exposures (seconds).
t_acquire : float or None, default ``None``
Per-frame exposure time (seconds). ``None`` (default) means
"use ``t_period``" (continuous exposure). Must satisfy
``0 < t_acquire <= t_period``.
taxi_allowance : float, default ``0.5``
Extra distance (in motor EGU) added past the
acceleration-based taxi region at each end of the scan.
Increase if the first/last useful frame falls outside
``[p_start, p_end]``; must be non-negative.
compression : str, default ``"zlib"``
HDF5 chunk compression name. Must match one of
``det.hdf1.compression.enum_strs`` if the IOC is reachable.
ad_file_name : str, default ``"flyscan"``
HDF5 filename stem (IOC appends a number and ``.h5``).
ad_file_path : str, default ``"/tmp/flyscan"``
Directory on the IOC's filesystem to write the HDF5 file.
_consumer_tick : float, default ``_CONSUMER_TICK_DEFAULT`` (20 ms)
Internal: wake-up tick for the per-frame event consumer.
Increase if your run-engine subscriptions can't keep up;
decrease only for very high frame rates. Rarely needs to
be changed.
md : dict, optional
Additional metadata to record under the run's ``start``
document. Merged on top of the plan's computed metadata.
Returns
-------
None (yields bluesky messages — pass to ``RE()`` to execute).
Raises
------
KeyError
``det_name`` or ``flymotor_name`` does not resolve to the
expected ophyd device type in the registry.
ValueError
Position ordering is wrong (``p_end <= p_start``),
``exposures_per_egu`` is non-positive, ``taxi_allowance`` is
negative, ``t_acquire > t_period``, computed ``num_frames`` is
too small, computed ``scan_velocity`` is outside the motor's
limits, or ``compression`` is not in the HDF plugin's
enumeration.
RuntimeError
IOC preflight failed (an expected PV did not connect), or
the HDF plugin's file path does not exist on the IOC's
filesystem, or the no-frames watchdog tripped during the
scan.
See Also
--------
configure_adsimdet :
Standalone diagnostic that exercises the same AD acquisition
protocol without a plan or RunEngine. Useful for triaging
an IOC that's misbehaving.
compute_flyscan_geometry :
Pure-function helper that derives ``p_initial``, ``p_final``,
and ``num_frames`` from the user-supplied kwargs; unit-
testable without an IOC.
"""
## Preparation
# t_acquire defaults to t_period (continuous exposure).
if t_acquire is None:
t_acquire = t_period
logger.info(
"flyscan: entered. det_name=%r flymotor_name=%r"
" p_start=%g p_end=%g exposures_per_egu=%g"
" t_acquire=%g t_period=%g taxi_allowance=%g compression=%r",
det_name,
flymotor_name,
p_start,
p_end,
exposures_per_egu,
t_acquire,
t_period,
taxi_allowance,
compression,
)
det = oregistry.find(det_name, allow_none=True)
flymotor = oregistry.find(flymotor_name, allow_none=True)
logger.info(
"flyscan: lookup -> det=%r flymotor=%r",
getattr(det, "name", det),
getattr(flymotor, "name", flymotor),
)
# Fail fast if the IOCs are down, before we incur the much longer
# staging-time cost of discovering it via per-PV 5-second timeouts.
# validate_flyscan_inputs (below) does device-type validation that
# makes no CA calls, so preflight runs only after we know det and
# flymotor are the right kinds of objects. Reuse that validation
# first by doing isinstance() guards here that mirror the validator;
# cleaner would be to split validate_flyscan_inputs in two, but for
# now we accept the small redundancy.
if isinstance(det, ADBase) and isinstance(flymotor, EpicsMotor):
preflight_connectivity(det, det_name, flymotor, flymotor_name)
# Derive geometry first (motor must be the right type for the
# .ACCL/.EGU caget calls in compute_flyscan_geometry to be
# meaningful), then validate.
if not isinstance(flymotor, EpicsMotor):
# Mirror validate_flyscan_inputs' error so the user sees the
# same message either way.
raise KeyError(f"Motor {flymotor_name!r} not found in registry.")
geometry = compute_flyscan_geometry(
flymotor,
p_start,
p_end,
exposures_per_egu,
t_period,
taxi_allowance,
)
v_max, v_base = validate_flyscan_inputs(
det,
det_name,
flymotor,
flymotor_name,
geometry,
t_acquire,
t_period,
compression,
)
# Rebind derived values to plain locals so the rest of the plan
# (taxi/scan kickoff, watchdog timing, _cleanup) keeps reading
# them by their familiar names.
p_initial = geometry.p_initial
p_final = geometry.p_final
num_frames = geometry.num_frames
scan_active_duration = geometry.scan_duration
scan_velocity = geometry.scan_velocity
# IOC HDF plugin pre-allocation overflows somewhere between 1e6
# and 1e9 for Float64 1024x1024 frames (verified empirically;
# likely a C int byte-count overflow in NDFileHDF5). num_capture
# = num_frames * 1.5 + 20 is comfortable for any sensible scan
# size while absorbing taxi-in/-out leading frames, post-stop
# tail frames, and timing jitter.
hdf_num_capture = int(num_frames * 1.5) + 20
# Worst-case flush timeout assumes the HDF plugin fills to
# ``hdf_num_capture``. Actual flush time at run-end uses the real
# ``num_captured`` (recomputed in _cleanup). This metadata lets a
# downstream reader see the planning assumption.
hdf_flush_timeout_max = _hdf_flush_timeout(det, hdf_num_capture)
_md = build_flyscan_md(
plan_name=inspect.currentframe().f_code.co_name,
det_name=det_name,
flymotor_name=flymotor_name,
p_start=p_start,
p_end=p_end,
exposures_per_egu=exposures_per_egu,
t_acquire=t_acquire,
t_period=t_period,
taxi_allowance=taxi_allowance,
compression=compression,
geometry=geometry,
v_max=v_max,
v_base=v_base,
ad_file_name=ad_file_name,
ad_file_path=ad_file_path,
hdf_num_capture=hdf_num_capture,
hdf_flush_timeout_max=hdf_flush_timeout_max,
consumer_tick=_consumer_tick,
)
_md.update(md or {})
original_cache = CacheParameters()
# Closure flag used by _cleanup to decide whether to flush the
# HDF5 file. Stays False if the plan dies before acquisition
# actually starts (e.g. a FailedStatus during the override loop),
# so _cleanup doesn't waste time waiting for a non-existent flush
# or, worse, flush a stale num_captured value left over from a
# prior run. List-of-one so kickoff_and_monitor can flip it
# without a nonlocal declaration chain.
capture_started = [False]
# Snapshot the HDF plugin's cumulative dropped-arrays counter
# (the IOC does not reset it across runs). ``_cleanup`` reads
# it again and reports the per-run delta. Initialize with the
# current value if readable, else None to signal "no baseline"
# (then the delta isn't computed). Defensive against IOCs that
# don't expose ``dropped_arrays``.
dropped_arrays_baseline = (
_safe_get(
det.hdf1,
"dropped_arrays",
use_monitor=False,
)
if _has_component(det, "hdf1")
else None
)
if dropped_arrays_baseline is not None:
logger.info(
"flyscan: hdf1.dropped_arrays baseline = %d",
int(dropped_arrays_baseline),
)
# Snapshot stage_sigs on every component we may mutate below.
plugins = [
getattr(det, nm)
for nm in det.component_names
if hasattr(getattr(det, nm), "blocking_callbacks")
]
logger.info(
"flyscan: snapshotting stage_sigs on det, cam, hdf1, and %d plugin(s):" " %s",
len(plugins),
[p.name for p in plugins],
)
saved_stage_sigs = snapshot_stage_sigs(det, det.cam, det.hdf1, *plugins)
# Snapshot the ``kind`` of the two array_counter signals we want
# in the primary-stream events for this run. Setting Kind.hinted
# makes bps.read(det) include them and live displays plot them.
# We restore in _cleanup so other plans against this detector see
# whatever kind it was configured with by default (typically
# Kind.config or Kind.omitted for these counters). See Phase 3
# decision 3.5a/3.5b in the strategy doc.
saved_kinds = snapshot_kinds(
det.cam.array_counter,
det.hdf1.array_counter,
)
logger.info(
"flyscan: snapshotted kinds for %d signal(s): %s",
len(saved_kinds),
[s.name for s, _ in saved_kinds],
)
for sig, _ in saved_kinds:
sig.kind = Kind.hinted
### Send motor to initial position (start moving; we wait below)
# The kickoff is grouped so the RunEngine tracks the MoveStatus
# and we can wait on it explicitly after the AD setup work in
# _main has completed (see bps.wait(group="taxi") below). This
# preserves the deliberate concurrency: motor moves while AD
# parameters are configured, then we wait, then we change
# velocity. Group name "taxi" matches the idiom used by other
# APS fly scans.
logger.info(
"flyscan: taxi -> sending %s to p_initial=%g (non-blocking)",
flymotor.name,
p_initial,
)
yield from bps.abs_set(flymotor, p_initial, group="taxi")
def _main():
"""Preparation (no data collection) and Kickoff (data collection)."""
# AD runtime parameters (overridden, restored on exit)
# Set these before device staging
logger.info("flyscan._main: overriding AD runtime parameters")
# must set before it is used when setting hdf1.file_path
yield from original_cache.override(det.hdf1.create_directory, -5)
# then, a list of them
for obj, value in [
(det.hdf1.array_counter, 0), # optional
(det.hdf1.auto_increment, "Yes"),
(det.hdf1.auto_save, "Yes"),
(det.hdf1.compression, compression),
(det.hdf1.file_name, ad_file_name),
(det.hdf1.file_number, 1),
(det.hdf1.file_path, ad_file_path),
(det.hdf1.file_template, "%s%s_%6.6d.h5"),
]:
yield from original_cache.override(obj, value)
# Verify the IOC can see the path. Must happen before staging.
# kicks the HDF plugin into capture mode.
check_hdf_file_path(det)
# Set parameters for staging
# stage_decorator: sets before run, then restores after the run.
det.stage_sigs["cam.image_mode"] = "Continuous"
det.cam.stage_sigs["acquire_time"] = t_acquire
det.cam.stage_sigs["acquire_period"] = t_period
det.hdf1.stage_sigs["num_capture"] = hdf_num_capture
# AD callback-chain throttling (revised 2026-06-08 after a
# flyscan reported only 59 of an expected 101 frames written
# to HDF, with cam.array_counter showing the cam itself ran
# at the requested 10 Hz — i.e. the cam produced 117 frames
# but HDF wrote only 59, and hdf1.dropped_arrays climbed by
# ~58 to confirm). Root cause: under
# ``blocking_callbacks="No"`` the cam doesn't wait for HDF to
# consume the previous frame before producing the next; HDF's
# input queue overflows when the file-write throughput is
# below the cam's rate, and frames are silently dropped.
#
# Trade-off chosen here: prefer fidelity (no drops) over rate.
# Set ``blocking_callbacks="Yes"`` on the HDF plugin so the
# cam auto-throttles to the HDF write rate. Result: every
# cam frame the IOC produces is captured; the achieved
# frame rate may be below the user's requested ``1/t_period``
# if the IOC can't keep up. Look at the post-scan WARNING
# log line that compares cam.array_counter to hdf1.array_counter
# — if they diverge, increase ``t_period`` or reduce
# ``exposures_per_egu``.
#
# Non-HDF plugins (image, pva, stats1, roi1) keep
# ``blocking_callbacks="No"`` because they're typically
# display/analysis sinks where dropped frames are tolerable
# and we don't want them gating the cam.
if hasattr(det.cam, "wait_for_plugins"):
det.cam.stage_sigs["wait_for_plugins"] = "Yes"
hdf_blocking_setting = "No" if _force_hdf_nonblocking else "Yes"
if _force_hdf_nonblocking:
logger.warning(
"flyscan._main: _force_hdf_nonblocking=True — staging"
" HDF with blocking_callbacks='No'. This DELIBERATELY"
" disables the cam-to-HDF back-pressure and will drop"
" frames if HDF can't keep up with the cam. This"
" escape hatch is for testing the data-loss warning"
" path; do NOT use for production data collection.",
)
for plugin in plugins:
if plugin is det.hdf1:
plugin.stage_sigs["blocking_callbacks"] = hdf_blocking_setting
else:
plugin.stage_sigs["blocking_callbacks"] = "No"
det.hdf1.stage_sigs.move_to_end("capture") # always last
logger.debug(
"flyscan._main: stage_sigs configured." " det=%s cam=%s hdf1=%s",
dict(det.stage_sigs),
dict(det.cam.stage_sigs),
dict(det.hdf1.stage_sigs),
)
# Wait for the taxi-in move to finish, *then* change velocity.
# bps.wait(group="taxi") consumes the MoveStatus that
# bps.abs_set(..., group="taxi") registered with the
# RunEngine; this replaces the old hand-rolled
# wait_for_motor_done polling loop with the RunEngine's own
# status-tracking machinery.
logger.info(
"flyscan._main: waiting for %s to reach p_initial",
flymotor.name,
)
yield from bps.wait(group="taxi")
logger.info(
"flyscan._main: setting %s velocity to scan_velocity=%g",
flymotor.name,
scan_velocity,
)
yield from original_cache.override(flymotor.velocity, scan_velocity)
logger.info("flyscan._main: entering kickoff_and_monitor")
yield from kickoff_and_monitor()
logger.info("flyscan._main: kickoff_and_monitor returned")
## Kickoff & Monitor
@bpp.stage_decorator([det]) # Don't stage the flymotor!
# Three bespoke monitor streams (one per signal):
# - det.hdf1.array_counter: HDF writer's frame count (with
# EPICS timestamp, used downstream to sync with flymotor)
# - det.cam.array_counter: camera's frame count; cheap to
# collect, lets users compare cam & hdf
# - flymotor.user_readback: motor position at CA monitor rate
@bpp.monitor_during_decorator(
[
det.hdf1.array_counter,
det.cam.array_counter,
flymotor.user_readback,
]
)
@bpp.run_decorator(md=_md)
def kickoff_and_monitor():
# Kickoff ordering (revised 2026-06-08 after empirical
# observation that the cam delivered its first frame several
# seconds after Acquire=1, by which time the motor was already
# past p_start, costing the user a chunk of their requested
# frame budget):
#
# 1. Start the cam acquiring (Acquire=1).
# 2. Wait for the HDF plugin to receive its first frame
# (num_captured >= 1), bounded by a generous timeout.
# This is the "cam is genuinely producing frames" gate.
# 3. Only then launch the motor toward p_final.
#
# Cost: a few pre-roll frames captured while the motor is
# still parked at p_initial. These are written to the HDF5
# file (no harm — downstream trims by motor position anyway)
# and counted into the IOC's num_capture allocation (already
# generously sized at 1.5*num_frames+20).
#
# group="scan" registers the MoveStatus with the RunEngine so
# the bps.wait(group="scan") below absorbs any post-scan
# motor settling after monitor_loop returns. See Phase 3
# decisions 2.4/3.8 in the strategy doc.
# Diagnostic: log what the IOC actually has for cam timings
# right before we start. Helps catch staging defects (e.g.
# acquire_period got overridden, or acquire_time > t_period
# silently capped by the IOC) without an empirical "why is
# my frame rate wrong" investigation.
actual_acquire_time = _safe_get(det.cam, "acquire_time", use_monitor=False)
actual_acquire_period = _safe_get(det.cam, "acquire_period", use_monitor=False)
actual_image_mode = _safe_get(
det.cam, "image_mode", use_monitor=False, as_string=True
)
actual_num_capture = _safe_get(det.hdf1, "num_capture", use_monitor=False)
logger.info(
"kickoff_and_monitor: IOC state pre-acquire:"
" cam.acquire_time=%r cam.acquire_period=%r"
" cam.image_mode=%r hdf1.num_capture=%r"
" (requested: t_acquire=%g t_period=%g num_capture=%d)",
actual_acquire_time,
actual_acquire_period,
actual_image_mode,
actual_num_capture,
t_acquire,
t_period,
hdf_num_capture,
)
logger.info("kickoff_and_monitor: starting %s acquisition", det.name)
# Use bps.mv (not bps.abs_set) so we don't proceed until
# Acquire_RBV has caught up to 1 ("Acquiring"). Otherwise
# cam_stopped_status (built below) could fire immediately
# at run=True evaluation if the RBV hadn't yet updated past
# its pre-scan value of 0. In continuous image_mode (set by
# stage_sigs above), Acquire_RBV reaches 1 and stays there
# until monitor_loop stops the cam.
yield from bps.mv(det.cam.acquire, 1)
# From this point on, _cleanup should treat the HDF plugin as
# active (drain, flush, verify). If we never get here,
# _cleanup skips that work entirely.
capture_started[0] = True
# Wait for the cam to actually start producing frames before
# launching the motor. This gates on hdf1.num_captured (the
# downstream-of-everything signal: cam produced a frame AND
# the HDF plugin accepted it). Timeout generously: if the
# cam can't produce a single frame within 5 t_periods (floor
# 5 s), something is wrong with the IOC chain and we'd rather
# raise than launch the motor into a dead scan.
first_frame_timeout = max(5.0 * t_period, 5.0)
logger.info(
"kickoff_and_monitor: waiting for first frame (timeout=%gs)",
first_frame_timeout,
)
first_frame_status = SubscriptionStatus(
det.hdf1.num_captured,
lambda *, value, **_: int(value) >= 1,
run=True, # OK to fire at subscribe time if the IOC's
# num_captured cache happens to already be >0
# (e.g. from a prior run that left it set);
# the alternative — missing a fast first
# frame between subscribe and the first plan
# tick — is worse.
timeout=first_frame_timeout,
)
try:
# Yield ticks until the status fires or its timeout
# expires. Using the same _consumer_tick that
# monitor_loop uses below so plan-wakeup cadence is
# consistent.
t0_first = time.time()
while not first_frame_status.done:
yield from bps.sleep(_consumer_tick)
if not first_frame_status.success:
msg = (
"kickoff_and_monitor: cam did not deliver a first"
f" frame within {first_frame_timeout:g}s after"
f" Acquire=1 on {det.name}."
f" hdf1.num_captured={int(det.hdf1.num_captured.get(use_monitor=False))}" # noqa: E501
f" hdf1.write_status={_safe_get(det.hdf1, 'write_status', as_string=True)!r}" # noqa: E501
f" hdf1.write_message={_safe_get(det.hdf1, 'write_message', as_string=True)!r}" # noqa: E501
)
logger.error(msg)
raise RuntimeError(msg)
first_frame_latency = time.time() - t0_first
logger.info(
"kickoff_and_monitor: first frame received after %.3fs"
" (num_captured=%d)",
first_frame_latency,
int(det.hdf1.num_captured.get(use_monitor=False)),
)
except Exception:
# Best-effort cleanup of the watchdog status; SubscriptionStatus
# auto-unsubscribes on .done, so this only matters if we
# raise before it fires.
raise
# Cam is live. Now launch the motor toward p_final.
logger.info(
"kickoff_and_monitor: launching %s -> p_final=%g (non-blocking)",
flymotor.name,
p_final,
)
yield from bps.abs_set(flymotor, p_final, group="scan")
# Watchdog grace period for the *rest* of the scan: the
# expected scan duration plus a couple of periods of slack,
# with a floor of 5 s for tiny scans. At this point we
# already know the cam delivered at least one frame, so the
# watchdog below is now guarding "cam went silent mid-scan"
# rather than "cam never started".
no_frames_timeout = max(scan_active_duration + 2 * t_period, 5.0)
# Status-based exit condition for monitor_loop (Phase 3
# decisions 3.8 + 3.9, revised by 3.B.4 after empirical
# gates caught two false-early-exit defects, and revised
# again 2026-06-08 after a flyscan hang where the loop
# never exited because cam.acquire (== Acquire_RBV) stayed
# at 1 for >60s after we wrote Acquire=0 — the simulator
# IOC apparently finishes its current burst before the
# RBV drops. Replaced with cam.acquire_busy, which the
# IOC drops to 0 promptly when wait_for_plugins=Yes
# (which we set via stage_sigs); same signal
# wait_for_acquire_drained uses successfully in cleanup.
# Falls back to cam.acquire only on devices that don't
# expose acquire_busy.
# This is an AndStatus of two sub-statuses:
# 1. cam_stopped_status: cam.acquire_busy == 0 (preferred)
# or cam.acquire == 0 (fallback). The busy signal
# goes 0 only after every enabled plugin (including
# hdf1) has finished processing the last frame, so
# this also implicitly covers cam-to-plugin drain.
# monitor_loop tells the cam to stop (Acquire=0) when
# the motor crosses p_end, then this status fires.
# 2. drain_status: the HDF plugin queue is fully idle
# (no frames waiting AND no frame currently being
# written). Same predicate shape as
# wait_for_acquire_drained's HDF sub-status.
# Together: "the cam has stopped *and* every frame it
# produced has been flushed by the HDF plugin" — the
# actual condition for "scan is done."
# ``run=False`` is critical here: the subscribe-time precheck
# path of ``SubscriptionStatus(run=True)`` evaluates the
# predicate on the cached value at subscribe time, which on
# ``acquire_busy`` may transiently be 0 (the cam hasn't yet
# started the burst from our recent ``bps.mv(cam.acquire, 1)``)
# and would immediately satisfy ``value == 0`` — firing the
# AndStatus at scan-start and exiting monitor_loop before any
# frame arrives. With ``run=False`` the callback fires only on
# real CA monitor edges *after* subscribe time, so the first
# interesting edge is "busy went to 0" later in the run when
# the cam actually stops.
if _has_component(det.cam, "acquire_busy"):
cam_stopped_sig = det.cam.acquire_busy
cam_stopped_signal_name = "cam.acquire_busy"
else:
cam_stopped_sig = det.cam.acquire
cam_stopped_signal_name = "cam.acquire"
logger.info(
"kickoff_and_monitor: using %s for cam-stopped status",
cam_stopped_signal_name,
)
cam_stopped_status = SubscriptionStatus(
cam_stopped_sig,
lambda *, value, **_: int(value) == 0,
run=False,
)
# ``drain_status`` keeps ``run=True``: pre-scan the queue is
# trivially empty so this fires immediately at subscribe time,
# which on its own would be a defect — but the AndStatus is
# gated by ``cam_stopped_status`` (which uses ``run=False``
# above), so this just means "drain check is a no-op when
# using acquire_busy with wait_for_plugins=Yes" (since
# acquire_busy already implies HDF drain). On detectors
# without wait_for_plugins, drain_status's *later* edge from
# queue going > 0 then back to 0 carries the real signal —
# AndStatus completes the first time both halves are
# simultaneously done, and StatusBase is monotonic so this
# half being already-done is fine.
drain_status = SubscriptionStatus(
det.hdf1.num_queued_arrays,
lambda *, value, **_: (
int(value) == 0
and int(det.hdf1.queue_free.get(use_monitor=False))
== int(det.hdf1.queue_size.get(use_monitor=False))
),
run=True,
)
hdf_drain_status = AndStatus(cam_stopped_status, drain_status)
# No-frames watchdog (Phase 3 decision 3.10): a status that
# times out if num_captured doesn't reach > 0 within
# no_frames_timeout seconds. ophyd's StatusBase timeout
# mechanism sets the status to done-with-exception
# (StatusTimeoutError) on its own thread; the consumer in
# monitor_loop checks `watchdog_status.done and not
# watchdog_status.success` per tick to detect the trip and
# raise RuntimeError (per 3.11 — RE then sends STOP to all
# in-motion movables, including flymotor).
watchdog_status = SubscriptionStatus(
det.hdf1.num_captured,
lambda *, value, **_: int(value) > 0,
run=True,
timeout=no_frames_timeout,
)
yield from monitor_loop(
flymotor,
det,
p_end,
exit_when=hdf_drain_status,
watchdog=watchdog_status,
tick=_consumer_tick,
)
# Absorb any motor settling past p_final after monitor_loop
# has exited (the HDF queue may drain before the motor fully
# stops at p_final).
logger.info(
"kickoff_and_monitor: waiting for %s to reach p_final",
flymotor.name,
)
yield from bps.wait(group="scan")
def _cleanup():
# Best-effort cleanup; swallow secondary failures so the
# original exception (if any) reaches the RunEngine.
#
# Ordering matters here:
# 1. Stop motor (no more position changes)
# 2. Stop cam.acquire (no more new frames)
# 3. Stop hdf1.capture (close the capture window)
# 4. Drain HDF queue (flush in-flight frames to plugin buffer)
# 5. write_file=1 (force HDF5 file to disk; required because
# auto_save=Yes does not flush when capture is stopped
# early, as we do here — verified empirically)
# 6. Verify full_file_name (the "tell" that the file landed)
# 7. Restore overridden signals
# 8. Restore stage_sigs snapshots
logger.info("flyscan._cleanup: starting")
try:
if motor_is_moving(flymotor):
logger.info("flyscan._cleanup: stopping moving %s", flymotor.name)
yield from bps.stop(flymotor)
except Exception as exc:
logger.exception("flyscan._cleanup: stop(flymotor) failed: %r", exc)
try:
logger.info("flyscan._cleanup: stopping %s acquire", det.name)
yield from bps.mv(det.cam.acquire, 0)
except Exception as exc:
logger.exception("flyscan._cleanup: stop acquire failed: %r", exc)
# Steps 3-6 below only matter if the plan actually got as far
# as starting acquisition. If we died earlier (e.g. a
# FailedStatus during the override loop), skip them to avoid:
# - waiting for drains that will never come,
# - flushing a num_captured value left over from a prior run
# (the PV is cumulative across runs unless explicitly
# reset, which we do not do).
if not capture_started[0]:
logger.info(
"flyscan._cleanup: capture was never armed in this run;"
" skipping stop-capture / drain / flush",
)
else:
try:
logger.info("flyscan._cleanup: stopping %s.hdf1 capture", det.name)
yield from bps.mv(det.hdf1.capture, 0)
except Exception as exc:
logger.exception("flyscan._cleanup: stop capture failed: %r", exc)
# Let the cam settle and the HDF plugin drain its queue
# before we ask it to write the file. Cleanup latency
# does not affect user-visible behavior, so we use a
# coarser tick than monitor_loop's _consumer_tick.
try:
yield from wait_for_acquire_drained(det, poll=_CLEANUP_DRAIN_TICK)
except Exception as exc:
logger.exception(
"flyscan._cleanup: wait_for_acquire_drained failed: %r",
exc,
)
# Verify the file landed on disk. We rely on auto_save=Yes
# (set in the override list) to flush the HDF5 file when
# capture stops. Verified empirically: from a bluesky plan
# with auto_save=Yes, Capture=0 causes the IOC to write the
# file without our needing to set WriteFile=1.
#
# (Manual GUI use with auto_save=No is different: there,
# WriteFile=1 must be set explicitly after Capture=0. An
# earlier version of this code issued WriteFile=1 here as
# well, but it ran *after* the auto_save had already
# written the file, so the IOC rejected it with status=3.
# The file was fine; the error was spurious; the code was
# redundant.)
#
# FullFileName_RBV is populated by the IOC after a
# successful write. An empty value here is the "tell"
# that no file was saved.
try:
n_captured = _safe_get(det.hdf1, "num_captured", use_monitor=False) or 0
full_name = _safe_get(
det.hdf1, "full_file_name", use_monitor=False, as_string=True
)
if n_captured > 0 and full_name:
logger.info(
"flyscan._cleanup: HDF5 file saved: %s" " (num_captured=%d)",
full_name,
n_captured,
)
elif n_captured > 0:
logger.warning(
"flyscan._cleanup: HDF5 file not saved"
" (full_file_name empty; num_captured=%d,"
" write_status=%r, write_message=%r)",
n_captured,
_safe_get(
det.hdf1, "write_status", use_monitor=False, as_string=True
),
_safe_get(
det.hdf1, "write_message", use_monitor=False, as_string=True
),
)
else:
logger.info(
"flyscan._cleanup: no frames captured;" " no file expected",
)
except Exception as exc:
logger.exception(
"flyscan._cleanup: file verification failed: %r",
exc,
)
# Report frames the HDF input dropped this run (cam
# produced them, HDF couldn't keep up). The counter is
# cumulative across runs in the IOC, so we compare to the
# baseline snapshotted at plan entry. Non-zero drops
# indicate the cam is producing faster than the HDF can
# write — even with blocking_callbacks=Yes there are edge
# cases (e.g. the cam's first burst before back-pressure
# propagates). Cleanup logs at WARNING for visibility.
try:
if dropped_arrays_baseline is not None:
dropped_now = _safe_get(
det.hdf1,
"dropped_arrays",
use_monitor=False,
)
if dropped_now is not None:
delta = int(dropped_now) - int(dropped_arrays_baseline)
if delta > 0:
cam_counter = _safe_get(
det.cam, "array_counter", use_monitor=False
)
hdf_counter = _safe_get(
det.hdf1, "array_counter", use_monitor=False
)
msg = (
f"HDF plugin dropped {delta} frame(s)"
f" during this run"
f" (hdf1.dropped_arrays:"
f" {int(dropped_arrays_baseline)}"
f" -> {int(dropped_now)})."
f" cam.array_counter={cam_counter!r},"
f" hdf1.array_counter={hdf_counter!r}."
f" The HDF plugin cannot keep up with the"
f" cam at the requested rate on this IOC"
f" / filesystem / host combination."
f" Frames produced by the cam are missing"
f" from the on-disk HDF5 file."
f" Increase t_period or reduce"
f" exposures_per_egu and retry."
)
# Two channels for visibility:
# - logger.warning: lands in the log file
# and any stream handlers (apsbits sets
# these up to print to the console).
# - warnings.warn: appears in IPython's
# warning channel (rendered distinctly
# from normal output) and can be
# filtered/escalated to exception via
# ``warnings.filterwarnings(...)``.
logger.warning("flyscan._cleanup: %s", msg)
warnings.warn(
msg,
category=FlyscanDataLossWarning,
stacklevel=2,
)
else:
logger.info(
"flyscan._cleanup: HDF dropped 0 frames"
" this run (dropped_arrays unchanged at %d)",
int(dropped_now),
)
except Exception as exc:
logger.exception(
"flyscan._cleanup: dropped-arrays check failed: %r",
exc,
)
try:
logger.info(
"flyscan._cleanup: restoring %d cached signal(s)",
len(original_cache),
)
yield from original_cache.restore()
except Exception as exc:
logger.exception("flyscan._cleanup: restore failed: %r", exc)
try:
logger.info("flyscan._cleanup: restoring stage_sigs snapshot")
restore_stage_sigs(saved_stage_sigs)
except Exception as exc:
logger.exception(
"flyscan._cleanup: restore stage_sigs failed: %r",
exc,
)
try:
logger.info(
"flyscan._cleanup: restoring kinds snapshot (%d signal(s))",
len(saved_kinds),
)
restore_kinds(saved_kinds)
except Exception as exc:
logger.exception(
"flyscan._cleanup: restore kinds failed: %r",
exc,
)
logger.info("flyscan._cleanup: done")
# finalize_wrapper consumes _cleanup as a generator, so it must
# yield at least one message.
yield from bps.null()
# finalize_wrapper guarantees _cleanup runs on success, exception,
# *and* RE-injected exceptions (Ctrl-C, RequestAbort, ...), and
# re-raises the original exception so the RunEngine sees it.
yield from bpp.finalize_wrapper(_main(), _cleanup())
# ----------------
# TODO: Function identify motor readback with each image frame
"""
1. Starting with Bluesky Run object:
1. Get flymotor readback monitor stream (position v. timestamp)
1. Sort by timestamp
1. Get timestamps bracketing p_start
1. Get timestamps bracketing p_end
1. Interpolate to approximate t_start & t_end
1. Note flyscan data is the part between t_start & t_end
1. Get area detector HDF image number monitor stream (image_num v. timestamp)
1. Sort by timestamp
1. Identify all flyscan image numbers collected between t_start & t_end
1. For each frame number, interpolate flymotor readback at each timestamp
"""