
Various utilities to help APS use the Bluesky framework.

Also consult the Index under the apstools heading for links to the Exceptions, and Utilities described here.

Utilities by Activity#


findbyname(oname[, force_rebuild, ns])

Find the ophyd (dotted name) object associated with the given ophyd name.

findbypv(pvname[, force_rebuild, ns])

Find all ophyd objects associated with the given EPICS PV.


Return a dictionary of databroker catalogs in the default namespace.


listdevice(obj[, scope, cname, dname, ...])

Describe the signal information from device obj in a pandas DataFrame.

listobjects([show_pv, printing, verbose, ...])

Show all the ophyd Signal and Device objects defined as globals.

listplans([base, trunc, table_style])

List all plans.

listRunKeys(scan_id[, key_fragment, db, ...])

Convenience function to list all keys (column names) in the scan's stream (default: primary).

ListRuns([cat, query, keys, missing, num, ...])

List the runs from the given catalog according to some options.

listruns([cat, keys, missing, num, ...])

List runs from catalog.


print_RE_md([dictionary, fmt, printing])

custom print the RunEngine metadata in a table

file_log_handler(file_name_base[, maxBytes, ...])

Record logging output to a file.


Return a path to ./.logs.

setup_IPython_console_logging([logger, ...])

Record all input (In) and output (Out) from IPython console.

stream_log_handler([formatter, level])

Record logging output to a stream (such as the console).

SlitGeometry(width, height, x, y)

Slit size and center as a named tuple

Other Utilities#


Configure data management from its bash setup script.


convert text so it can be used as a dictionary key

connect_pvlist(pvlist[, wait, timeout, ...])

Given list of EPICS PV names, return dict of EpicsSignal objects.


send email notifications when requested

select_live_plot(bec, signal)

Get the first live plot that matches signal.

select_mpl_figure(x, y)

Get the MatPlotLib Figure window for y vs x.

trim_plot_by_name([n, plots])

Find the plot(s) by name and replot with at most the last n lines.

trim_plot_lines(bec, n, x, y)

Find the plot with axes x and y and replot with at most the last n lines.


String must not exceed EPICS PV length.

unix(command[, raises])

Run a UNIX command, returns (stdout, stderr).



convert text so it can be used as a dictionary key

command_list_as_table(commands[, show_raw])

format a command list as a pyRestTable.Table object

connect_pvlist(pvlist[, wait, timeout, ...])

Given list of EPICS PV names, return dict of EpicsSignal objects.

copy_filtered_catalog(source_cat, target_cat)

copy filtered runs from source_cat to target_cat

db_query(db, query)

Searches the databroker v2 database.

dictionary_table(dictionary, **kwargs)

Return a text table from dictionary.


send email notifications when requested


base class: read-only support for Excel files, treat them like databases

ExcelDatabaseFileGeneric(filename[, ...])

Generic (read-only) handling of Excel spreadsheet-as-database

ExcelReadError(*args, **kwargs)

Exception when reading Excel spreadsheet.

findbyname(oname[, force_rebuild, ns])

Find the ophyd (dotted name) object associated with the given ophyd name.

findbypv(pvname[, force_rebuild, ns])

Find all ophyd objects associated with the given EPICS PV.


Return a dictionary of databroker catalogs in the default namespace.


Return the full dotted name


getDatabase([db, catalog_name])

Return Bluesky database using keyword guides or default choice.



Find the "default" database (has the most recent run).


get the IPython shell's namespace dictionary (or globals() if not found)

getRunData(scan_id[, db, stream, query, use_v1])

Convenience function to get the run's data.

getRunDataValue(scan_id, key[, db, stream, ...])

Convenience function to get value of key in run stream.

getStreamValues(scan_id[, key_fragment, db, ...])

Get values from a previous scan stream in a databroker catalog.


return the name of the current ipython profile or None

itemizer(fmt, items)

Format a list of items.

listdevice(obj[, scope, cname, dname, ...])

Describe the signal information from device obj in a pandas DataFrame.

listobjects([show_pv, printing, verbose, ...])

Show all the ophyd Signal and Device objects defined as globals.

listplans([base, trunc, table_style])

List all plans.

listRunKeys(scan_id[, key_fragment, db, ...])

Convenience function to list all keys (column names) in the scan's stream (default: primary).

ListRuns([cat, query, keys, missing, num, ...])

List the runs from the given catalog according to some options.

listruns([cat, keys, missing, num, ...])

List runs from catalog.


Define parameters that can be overridden from a user configuration file.


break a list (or other iterable) into pairs

plotxy(runs, xname, yname[, append, cat, ...])

Plot y vs x from a bluesky run.

print_RE_md([dictionary, fmt, printing])

custom print the RunEngine metadata in a table

quantify_md_key_use([key, db, catalog_name, ...])

Print table of different key values and how many times each appears.

redefine_motor_position(motor, new_position)

Set EPICS motor record's user coordinate to new_position.

replay(headers[, callback, sort])

Replay the document stream from one (or more) scans (headers).


return memory used by this process


(decorator) run func in thread


make text safe to be used as an ophyd object name

select_live_plot(bec, signal)

Get the first live plot that matches signal.

select_mpl_figure(x, y)

Get the MatPlotLib Figure window for y vs x.


splits a line into words some of which might be quoted

summarize_runs([since, db])

Report bluesky run metrics from the databroker.


Encode source using the default codepoint.

to_unicode_or_bust(obj[, encoding])

from: http://farmdev.com/talks/unicode/ .

trim_plot_by_name([n, plots])

