Integration with APS Data Management (DM)#

The APS Data Management System is a system for gathering together experimental data, metadata about the experiment and providing users access to the data based on a users role.

APS beamlines have been using DM to process acquired data (with DM workflows). See the documentation for more details. This document describes a way to integrate DM with the Bluesky framework. Before the integration is described, a brief overview of DM…

DM Overview#

For integration with the Bluesky framework, we provide a minimal explanation for these parts of DM:

  • experiment

  • DAQ

  • workflow

  • upload

experiment#

The experiment is the central object for creating entries in the Data Management System.

Typically, a DM experiment will manage all data for a single proposal/ESAF at a beamline. Beamline staff will create the DM experiments they will need.

The name of the experiment will be used by bluesky to coordinate acquired data and runs with DM.

An experiment will create data storage with permissions for the beamline data acquisition account and all the users listed in the proposal.

More info here

DAQ#

… overall purpose of this system is to get data into the system and provide access control to that data.

In DM, a DAQ monitors a local file directory tree (on a specific workstation) and copies any new content to the DM experiment’s data storage.

More info here

workflow#

A workflow is simply a set of defined steps that will process data.

The steps to process data: move acquired data to computation hosts (local or remote), run the computation applications, then move any results to a final destination. A workflow is capable of other activities, such as communicating back to EPICS.

More info here

upload#

In addition to automatic file copying with a DAQ, DM provides for single file (or whole directory) uploads to the experiment storage. This can be used for content that is not in the local data directory trees monitored by a DAQ.

In file mode, files are processed (transfer, catalog, invoke workflow, etc) one by one. In directory mode, there will be a single transfer of the whole directory, after which all files will be cataloged, etc.

The advantage of the file mode is that you can keep track of progress and see/recover from errors easier. The advantage of the directory mode is that it is more efficient, especially for large number of small files, where any new transfer connection is more expensive relative to the data transfer time.

Bluesky integration#

Many beamlines operate with software that provides some general setup for the next user group. Then, the user runs various plans to align the instrument and sample, then to collect the scientific data.

Setup User#

Activities could include creating local storage directories, identifying proposal and/or ESAF numbers, …

The setup_user() procedure is a convenient place to enter the name of the DM experiment to be used for the user’s data collection. Bluesky should remember this name so that the user does not need to supply for any of their data collection activities. For example:

def setup_user(dm_experiment_name: str = ""):
    yield from bps.mv(dm_experiment, dm_experiment_name)

where

from ophyd import Signal
dm_experiment = Signal(name="dm_experiment", value="")

Start a DAQ if needed#

Bluesky might direct some data acquisition to write data into local file storage (such as area detector image files). A DAQ can be started by setup_user() to copy new files to the DM experiment. For example:

from apstools.utils import dm_get_experiment_datadir_active_daq, dm_start_daq

