Source code for id3c.utils.flyscan_3idc_analysis

"""Post-run analysis helpers for ``flyscan_3idc`` runs.

This module pairs each detector frame with the motor's
interpolated position at the frame's IOC timestamp, using the
monitor streams that ``flyscan_3idc.flyscan`` sets up via
``@bpp.monitor_during_decorator``.  No bluesky, no ophyd, no
RunEngine — purely operates on ``BlueskyRun``-shaped objects from
tiled / databroker.

Usage
-----

::

    from tiled.client import from_profile
    cat = from_profile("your_profile")["your_tree"]
    run = cat[-1]
    from id3c.utils.flyscan_3idc_analysis import pair_frames_to_positions
    df = pair_frames_to_positions(run)
    # df columns: image_number, timestamp, position
    # df.index: absolute timestamp (float seconds since epoch)
    df.to_csv("scan.csv")

Design notes
------------

- IOC timestamps are the system of record for pairing (per the
  ``flyscan_3idc`` strategy doc, Phase 0.2 / Phase 0e).  The
  primary-stream snapshots from the plan are a progress indicator;
  this module's output is the high-fidelity pairing.
- Empirically (verified during the 2026-06-08 commissioning
  session against ``adsimdet`` + ``gp:m1``):
  - the m1 monitor stream's record-order is interleaved across
    multiple CA dispatcher segments; sorting by timestamp yields
    a strictly increasing position trace at constant velocity in
    the in-scan window.
  - the HDF array_counter monitor stream's record-order is also
    interleaved, but sorting by timestamp yields strictly
    monotonic counter values (0, 1, 2, ..., contiguous).
- The function uses linear interpolation of motor position vs
  motor IOC timestamp.  Linear is exact for a motor at constant
  velocity in the in-scan window (which is the entire reason the
  plan sets velocity = (p_end-p_start)/(num_frames*t_period) and
  taxis the motor up to scan velocity before crossing p_start).
- Frames whose timestamps fall outside the motor stream's time
  range are dropped (extrapolation is rejected, never silent).
- Frames whose interpolated positions fall outside
  ``[p_start, p_end]`` are dropped (this is "frames captured
  during taxi-in / coast-out" — they're in the HDF5 file but not
  part of the scan).
"""

from __future__ import annotations

import logging

import numpy as np
import pandas as pd