Find the plot(s) by name and replot with at most the last n lines.

trim_plot_lines(bec, n, x, y)

Find the plot with axes x and y and replot with at most the last n lines.


String must not exceed EPICS PV length.

unix(command[, raises])

Run a UNIX command, returns (stdout, stderr).


Setup for for this beam line’s APS Data Management Python API client.

This setup must be done before the first DM_WorkflowConnector() object is created. The setup_file is the bash shell script provided by the APS Data Management for the user’s account.


Configure data management from its bash setup script.

The return result defines the BDP_WORKFLOW_OWNER symbol.

Working with databroker catalogs#

copy_filtered_catalog(source_cat, target_cat)

copy filtered runs from source_cat to target_cat


Return a dictionary of databroker catalogs in the default namespace.


getDatabase([db, catalog_name])

Return Bluesky database using keyword guides or default choice.



Find the "default" database (has the most recent run).

getStreamValues(scan_id[, key_fragment, db, ...])

Get values from a previous scan stream in a databroker catalog.

quantify_md_key_use([key, db, catalog_name, ...])

Print table of different key values and how many times each appears.

apstools.utils.catalog.copy_filtered_catalog(source_cat, target_cat, query=None)[source]#

copy filtered runs from source_cat to target_cat



obj : instance of databroker.Broker or databroker.catalog[name]


obj : instance of databroker.Broker or databroker.catalog[name]


dict : mongo query dictionary, used to filter the results (default: {})

see: https://docs.mongodb.com/manual/reference/operator/query/


    {'plan_name': 'snapshot'})

Return a dictionary of databroker catalogs in the default namespace.

apstools.utils.catalog.getDatabase(db=None, catalog_name=None)[source]#

Return Bluesky database using keyword guides or default choice.



object : Bluesky database, an instance of databroker.catalog (default: see catalog_name keyword argument)


str : Name of databroker v2 catalog, used when supplied db is None. (default: catalog with most recent run timestamp)


object or None:

Bluesky database, an instance of databroker.catalog

(new in release 1.4.0)


Find the “default” database (has the most recent run).

Note that here, database and catalog mean the same.

This routine looks at all the database instances defined in the current session (console or notebook). If there is only one or no database instances defined as objects in the current session, the choice is simple. When there is more than one database instance in the current session, then the one with the most recent run timestamp is selected. In the case (as happens when starting with a new database) that the current database has no runs and another database instance is defined in the session and that additional database has runs in it (such as the previous database), then the database with the newest run timestamp (and not the newer empty database) will be chosen.


object or None:

Bluesky database, an instance of databroker.catalog

(new in release 1.4.0)

apstools.utils.catalog.getStreamValues(scan_id, key_fragment='', db=None, stream='baseline', query=None, use_v1=True)[source]#

Get values from a previous scan stream in a databroker catalog.

Optionally, select only those data with names including key_fragment.


If the output is truncated, use pd.set_option('display.max_rows', 300) to increase the number of rows displayed.



int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).


str : Part or all of key name to be found in selected stream. For instance, if you specify key_fragment="lakeshore", it will return all the keys that include lakeshore.


object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.


str : Name of the bluesky data stream to obtain the data. Default: ‘baseline’


dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/


bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)


object :

pandas DataFrame with values from selected stream, search_string, and query

see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html

(new in apstools 1.5.1)

apstools.utils.catalog.quantify_md_key_use(key=None, db=None, catalog_name=None, since=None, until=None, query=None)[source]#

Print table of different key values and how many times each appears.


key str :

one of the metadata keys in a run’s start document (default: plan_name)

db object :

Instance of databroker v1 Broker or v2 catalog (default: see catalog_name keyword argument)

catalog_name str :

Name of databroker v2 catalog, used when supplied db is None. (default: mongodb_config)

since str :

include runs that started on or after this ISO8601 time (default: 1995-01-01)

until str :

include runs that started before this ISO8601 time (default: 2100-12-31)

query dict :

mongo query dictionary, used to filter the results (default: {})

see: https://docs.mongodb.com/manual/reference/operator/query/


quantify_md_key_use(key="plan_name", catalog_name="9idc", since="2020-07")
quantify_md_key_use(key="beamline_id", catalog_name="9idc")
                    query={'plan_name': 'Flyscan'},
                    until="2020-06-21 21:51")
quantify_md_key_use(catalog_name="8id", since="2020-01", until="2020-03")

In [8]: quantify_md_key_use(catalog_name="apstools_test")
========= =====
plan_name #runs
========= =====
count     26
scan      27
========= =====

In [9]: quantify_md_key_use(catalog_name="usaxs_test")
========================== =====
plan_name                  #runs
========================== =====
Flyscan                    1
TuneAxis.tune              1
count                      1
measure_USAXS_Transmission 1
run_Excel_file             1
snapshot                   1
tune_a2rp                  1
tune_ar                    1
tune_m2rp                  1
tune_mr                    1
========================== =====

Device information#

listdevice(obj[, scope, cname, dname, ...])

Describe the signal information from device obj in a pandas DataFrame.

apstools.utils.device_info.listdevice(obj, scope=None, cname=False, dname=True, show_pv=False, use_datetime=True, show_ancient=True, max_column_width=None, table_style=TableStyle.pyRestTable)[source]#

Describe the signal information from device obj in a pandas DataFrame.

Look through all subcomponents to find all the signals to be shown. Components that are disconnected will be skipped and a warning logged.



object : Instance of ophyd Signal or Device.


