Callback#

To demonstrate how the NXWriter is used as a callback, it is necessary to have a data acquisition setup.

This example scans a sensor in response to a motor position. The NXWriter is subscribed to the RunEngine so that during data collection, the NXWriter receives data updates. Once the acquisition ends (when a stop document is received), the HDF5 file is written.

The data acquisition is a prebuilt synApps xxx IOC driver, packaged in a docker image (prjemian/synapps). The EPICS IOC is started using prefix gp: by the bash shell script:

$ iocmgr.sh start GP gp

For the purposes of demonstration, the sensor is a random number generator (new values at 10 Hz). The random number generator is provided by a userCalc. The motor is a software simulator of a stepping motor. There is no particular correlation between the sensor and the motor in this example, they are used only for purposes of illustration.

After connecting with the EPICS PVs, the RunEngine is constructed and connected with a temporary databroker catalog.

Note

If you use NXWriter (or a subclass), you must wait for all data processing to finish before proceeding with the next acquisition or processing. (The writer() method is launched in a background thread to complete once all readable assets are available, potentially even after the run ends.) See the NXWriter documentation for details.

[1]:
%matplotlib inline
from apstools.synApps import setup_random_number_swait
from apstools.synApps import SwaitRecord
from bluesky import RunEngine
from bluesky import SupplementalData
from bluesky import plans as bp
from bluesky.callbacks.best_effort import BestEffortCallback
from matplotlib import pyplot as plt
from ophyd import EpicsMotor
from ophyd import EpicsSignalRO
import databroker

IOC = "gp:"

# ophyd-level
motor = EpicsMotor(f"{IOC}m10", name="motor")
calc10 = SwaitRecord(f"{IOC}userCalc10", name="calc10")
sensor = EpicsSignalRO(calc10.calculated_value.pvname, name="sensor")
motor.wait_for_connection()
sensor.wait_for_connection()

# calc10 sets up the RNG, updating at 10Hz
calc10.wait_for_connection()
setup_random_number_swait(calc10)

# bluesky-level
best_effort_callback = BestEffortCallback()
cat = databroker.temp().v2
plt.ion()  # enables matplotlib graphics
RE = RunEngine({})
RE.subscribe(cat.v1.insert)
RE.subscribe(best_effort_callback)  # LivePlot & LiveTable

[1]:
1

Setup the NXWriter to create and write the scan data to an HDF5 file. We override the default HDF5 file name. The steps:

  1. import the Python structures

  2. Define the file name. (A pathlib object provides an easy way to test if the file exists.)

  3. Create the NXWriter instance

  4. Subscribe the writer’s receiver to the RunEngine.

  5. Configure the writer for file name and to suppress extra warnings in the example.

[2]:
from apstools.callbacks import NXWriter
import pathlib

h5_file = pathlib.Path("/tmp/nxwriter.h5")

nxwriter = NXWriter()
RE.subscribe(nxwriter.receiver)
nxwriter.file_name = str(h5_file)
nxwriter.warn_on_missing_content = False

Collect data by scanning sensor v. motor. A LiveTable and LivePlot will be shown.

The sensor updates automatically at 10 Hz. The motor moves slowly enough that the sensor updates before the next position is reached. The data itself is for the purpose of demonstrating the NXWriter callback.

After the scan, show that the file exists.

[3]:
RE(bp.scan([sensor], motor, -0.5, 0.5, 5))
print(f"{h5_file.exists()=}  {h5_file=}")


Transient Scan ID: 1     Time: 2022-08-12 17:27:38
Persistent Unique Scan ID: 'c888d282-1094-403b-baff-3781057ff087'
New stream: 'primary'
+-----------+------------+------------+------------+
|   seq_num |       time |      motor |     sensor |
+-----------+------------+------------+------------+
|         1 | 17:27:40.1 |   -0.50000 |    0.69377 |
|         2 | 17:27:40.5 |   -0.25000 |    0.44727 |
|         3 | 17:27:41.0 |    0.00000 |    0.07127 |
|         4 | 17:27:41.5 |    0.25000 |    0.97433 |
|         5 | 17:27:42.0 |    0.50000 |    0.57342 |
+-----------+------------+------------+------------+
generator scan ['c888d282'] (scan num: 1)



