Callback#
To demonstrate how the NXWriter
is used as a callback, it is necessary to have a data acquisition setup.
This example scans a sensor
in response to a motor
position. The NXWriter
is subscribed to the RunEngine
so that during data collection, the NXWriter
receives data updates. Once the acquisition ends (when a stop
document is received), the HDF5 file is written.
The data acquisition is a prebuilt synApps xxx IOC driver, packaged in a docker image (prjemian/synapps). The EPICS IOC is started using prefix gp:
by the bash shell script:
$ iocmgr.sh start GP gp
For the purposes of demonstration, the sensor is a random number generator (new values at 10 Hz). The random number generator is provided by a userCalc. The motor is a software simulator of a stepping motor. There is no particular correlation between the sensor
and the motor
in this example, they are used only
for purposes of illustration.
After connecting with the EPICS PVs, the RunEngine
is constructed and connected with a temporary databroker catalog.
Note
If you use NXWriter
(or a subclass), you must wait for all data processing to finish before proceeding with the next acquisition or processing. (The writer()
method is launched in a background thread to complete once all readable assets are available, potentially even after the run ends.) See the NXWriter
documentation for details.
[1]:
%matplotlib inline
from apstools.synApps import setup_random_number_swait
from apstools.synApps import SwaitRecord
from bluesky import RunEngine
from bluesky import SupplementalData
from bluesky import plans as bp
from bluesky.callbacks.best_effort import BestEffortCallback
from matplotlib import pyplot as plt
from ophyd import EpicsMotor
from ophyd import EpicsSignalRO
import databroker
IOC = "gp:"
# ophyd-level
motor = EpicsMotor(f"{IOC}m10", name="motor")
calc10 = SwaitRecord(f"{IOC}userCalc10", name="calc10")
sensor = EpicsSignalRO(calc10.calculated_value.pvname, name="sensor")
motor.wait_for_connection()
sensor.wait_for_connection()
# calc10 sets up the RNG, updating at 10Hz
calc10.wait_for_connection()
setup_random_number_swait(calc10)
# bluesky-level
best_effort_callback = BestEffortCallback()
cat = databroker.temp().v2
plt.ion() # enables matplotlib graphics
RE = RunEngine({})
RE.subscribe(cat.v1.insert)
RE.subscribe(best_effort_callback) # LivePlot & LiveTable
[1]:
1
Setup the NXWriter
to create and write the scan data to an HDF5 file. We override the default HDF5 file name. The steps:
import the Python structures
Define the file name. (A pathlib object provides an easy way to test if the file exists.)
Create the
NXWriter
instanceSubscribe the writer’s
receiver
to the RunEngine.Configure the writer for file name and to suppress extra warnings in the example.
[2]:
from apstools.callbacks import NXWriter
import pathlib
h5_file = pathlib.Path("/tmp/nxwriter.h5")
nxwriter = NXWriter()
RE.subscribe(nxwriter.receiver)
nxwriter.file_name = str(h5_file)
nxwriter.warn_on_missing_content = False
Collect data by scanning sensor
v. motor
. A LiveTable
and LivePlot
will be shown.
The sensor
updates automatically at 10 Hz. The motor
moves slowly enough that the sensor updates before the next position is reached. The data itself is for the purpose of demonstrating the NXWriter
callback.
After the scan, show that the file exists.
[3]:
RE(bp.scan([sensor], motor, -0.5, 0.5, 5))
print(f"{h5_file.exists()=} {h5_file=}")
Transient Scan ID: 1 Time: 2022-08-12 17:27:38
Persistent Unique Scan ID: 'c888d282-1094-403b-baff-3781057ff087'
New stream: 'primary'
+-----------+------------+------------+------------+
| seq_num | time | motor | sensor |
+-----------+------------+------------+------------+
| 1 | 17:27:40.1 | -0.50000 | 0.69377 |
| 2 | 17:27:40.5 | -0.25000 | 0.44727 |
| 3 | 17:27:41.0 | 0.00000 | 0.07127 |
| 4 | 17:27:41.5 | 0.25000 | 0.97433 |
| 5 | 17:27:42.0 | 0.50000 | 0.57342 |
+-----------+------------+------------+------------+
generator scan ['c888d282'] (scan num: 1)
h5_file.exists()=True h5_file=PosixPath('/tmp/nxwriter.h5')
Show the content of the NeXus HDF5 data file using punx, a program external to our Bluesky Python session.
[4]:
from apstools.utils import unix
for line in unix(f"punx tree {nxwriter.file_name}"):
print(line.decode().strip())
!!! WARNING: this program is not ready for distribution.
/tmp/nxwriter.h5 : NeXus data file
@HDF5_Version = "1.12.1"
@NeXus_release = "v2020.1"
@creator = "NXWriter"
@default = "entry"
@file_name = "/tmp/nxwriter.h5"
@file_time = "2022-08-12T17:27:42.294183"
@h5py_version = "3.7.0"
entry:NXentry
@NX_class = "NXentry"
@default = "data"
@target = "/entry"
duration:NX_FLOAT64[] =
@units = "s"
end_time:NX_CHAR = b'2022-08-12T17:27:42.139426'
entry_identifier --> /entry/instrument/bluesky/metadata/run_start_uid
plan_name --> /entry/instrument/bluesky/metadata/plan_name
program_name:NX_CHAR = b'bluesky'
start_time:NX_CHAR = b'2022-08-12T17:27:38.779351'
title:NX_CHAR = b'scan-S0001-c888d28'
data:NXdata
@NX_class = "NXdata"
@axes = ["motor"]
@signal = "sensor"
@target = "/entry/data"
EPOCH --> /entry/instrument/bluesky/streams/primary/sensor/time
motor --> /entry/instrument/bluesky/streams/primary/motor/value
motor_user_setpoint --> /entry/instrument/bluesky/streams/primary/motor_user_setpoint/value
sensor --> /entry/instrument/bluesky/streams/primary/sensor/value
instrument:NXinstrument
@NX_class = "NXinstrument"
@target = "/entry/instrument"
bluesky:NXnote
@NX_class = "NXnote"
@target = "/entry/instrument/bluesky"
plan_name --> /entry/instrument/bluesky/metadata/plan_name
uid --> /entry/instrument/bluesky/metadata/run_start_uid
metadata:NXnote
@NX_class = "NXnote"
@target = "/entry/instrument/bluesky/metadata"
detectors:NX_CHAR = b'- sensor\n'
@target = "/entry/instrument/bluesky/metadata/detectors"
@text_format = "yaml"
hints:NX_CHAR = b'dimensions:\n- !!python/tuple\n - - motor\n - primary\n'
@target = "/entry/instrument/bluesky/metadata/hints"
@text_format = "yaml"
motors:NX_CHAR = b'!!python/tuple\n- motor\n'
@target = "/entry/instrument/bluesky/metadata/motors"
@text_format = "yaml"
num_intervals:NX_INT64[] =
@target = "/entry/instrument/bluesky/metadata/num_intervals"
num_points:NX_INT64[] =
@target = "/entry/instrument/bluesky/metadata/num_points"
plan_args:NX_CHAR = b"args:\n- EpicsMotor(prefix='gp:m10', name='motor', settle_time=0.0, timeout=None, read_attrs=['user_readback',\n 'user_setpoint'], configuration_attrs=['user_offset', 'user_offset_dir', 'velocity',\n 'acceleration', 'motor_egu'])\n- -0.5\n- 0.5\ndetectors:\n- EpicsSignalRO(read_pv='gp:userCalc10.VAL', name='sensor', timestamp=1660343258.421057,\n auto_monitor=False, string=False)\nnum: 5\nper_step: None\n"
@target = "/entry/instrument/bluesky/metadata/plan_args"
@text_format = "yaml"
plan_name:NX_CHAR = b'scan'
@target = "/entry/instrument/bluesky/metadata/plan_name"
plan_pattern:NX_CHAR = b'inner_product'
@target = "/entry/instrument/bluesky/metadata/plan_pattern"
plan_pattern_args:NX_CHAR = b"args:\n- EpicsMotor(prefix='gp:m10', name='motor', settle_time=0.0, timeout=None, read_attrs=['user_readback',\n 'user_setpoint'], configuration_attrs=['user_offset', 'user_offset_dir', 'velocity',\n 'acceleration', 'motor_egu'])\n- -0.5\n- 0.5\nnum: 5\n"
@target = "/entry/instrument/bluesky/metadata/plan_pattern_args"
@text_format = "yaml"
plan_pattern_module:NX_CHAR = b'bluesky.plan_patterns'
@target = "/entry/instrument/bluesky/metadata/plan_pattern_module"
plan_type:NX_CHAR = b'generator'
@target = "/entry/instrument/bluesky/metadata/plan_type"
run_start_uid:NX_CHAR = b'c888d282-1094-403b-baff-3781057ff087'
@long_name = "bluesky run uid"
@target = "/entry/instrument/bluesky/metadata/run_start_uid"
versions:NX_CHAR = b'bluesky: 1.8.3\nophyd: 1.6.4\n'
@target = "/entry/instrument/bluesky/metadata/versions"
@text_format = "yaml"
streams:NXnote
@NX_class = "NXnote"
@target = "/entry/instrument/bluesky/streams"
primary:NXnote
@NX_class = "NXnote"
@target = "/entry/instrument/bluesky/streams/primary"
@uid = "c808bd42-27b4-4498-819d-2279f9516608"
motor:NXdata
@NX_class = "NXdata"
@axes = ["time"]
@signal = "value"
@signal_type = "positioner"
@target = "/entry/instrument/bluesky/streams/primary/motor"
EPOCH:NX_FLOAT64[5] = [1660343259.975937, 1660343260.576943, 1660343261.078024, 1660343261.579225, 1660343262.080634]
@long_name = "epoch time (s)"
@target = "/entry/instrument/bluesky/streams/primary/motor/EPOCH"
@units = "s"
time:NX_FLOAT64[5] = [0.0, 0.601006031036377, 1.1020870208740234, 1.603288173675537, 2.1046972274780273]
@long_name = "time since first data (s)"
@start_time = 1660343259.975937
@start_time_iso = "2022-08-12T17:27:39.975937"
@target = "/entry/instrument/bluesky/streams/primary/motor/time"
@units = "s"
value:NX_FLOAT64[5] = [-0.5, -0.25, 0.0, 0.25, 0.5]
@long_name = "motor"
@lower_ctrl_limit = -32000.0
@precision = 5
@signal_type = "positioner"
@source = "PV:gp:m10.RBV"
@target = "/entry/instrument/bluesky/streams/primary/motor/value"
@units = "degrees"
@upper_ctrl_limit = 32000.0
motor_user_setpoint:NXdata
@NX_class = "NXdata"
@axes = ["time"]
@signal = "value"
@signal_type = "other"
@target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint"
EPOCH:NX_FLOAT64[5] = [1660343258.784328, 1660343260.169698, 1660343260.644624, 1660343261.141399, 1660343261.645333]
@long_name = "epoch time (s)"
@target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint/EPOCH"
@units = "s"
time:NX_FLOAT64[5] = [0.0, 1.3853700160980225, 1.8602960109710693, 2.3570709228515625, 2.8610050678253174]
@long_name = "time since first data (s)"
@start_time = 1660343258.784328
@start_time_iso = "2022-08-12T17:27:38.784328"
@target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint/time"
@units = "s"
value:NX_FLOAT64[5] = [-0.5, -0.25, 0.0, 0.25, 0.5]
@long_name = "motor_user_setpoint"
@lower_ctrl_limit = -32000.0
@precision = 5
@signal_type = "other"
@source = "PV:gp:m10.VAL"
@target = "/entry/instrument/bluesky/streams/primary/motor_user_setpoint/value"
@units = "degrees"
@upper_ctrl_limit = 32000.0
sensor:NXdata
@NX_class = "NXdata"
@axes = ["time"]
@signal = "value"
@signal_type = "detector"
@target = "/entry/instrument/bluesky/streams/primary/sensor"
EPOCH:NX_FLOAT64[5] = [1660343260.021116, 1660343260.521089, 1660343261.021125, 1660343261.521131, 1660343262.021114]
@long_name = "epoch time (s)"
@target = "/entry/instrument/bluesky/streams/primary/sensor/EPOCH"
@units = "s"
time:NX_FLOAT64[5] = [0.0, 0.4999730587005615, 1.0000090599060059, 1.5000150203704834, 1.9999980926513672]
@long_name = "time since first data (s)"
@start_time = 1660343260.021116
@start_time_iso = "2022-08-12T17:27:40.021116"
@target = "/entry/instrument/bluesky/streams/primary/sensor/time"
@units = "s"
value:NX_FLOAT64[5] = [0.6937666895551995, 0.44727244983596554, 0.07127489127946898, 0.9743343251697566, 0.5734187838559548]
@long_name = "sensor"
@lower_ctrl_limit = 0.0
@precision = 5
@signal_type = "detector"
@source = "PV:gp:userCalc10.VAL"
@target = "/entry/instrument/bluesky/streams/primary/sensor/value"
@units = ""
@upper_ctrl_limit = 0.0
detectors:NXnote
@NX_class = "NXnote"
@target = "/entry/instrument/detectors"
sensor:NXdetector
@NX_class = "NXdetector"
@target = "/entry/instrument/detectors/sensor"
data --> /entry/instrument/bluesky/streams/primary/sensor
positioners:NXnote
@NX_class = "NXnote"
@target = "/entry/instrument/positioners"
motor:NXpositioner
@NX_class = "NXpositioner"
@target = "/entry/instrument/positioners/motor"
value --> /entry/instrument/bluesky/streams/primary/motor
source:NXsource
@NX_class = "NXsource"
@target = "/entry/instrument/source"
name:NX_CHAR = b'Bluesky framework'
@short_name = "bluesky"
probe:NX_CHAR = b'x-ray'
type:NX_CHAR = b'Synchrotron X-ray Source'
Export#
It is possible to use the NXWriter
to export the data from the databroker to a NeXus/HDF5 file. The bluesky community is preparing the tiled data access service to make such data readily available.
This example shows an alternative method to export one run to one NeXus/HDF5 data file. While other variations are possible (such as a list of runs in one file), they are not shown here to keep the example simple.
The data export is based on the replay() from apstools.
We assume the run to be exported is identified by scan_id = 1
, as the example above shows. This example uses the cat
object created above. You should create this as shown in the comment.
The steps:
the Python imports
define the file to be read and the scan_id
create the
NXWriter()
instancesuppress the warnings we do not need to see
use
replay()
to get the data (note thev1
is important) and send it to thereceiver
show the new HDF5 file exists
[5]:
from apstools.callbacks import NXWriter
from apstools.utils import replay
import databroker
# This example uses 'cat' as defined above. You use this next line:
# cat = databroker.databroker["YOUR_CATALOG_NAME"]
h5_file = pathlib.Path("/tmp/db_export.h5")
scan_id = 1 # TODO: you choose
nxwriter = NXWriter()
nxwriter.file_name = str(h5_file)
nxwriter.warn_on_missing_content = False
replay(cat.v1[scan_id], nxwriter.receiver)
print(f"{h5_file.exists()=} {h5_file=}")
h5_file.exists()=True h5_file=PosixPath('/tmp/db_export.h5')