Plans#

Plans that might be useful at the APS when using Bluesky.

Plans and Support by Activity#

Batch Scanning#

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

summarize_command_file(filename)

print the command list from a text or Excel file

Custom Scans#

documentation_run(text[, stream, bec, md])

Save text as a bluesky run.

label_stream_decorator

Decorator support: Write labeled device(s) to stream(s).

lineup(detectors, axis, minus, plus, npts[, ...])

Lineup and center a given axis, relative to current position.

nscan(detectors, *motor_sets[, num, ...])

Scan over n variables moved together, each in equally spaced steps.

sscan_1D(sscan[, poll_delay_s, ...])

simple 1-D scan using EPICS synApps sscan record

TuneAxis(signals, axis[, signal_name])

tune an axis with a signal

tune_axes(axes)

Bluesky plan to tune a list of axes in sequence.

Overall#

addDeviceDataAsStream(devices, label)

Renamed to write_stream().

command_list_as_table(commands[, show_raw])

format a command list as a pyRestTable.Table object

documentation_run(text[, stream, bec, md])

Save text as a bluesky run.

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

lineup(detectors, axis, minus, plus, npts[, ...])

Lineup and center a given axis, relative to current position.

nscan(detectors, *motor_sets[, num, ...])

Scan over n variables moved together, each in equally spaced steps.

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

label_stream_decorator

Decorator support: Write labeled device(s) to stream(s).

label_stream_stub([labels, fmt, bec])

Writes ophyd-labeled objects to open bluesky run streams.

label_stream_wrapper(plan, labels[, fmt, when])

Decorator support: Write labeled device(s) to stream(s).

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

request_input([msg, default, agree, bypass])

Request input from the user.

restorable_stage_sigs

Save stage_sigs from each device and restore after the user_plan.

run_blocking_function(function, *args, **kwargs)

Run a blocking function as a bluesky plan, in a thread.

sscan_1D(sscan[, poll_delay_s, ...])

simple 1-D scan using EPICS synApps sscan record

summarize_command_file(filename)

print the command list from a text or Excel file

TuneAxis(signals, axis[, signal_name])

tune an axis with a signal

tune_axes(axes)

Bluesky plan to tune a list of axes in sequence.

write_stream(devices, label)

add an ophyd Device as an additional document stream

Also consult the Index under the Bluesky heading for links to the Callbacks, Devices, Exceptions, and Plans described here.

Submodules#

Alignment plans#

lineup(detectors, axis, minus, plus, npts[, ...])

Lineup and center a given axis, relative to current position.

tune_axes(axes)

Bluesky plan to tune a list of axes in sequence.

TuneAxis(signals, axis[, signal_name])

tune an axis with a signal

TuneResults(*args, **kwargs)

Provides bps.read() as a Device

class apstools.plans.alignment.TuneAxis(signals, axis, signal_name=None)[source]#

tune an axis with a signal

This class provides a tuning object so that a Device or other entity may gain its own tuning process, keeping track of the particulars needed to tune this device again. For example, one could add a tuner to a motor stage:

motor = EpicsMotor("xxx:motor", "motor")
motor.tuner = TuneAxis([det], motor)

Then the motor could be tuned individually:

RE(motor.tuner.tune(md={"activity": "tuning"}))

or the tune() could be part of a plan with other steps.

Example:

tuner = TuneAxis([det], axis)
live_table = LiveTable(["axis", "det"])
RE(tuner.multi_pass_tune(width=2, num=9), live_table)
RE(tuner.tune(width=0.05, num=9), live_table)

Also see the jupyter notebook tited Demonstrate TuneAxis() in the Guides section.

tune([width, num, peak_factor, md])

Bluesky plan to execute one pass through the current scan range

multi_pass_tune([width, step_factor, num, ...])

Bluesky plan for tuning this axis with this signal

peak_detected([peak_factor])

returns True if a peak was detected, otherwise False

SEE ALSO

tune_axes(axes)

Bluesky plan to tune a list of axes in sequence.

multi_pass_tune(width=None, step_factor=None, num=None, pass_max=None, peak_factor=None, snake=None, md=None)[source]#

Bluesky plan for tuning this axis with this signal