h5_file.exists()=True  h5_file=PosixPath('/tmp/nxwriter.h5')
../_images/examples_fw_nxwriter_5_1.png

Show the content of the NeXus HDF5 data file using punx, a program external to our Bluesky Python session.

[4]:
from apstools.utils import unix

for line in unix(f"punx tree {nxwriter.file_name}"):
    print(line.decode().strip())
!!! WARNING: this program is not ready for distribution.

/tmp/nxwriter.h5 : NeXus data file
  @HDF5_Version = "1.12.1"
  @NeXus_release = "v2020.1"
  @creator = "NXWriter"
  @default = "entry"
  @file_name = "/tmp/nxwriter.h5"
  @file_time = "2022-08-12T17:27:42.294183"
  @h5py_version = "3.7.0"
  entry:NXentry
    @NX_class = "NXentry"
    @default = "data"
    @target = "/entry"
    duration:NX_FLOAT64[] =
      @units = "s"
    end_time:NX_CHAR = b'2022-08-12T17:27:42.139426'
    entry_identifier --> /entry/instrument/bluesky/metadata/run_start_uid
    plan_name --> /entry/instrument/bluesky/metadata/plan_name
    program_name:NX_CHAR = b'bluesky'
    start_time:NX_CHAR = b'2022-08-12T17:27:38.779351'
    title:NX_CHAR = b'scan-S0001-c888d28'
    data:NXdata
      @NX_class = "NXdata"
      @axes = ["motor"]
      @signal = "sensor"
      @target = "/entry/data"
      EPOCH --> /entry/instrument/bluesky/streams/primary/sensor/time
      motor --> /entry/instrument/bluesky/streams/primary/motor/value
      motor_user_setpoint --> /entry/instrument/bluesky/streams/primary/motor_user_setpoint/value
      sensor --> /entry/instrument/bluesky/streams/primary/sensor/value
    instrument:NXinstrument
      @NX_class = "NXinstrument"
      @target = "/entry/instrument"
      bluesky:NXnote
        @NX_class = "NXnote"
        @target = "/entry/instrument/bluesky"
        plan_name --> /entry/instrument/bluesky/metadata/plan_name
        uid --> /entry/instrument/bluesky/metadata/run_start_uid
        metadata:NXnote
          @NX_class = "NXnote"
          @target = "/entry/instrument/bluesky/metadata"
          detectors:NX_CHAR = b'- sensor\n'
            @target = "/entry/instrument/bluesky/metadata/detectors"
            @text_format = "yaml"
          hints:NX_CHAR = b'dimensions:\n- !!python/tuple\n  - - motor\n  - primary\n'
            @target = "/entry/instrument/bluesky/metadata/hints"
            @text_format = "yaml"
          motors:NX_CHAR = b'!!python/tuple\n- motor\n'
            @target = "/entry/instrument/bluesky/metadata/motors"
            @text_format = "yaml"
          num_intervals:NX_INT64[] =
            @target = "/entry/instrument/bluesky/metadata/num_intervals"
          num_points:NX_INT64[] =
            @target = "/entry/instrument/bluesky/metadata/num_points"
          plan_args:NX_CHAR = b"args:\n- EpicsMotor(prefix='gp:m10', name='motor', settle_time=0.0, timeout=None, read_attrs=['user_readback',\n  'user_setpoint'], configuration_attrs=['user_offset', 'user_offset_dir', 'velocity',\n  'acceleration', 'motor_egu'])\n- -0.5\n- 0.5\ndetectors:\n- EpicsSignalRO(read_pv='gp:userCalc10.VAL', name='sensor', timestamp=1660343258.421057,\n  auto_monitor=False, string=False)\nnum: 5\nper_step: None\n"
            @target = "/entry/instrument/bluesky/metadata/plan_args"
            @text_format = "yaml"
          plan_name:NX_CHAR = b'scan'
            @target = "/entry/instrument/bluesky/metadata/plan_name"
          plan_pattern:NX_CHAR = b'inner_product'
            @target = "/entry/instrument/bluesky/metadata/plan_pattern"
          plan_pattern_args:NX_CHAR = b"args:\n- EpicsMotor(prefix='gp:m10', name='motor', settle_time=0.0, timeout=None, read_attrs=['user_readback',\n  'user_setpoint'], configuration_attrs=['user_offset', 'user_offset_dir', 'velocity',\n  'acceleration', 'motor_egu'])\n- -0.5\n- 0.5\nnum: 5\n"
            @target = "/entry/instrument/bluesky/metadata/plan_pattern_args"
            @text_format = "yaml"
          plan_pattern_module:NX_CHAR = b'bluesky.plan_patterns'
            @target = "/entry/instrument/bluesky/metadata/plan_pattern_module"
          plan_type:NX_CHAR = b'generator'
            @target = "/entry/instrument/bluesky/metadata/plan_type"
          run_start_uid:NX_CHAR = b'c888d282-1094-403b-baff-3781057ff087'
            @long_name = "bluesky run uid"
            @target = "/entry/instrument/bluesky/metadata/run_start_uid"
          versions:NX_CHAR = b'bluesky: 1.8.3\nophyd: 1.6.4\n'
            @target = "/entry/instrument/bluesky/metadata/versions"
            @text_format = "yaml"
        streams:NXnote
          @NX_class = "NXnote"
          @target = "/entry/instrument/bluesky/streams"
          primary:NXnote
            @NX_class = "NXnote"
            @target = "/entry/instrument/bluesky/streams/primary"
            @uid = "c808bd42-27b4-4498-819d-2279f9516608"
            motor:NXdata
              @NX_class = "NXdata"
              @axes = ["time"]
              @signal = "value"
              @signal_type = "positioner"
              @target = "/entry/instrument/bluesky/streams/primary/motor"
              EPOCH:NX_FLOAT64[5] = [1660343259.975937, 1660343260.576943, 1660343261.078024, 1660343261.579225, 1660343262.080634]
                @long_name = "epoch time (s)"
                @target = "/entry/instrument/bluesky/streams/primary/motor/EPOCH"
                @units = "s"
              time:NX_FLOAT64[5] = [0.0, 0.601006031036377, 1.1020870208740234, 1.603288173675537, 2.1046972274780273]
                @long_name = "time since first data (s)"
                @start_time = 1660343259.975937
                @start_time_iso = "2022-08-12T17:27:39.975937"
                @target = "/entry/instrument/bluesky/streams/primary/motor/time"
                @units = "s"
              value:NX_FLOAT64[5] = [-0.5, -0.25, 0.0, 0.25, 0.5]
                @long_name = "motor"
                @lower_ctrl_limit = -32000.0
                @precision = 5
                @signal_type = "positioner"
                @source = "PV:gp:m10.RBV"
                @target = "/entry/instrument/bluesky/streams/primary/motor/value"
                @units = "degrees"
                @upper_ctrl_limit = 32000.0
            motor_user_setpoint:NXdata
              @NX_class = "NXdata"
              @axes = ["time"]
              @signal = "value"
              @signal_type = "other"
              @target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint"
              EPOCH:NX_FLOAT64[5] = [1660343258.784328, 1660343260.169698, 1660343260.644624, 1660343261.141399, 1660343261.645333]
                @long_name = "epoch time (s)"
                @target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint/EPOCH"
                @units = "s"
              time:NX_FLOAT64[5] = [0.0, 1.3853700160980225, 1.8602960109710693, 2.3570709228515625, 2.8610050678253174]
                @long_name = "time since first data (s)"
                @start_time = 1660343258.784328
                @start_time_iso = "2022-08-12T17:27:38.784328"
                @target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint/time"
                @units = "s"
              value:NX_FLOAT64[5] = [-0.5, -0.25, 0.0, 0.25, 0.5]
                @long_name = "motor_user_setpoint"
                @lower_ctrl_limit = -32000.0
                @precision = 5
                @signal_type = "other"
                @source = "PV:gp:m10.VAL"
                @target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint/value"
                @units = "degrees"
                @upper_ctrl_limit = 32000.0
            sensor:NXdata
              @NX_class = "NXdata"
              @axes = ["time"]
              @signal = "value"
              @signal_type = "detector"
              @target = "/entry/instrument/bluesky/streams/primary/sensor"
              EPOCH:NX_FLOAT64[5] = [1660343260.021116, 1660343260.521089, 1660343261.021125, 1660343261.521131, 1660343262.021114]
                @long_name = "epoch time (s)"
                @target = "/entry/instrument/bluesky/streams/primary/sensor/EPOCH"
                @units = "s"
              time:NX_FLOAT64[5] = [0.0, 0.4999730587005615, 1.0000090599060059, 1.5000150203704834, 1.9999980926513672]
                @long_name = "time since first data (s)"
                @start_time = 1660343260.021116
                @start_time_iso = "2022-08-12T17:27:40.021116"
                @target = "/entry/instrument/bluesky/streams/primary/sensor/time"
                @units = "s"
              value:NX_FLOAT64[5] = [0.6937666895551995, 0.44727244983596554, 0.07127489127946898, 0.9743343251697566, 0.5734187838559548]
                @long_name = "sensor"
                @lower_ctrl_limit = 0.0
                @precision = 5
                @signal_type = "detector"
                @source = "PV:gp:userCalc10.VAL"
                @target = "/entry/instrument/bluesky/streams/primary/sensor/value"
                @units = ""
                @upper_ctrl_limit = 0.0
      detectors:NXnote
        @NX_class = "NXnote"
        @target = "/entry/instrument/detectors"
        sensor:NXdetector
          @NX_class = "NXdetector"
          @target = "/entry/instrument/detectors/sensor"
          data --> /entry/instrument/bluesky/streams/primary/sensor
      positioners:NXnote
        @NX_class = "NXnote"
        @target = "/entry/instrument/positioners"
        motor:NXpositioner
          @NX_class = "NXpositioner"
          @target = "/entry/instrument/positioners/motor"
          value --> /entry/instrument/bluesky/streams/primary/motor
      source:NXsource
        @NX_class = "NXsource"
        @target = "/entry/instrument/source"
        name:NX_CHAR = b'Bluesky framework'
          @short_name = "bluesky"
        probe:NX_CHAR = b'x-ray'
        type:NX_CHAR = b'Synchrotron X-ray Source'