str or None : Scope of content to be shown.

  • "full" (or None) shows all Signal components

  • "epics" shows only EPICS-based Signals

  • "read" shows only the signals returned by obj.read()

default: None


bool : Show the _control_ (Python, dotted) name in column name.

default: False


bool : Show the _data_ (databroker, with underlines) name in column data name.

default: True


bool : Show the EPICS process variable (PV) name in column PV.

default: False

use_datetime bool :

Show the EPICS timestamp (time of last update) in column timestamp.

default: True

show_ancient bool :

Show uninitialized EPICS process variables.

In EPICS, an uninitialized PV has a timestamp of 1990-01-01 UTC. This option enables or suppresses ancient values identified by timestamp from 1989. These are values only defined in the original .db file.

default: True

max_column_width int or None :

Truncate long columns to no more than this length. If not default, then table will be formatted using pyRestTable.

default: None (will use 50)

table_style object :

Either apstools.utils.TableStyle.pandas (default) or using values from apstools.utils.TableStyle.


pandas.DataFrame wll truncate long text to at most 50 characters.

email Support#


send email notifications when requested

class apstools.utils.email.EmailNotifications(sender=None)[source]#

send email notifications when requested

use default OS mail utility (so no credentials needed)


Send email(s) when feedback_limits_approached (a hypothetical boolean) is True:

# setup
from apstools.utils import EmailNotifications

SENDER_EMAIL = "instrument_user@email.host.tld"

email_notices = EmailNotifications(SENDER_EMAIL)
    # This list receives email when send() is called.
    # others?

# ... later

if feedback_limits_approached:
    # send emails to list
    subject = "Feedback problem"
    message = "Feedback is very close to its limits."
    email_notices.send(subject, message)

Statistical peak analysis functions#

Uses pysumreg package (https://prjemian.github.io/pysumreg/) to obtain summary statistics.

analyze_1D(y_arr[, x_arr])

Measures of 1D data peak center & width.


Analyze 2-D (image) data.

apstools.utils.image_analysis.analyze_1D(y_arr, x_arr=None)[source]#

Measures of 1D data peak center & width.

Return result is a dictionary prepared by the to_dict(use_registers=True) method of the pysumreg.SummationRegisters() class.


{'mean_x': 2.0, 'mean_y': 7.2,
'stddev_x': 1.5811388300841898, 'stddev_y': 3.3466401061363027,
'slope': 0.0, 'intercept': 7.2, 'correlation': 0.0,
'centroid': 2.0, 'sigma': 1.1547005383792515,
'min_x': 1, 'max_x': 4, 'min_y': 4, 'max_y': 12,
'x_at_max_y': 2,
'n': 5, 'X': 10, 'Y': 36, 'XX': 30, 'XY': 72,
'XXY': 192, 'YY': 304}

Analyze 2-D (image) data.

Return result is a dictionary with the statistical results for a peak analysis, grouped in pairs (row, column) as it makes sense given frame[rows][columns]. The x values are the index number along the respective axis.

For this image data:

    [0, 1, 2, 1, 0],
    [1, 2, 3, 2, 1],
    [2, 3, 4, 10, 2],
    [1, 2, 3, 2, 1],

This is the analysis:

{'n': (5, 4),
 'centroid': (2.1628, 1.814),
 'sigma': (1.1192, 0.8695),
 'peak_position': (3, 2),
 'max_y': 10}

Directory of the known plans#

listplans([base, trunc, table_style])

List all plans.

apstools.utils.list_plans.listplans(base=None, trunc=50, table_style=TableStyle.pyRestTable)[source]#

List all plans. (Actually, lists all generator functions).

NOTE: Can only detect generator functions. Bluesky plans are generator functions that generate bluesky.Msg objects. There is a PR to define a decorator that identifies a generator function as a bluesky plan.


base object or None :

Object that contains plan methods (if None, use global namespace.) (default: None)

trunc int :

Truncate long docstrings to no more than trunc characters. (default: 50)

table_style object :

Either TableStyle.pyRestTable (default) or TableStyle.pandas, using values from apstools.utils.TableStyle.


pandas.DataFrame wll truncate long text to at most 50 characters.

Directory of bluesky runs#

getRunData(scan_id[, db, stream, query, use_v1])

Convenience function to get the run's data.

getRunDataValue(scan_id, key[, db, stream, ...])

Convenience function to get value of key in run stream.

listRunKeys(scan_id[, key_fragment, db, ...])

Convenience function to list all keys (column names) in the scan's stream (default: primary).

ListRuns([cat, query, keys, missing, num, ...])

List the runs from the given catalog according to some options.

listruns([cat, keys, missing, num, ...])

List runs from catalog.

summarize_runs([since, db])

Report bluesky run metrics from the databroker.

class apstools.utils.list_runs.ListRuns(cat: object = None, query: object = None, keys: object = None, missing: str = '', num: int = 20, reverse: bool = True, since: object = None, sortby: str = 'time', timefmt: str = '%Y-%m-%d %H:%M:%S', until: object = None, ids: Any = None, hints_override: bool = False)[source]#

List the runs from the given catalog according to some options.





Output as pandas DataFrame object


Output as pyRestTable object.


Parse the runs for the given metadata keys.


_get_by_key(md, key)

Get run's metadata value by key.



Search for runs from the catalog.


Check that self.keys is a list of strings.


Parse the runs for the given metadata keys. Return a dict.


Output as pandas DataFrame object


Output as pyRestTable object.

apstools.utils.list_runs.getRunData(scan_id, db=None, stream='primary', query=None, use_v1=True)[source]#

Convenience function to get the run’s data. Default is the primary stream.



int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).