Execute multiple passes to refine the centroid determination. Each subsequent pass will reduce the width of scan by step_factor. If snake=True then the scan direction will reverse with each subsequent pass.

PARAMETERS

width

float : width of the tuning scan in the units of self.axis Default value in self.width (initially 1)

num

int : number of steps Default value in self.num (initially 10)

step_factor

float : This reduces the width of the next tuning scan by the given factor. Default value in self.step_factor (initially 4)

pass_max

int : Maximum number of passes to be executed (avoids runaway scans when a centroid is not found). Default value in self.pass_max (initially 4)

peak_factor

float : peak maximum must be greater than peak_factor*minimum (default: 4)

snake

bool : If True, reverse scan direction on next pass. Default value in self.snake (initially True)

md

dict : (optional) metadata

peak_detected(peak_factor=None)[source]#

returns True if a peak was detected, otherwise False

The default algorithm identifies a peak when the maximum value is four times the minimum value. Change this routine by subclassing TuneAxis and override peak_detected().

tune(width=None, num=None, peak_factor=None, md=None)[source]#

Bluesky plan to execute one pass through the current scan range

Scan self.axis centered about current position from -width/2 to +width/2 with num observations. If a peak was detected (default check is that max >= 4*min), then set self.tune_ok = True.

PARAMETERS

width

float : width of the tuning scan in the units of self.axis Default value in self.width (initially 1)

num

int : number of steps Default value in self.num (initially 10)

md

dict : (optional) metadata

class apstools.plans.alignment.TuneResults(*args: Any, **kwargs: Any)[source]#

Provides bps.read() as a Device

apstools.plans.alignment.lineup(detectors, axis, minus, plus, npts, time_s=0.1, peak_factor=4, width_factor=0.8, feature='cen', rescan=True, bec=None, md=None)[source]#

Lineup and center a given axis, relative to current position.

If first run identifies a peak, makes a second run to fine tune the result.

PARAMETERS

detectors

[object] or object : Instance(s) of ophyd.Signal (or subclass such as ophyd.scaler.ScalerChannel) dependent measurement to be maximized. If a list, the first signal in the list will be used.

axis

movable object : instance of ophyd.Signal (or subclass such as EpicsMotor) independent axis to use for alignment

minus

float : first point of scan at this offset from starting position

plus

float : last point of scan at this offset from starting position

npts

int : number of data points in the scan

time_s

float : Count time per step (if detectors[0] is ScalerChannel object), other object types not yet supported. (default: 0.1)

peak_factor

float : peak maximum must be greater than peak_factor*minimum (default: 4)

width_factor

float : fwhm must be less than width_factor*plot_range (default: 0.8)

feature

str : One of the parameters returned by BestEffortCallback peak stats. (cen, com, max, min) (default: cen)

rescan

bool : If first scan indicates a peak, should a second scan refine the result? (default: True)

bec

object : Instance of bluesky.callbacks.best_effort.BestEffortCallback. (default: None, meaning look for it from global namespace)

EXAMPLE:

RE(lineup(diode, foemirror.theta, -30, 30, 30, 1.0))
apstools.plans.alignment.tune_axes(axes)[source]#

Bluesky plan to tune a list of axes in sequence.

Expects each axis will have a tuner attribute which is an instance of TuneAxis().

EXAMPLE

Sequentially, tune a list of preconfigured axes:

RE(tune_axes([mr, m2r, ar, a2r])

SEE ALSO

TuneAxis(signals, axis[, signal_name])

tune an axis with a signal

nscan plan#

CommandFileReadError

Exception when reading a command file.

command_list_as_table(commands[, show_raw])

format a command list as a pyRestTable.Table object

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

summarize_command_file(filename)

print the command list from a text or Excel file

exception apstools.plans.command_list.CommandFileReadError[source]#

Exception when reading a command file.

apstools.plans.command_list.command_list_as_table(commands, show_raw=False)[source]#

format a command list as a pyRestTable.Table object

apstools.plans.command_list.execute_command_list(filename, commands, md=None)[source]#

plan: execute the command list

The command list is a tuple described below.

  • Only recognized commands will be executed.

  • Unrecognized commands will be reported as comments.

See example implementation with APS USAXS instrument: APS-USAXS/ipython-usaxs

PARAMETERS

filename

str : Name of input text file. Can be relative or absolute path, such as “actions.txt”, “../sample.txt”, or “/path/to/overnight.txt”.

commands

[command] : List of command tuples for use in execute_command_list()

where

command

tuple : (action, OrderedDict, line_number, raw_command)

action

str : names a known action to be handled

parameters

list : List of parameters for the action. The list is empty of there are no values

line_number

int : line number (1-based) from the input text file

raw_command

str or [str] : contents from input file, such as: SAXS 0 0 0 blank

SEE ALSO

execute_command_list(filename, commands[, md])

plan: execute the command list

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

summarize_command_file(filename)

print the command list from a text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

new in apstools release 1.1.7

apstools.plans.command_list.get_command_list(filename)[source]#

return command list from either text or Excel file

SEE ALSO

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

summarize_command_file(filename)

print the command list from a text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

new in apstools release 1.1.7

apstools.plans.command_list.parse_Excel_command_file(filename)[source]#

parse an Excel spreadsheet with commands, return as command list

TEXT view of spreadsheet (Excel file line numbers shown):

[1] List of sample scans to be run
[2]
[3]
[4] scan    sx  sy  thickness   sample name
[5] FlyScan 0   0   0   blank
[6] FlyScan 5   2   0   blank

PARAMETERS

filename

str : Name of input Excel spreadsheet file. Can be relative or absolute path, such as “actions.xslx”, “../sample.xslx”, or “/path/to/overnight.xslx”.

RETURNS

list of commands

[command] : List of command tuples for use in execute_command_list()

RAISES

FileNotFoundError

if file cannot be found

SEE ALSO

get_command_list(filename)

return command list from either text or Excel file

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

summarize_command_file(filename)

print the command list from a text or Excel file

parse_text_command_file(filename)

parse a text file with commands, return as command list

new in apstools release 1.1.7

apstools.plans.command_list.parse_text_command_file(filename)[source]#

parse a text file with commands, return as command list

  • The text file is interpreted line-by-line.

  • Blank lines are ignored.

  • A pound sign (#) marks the rest of that line as a comment.

  • All remaining lines are interpreted as commands with arguments.

Example of text file (no line numbers shown):

#List of sample scans to be run
# pound sign starts a comment (through end of line)

# action  value
mono_shutter open

# action  x y width height
uslits 0 0 0.4 1.2

# action  sx  sy  thickness   sample name
FlyScan 0   0   0   blank
FlyScan 5   2   0   "empty container"

# action  sx  sy  thickness   sample name
SAXS 0 0 0 blank

# action  value
mono_shutter close

PARAMETERS

filename

str : Name of input text file. Can be relative or absolute path, such as “actions.txt”, “../sample.txt”, or “/path/to/overnight.txt”.

RETURNS

list of commands

[command] : List of command tuples for use in execute_command_list()

RAISES

FileNotFoundError

if file cannot be found

SEE ALSO

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

register_command_handler([handler])

Define the function called to execute the command list

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

summarize_command_file(filename)

print the command list from a text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

new in apstools release 1.1.7

apstools.plans.command_list.register_command_handler(handler=None)[source]#

Define the function called to execute the command list

PARAMETERS

handler obj :

Reference of the execute_command_list function to be used from run_command_file(). If None or not provided, will reset to execute_command_list(), which is also the initial setting.

SEE ALSO

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

register_command_handler([handler])

Define the function called to execute the command list

summarize_command_file(filename)

print the command list from a text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

new in apstools release 1.1.7

apstools.plans.command_list.run_command_file(filename, md=None)[source]#

plan: execute a list of commands from a text or Excel file

  • Parse the file into a command list

  • yield the command list to the RunEngine (or other)

SEE ALSO

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

register_command_handler([handler])

Define the function called to execute the command list

summarize_command_file(filename)

print the command list from a text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

new in apstools release 1.1.7

apstools.plans.command_list.summarize_command_file(filename)[source]#

print the command list from a text or Excel file

SEE ALSO

execute_command_list(filename, commands[, md])

plan: execute the command list

get_command_list(filename)

return command list from either text or Excel file

run_command_file(filename[, md])

plan: execute a list of commands from a text or Excel file

parse_Excel_command_file(filename)

parse an Excel spreadsheet with commands, return as command list

parse_text_command_file(filename)

parse a text file with commands, return as command list

new in apstools release 1.1.7

Documentation of batch runs#

addDeviceDataAsStream(devices, label)

Renamed to write_stream().

documentation_run(text[, stream, bec, md])

Save text as a bluesky run.

write_stream(devices, label)

add an ophyd Device as an additional document stream

apstools.plans.doc_run.addDeviceDataAsStream(devices, label)[source]#

Renamed to write_stream(). Will remove with relase 1.7+.

apstools.plans.doc_run.documentation_run(text, stream=None, bec=None, md=None)[source]#

Save text as a bluesky run.

PARAMETERS

text

str : Text to be written.

stream

str : document stream, default: “primary”

bec

object : Instance of bluesky.BestEffortCallback, default: get from IPython shell

md

dict (optional): metadata dictionary

apstools.plans.doc_run.write_stream(devices, label)[source]#

add an ophyd Device as an additional document stream

Use this within a custom plan, such as this example:

from apstools.plans import addDeviceStream
...
yield from bps.open_run(md={})
# ...
yield from write_stream(prescanDeviceList, "metadata_prescan")
# ...
yield from custom_scan_procedure()
# ...
yield from write_stream(postscanDeviceList, "metadata_postscan")
# ...
yield from bps.close_run()

Request user input in a plan#

request_input([msg, default, agree, bypass])

Request input from the user.

Wrap bps.input_plan() to ask the user a question.

apstools.plans.input_plan.request_input(msg='', default='n', agree='y', bypass=False)[source]#

Request input from the user. Returns True if confirmed.

Return whether (lower case) the response from user (or default) starts with the text of agree or bypass is True.

PARAMETERS

msg str:

Message text to be printed. (default: "")

default str:

Default response if user accepts default. (default: "n")

agree (str or list):

User (or default) response must start with this text for True. If a list of strings is provided, response must match (lower case) one of the strings in the list. (default: "y")

bypass bool:

Allow for automated plans to bypass this request in-place. (default: False)

New in release 1.6.6

Record any labeled objects to streams#

To record positions of all motors at the start of a scan, use @label_stream_decorator("motor") as described next.

EXAMPLE:

First, use the labels=["motor"] keyword when defining your (ophyd) objects. Here’s an example with the motor label (and others, too) used with various objects:

from apstools.plans import label_stream_decorator
from ophyd import Component, Device, EpicsMotor, EpicsSignal

class Sample(Device):
    temperature = Component(EpicsSignal, "t2")  # unlabeled
    pressure = Component(EpicsSignal, "ao14")  # unlabeled
    cryo_stream_on = Component(EpicsSignal, "bo27")  # unlabeled
    x = Component(EpicsMotor, "m14", labels=["motor"])
    y = Component(EpicsMotor, "m15", labels=["motor"])

class Instrument(Device):
    omega = Component(EpicsMotor, "m2", labels=["motor"])
    chi = Component(EpicsMotor, "m3", labels=["motor"])
    phi = Component(EpicsMotor, "m4", labels=["motor"])
    two_theta = Component(EpicsMotor, "m1", labels=["motor"])

    # support table
    x = Component(EpicsMotor, "m7", labels=["motor"])
    y = Component(EpicsMotor, "m8", labels=["motor"])

    sample = Component(Sample, "", labels=["sample"])

diode = EpicsSignal("ioc:counter2", name="diode", labels=["counter"])
instrument = Instrument("ioc:", name="instrument")
m49 = EpicsMotor("ioc:m49", name="m49", labels=["motor"])
scint = EpicsSignal("ioc:counter1", name="scint", labels=["counter"])

Then define a custom plan with the following (where when=”start” is the default):

@label_stream_decorator("motor")
def my_count_plan(dets, md=None):
    _md = md or {}
    yield from bp.count(dets, md=_md)

Call this plan with the bluesky RunEngine:

RE(my_count_plan([scint, diode]))

or from another plan:

yield from my_count_plan([scint, diode])

Once the run is complete, look for the label_start_motor stream that has the positions at the start of the run of all devices with the "motor" label. Such as:

run = cat.v2[-1]  # assume the most recent run
run.label_start_motor.read()

Similarly, to write the motor objects at the end of the run, use the when=”end” keyword:

@label_stream_decorator("motor", when="end")
def my_count_plan(dets, md=None):
    _md = md or {}
    yield from bp.count(dets, md=_md)

writes a stream named label_end_motor at the end of the run. If you want values record at both start and end, then use two decorators:

@label_stream_decorator("motor", when="start")
@label_stream_decorator("motor", when="end")
def my_count_plan(dets, md=None):
    _md = md or {}
    yield from bp.count(dets, md=_md)

label_stream_decorator

Decorator support: Write labeled device(s) to stream(s).

label_stream_stub([labels, fmt, bec])

Writes ophyd-labeled objects to open bluesky run streams.

label_stream_wrapper(plan, labels[, fmt, when])

Decorator support: Write labeled device(s) to stream(s).

When(value[, names, module, qualname, type, ...])

Describes what point of the run the stream(s) should be written.

new in apstools release 1.6.11

class apstools.plans.labels_to_streams.When(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Describes what point of the run the stream(s) should be written.

apstools.plans.labels_to_streams.label_stream_decorator(plan, labels, fmt=None, when='start')#

Decorator support: Write labeled device(s) to stream(s). Either at “start” or “end”.

PARAMETERS

plan

obj: Instance of a bluesky plan.

labels

[str] (or str): List of configured ophyd object “labels”. Passed through to write_label_stream(). Default: None (meaning all).

fmt

str: Format string for stream name(s). Default: "label_{when}_{}"

when

str: Indicates when the stream(s) should be written. Any of these values:

value

stream will be written …

"start"

just after open_run

"end"

just before close_run

When.START

same as "start"

When.END

same as "end"

The str value can be expressed in either upper or lower case.

Default: "start"

new in apstools release 1.6.11

apstools.plans.labels_to_streams.label_stream_stub(labels=None, fmt=None, bec=None)[source]#

Writes ophyd-labeled objects to open bluesky run streams. One stream per label.

PARAMETERS

labels

obj: List of configured ophyd object “labels”. Default: None (meaning all).

fmt

str: Format string for stream name(s). Default: "label_{}"

bec

obj: Instance of bluesky BestEffortCallback. Default: selected from default namespace, if available.

new in apstools release 1.6.11

apstools.plans.labels_to_streams.label_stream_wrapper(plan, labels, fmt=None, when='start')[source]#

Decorator support: Write labeled device(s) to stream(s). Either at “start” or “end”.

PARAMETERS

plan

obj: Instance of a bluesky plan.

labels

[str] (or str): List of configured ophyd object “labels”. Passed through to write_label_stream(). Default: None (meaning all).

fmt

str: Format string for stream name(s). Default: "label_{when}_{}"

when

str: Indicates when the stream(s) should be written. Any of these values:

value

stream will be written …

"start"

just after open_run

"end"

just before close_run

When.START

same as "start"

When.END

same as "end"

The str value can be expressed in either upper or lower case.

Default: "start"

new in apstools release 1.6.11

nscan plan#

nscan(detectors, *motor_sets[, num, ...])

Scan over n variables moved together, each in equally spaced steps.

apstools.plans.nscan_support.nscan(detectors, *motor_sets, num=11, per_step=None, md=None)[source]#

Scan over n variables moved together, each in equally spaced steps.

PARAMETERS

detectors list :

list of ‘readable’ objects

motor_sets list :

sequence of one or more groups of: motor, start, finish

motor object :

any ‘settable’ object (motor, temp controller, etc.)

start float :

starting position of motor

finish float :

ending position of motor

num int :

number of steps (default = 11)

per_step callable :

(optional) hook for customizing action of inner loop (messages per step) Expected signature: f(detectors, step_cache, pos_cache)

md dict

(optional) metadata

See the nscan() example in a Jupyter notebook: BCDA-APS/apstools

apstools.plans.run_blocking_function_plan.run_blocking_function(function, *args, **kwargs)[source]#

Run a blocking function as a bluesky plan, in a thread.

The run_blocking_function() is a bluesky plan which runs function(*args, **kwargs) in a thread so it does not block the RunEngine’s background operations.

It is intended to call blocking code that should not be called directly from a bluesky plan.

USAGE:

yield from run_blocking_function(function, *args, **kwargs)

EXAMPLE:

This example creates a bluesky plan named start_incrementer() which configures a synApps “userCalc” as an automated 10Hz incrementer. Configuration is made by calling the blocking function setup_incrementer_swait(). The incrementer resets to zero at 100,000:

from apstools.plans import run_blocking_function
from apstools.synApps import SwaitRecord
from apstools.synApps import setup_incrementer_swait
import bluesky.plan_stubs as bps

swait = SwaitRecord("ioc:userCalc1", name="swait")

def start_incrementer():
    yield from run_blocking_function(
        setup_incrementer_swait,
        swait,
        scan="0.1 second",
        limit=100_000
    )

# now, run my_plan
RE(my_plan())

(new in release 1.6.3)

sscan Record plans#

sscan_1D(sscan[, poll_delay_s, ...])

simple 1-D scan using EPICS synApps sscan record

apstools.plans.sscan_support.sscan_1D(sscan, poll_delay_s=0.001, phase_timeout_s=60.0, running_stream='primary', final_array_stream=None, device_settings_stream='settings', md=None)[source]#

simple 1-D scan using EPICS synApps sscan record

assumes the sscan record has already been setup properly for a scan

PARAMETERS

sscan Device :

one EPICS sscan record (instance of apstools.synApps.sscanRecord)

running_stream stror None

(default: "primary") Name of document stream to write positioners and detectors data made available while the sscan is running. This is typically the scan data, row by row. If set to None, this stream will not be written.

final_array_stream str or None :

Name of document stream to write positioners and detectors data posted after the sscan has ended. If set to None, this stream will not be written. (default: None)

device_settings_stream str or None :

Name of document stream to write settings of the sscan device. This is all the information returned by sscan.read(). If set to None, this stream will not be written. (default: "settings")

poll_delay_s float :

How long to sleep during each polling loop while collecting interim data values and waiting for sscan to complete. Must be a number between zero and 0.1 seconds. (default: 0.001 seconds)

phase_timeout_s float :

How long to wait after last update of the sscan.FAZE. When scanning, we expect the scan phase to update regularly as positioners move and detectors are triggered. If the scan hangs for some reason, this is a way to end the plan early. To cancel this feature, set it to None. (default: 60 seconds)

NOTE about the document stream names

Make certain the names for the document streams are different from each other. If you make them all the same (such as primary), you will have difficulty when reading your data later on.

Don’t cross the streams!

EXAMPLE

Assume that the chosen sscan record has already been setup.

from apstools.devices import sscanDevice scans = sscanDevice(P, name=”scans”)

from apstools.plans import sscan_1D RE(sscan_1D(scans.scan1), md=dict(purpose=”demo”))

Support for stage_sigs#

Often when writing a plan, it is desired to change the staging of one or more ophyd.Devices by modifying the device.stage_sigs dictionary(ies), then reset them to original values after the plan.

restorable_stage_sigs

Save stage_sigs from each device and restore after the user_plan.

stage_sigs_wrapper(user_plan, devices)

Save stage_sigs from each device and restore after the user_plan.

EXAMPLE:

For example, move the motor at 2 units/second and count the scaler for 0.5 seconds during a custom plan. After the plan, both scaler and motor will have their original .stage_sigs dictionaries.

1@restorable_stage_sigs([scaler, motor])
2def my_plan(start, finish, npts, ct=1, v=1):
3    scaler.stage_sigs = dict(preset_time=ct)
4    motor.stage_sigs = dict(velocity=v)
5    yield from bp.rel_scan([scaler], motor, start, finish, npts)
6
7RE(my_plan(-1, 1, 11, ct=0.5, v=2))

(new in apstools release 1.6.9)

apstools.plans.stage_sigs_support.restorable_stage_sigs(user_plan, devices)#

Save stage_sigs from each device and restore after the user_plan.

The user_plan is free to modify the stage_sigs of each device without further need to preserve original values.

apstools.plans.stage_sigs_support.stage_sigs_wrapper(user_plan, devices)[source]#

Save stage_sigs from each device and restore after the user_plan.

The user_plan is free to modify the stage_sigs of each device without further need to preserve original values.