def setup_user(dm_experiment_name: str = ""):
    yield from bps.mv(dm_experiment, dm_experiment_name)

    # local directory tree to monitor
    data_directory = "/some/path/to/data/"
    # DM experiment subdirectory for upload
    dm_daq_directory = "something"
    daq = dm_get_experiment_datadir_active_daq(
        dm_experiment_name, data_directory)
    if daq is None:
        daq = dm_start_daq(
            dm_experiment_name,
            data_directory,
            destDirectory=dm_daq_directory
        )
    # remember this for later
    yield from bps.mv(dm_daq_id, daq["id])

where

dm_daq_id = Signal(name="dm_daq_id", value="")

Quickly, this can become more complicated if more than one DAQ is needed.

The value for dm_daq_directory is to be decided by the software (called in the workflow) that processes the data.

Upload a file if needed#

It may be needed to upload a file during the setup_user() plan. Here’s an example:

import pathlib
from apstools.utils import dm_upload

def setup_user(
    dm_experiment_name: str = "",
    upload_file: str = "",
):
    yield from bps.mv(dm_experiment, dm_experiment_name)

    # upload a file
    # DM experiment subdirectory for upload
    p = pathlib.Path(upload_file)
    dm_upload_directory = "something"
    dm_upload(
        dm_experiment_name,
        str(p.parent),  # the directory name
        experimentFilePath=p.name  # the file name
        destDirectory=dm_upload_directory,
    )

Like dm_daq_directory above, the value for dm_upload_directory is to be decided by the software (called in the workflow) that processes the data.

Data collection plan(s)#

Integration of DM with bluesky plans is dependent on the type of scan to be executed and how the workflow will interact.

Two general possibilities come to mind:

  • file-based collection and workflow

  • streaming-based collection and workflow

In either case, the apstools.devices.DM_WorkflowConnector is an ophyd-style Device that is used to coordinate with a DM workflow. Create a connector:

from apstools.devices import DM_WorkflowConnector

    # ... later, in the bluesky plan ...
    # REPLACE "dm_workflow" with the name of the workflow to be used.
    # This could be a keyword argument to the plan!
    # The reporting settings could also be user-selectable keyword arguments.
    dm_workflow = DM_WorkflowConnector(name="dm_workflow")
    yield from bps.mv(
        dm_workflow.concise_reporting, False,  # True: for less details
        dm_workflow.reporting_period, 60,  # for periodic reports (s)
    )

Start the workflow (with the dm_workflow object) in the data acquisition plan as if it is a bluesky plan. This way, the startup does not block the RunEngine from its periodic responsibilities.

    yield from dm_workflow.run_as_plan(
        workflow=workflow_name,
        wait=dm_wait,
        timeout=999_999_999, # seconds (aka forever)
        # all kwargs after this line are DM argsDict content
        filePath=dm_directory,  # like dm_daq_directory
        experiment=dm_experiment.get(),
        # ... any other keyword args needed by the plan ...
    )

Here dm_directory is (like dm_daq_directory and dm_upload_directory above) the experiment subdirectory where the workflow expects to find the files.

Note that the timeout parameter is for the background process that watches the workflow’s progress. If the timeout is reached, the reporting stops but the workflow itself is unaffected.

Any keywords expected by the plan should be included as user-selectable arguments of the data collection plan, or determined by the plan itself.

File-based#

In a file data collection and workflow, data is acquired first and stored into files on local storage. Upload of the files is managed either by a DAQ or by direct call(s) to apstools.utils.dm_upload(). If a DAQ is used, the bluesky plan should wait until the DAQ reports the expected file(s) upload have completed. Then, bluesky tells DM to start the workflow and monitors it until it completes.

It is a choice of the workflow if more than one of the same kind of workflow can execute at the same time. Some workflows expect specific files to exist and may not tolerate their manipulation by another workflow running at the same time. DM can be configured to control this with a scheduling queue. Alternatively, this decision process could be built into the bluesky plan.

The general outline:

  1. Bluesky plan

    1. assembles run metadata dictionary

    2. prepares instrument for streaming data collection

    3. initiates data collection

    4. waits for data collection to finish

    5. waits for any DAQs to complete expected uploads (if required)

    6. waits for any existing workflows to finish (if required)

    7. starts DM workflow

    8. monitors workflow in the background (periodic reports)

    9. uploads run metadata to DM

  2. DM workflow

    1. execute workflow steps as programmed

See this example file-based bluesky data acquisition plan.

Streaming-based#

In a streaming data collection and workflow, the workflow must be started first so it can setup the tools to receive data that will be streamed. A common tool to use for this interprocess communication is EPICS PVAccess. PVAccess is preferred since it can comunicate structured data. It may be easier to communicate across network boundaries than EPICS ChannelAccess.

Without a detailed description or code, here is an outline of a possible streaming-based data collection and workflow with bluesky and DM.

  1. Bluesky plan

    1. creates a PVA object for reports from the workflow

    2. starts DM workflow, passing the name of its PVA object

    3. assembles run metadata dictionary

    4. prepares instrument for streaming data collection

    5. connects with any PVA objects from workflow, as needed

    6. waits for DM workflow to become ready

  2. DM workflow

    1. execute workflow steps as programmed

      1. connects with bluesky PVA object

      2. creates its own PVA object for commands from workflow

      3. prepares itself for streaming data

      4. signals to bluesky that it is ready

  3. data acquisition stream(s)

    1. bluesky initiates data collection

    2. DM workflow receives data

    3. either process signals the other while data is being collected

  4. data collection finishes

    1. Either DM workflow signals or Bluesky signals

    2. bluesky reports on further progress of workflow

  5. DM workflow finishes

  6. Bluesky uploads run metadata to DM