object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.


str : Name of the bluesky data stream to obtain the data. Default: ‘primary’


dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/


bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

(new in apstools 1.5.1)

apstools.utils.list_runs.getRunDataValue(scan_id, key, db=None, stream='primary', query=None, idx=-1, use_v1=True)[source]#

Convenience function to get value of key in run stream.

Defaults are last value of key in primary stream.



int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).


str : Name of the key (data column) in the table of the stream’s data. Must match identically.


object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.


str : Name of the bluesky data stream to obtain the data. Default: ‘primary’


dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/


int or str : List index of value to be returned from column of table. Can be 0 for first value, -1 for last value, "mean" for average value, or "all" for the full list of values. Default: -1


bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

(new in apstools 1.5.1)

apstools.utils.list_runs.listRunKeys(scan_id, key_fragment='', db=None, stream='primary', query=None, strict=False, use_v1=True)[source]#

Convenience function to list all keys (column names) in the scan’s stream (default: primary).



int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).


str : Part or all of key name to be found in selected stream. For instance, if you specify key_fragment="lakeshore", it will return all the keys that include lakeshore.


object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.


str : Name of the bluesky data stream to obtain the data. Default: ‘primary’


dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/


bool : Should the key_fragment be matched identically (strict=True) or matched by lower case comparison (strict=False)? Default: False


bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

(new in apstools 1.5.1)

apstools.utils.list_runs.listruns(cat=None, keys=None, missing='', num=20, printing=None, reverse=True, since=None, sortby='time', tablefmt=None, table_style=TableStyle.pyRestTable, timefmt='%Y-%m-%d %H:%M:%S', until=None, ids=None, hints_override=False, **query)[source]#

List runs from catalog.

This function provides a thin interface to the highly-reconfigurable ListRuns() class in this package.



object : Instance of databroker v1 or v2 catalog.


str or [str] or None: Include these additional keys from the start document. (default: None means "scan_id time plan_name detectors")


str: Test to report when a value is not available. (default: "")

hints_override bool:

For a key that appears in both the metadata and the hints, override the metadata value if the same key is found in the hints. (default: False)


[int] or [str]: List of uid or scan_id value(s). Can mix different kinds in the same list. Also can specify offsets (e.g., -1). According to the rules for databroker catalogs, a string is a uid (partial representations allowed), an int is scan_id if positive or an offset if negative. (default: None)


int : Make the table include the num most recent runs. (default: 20)

printing bool or str :



bool : If True, sort in descending order by sortby. (default: True)


str : include runs that started on or after this ISO8601 time (default: "1995-01-01")


str : Sort columns by this key, found by exact match in either the start or stop document. (default: "time")

tablefmt str :

Deprecated. Use table_style instead.

table_style object :

Either TableStyle.pyRestTable (default) or TableStyle.pandas, using values from apstools.utils.TableStyle.


pandas.DataFrame wll truncate long text to at most 50 characters.


str : The time key (also includes keys "start.time" and "stop.time") will be formatted by the self.timefmt value. See https://strftime.org/ for examples. The special timefmt="raw" is used to report time as the raw value (floating point time as used in python’s time.time()). (default: "%Y-%m-%d %H:%M:%S",)


str : include runs that started before this ISO8601 time (default: 2100-12-31)


dict : Any additional keyword arguments will be passed to the databroker to refine the search for matching runs using the mongoquery package.



None or str or pd.DataFrame() object



(new in release 1.5.0)

apstools.utils.list_runs.summarize_runs(since=None, db=None)[source]#

Report bluesky run metrics from the databroker.

  • How many different plans?

  • How many runs?

  • How many times each run was used?

  • How frequently? (TODO:)



str : Report all runs since this ISO8601 date & time (default: 1995)


object : Instance of databroker.Broker() (default: db from the IPython shell)

Support for logging#

There is a guide describing How to setup logging.

file_log_handler(file_name_base[, maxBytes, ...])

Record logging output to a file.


Return a path to ./.logs.

setup_IPython_console_logging([logger, ...])

Record all input (In) and output (Out) from IPython console.

stream_log_handler([formatter, level])

Record logging output to a stream (such as the console).

apstools.utils.log_utils.file_log_handler(file_name_base, maxBytes=0, backupCount=0, log_path=None, level=None)[source]#

Record logging output to a file.



Part of the name to store the log file. Full name is f"<log_path>/{file_name_base}.log" in present working directory.


Part of the name to store the log file. Full name is f"<log_path>/{file_name_base}.log" in present working directory. default: (the present working directory)/LOG_DIR_BASE


Threshold for reporting messages with this logger. Logging messages which are less severe than level will be ignored. default: 10 (logging.DEBUG or DEBUG) see: https://docs.python.org/3/library/logging.html#levels

maxBytes(optional) int

Log file rollover begins whenever the current log file is nearly maxBytes in length. A new file is written when the current line will push the current file beyond this limit. default: 0

backupCount(optional) int