[docs] logger = logging.getLogger(__name__)
def _interpolate_positions( motor_t: np.ndarray, motor_pos: np.ndarray, hdf_t: np.ndarray, hdf_counter: np.ndarray, p_start: float, p_end: float, ) -> pd.DataFrame: """Pair HDF frames with linearly-interpolated motor positions. Pure-array core; no run object, no ophyd, no bluesky. Tests construct inputs directly. Parameters ---------- motor_t : np.ndarray IOC timestamps (seconds since epoch) for motor samples. May be in arbitrary order with duplicates; this function sorts and dedupes. motor_pos : np.ndarray Motor positions (engineering units) aligned with ``motor_t``. hdf_t : np.ndarray IOC timestamps (seconds since epoch) for HDF frame captures. May be in arbitrary order; this function sorts. hdf_counter : np.ndarray Integer HDF frame counter (``hdf1.array_counter``) aligned with ``hdf_t``. p_start, p_end : float Scan range in motor engineering units. Frames whose interpolated positions fall outside ``[p_start, p_end]`` are dropped. Returns ------- pandas.DataFrame Columns: ``image_number`` (int), ``timestamp`` (float), ``position`` (float). Indexed by ``timestamp``. Sorted by timestamp ascending. Rows are only those frames inside ``[p_start, p_end]`` *and* whose timestamps fall within the motor stream's time range. """ motor_t = np.asarray(motor_t, dtype=float) motor_pos = np.asarray(motor_pos, dtype=float) hdf_t = np.asarray(hdf_t, dtype=float) hdf_counter = np.asarray(hdf_counter, dtype=np.int64) if motor_t.shape != motor_pos.shape: raise ValueError( f"motor_t shape {motor_t.shape} != motor_pos shape" f" {motor_pos.shape}" ) if hdf_t.shape != hdf_counter.shape: raise ValueError( f"hdf_t shape {hdf_t.shape} != hdf_counter shape" f" {hdf_counter.shape}" ) if motor_t.size < 2: raise ValueError( f"motor stream has {motor_t.size} sample(s); need >= 2" " for interpolation" ) if hdf_t.size == 0: # Nothing to pair; return an empty correctly-typed frame. return _empty_result() # Sort motor by timestamp and dedupe — keep first occurrence # of each unique timestamp. ``np.unique(..., return_index=True)`` # returns indices into the sorted-unique array; combine with # argsort to recover the first occurrence in the original order. m_order = np.argsort(motor_t, kind="stable") m_t_sorted = motor_t[m_order] m_p_sorted = motor_pos[m_order] # Find duplicates after sort: keep first. _, unique_idx = np.unique(m_t_sorted, return_index=True) unique_idx.sort() m_t = m_t_sorted[unique_idx] m_p = m_p_sorted[unique_idx] n_dropped_dups = motor_t.size - m_t.size if n_dropped_dups: logger.debug( "_interpolate_positions: deduplicated %d motor sample(s)" " with repeated timestamp", n_dropped_dups, ) # Sort HDF by timestamp. h_order = np.argsort(hdf_t, kind="stable") h_t = hdf_t[h_order] h_c = hdf_counter[h_order] # Drop HDF frames outside the motor stream's time range # (extrapolation rejected, never silent). t_lo, t_hi = m_t[0], m_t[-1] in_time_range = (h_t >= t_lo) & (h_t <= t_hi) n_out_of_range = h_t.size - int(in_time_range.sum()) if n_out_of_range: logger.warning( "_interpolate_positions: dropping %d HDF frame(s) with" " timestamps outside motor stream range" " [%g, %g] (would require extrapolation)", n_out_of_range, t_lo, t_hi, ) h_t_keep = h_t[in_time_range] h_c_keep = h_c[in_time_range] if h_t_keep.size == 0: return _empty_result() # Linear interpolation — np.interp requires monotonic xp; we # ensured that via sort+unique. positions = np.interp(h_t_keep, m_t, m_p) # Trim to in-scan range [p_start, p_end]. in_scan = (positions >= p_start) & (positions <= p_end) n_out_of_scan = h_t_keep.size - int(in_scan.sum()) if n_out_of_scan: logger.info( "_interpolate_positions: dropping %d HDF frame(s) with" " interpolated position outside [%g, %g] (taxi-in / " "coast-out frames)", n_out_of_scan, p_start, p_end, ) df = pd.DataFrame( { "image_number": h_c_keep[in_scan].astype(np.int64), "timestamp": h_t_keep[in_scan].astype(float), "position": positions[in_scan].astype(float), } ) df = df.set_index("timestamp", drop=False) df = df.sort_index() return df def _empty_result() -> pd.DataFrame: """Return a correctly-typed empty result DataFrame.""" df = pd.DataFrame( { "image_number": np.array([], dtype=np.int64), "timestamp": np.array([], dtype=float), "position": np.array([], dtype=float), } ) df = df.set_index("timestamp", drop=False) return df
[docs] def pair_frames_to_positions(run) -> pd.DataFrame: """Pair each in-scan HDF frame with the interpolated motor position. Reads everything from the run's start-document metadata and the standard monitor streams set up by ``flyscan_3idc.flyscan``: - ``<flymotor_name>_monitor`` — motor position vs IOC timestamp. - ``<det_name>_hdf1_array_counter_monitor`` — HDF ``array_counter`` vs IOC timestamp. - ``p_start`` / ``p_end`` from ``run.metadata["start"]``. Frames captured outside ``[p_start, p_end]`` are dropped. Frames whose IOC timestamps fall outside the motor stream's time range are dropped (extrapolation never happens silently). Parameters ---------- run : BlueskyRun Tiled / databroker run object. Must have ``.metadata["start"]`` with the keys ``p_start``, ``p_end``, ``flymotor_name``, ``det_name``, and must expose monitor streams named per the convention above. Returns ------- pandas.DataFrame Columns: ``image_number`` (int64, the HDF array_counter value at frame capture), ``timestamp`` (float, IOC time of capture), ``position`` (float, motor position in engineering units, linearly interpolated). Indexed by ``timestamp`` ascending. Raises ------ KeyError Required metadata key or monitor stream is missing from the run. ValueError Motor stream has fewer than 2 samples (cannot interpolate). """ md = _get_start_metadata(run) flymotor_name = _require_metadata_key(md, "flymotor_name") det_name = _require_metadata_key(md, "det_name") p_start = float(_require_metadata_key(md, "p_start")) p_end = float(_require_metadata_key(md, "p_end")) motor_stream_name = f"{flymotor_name}_monitor" motor_field_name = flymotor_name hdf_stream_name = f"{det_name}_hdf1_array_counter_monitor" hdf_field_name = f"{det_name}_hdf1_array_counter" motor_ds = _read_stream(run, motor_stream_name) hdf_ds = _read_stream(run, hdf_stream_name) motor_t = _array_from_ds(motor_ds, "time", motor_stream_name) motor_pos = _array_from_ds(motor_ds, motor_field_name, motor_stream_name) hdf_t = _array_from_ds(hdf_ds, "time", hdf_stream_name) hdf_counter = _array_from_ds(hdf_ds, hdf_field_name, hdf_stream_name) logger.info( "pair_frames_to_positions: motor=%r (%d sample(s))," " hdf=%r (%d frame(s)), p_start=%g p_end=%g", motor_stream_name, motor_t.size, hdf_stream_name, hdf_t.size, p_start, p_end, ) df = _interpolate_positions( motor_t, motor_pos, hdf_t, hdf_counter, p_start, p_end, ) logger.info( "pair_frames_to_positions: paired %d in-scan frame(s)", len(df), ) return df
# --------------------------------------------------------------------------- # Internal: thin shims around the BlueskyRun shape that let us swap in # duck-typed mocks for tests. # --------------------------------------------------------------------------- def _get_start_metadata(run) -> dict: """Return the run's start-document metadata as a dict.""" md = getattr(run, "metadata", None) if md is None: raise KeyError("run object has no .metadata attribute") # Both dict-style (BlueskyRun) and Mapping-style access work. try: start = md["start"] except (KeyError, TypeError) as exc: raise KeyError("run.metadata is missing the 'start' document") from exc if not isinstance(start, dict): raise KeyError( f"run.metadata['start'] is {type(start).__name__}," " expected dict" ) return start def _require_metadata_key(md: dict, key: str): if key not in md: raise KeyError(f"run.metadata['start'] is missing required key {key!r}") return md[key] def _read_stream(run, stream_name: str): """Return ``run.<stream_name>.read()`` (an xarray.Dataset).""" stream = getattr(run, stream_name, None) if stream is None: raise KeyError(f"run has no stream named {stream_name!r}") if not hasattr(stream, "read"): raise KeyError( f"run.{stream_name} has no .read() method" f" (got {type(stream).__name__})" ) return stream.read() def _array_from_ds(ds, key: str, stream_name: str) -> np.ndarray: """Pull a column from an xarray.Dataset (or dict-like) as a 1-D numpy array.""" try: col = ds[key] except (KeyError, TypeError) as exc: raise KeyError(f"stream {stream_name!r} has no column {key!r}") from exc # xarray DataArray exposes .data; numpy arrays are already arrays; # pandas Series have .to_numpy(). Try them in order. if hasattr(col, "data"): arr = col.data elif hasattr(col, "to_numpy"): arr = col.to_numpy() else: arr = np.asarray(col) return np.asarray(arr)