Export#

It is possible to use the NXWriter to export the data from the databroker to a NeXus/HDF5 file. The bluesky community is preparing the tiled data access service to make such data readily available.

This example shows an alternative method to export one run to one NeXus/HDF5 data file. While other variations are possible (such as a list of runs in one file), they are not shown here to keep the example simple.

The data export is based on the replay() from apstools.

We assume the run to be exported is identified by scan_id = 1, as the example above shows. This example uses the cat object created above. You should create this as shown in the comment.

The steps:

  1. the Python imports

  2. define the file to be read and the scan_id

  3. create the NXWriter() instance

  4. suppress the warnings we do not need to see

  5. use replay() to get the data (note the v1 is important) and send it to the receiver

  6. show the new HDF5 file exists

[5]:
from apstools.callbacks import NXWriter
from apstools.utils import replay
import databroker

# This example uses 'cat' as defined above.  You use this next line:
# cat = databroker.databroker["YOUR_CATALOG_NAME"]

h5_file = pathlib.Path("/tmp/db_export.h5")
scan_id = 1  # TODO: you choose

nxwriter = NXWriter()
nxwriter.file_name = str(h5_file)
nxwriter.warn_on_missing_content = False
replay(cat.v1[scan_id], nxwriter.receiver)
print(f"{h5_file.exists()=}  {h5_file=}")
h5_file.exists()=True  h5_file=PosixPath('/tmp/db_export.h5')