When backupCount is non-zero, the system will keep up to backupCount numbered log files (with added extensions .1, ‘.2`, …). The current log file always has no numbered extension. The previous log file is the one with the lowest extension number. default: 0


When either maxBytes or backupCount are zero, log file rollover never occurs, so you generally want to set backupCount to at least 1, and have a non-zero maxBytes.


Return a path to ./.logs. Create directory if it does not exist.

apstools.utils.log_utils.setup_IPython_console_logging(logger=None, filename='ipython_console.log', log_path=None)[source]#

Record all input (In) and output (Out) from IPython console.



object: Instance of logging.Logger.


str: Name of the log file. (default: ipython_console.log)


Directory to store the log file. Full name is f"<log_path>/{file_name_base}.log". default: (the present working directory)/LOG_DIR_BASE

apstools.utils.log_utils.stream_log_handler(formatter=None, level='INFO')[source]#

Record logging output to a stream (such as the console).



object: Instance of logging.Formatter.


str: Name of the logging level to report. (default: INFO)

Diagnostic Support for Memory#


return memory used by this process


return memory used by this process

Miscellaneous Support#


convert text so it can be used as a dictionary key

connect_pvlist(pvlist[, wait, timeout, ...])

Given list of EPICS PV names, return dict of EpicsSignal objects.


Dict with number of children of this device.

dictionary_table(dictionary, **kwargs)

Return a text table from dictionary.


Return the full dotted name

itemizer(fmt, items)

Format a list of items.

listobjects([show_pv, printing, verbose, ...])

Show all the ophyd Signal and Device objects defined as globals.


break a list (or other iterable) into pairs

print_RE_md([dictionary, fmt, printing])

custom print the RunEngine metadata in a table

redefine_motor_position(motor, new_position)

Set EPICS motor record's user coordinate to new_position.

replay(headers[, callback, sort])

Replay the document stream from one (or more) scans (headers).


(decorator) run func in thread


make text safe to be used as an ophyd object name


splits a line into words some of which might be quoted


Encode source using the default codepoint.

to_unicode_or_bust(obj[, encoding])

from: http://farmdev.com/talks/unicode/ .


String must not exceed EPICS PV length.

unix(command[, raises])

Run a UNIX command, returns (stdout, stderr).


convert text so it can be used as a dictionary key

Given some input text string, return a clean version remove troublesome characters, perhaps other cleanup as well. This is best done with regular expression pattern matching.

apstools.utils.misc.connect_pvlist(pvlist, wait=True, timeout=2, poll_interval=0.1)[source]#

Given list of EPICS PV names, return dict of EpicsSignal objects.



[str] : list of EPICS PV names


bool : should wait for EpicsSignal objects to connect (default: True)


float : maximum time to wait for PV connections, seconds (default: 2.0)


float : time to sleep between checks for PV connections, seconds (default: 0.1)


Dict with number of children of this device. Keys: Device and Signal.

apstools.utils.misc.dictionary_table(dictionary, **kwargs)[source]#

Return a text table from dictionary.

Dictionary keys in first column, values in second.



dict : Python dictionary

Note: Keyword arguments parameters are kept for compatibility with previous versions of apstools. They are ignored now.



object or None : pyRestTable.Table() object (multiline text table) or None if dictionary has no contents


In [8]: RE.md
Out[8]: {'login_id': 'jemian:wow.aps.anl.gov', 'beamline_id': 'developer', 'proposal_id': None, 'pid': 19072, 'scan_id': 10, 'version': {'bluesky': '1.5.2', 'ophyd': '1.3.3', 'apstools': '1.1.5', 'epics': '3.3.3'}}

In [9]: print(dictionary_table(RE.md))
=========== =============================================================================
key         value
=========== =============================================================================
beamline_id developer
login_id    jemian:wow.aps.anl.gov
pid         19072
proposal_id None
scan_id     10
version     {'bluesky': '1.5.2', 'ophyd': '1.3.3', 'apstools': '1.1.5', 'epics': '3.3.3'}
=========== =============================================================================

Return the full dotted name

The .dotted_name property does not include the name of the root object. This routine adds that.

see: bluesky/ophyd#797

apstools.utils.misc.itemizer(fmt, items)[source]#

Format a list of items.

apstools.utils.misc.listobjects(show_pv=True, printing=None, verbose=False, symbols=None, child_devices=False, child_signals=False, table_style=TableStyle.pyRestTable)[source]#

Show all the ophyd Signal and Device objects defined as globals.



bool : If True, also show relevant EPICS PV, if available. (default: True)

printing bool :



bool : If True, also show str(obj. (default: False)


dict : If None, use global symbol table. If not None, use provided dictionary. (default: globals())


bool : If True, also show how many Devices are children of this device. (default: False)


bool : If True, also show how many Signals are children of this device. (default: False)

table_style object :

Either apstools.utils.TableStyle.pandas (default) or using values from apstools.utils.TableStyle.


pandas.DataFrame wll truncate long text to at most 50 characters.



Instance of pyRestTable.Table()


In [1]: listobjects()
======== ================================ =============
name     ophyd structure                  EPICS PV
======== ================================ =============
adsimdet MySingleTriggerSimDetector       vm7SIM1:
m1       EpicsMotor                       vm7:m1
m2       EpicsMotor                       vm7:m2
m3       EpicsMotor                       vm7:m3
m4       EpicsMotor                       vm7:m4
m5       EpicsMotor                       vm7:m5
m6       EpicsMotor                       vm7:m6
m7       EpicsMotor                       vm7:m7
m8       EpicsMotor                       vm7:m8
noisy    EpicsSignalRO                    vm7:userCalc1
scaler   ScalerCH                         vm7:scaler1
shutter  SimulatedApsPssShutterWithStatus
======== ================================ =============

Out[1]: <pyRestTable.rest_table.Table at 0x7fa4398c7cf8>

In [2]:

(new in apstools release 1.1.8)


break a list (or other iterable) into pairs

s -> (s0, s1), (s2, s3), (s4, s5), ...

In [71]: for item in pairwise("a b c d e fg".split()):
    ...:     print(item)
('a', 'b')
('c', 'd')
('e', 'fg')
apstools.utils.misc.print_RE_md(dictionary=None, fmt='simple', printing=True)[source]#

custom print the RunEngine metadata in a table



dict : Python dictionary


In [4]: print_RE_md()
RunEngine metadata dictionary:
======================== ===================================
key                      value
======================== ===================================
EPICS_HOST_ARCH          linux-x86_64
beamline_id              APS USAXS 9-ID-C
login_id                 usaxs:usaxscontrol.xray.aps.anl.gov
pid                      67933
proposal_id              testing Bluesky installation
scan_id                  0
versions                 ======== =====
                         key      value
                         ======== =====
                         apstools 1.1.3
                         bluesky  1.5.2
                         epics    3.3.1
                         ophyd    1.3.3
                         ======== =====
======================== ===================================
apstools.utils.misc.redefine_motor_position(motor, new_position)[source]#

Set EPICS motor record’s user coordinate to new_position.

apstools.utils.misc.replay(headers, callback=None, sort=True)[source]#

Replay the document stream from one (or more) scans (headers).



run or [run] : Run(s) to be replayed through callback. A run is an instance of a Bluesky databroker.core.BlueskyRun (or the older databroker.Header). see: https://nsls-ii.github.io/databroker/api.html?highlight=header#header-api


run or [run] : The Bluesky callback to handle the stream of documents from a run. If None, then use the bec (BestEffortCallback) from the IPython shell. (default:None)


bool : Sort the headers chronologically if True. (default:True)

(new in apstools release 1.1.11)


(decorator) run func in thread


def progress_reporting():
    logger.debug("progress_reporting is starting")
    # ...

progress_reporting()   # runs in separate thread

make text safe to be used as an ophyd object name

Given some input text string, return a clean version. Remove troublesome characters, perhaps other cleanup as well. This is best done with regular expression pattern matching.

The “sanitized” name fits this regular expression:


Also can be used for safe HDF5 and NeXus names.


splits a line into words some of which might be quoted


FlyScan 0   0   0   blank
FlyScan 5   2   0   "empty container"
FlyScan 5   12   0   "even longer name"
SAXS 0 0 0 blank
SAXS 0 0 0 "blank"


['FlyScan', '0', '0', '0', 'blank']
['FlyScan', '5', '2', '0', 'empty container']
['FlyScan', '5', '12', '0', 'even longer name']
['SAXS', '0', '0', '0', 'blank']
['SAXS', '0', '0', '0', 'blank']

Encode source using the default codepoint.

apstools.utils.misc.to_unicode_or_bust(obj, encoding='utf-8')[source]#

from: http://farmdev.com/talks/unicode/ .


String must not exceed EPICS PV length.

apstools.utils.misc.unix(command, raises=True)[source]#

Run a UNIX command, returns (stdout, stderr).



str : UNIX command to be executed


bool : If True, will raise exceptions as needed, default: True


Define parameters that can be overridden from a user configuration file.


Create an overrides object in a new file override_params.py:

import apstools.utils
overrides = apstools.utils.OverrideParameters()

When code supports a parameter for which a user can provide a local override, the code should import the overrides object (from the override_params module), and then register the parameter name, such as this example:

from override_params import overrides

Then later:

minstep = overrides.pick("minimum_step", 45e-6)

In the user’s configuration file that will override the value of 45e-6 (such as can be loaded via %run -i user.py), import the overrides` object (from the override_params module):

from override_params import overrides

and then override the attribute(s) as desired:

overrides.set("minimum_step", 1.0e-5)

With this override in place, the minstep value (from pick()) will be 1e-5.

Get a pandas DataFrame object with all the overrides:


which returns this table:

    parameter    value
0  minimum_step  0.00001


Define parameters that can be overridden from a user configuration file.

class apstools.utils.override_parameters.OverrideParameters[source]#

Define parameters that can be overridden from a user configuration file.

NOTE: This is a pure Python object, not using ophyd.

pick(parameter, default)

Return either the override parameter value if defined, or the default.


Register a new parameter name to be supported by user overrides.


Remove an override value for a known parameter.


Remove override values for all known parameters.

set(parameter_name, value)

Define an override value for a known parameter.


Return a pandas DataFrame with all overrides.

(new in apstools 1.5.2)

pick(parameter, default)[source]#

Return either the override parameter value if defined, or the default.


Register a new parameter name to be supported by user overrides.


Remove an override value for a known parameter. (sets it to undefined)


Remove override values for all known parameters. (sets all to undefined)

set(parameter_name, value)[source]#

Define an override value for a known parameter.


Return a pandas DataFrame with all overrides.

Parameter names that have no override value will be reported as --undefined--.

Plot Support#

plotxy(runs, xname, yname[, append, cat, ...])

Plot y vs x from a bluesky run.

select_mpl_figure(x, y)

Get the MatPlotLib Figure window for y vs x.

select_live_plot(bec, signal)

Get the first live plot that matches signal.

trim_plot_lines(bec, n, x, y)

Find the plot with axes x and y and replot with at most the last n lines.

trim_plot_by_name([n, plots])

Find the plot(s) by name and replot with at most the last n lines.

apstools.utils.plot.plotxy(runs, xname, yname, append=False, cat=None, stats=True, stream='primary', title=None)[source]#

Plot y vs x from a bluesky run.

Note: This is not a bluesky plan. Call it as a normal Python function.


runs[run] or run:

List or runs or single run. A run is either a bluesky.core.BlueskyRun object or a reference (uid, scan_id, relative to most recent) to a BlueskyRun in the catalog.


Name of the signal to plot on the x axis.


Name of the signal to plot on the y axis.


(optional) If True, append to existing plot window. Default: append=False


(optional) Catalog to be used for finding a run by reference. Default: return value from apstools.utils.getCatalog()


(optional) If True, compute and plot centroid and FWHM (computed from sigma). Default: stats=True


(optional) Name of the data stream in which to find “xname” and “yname”. Default: stream="primary"


(optional) Title to show on this plot. Default: Metadata “title” keyword of first run (if found) or scan_id and starting date/time of first run.


Returns a dict of statistics for each run indexed by scan_id, if stats=True, else None. A computed fwhm key is added to the statistics.

New in release 1.6.10.

apstools.utils.plot.select_live_plot(bec, signal)[source]#

Get the first live plot that matches signal.



object: instance of bluesky.callbacks.best_effort.BestEffortCallback


object: The Y axis object (an ophyd.Signal)



Instance of bluesky.callbacks.best_effort.LivePlotPlusPeaks() or None

apstools.utils.plot.select_mpl_figure(x, y)[source]#

Get the MatPlotLib Figure window for y vs x.



object: X axis object (an ophyd.Signal)


ophyd object: X axis object (an ophyd.Signal)


object or None:

Instance of matplotlib.pyplot.Figure()

apstools.utils.plot.trim_plot_by_name(n=3, plots=None)[source]#

Find the plot(s) by name and replot with at most the last n lines.

Note: this is not a bluesky plan. Call it as normal Python function.

It is recommended to call trim_plot_by_name() before the scan(s) that generate plots. Plots are generated from a RunEngine callback, executed after the scan completes.



int : number of plots to keep


str, [str], or None : name(s) of plot windows to trim (default: all plot windows)


trim_plot_by_name()   # default of n=3, apply to all plots
trim_plot_by_name(5)  # change from default of n=3
trim_plot_by_name(5, "noisy_det vs motor")  # just this plot
    ["noisy_det vs motor", "det noisy_det vs motor"]]


# use simulators from ophyd
from bluesky import plans as bp
from bluesky import plan_stubs as bps
from ophyd.sim import *

snooze = 0.25

def scan_set():
    yield from bp.scan([noisy_det], motor, -1, 1, 5)
    yield from bp.scan([noisy_det, det], motor, -2, 1, motor2, 3, 1, 6)
    yield from bps.sleep(snooze)

# repeat the_scans 15 times
uids = RE(bps.repeat(scan_set, 15))

(new in release 1.3.5)

apstools.utils.plot.trim_plot_lines(bec, n, x, y)[source]#

Find the plot with axes x and y and replot with at most the last n lines.

Note: trim_plot_lines() is not a bluesky plan. Call it as normal Python function.


trim_plot_lines(bec, 1, m1, noisy)



object : instance of BestEffortCallback


int : number of plots to keep


object : instance of ophyd.Signal (or subclass), independent (x) axis


object : instance of ophyd.Signal (or subclass), dependent (y) axis

(new in release 1.3.5)

Support for IPython profiles#


get the IPython shell's namespace dictionary (or globals() if not found)


return the name of the current ipython profile or None


get the IPython shell's namespace dictionary (or empty if not found)


get the IPython shell’s namespace dictionary (or globals() if not found)


return the name of the current ipython profile or None

Example (add to default RunEngine metadata):

RE.md['ipython_profile'] = str(ipython_profile_name())
print("using profile: " + RE.md['ipython_profile'])

get the IPython shell’s namespace dictionary (or empty if not found)

EPICS PV Registry#

findbyname(oname[, force_rebuild, ns])

Find the ophyd (dotted name) object associated with the given ophyd name.

findbypv(pvname[, force_rebuild, ns])

Find all ophyd objects associated with the given EPICS PV.


Cross-reference EPICS PVs with ophyd EpicsSignalBase objects.

class apstools.utils.pvregistry.PVRegistry(ns=None)[source]#

Cross-reference EPICS PVs with ophyd EpicsSignalBase objects.

Search for ophyd object by ophyd name.


Search for PV in both read & write modes.

search_by_mode(pvname, mode='R')[source]#

Search for PV in specified mode.

apstools.utils.pvregistry.findbyname(oname, force_rebuild=False, ns=None)[source]#

Find the ophyd (dotted name) object associated with the given ophyd name.



str : ophyd name to search


bool : If True, rebuild the internal registry that maps ophyd names to ophyd objects.


dict or None : Namespace dictionary of Python objects.


str or None:

Name of the ophyd object.


In [45]: findbyname("adsimdet_cam_acquire")
Out[45]: 'adsimdet.cam.acquire'

(new in apstools 1.5.0)

apstools.utils.pvregistry.findbypv(pvname, force_rebuild=False, ns=None)[source]#

Find all ophyd objects associated with the given EPICS PV.



str : EPICS PV name to search


bool : If True, rebuild the internal registry that maps EPICS PV names to ophyd objects.


dict or None : Namespace dictionary of Python objects.


dict or None:

Dictionary of matching ophyd objects, keyed by how the PV is used by the ophyd signal. The keys are read and write.


In [45]: findbypv("ad:cam1:Acquire")
Out[45]: {'read': [], 'write': ['adsimdet.cam.acquire']}

In [46]: findbypv("ad:cam1:Acquire_RBV")
Out[46]: {'read': ['adsimdet.cam.acquire'], 'write': []}

Searching databroker catalogs#

db_query(db, query)

Searches the databroker v2 database.

apstools.utils.query.db_query(db, query)[source]#

Searches the databroker v2 database.



object : Bluesky database, an instance of databroker.catalog.


dict : Search parameters.


object :

Bluesky database, an instance of databroker.catalog satisfying the query parameters.

See also


Common support of slits

SlitGeometry(width, height, x, y)

Slit size and center as a named tuple

class apstools.utils.slit_core.SlitGeometry(width, height, x, y)#

Slit size and center as a named tuple


Alias for field number 1


Alias for field number 0


Alias for field number 2


Alias for field number 3

Spreadsheet Support#


base class: read-only support for Excel files, treat them like databases

ExcelDatabaseFileGeneric(filename[, ...])

Generic (read-only) handling of Excel spreadsheet-as-database

ExcelReadError(*args, **kwargs)

Exception when reading Excel spreadsheet.

class apstools.utils.spreadsheet.ExcelDatabaseFileBase(ignore_extra=True)[source]#

base class: read-only support for Excel files, treat them like databases

Use this class when creating new, specific spreadsheet support.


Show how to read an Excel file where one of the columns contains a unique key. This allows for random access to each row of data by use of the key.

class ExhibitorsDB(ExcelDatabaseFileBase):
    content for exhibitors from the Excel file
    EXCEL_FILE = pathlib.Path("resources") / "exhibitors.xlsx"
    LABELS_ROW = 2

    def handle_single_entry(self, entry):
        '''any special handling for a row from the Excel file'''

    def handleExcelRowEntry(self, entry):
        '''identify unique key (row of the Excel file)'''
        key = entry["Name"]
        self.db[key] = entry
class apstools.utils.spreadsheet.ExcelDatabaseFileGeneric(filename, labels_row=3, ignore_extra=True)[source]#

Generic (read-only) handling of Excel spreadsheet-as-database


This is the class to use when reading Excel spreadsheets.

In the spreadsheet, the first sheet should contain the table to be used. By default (see keyword parameter labels_row), the table should start in cell A4. The column labels are given in row 4. A blank column should appear to the right of the table (see keyword parameter ignore_extra). The column labels will describe the action and its parameters. Additional columns may be added for metadata or other purposes.

The rows below the column labels should contain actions and parameters for those actions, one action per row.

To make a comment, place a # in the action column. A comment should be ignored by the bluesky plan that reads this table. The table will end with a row of empty cells.

While it’s a good idea to put the action column first, that is not necessary. It is not even necessary to name the column action. You can re-arrange the order of the columns and change their names as long as the column names match what text strings your Python code expects to find.

A future upgrade [1] will allow the table boundaries to be named by Excel when using Excel’s Format as Table [2] feature. For now, leave a blank row and column at the bottom and right edges of the table.



str : name (absolute or relative) of Excel spreadsheet file


int : Row (zero-based numbering) of Excel file with column labels, default: 3 (Excel row 4)


bool : When True, ignore any cells outside of the table, default: True.

Note that when True, a row of cells within the table will be recognized as the end of the table, even if there are actions in following rows. To force an empty row, use a comment symbol # (actually, any non-empty content will work).

When False, cells with other information (in Sheet 1) will be made available, sometimes with unpredictable results.


See section The run_command_file() plan – batch scans using a text file for more examples.

(See also example screen shot.) Table (on Sheet 1) begins on row 4 in first column:

1  |  some text here, maybe a title
2  |  (could have content here)
3  |  (or even more content here)
4  |  action  | sx   | sy   | sample     | comments          |  | <-- leave empty column
5  |  close   |      |                   | close the shutter |  |
6  |  image   | 0    | 0    | dark       | dark image        |  |
7  |  open    |      |      |            | open the shutter  |  |
8  |  image   | 0    | 0    | flat       | flat field image  |  |
9  |  image   | 5.1  | -3.2 | 4140 steel | heat 9172634      |  |
10 |  scan    | 5.1  | -3.2 | 4140 steel | heat 9172634      |  |
11 |  scan    | 0    | 0    | blank      |                   |  |
12 |
13 |  ^^^ leave empty row ^^^
14 | (could have content here)

Example python code to read this spreadsheet:

from apstools.utils import ExcelDatabaseFileGeneric, cleanupText

def myExcelPlan(xl_file, md={}):
    excel_file = pathlib.Path(xl_file).absolute()
    xl = ExcelDatabaseFileGeneric(excel_file)
    for i, row in xl.db.values():
        # prepare the metadata
        _md = {cleanupText(k): v for k, v in row.items()}
        _md["xl_file"] = xl_file
        _md["excel_row_number"] = i+1
        _md.update(md) # overlay with user-supplied metadata

        # determine what action to take
        action = row["action"].lower()
        if action == "open":
            yield from bps.mv(shutter, "open")
        elif action == "close":
            yield from bps.mv(shutter, "close")
        elif action == "image":
            # your code to take an image, given **row as parameters
            yield from my_image(**row, md=_md)
        elif action == "scan":
            # your code to make a scan, given **row as parameters
            yield from my_scan(**row, md=_md)
            print(f"no handling for row {i+1}: action={action}")

# execute this plan through the RunEngine
RE(myExcelPlan("spreadsheet.xlsx", md=dict(purpose="apstools demo"))

use row number as the unique key

class apstools.utils.spreadsheet.ExcelReadError(*args: Any, **kwargs: Any)[source]#

Exception when reading Excel spreadsheet.