Utilities#

Assists measurement, data exploration, and the user experience.

Also consult the Index under the apstools heading for links to the Exceptions, and Utilities described here.

Utilities by Activity#

APS Data Management#

dm_setup(setup_file)

Name the APS Data Management bash script that activates its conda environment.

dm_api_cat()

Return the APS Data Management Catalog API object.

dm_api_daq()

Return the APS Data Management Data Acquisition API object.

dm_api_dataset_cat()

Return the APS Data Management Dataset Metadata Catalog API object.

dm_api_ds()

Return the APS Data Management Data Storage API object.

dm_api_file()

Return the APS Data Management File API object.

dm_api_filecat()

Return the APS Data Management Metadata Catalog Service API object.

dm_api_proc()

Return the APS Data Management Processing API object.

DM_WorkflowCache()

Keep track of one or more APS Data Management workflows for bluesky plans.

Finding#

findbyname(oname[, force_rebuild, ns])

Find the ophyd (dotted name) object associated with the given ophyd name.

findbypv(pvname[, force_rebuild, ns])

Find all ophyd objects associated with the given EPICS PV.

findCatalogsInNamespace()

Return a dictionary of databroker catalogs in the default namespace.

Listing#

listdevice(obj[, scope, cname, dname, ...])

Describe the signal information from device obj in a pandas DataFrame.

listobjects([show_pv, printing, verbose, ...])

Show all the ophyd Signal and Device objects defined as globals.

listplans([base, trunc, table_style])

List all plans.

listRunKeys(scan_id[, key_fragment, db, ...])

Convenience function to list all keys (column names) in the scan's stream (default: primary).

ListRuns([cat, query, keys, missing, num, ...])

List the runs from the given catalog according to some options.

listruns([cat, keys, missing, num, ...])

List runs from catalog.

Reporting#

print_RE_md([dictionary, fmt, printing])

custom print the RunEngine metadata in a table

file_log_handler(file_name_base[, maxBytes, ...])

Record logging output to a file.

get_log_path()

Return a path to ./.logs.

setup_IPython_console_logging([logger, ...])

Record all input (In) and output (Out) from IPython console.

stream_log_handler([formatter, level])

Record logging output to a stream (such as the console).

SlitGeometry(width, height, x, y)

Slit size and center as a named tuple

Other Utilities#

dm_setup(setup_file)

Name the APS Data Management bash script that activates its conda environment.

warn_if_not_aps_controls_subnet()

APS-U controls are on private subnets.

cleanupText(text)

convert text so it can be used as a dictionary key

connect_pvlist(pvlist[, wait, timeout, ...])

Given list of EPICS PV names, return dict of EpicsSignal objects.

EmailNotifications([sender])

send email notifications when requested

select_live_plot(bec, signal)

Get the first live plot that matches signal.

select_mpl_figure(x, y)

Get the MatPlotLib Figure window for y vs x.

trim_plot_by_name([n, plots])

Find the plot(s) by name and replot with at most the last n lines.

trim_plot_lines(bec, n, x, y)

Find the plot with axes x and y and replot with at most the last n lines.

trim_string_for_EPICS(msg)

String must not exceed EPICS PV length.

unix(command[, raises])

Run a UNIX command, returns (stdout, stderr).

ts2iso(ts[, sep])

Convert Python timestamp (float) to IS8601 time in current time zone.

General#

call_signature_decorator(f)

Get the names of all function parameters supplied by the caller.

cleanupText(text)

convert text so it can be used as a dictionary key

command_list_as_table(commands[, show_raw])

format a command list as a pyRestTable.Table object

connect_pvlist(pvlist[, wait, timeout, ...])

Given list of EPICS PV names, return dict of EpicsSignal objects.

copy_filtered_catalog(source_cat, target_cat)

copy filtered runs from source_cat to target_cat

db_query(db, query)

Searches the databroker v2 database.

dictionary_table(dictionary, **kwargs)

Return a text table from dictionary.

EmailNotifications([sender])

send email notifications when requested

ExcelDatabaseFileBase([ignore_extra])

base class: read-only support for Excel files, treat them like databases

ExcelDatabaseFileGeneric(filename[, ...])

Generic (read-only) handling of Excel spreadsheet-as-database

ExcelReadError(*args, **kwargs)

Exception when reading Excel spreadsheet.

findbyname(oname[, force_rebuild, ns])

Find the ophyd (dotted name) object associated with the given ophyd name.

findbypv(pvname[, force_rebuild, ns])

Find all ophyd objects associated with the given EPICS PV.

findCatalogsInNamespace()

Return a dictionary of databroker catalogs in the default namespace.

full_dotted_name(obj)

Return the full dotted name

getCatalog([ref])

Return a catalog object.

getDatabase([db, catalog_name])

Return Bluesky database using keyword guides or default choice.

getDefaultCatalog()

Return the default databroker catalog.

getDefaultDatabase()

Find the "default" database (has the most recent run).

getDefaultNamespace([attr])

get the IPython shell's namespace dictionary (or globals() if not found)

getRunData(scan_id[, db, stream, query, use_v1])

Convenience function to get the run's data.

getRunDataValue(scan_id, key[, db, stream, ...])

Convenience function to get value of key in run stream.

getStreamValues(scan_id[, key_fragment, db, ...])

Get values from a previous scan stream in a databroker catalog.

ipython_profile_name()

return the name of the current ipython profile or None

itemizer(fmt, items)

Format a list of items.

listdevice(obj[, scope, cname, dname, ...])

Describe the signal information from device obj in a pandas DataFrame.

listobjects([show_pv, printing, verbose, ...])

Show all the ophyd Signal and Device objects defined as globals.

listplans([base, trunc, table_style])

List all plans.

listRunKeys(scan_id[, key_fragment, db, ...])

Convenience function to list all keys (column names) in the scan's stream (default: primary).

ListRuns([cat, query, keys, missing, num, ...])

List the runs from the given catalog according to some options.

listruns([cat, keys, missing, num, ...])

List runs from catalog.

OverrideParameters()

Define parameters that can be overridden from a user configuration file.

pairwise(iterable)

break a list (or other iterable) into pairs

plotxy(runs, xname, yname[, append, cat, ...])

Plot y vs x from a bluesky run.

print_RE_md([dictionary, fmt, printing])

custom print the RunEngine metadata in a table

quantify_md_key_use([key, db, catalog_name, ...])

Print table of different key values and how many times each appears.

redefine_motor_position(motor, new_position)

Set EPICS motor record's user coordinate to new_position.

render(value[, sig_figs])

Round-off floating-point numbers to sig_figs.

replay(headers[, callback, sort])

Replay the document stream from one (or more) scans (headers).

rss_mem()

return memory used by this process

run_in_thread(func)

(decorator) run func in thread

safe_ophyd_name(text)

make text safe to be used as an ophyd object name

select_live_plot(bec, signal)

Get the first live plot that matches signal.

select_mpl_figure(x, y)

Get the MatPlotLib Figure window for y vs x.

split_quoted_line(line)

splits a line into words some of which might be quoted

summarize_runs([since, db])

Report bluesky run metrics from the databroker.

text_encode(source)

Encode source using the default codepoint.

to_unicode_or_bust(obj[, encoding])

from: http://farmdev.com/talks/unicode/ .

trim_plot_by_name([n, plots])

Find the plot(s) by name and replot with at most the last n lines.

trim_plot_lines(bec, n, x, y)

Find the plot with axes x and y and replot with at most the last n lines.

trim_string_for_EPICS(msg)

String must not exceed EPICS PV length.

unix(command[, raises])

Run a UNIX command, returns (stdout, stderr).

Submodules#

Setup for for this beam line’s APS Data Management Python API client.

FIRST

The dm_setup(setup_file) function must be called first, before any other calls to the dm package. The setup_file argument is the bash script that activates the APS Data Management conda environment for the workstation. That file contains definitions of environment variables needed by the functions below.

dm_setup(setup_file)

Name the APS Data Management bash script that activates its conda environment.

FUNCTIONS

build_run_metadata_dict(user_md, **dm_kwargs)

Return a dictionary for use as Bluesky run metadata.

dm_add_workflow(workflow_file)

Add APS Data Management workflow from file.

dm_api_cat()

Return the APS Data Management Catalog API object.

dm_api_daq()

Return the APS Data Management Data Acquisition API object.

dm_api_dataset_cat()

Return the APS Data Management Dataset Metadata Catalog API object.

dm_api_ds()

Return the APS Data Management Data Storage API object.

dm_api_file()

Return the APS Data Management File API object.

dm_api_filecat()

Return the APS Data Management Metadata Catalog Service API object.

dm_api_proc()

Return the APS Data Management Processing API object.

dm_daq_wait_upload_plan(id[, period])

plan: Wait for DAQ uploads to finish.

dm_file_ready_to_process(experimentFilePath, ...)

Does DM determine the named file is ready for processing?

dm_get_daqs(experimentName)

Return list of APS Data Management DAQ(s) for this experiment.

dm_get_experiment_datadir_active_daq(...)

Return the daqInfo dict for the active DAQ, or 'None'.

dm_get_experiment_file(experiment_name, ...)

Get experiment file.

dm_get_experiment_path(experiment_name)

Return the storageDirectory for the named APS Data Management experiment as a path.

dm_get_experiments([keys, table, default_value])

Get the most recent APS Data Management experiments (for the current station).

dm_get_workflow(workflow_name)

Get named APS Data Management workflow.

dm_source_environ()

Add APS Data Management environment variable definitions to this process.

dm_start_daq(experimentName, dataDirectory, ...)

Start APS DM data acquisition (real-time directory monitoring and file upload).

dm_station_name()

Return the APS Data Management station name or None if not found.

dm_stop_daq(experimentName, dataDirectory)

Stop APS DM data acquisition (real-time directory monitoring and file upload).

dm_update_workflow(workflow_file)

Update APS Data Management workflow from file.

dm_upload(experimentName, dataDirectory, ...)

Start APS DM data acquisition file upload.

get_workflow_last_stage(workflow_name)

Return the name of the last stage in the named APS Data Management workflow.

share_bluesky_metadata_with_dm(...[, ...])

Once a bluesky run ends, share its metadata with APS DM.

validate_experiment_dataDirectory(...)

These bluesky plans use the experiment's 'dataDirectory'.

wait_dm_upload(experiment_name, experiment_file)

(bluesky plan) Wait for APS DM data acquisition to upload a file.

DM_WorkflowCache()

Keep track of one or more APS Data Management workflows for bluesky plans.

see:

https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/Workflow-Processing-Service

class apstools.utils.aps_data_management.DM_WorkflowCache[source]#

Bases: object

Keep track of one or more APS Data Management workflows for bluesky plans.

define_workflow(key, connector)

Add a DM_WorkflowConnector object to be managed.

print_cache_summary([title])

Summarize (in a table) the DM workflows in the cache.

report_dm_workflow_output(final_stage_id)

Print a final (summary) report about a single DM workflow.

wait_workflows([period, wait])

(plan) Wait (if True) for existing workflows to finish.

_update_processing_data()

Update all the workflows in the cache (from the DM server).

_update_processing_data()[source]#

Update all the workflows in the cache (from the DM server).

define_workflow(key: str, connector: object)[source]#

Add a DM_WorkflowConnector object to be managed.

PARAMETERS

key str:

Identifying text for this workflow object.

connector object:

Instance of DM_WorkflowConnector.

print_cache_summary(title: str = 'Summary')[source]#

Summarize (in a table) the DM workflows in the cache.

report_dm_workflow_output(final_stage_id: str)[source]#

Print a final (summary) report about a single DM workflow.

PARAMETERS

final_stage_id str:

Text key of the last stage in the workflow.

wait_workflows(period: float = 10, wait: bool = True)[source]#

(plan) Wait (if True) for existing workflows to finish.

PARAMETERS

period float:

Time between reports while waiting for all workflows to finish processing. Default: 10 seconds.

wait bool:

Should RE wait for all workflows to finish? Default: True

apstools.utils.aps_data_management.build_run_metadata_dict(user_md: dict, **dm_kwargs) dict[source]#

Return a dictionary for use as Bluesky run metadata.

apstools.utils.aps_data_management.dm_add_workflow(workflow_file)[source]#

Add APS Data Management workflow from file.

apstools.utils.aps_data_management.dm_api_cat()[source]#

Return the APS Data Management Catalog API object.

apstools.utils.aps_data_management.dm_api_daq()[source]#

Return the APS Data Management Data Acquisition API object.

apstools.utils.aps_data_management.dm_api_dataset_cat()[source]#

Return the APS Data Management Dataset Metadata Catalog API object.

apstools.utils.aps_data_management.dm_api_ds()[source]#

Return the APS Data Management Data Storage API object.

apstools.utils.aps_data_management.dm_api_file()[source]#

Return the APS Data Management File API object.

apstools.utils.aps_data_management.dm_api_filecat()[source]#

Return the APS Data Management Metadata Catalog Service API object.

apstools.utils.aps_data_management.dm_api_proc()[source]#

Return the APS Data Management Processing API object.

apstools.utils.aps_data_management.dm_daq_wait_upload_plan(id: str, period: float = 10)[source]#

plan: Wait for DAQ uploads to finish.

apstools.utils.aps_data_management.dm_file_ready_to_process(experimentFilePath: str, experimentName: str, compression: str = '', retrieveMd5Sum: bool = False) bool[source]#

Does DM determine the named file is ready for processing?

apstools.utils.aps_data_management.dm_get_daqs(experimentName: str)[source]#

Return list of APS Data Management DAQ(s) for this experiment.

PARAMETERS

experimentName str:

Name of the APS Data Management experiment.

RETURNS

List of matching DAQ dictionaries.

apstools.utils.aps_data_management.dm_get_experiment_datadir_active_daq(experiment_name: str, data_directory: str)[source]#

Return the daqInfo dict for the active DAQ, or ‘None’.

apstools.utils.aps_data_management.dm_get_experiment_file(experiment_name: str, experiment_file: str)[source]#

Get experiment file.

PARAMETERS

experiment_name str:

Name of the APS Data Management experiment. The experiment must exist.

experiment_file str:

Name (with path) of the experiment file.

RETURNS

FileMetadata object.

RAISES

  • InvalidRequest – in case experiment name or file path have not been provided

  • AuthorizationError – in case user is not authorized to manage DM station

  • ObjectNotFound – in case file with a given path does not exist

  • DmException – in case of any other errors

apstools.utils.aps_data_management.dm_get_experiment_path(experiment_name: str)[source]#

Return the storageDirectory for the named APS Data Management experiment as a path.

PARAMETERS

experiment_name str:

Name of the APS Data Management experiment. The experiment must exist.

RETURNS

Data directory for the experiment, as pathlib.Path object.

RAISES

dm.ObjectNotFound:

When experiment is not found.

apstools.utils.aps_data_management.dm_get_experiments(keys=['id', 'name', 'startDate', 'experimentType', 'experimentStation'], table=False, default_value='-na-')[source]#

Get the most recent APS Data Management experiments (for the current station).

Return result as either a list or a pyRestTable object (see table).

PARAMETERS:

keys [str]:

Data keys to be shown in the table.

table bool:

If False (default), return a Python list. If True, return a pyRestTable Table() object.

default_value str:

Table value if no data available for that key.

apstools.utils.aps_data_management.dm_get_workflow(workflow_name: str)[source]#

Get named APS Data Management workflow.

apstools.utils.aps_data_management.dm_setup(setup_file)[source]#

Name the APS Data Management bash script that activates its conda environment.

The return result is the ‘owner’ of the DM workflows.

apstools.utils.aps_data_management.dm_source_environ()[source]#

Add APS Data Management environment variable definitions to this process.

This function reads the bash script, searching for lines that start with “export “. Such lines define bash shell environment variables in the bash script. This function adds those environment variables to the current environment.

BASH COMMAND SUGGESTIONS:

source /home/dm/etc/dm.setup.sh

source ~/DM/etc/dm.setup.sh

The suggestions follow a pattern: ${DM_ROOT}/etc/dm.setup.sh where DM_ROOT is the location of the DM tools as installed in the current user account.

apstools.utils.aps_data_management.dm_start_daq(experimentName: str, dataDirectory: str, **daqInfo)[source]#

Start APS DM data acquisition (real-time directory monitoring and file upload).

PARAMETERS

experimentName str:

Name of the APS Data Management experiment.

dataDirectory:

data directory URL

daqInfo dict:

Dictionary of optional metadata (key/value pairs) describing data acquisition. See https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/API-Reference/DAQ-Service#dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi.startDaq for details.

RETURNS

  • daqInfo dictionary

apstools.utils.aps_data_management.dm_station_name()[source]#

Return the APS Data Management station name or None if not found.

apstools.utils.aps_data_management.dm_stop_daq(experimentName: str, dataDirectory: str)[source]#

Stop APS DM data acquisition (real-time directory monitoring and file upload).

PARAMETERS

experimentName str:

Name of the APS Data Management experiment.

dataDirectory:

data directory URL

apstools.utils.aps_data_management.dm_update_workflow(workflow_file)[source]#

Update APS Data Management workflow from file.

apstools.utils.aps_data_management.dm_upload(experimentName: str, dataDirectory: str, **daqInfo)[source]#

Start APS DM data acquisition file upload.

PARAMETERS

experimentName str:

Name of the APS Data Management experiment.

dataDirectory:

data directory URL

daqInfo dict:

Dictionary of optional metadata (key/value pairs) describing data acquisition. See https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/API-Reference/DAQ-Service#dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi.startDaq for details.

See also

The wait_dm_upload() function in this module.

apstools.utils.aps_data_management.get_workflow_last_stage(workflow_name)[source]#

Return the name of the last stage in the named APS Data Management workflow.

apstools.utils.aps_data_management.share_bluesky_metadata_with_dm(experimentName: str, workflow_name: str, run, should_raise: bool = False)[source]#

Once a bluesky run ends, share its metadata with APS DM.

Only upload if we have a workflow.

apstools.utils.aps_data_management.validate_experiment_dataDirectory(dm_experiment_name: str)[source]#

These bluesky plans use the experiment’s ‘dataDirectory’.

apstools.utils.aps_data_management.wait_dm_upload(experiment_name: str, experiment_file: str, timeout: float = 600, poll_period: float = 30)[source]#

(bluesky plan) Wait for APS DM data acquisition to upload a file.

PARAMETERS

  • experiment_name str: Name of the APS Data Management experiment.

  • experiment_file str Name (and path) of file in DM.

  • timeout float: Number of seconds to wait before raising a ‘TimeoutError’.

  • poll_period float: Number of seconds to wait before check DM again.

RAISES

  • TimeoutError: if DM does not identify file within ‘timeout’ (seconds).

APS-U controls are on private subnets. Check and advise as applicable.

warn_if_not_aps_controls_subnet()

APS-U controls are on private subnets.

apstools.utils.apsu_controls_subnet.warn_if_not_aps_controls_subnet()[source]#

APS-U controls are on private subnets. Check and advise as applicable.

Call this function early in the startup procedure. It could explain easily the reason for subsequent EPICS PV connection failures.

For workstations on subnets that do not match the criteria, this function should not post any warnings.

Working with databroker catalogs#

copy_filtered_catalog(source_cat, target_cat)

copy filtered runs from source_cat to target_cat

findCatalogsInNamespace()

Return a dictionary of databroker catalogs in the default namespace.

getCatalog([ref])

Return a catalog object.

getDatabase([db, catalog_name])

Return Bluesky database using keyword guides or default choice.

getDefaultCatalog()

Return the default databroker catalog.

getDefaultDatabase()

Find the "default" database (has the most recent run).

getStreamValues(scan_id[, key_fragment, db, ...])

Get values from a previous scan stream in a databroker catalog.

quantify_md_key_use([key, db, catalog_name, ...])

Print table of different key values and how many times each appears.

apstools.utils.catalog.copy_filtered_catalog(source_cat, target_cat, query=None)[source]#

copy filtered runs from source_cat to target_cat

PARAMETERS

source_cat

obj : instance of databroker.Broker or databroker.catalog[name]

target_cat

obj : instance of databroker.Broker or databroker.catalog[name]

query

dict : mongo query dictionary, used to filter the results (default: {})

see: https://docs.mongodb.com/manual/reference/operator/query/

example:

copy_filtered_catalog(
    databroker.Broker.named("mongodb_config"),
    databroker.catalog["test1"],
    {'plan_name': 'snapshot'})
apstools.utils.catalog.findCatalogsInNamespace()[source]#

Return a dictionary of databroker catalogs in the default namespace.

apstools.utils.catalog.getCatalog(ref=None)[source]#

Return a catalog object.

apstools.utils.catalog.getDatabase(db=None, catalog_name=None)[source]#

Return Bluesky database using keyword guides or default choice.

PARAMETERS

db

object : Bluesky database, an instance of databroker.catalog (default: see catalog_name keyword argument)

catalog_name

str : Name of databroker v2 catalog, used when supplied db is None. (default: catalog with most recent run timestamp)

RETURNS

object or None:

Bluesky database, an instance of databroker.catalog

(new in release 1.4.0)

apstools.utils.catalog.getDefaultCatalog()[source]#

Return the default databroker catalog.

apstools.utils.catalog.getDefaultDatabase()[source]#

Find the “default” database (has the most recent run).

Note that here, database and catalog mean the same.

This routine looks at all the database instances defined in the current session (console or notebook). If there is only one or no database instances defined as objects in the current session, the choice is simple. When there is more than one database instance in the current session, then the one with the most recent run timestamp is selected. In the case (as happens when starting with a new database) that the current database has no runs and another database instance is defined in the session and that additional database has runs in it (such as the previous database), then the database with the newest run timestamp (and not the newer empty database) will be chosen.

RETURNS

object or None:

Bluesky database, an instance of databroker.catalog

(new in release 1.4.0)

apstools.utils.catalog.getStreamValues(scan_id, key_fragment='', db=None, stream='baseline', query=None, use_v1=True)[source]#

Get values from a previous scan stream in a databroker catalog.

Optionally, select only those data with names including key_fragment.

Tip

If the output is truncated, use pd.set_option('display.max_rows', 300) to increase the number of rows displayed.

PARAMETERS

scan_id

int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).

key_fragment

str : Part or all of key name to be found in selected stream. For instance, if you specify key_fragment="lakeshore", it will return all the keys that include lakeshore.

db

object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.

stream

str : Name of the bluesky data stream to obtain the data. Default: ‘baseline’

query

dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/

use_v1

bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

RETURNS

object :

pandas DataFrame with values from selected stream, search_string, and query

see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html

(new in apstools 1.5.1)

apstools.utils.catalog.quantify_md_key_use(key=None, db=None, catalog_name=None, since=None, until=None, query=None)[source]#

Print table of different key values and how many times each appears.

PARAMETERS

key str :

one of the metadata keys in a run’s start document (default: plan_name)

db object :

Instance of databroker v1 Broker or v2 catalog (default: see catalog_name keyword argument)

catalog_name str :

Name of databroker v2 catalog, used when supplied db is None. (default: mongodb_config)

since str :

include runs that started on or after this ISO8601 time (default: 1995-01-01)

until str :

include runs that started before this ISO8601 time (default: 2100-12-31)

query dict :

mongo query dictionary, used to filter the results (default: {})

see: https://docs.mongodb.com/manual/reference/operator/query/

EXAMPLES:

quantify_md_key_use(key="proposal_id")
quantify_md_key_use(key="plan_name", catalog_name="9idc", since="2020-07")
quantify_md_key_use(key="beamline_id", catalog_name="9idc")
quantify_md_key_use(key="beamline_id",
                    catalog_name="9idc",
                    query={'plan_name': 'Flyscan'},
                    since="2020",
                    until="2020-06-21 21:51")
quantify_md_key_use(catalog_name="8id", since="2020-01", until="2020-03")

In [8]: quantify_md_key_use(catalog_name="apstools_test")
========= =====
plan_name #runs
========= =====
count     26
scan      27
========= =====

In [9]: quantify_md_key_use(catalog_name="usaxs_test")
========================== =====
plan_name                  #runs
========================== =====
Flyscan                    1
TuneAxis.tune              1
count                      1
measure_USAXS_Transmission 1
run_Excel_file             1
snapshot                   1
tune_a2rp                  1
tune_ar                    1
tune_m2rp                  1
tune_mr                    1
========================== =====

Device information#

listdevice(obj[, scope, cname, dname, ...])

Describe the signal information from device obj in a pandas DataFrame.

apstools.utils.device_info.listdevice(obj, scope=None, cname=False, dname=True, show_pv=False, use_datetime=True, show_ancient=True, max_column_width=None, table_style=TableStyle.pyRestTable, _call_args=None)[source]#

Describe the signal information from device obj in a pandas DataFrame.

Look through all subcomponents to find all the signals to be shown. Components that are disconnected will be skipped and a warning logged.

EXAMPLE:

>>> listdevice(m1)
======================= ======= ==========================
data name               value   timestamp                 
======================= ======= ==========================
m1                      0.0     2024-08-28 09:41:08.364137
m1_user_setpoint        0.0     2024-08-28 09:41:08.364137
m1_user_offset          0.0     2024-08-28 11:46:56.116048
m1_user_offset_dir      0       2024-08-28 09:41:08.364137
m1_offset_freeze_switch 0       2024-08-28 09:41:08.364137
m1_set_use_switch       0       2024-08-28 09:41:08.364137
m1_velocity             1.0     2024-08-28 09:41:08.364137
m1_acceleration         0.2     2024-08-28 09:41:08.364137
m1_motor_egu            degrees 2024-08-28 09:41:08.364137
m1_motor_is_moving      0       2024-08-28 09:41:08.364137
m1_motor_done_move      1       2024-08-28 11:46:56.116057
m1_high_limit_switch    0       2024-08-28 09:41:08.364137
m1_low_limit_switch     0       2024-08-28 09:41:08.364137
m1_high_limit_travel    1000.0  2024-08-28 11:46:56.116048
m1_low_limit_travel     -1000.0 2024-08-28 11:46:56.116048
m1_direction_of_travel  0       2024-08-28 09:41:08.364137
m1_motor_stop           0       2024-08-28 09:41:08.364137
m1_home_forward         0       2024-08-28 09:41:08.364137
m1_home_reverse         0       2024-08-28 09:41:08.364137
m1_steps_per_revolution 2000    2024-08-28 09:41:08.364137
======================= ======= ==========================

PARAMETERS

obj

object : Instance of ophyd Signal or Device.

scope

str or None : Scope of content to be shown.

  • "full" (or None) shows all Signal components

  • "epics" shows only EPICS-based Signals

  • "read" shows only the signals returned by obj.read()

default: None

cname

bool : Show the _control_ (Python, dotted) name in column name.

default: False

dname

bool : Show the _data_ (databroker, with underlines) name in column data name.

default: True

show_pv

bool : Show the EPICS process variable (PV) name in column PV.

default: False

Note

Special case when show_pv=True: If cname is not provided, it will be set True. If dname is not provided, it will be set False.

use_datetime bool :

Show the EPICS timestamp (time of last update) in column timestamp.

default: True

show_ancient bool :

Show uninitialized EPICS process variables.

In EPICS, an uninitialized PV has a timestamp of 1990-01-01 UTC. This option enables or suppresses ancient values identified by timestamp from 1989. These are values only defined in the original .db file.

default: True

max_column_width int or None :

Truncate long columns to no more than this length. If not default, then table will be formatted using pyRestTable.

default: None (will use 50)

table_style object :

Either apstools.utils.TableStyle.pandas (default) or using values from apstools.utils.TableStyle.

Note

pandas.DataFrame wll truncate long text to at most 50 characters.

See also

listdevice() in What are the objects to control?

email Support#

EmailNotifications([sender])

send email notifications when requested

class apstools.utils.email.EmailNotifications(sender=None)[source]#

send email notifications when requested

use default OS mail utility (so no credentials needed)

EXAMPLE

Send email(s) when feedback_limits_approached (a hypothetical boolean) is True:

# setup
from apstools.utils import EmailNotifications

SENDER_EMAIL = "instrument_user@email.host.tld"

email_notices = EmailNotifications(SENDER_EMAIL)
email_notices.add_addresses(
    # This list receives email when send() is called.
    "joe.user@goodmail.com",
    "instrument_team@email.host.tld",
    # others?
)

# ... later

if feedback_limits_approached:
    # send emails to list
    subject = "Feedback problem"
    message = "Feedback is very close to its limits."
    email_notices.send(subject, message)

Statistical peak analysis functions#

Uses pysumreg package (https://prjemian.github.io/pysumreg/) to obtain summary statistics.

analyze_1D(y_arr[, x_arr])

Measures of 1D data peak center & width.

analyze_2D(image)

Analyze 2-D (image) data.

apstools.utils.image_analysis.analyze_1D(y_arr, x_arr=None)[source]#

Measures of 1D data peak center & width.

Return result is a dictionary prepared by the to_dict(use_registers=True) method of the pysumreg.SummationRegisters() class.

Example:

{'mean_x': 2.0, 'mean_y': 7.2,
'stddev_x': 1.5811388300841898, 'stddev_y': 3.3466401061363027,
'slope': 0.0, 'intercept': 7.2, 'correlation': 0.0,
'centroid': 2.0, 'sigma': 1.1547005383792515,
'min_x': 1, 'max_x': 4, 'min_y': 4, 'max_y': 12,
'x_at_max_y': 2,
'n': 5, 'X': 10, 'Y': 36, 'XX': 30, 'XY': 72,
'XXY': 192, 'YY': 304}
apstools.utils.image_analysis.analyze_2D(image)[source]#

Analyze 2-D (image) data.

Return result is a dictionary with the statistical results for a peak analysis, grouped in pairs (row, column) as it makes sense given frame[rows][columns]. The \(x\) values are the index number along the respective axis.

For this image data:

[
    [0, 1, 2, 1, 0],
    [1, 2, 3, 2, 1],
    [2, 3, 4, 10, 2],
    [1, 2, 3, 2, 1],
]

This is the analysis:

{'n': (5, 4),
 'centroid': (2.1628, 1.814),
 'sigma': (1.1192, 0.8695),
 'peak_position': (3, 2),
 'max_y': 10}

Directory of the known plans#

listplans([base, trunc, table_style])

List all plans.

apstools.utils.list_plans.listplans(base=None, trunc=50, table_style=TableStyle.pyRestTable)[source]#

List all plans. (Actually, lists all generator functions).

NOTE: Can only detect generator functions. Bluesky plans are generator functions that generate bluesky.Msg objects. There is a PR to define a decorator that identifies a generator function as a bluesky plan.

PARAMETERS

base object or None :

Object that contains plan methods (if None, use global namespace.) (default: None)

trunc int :

Truncate long docstrings to no more than trunc characters. (default: 50)

table_style object :

Either TableStyle.pyRestTable (default) or TableStyle.pandas, using values from apstools.utils.TableStyle.

Note

pandas.DataFrame wll truncate long text to at most 50 characters.

Directory of bluesky runs#

getRunData(scan_id[, db, stream, query, use_v1])

Convenience function to get the run's data.

getRunDataValue(scan_id, key[, db, stream, ...])

Convenience function to get value of key in run stream.

listRunKeys(scan_id[, key_fragment, db, ...])

Convenience function to list all keys (column names) in the scan's stream (default: primary).

ListRuns([cat, query, keys, missing, num, ...])

List the runs from the given catalog according to some options.

listruns([cat, keys, missing, num, ...])

List runs from catalog.

summarize_runs([since, db])

Report bluesky run metrics from the databroker.

class apstools.utils.list_runs.ListRuns(cat: object = None, query: object = None, keys: object = None, missing: str = '', num: int = 20, reverse: bool = True, since: object = None, sortby: str = 'time', timefmt: str = '%Y-%m-%d %H:%M:%S', until: object = None, ids: Any = None, hints_override: bool = False)[source]#

List the runs from the given catalog according to some options.

EXAMPLE:

ListRuns(cat).to_dataframe()

PUBLIC METHODS

to_dataframe()

Output as pandas DataFrame object

to_table([fmt])

Output as pyRestTable object.

parse_runs()

Parse the runs for the given metadata keys.

INTERNAL METHODS

_get_by_key(md, key)

Get run's metadata value by key.

_check_cat()

_apply_search_filters()

Search for runs from the catalog.

_check_keys()

Check that self.keys is a list of strings.

parse_runs()[source]#

Parse the runs for the given metadata keys. Return a dict.

to_dataframe()[source]#

Output as pandas DataFrame object

to_table(fmt=None)[source]#

Output as pyRestTable object.

apstools.utils.list_runs.getRunData(scan_id, db=None, stream='primary', query=None, use_v1=True)[source]#

Convenience function to get the run’s data. Default is the primary stream.

PARAMETERS

scan_id

int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).

db

object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.

stream

str : Name of the bluesky data stream to obtain the data. Default: ‘primary’

query

dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/

use_v1

bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

(new in apstools 1.5.1)

apstools.utils.list_runs.getRunDataValue(scan_id, key, db=None, stream='primary', query=None, idx=-1, use_v1=True)[source]#

Convenience function to get value of key in run stream.

Defaults are last value of key in primary stream.

PARAMETERS

scan_id

int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).

key

str : Name of the key (data column) in the table of the stream’s data. Must match identically.

db

object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.

stream

str : Name of the bluesky data stream to obtain the data. Default: ‘primary’

query

dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/

idx

int or str : List index of value to be returned from column of table. Can be 0 for first value, -1 for last value, "mean" for average value, or "all" for the full list of values. Default: -1

use_v1

bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

(new in apstools 1.5.1)

apstools.utils.list_runs.listRunKeys(scan_id, key_fragment='', db=None, stream='primary', query=None, strict=False, use_v1=True)[source]#

Convenience function to list all keys (column names) in the scan’s stream (default: primary).

PARAMETERS

scan_id

int or str : Scan (run) identifier. Positive integer value is scan_id from run’s metadata. Negative integer value is since most recent run in databroker. String is run’s uid unique identifier (can abbreviate to the first characters needed to assure it is unique).

key_fragment

str : Part or all of key name to be found in selected stream. For instance, if you specify key_fragment="lakeshore", it will return all the keys that include lakeshore.

db

object : Bluesky database, an instance of databroker.catalog. Default: will search existing session for instance.

stream

str : Name of the bluesky data stream to obtain the data. Default: ‘primary’

query

dict : mongo query dictionary, used to filter the results Default: {}

see: https://docs.mongodb.com/manual/reference/operator/query/

strict

bool : Should the key_fragment be matched identically (strict=True) or matched by lower case comparison (strict=False)? Default: False

use_v1

bool : Chooses databroker API version between ‘v1’ or ‘v2’. Default: True (meaning use the v1 API)

(new in apstools 1.5.1)

apstools.utils.list_runs.listruns(cat=None, keys=None, missing='', num=20, printing=None, reverse=True, since=None, sortby='time', tablefmt=None, table_style=TableStyle.pyRestTable, timefmt='%Y-%m-%d %H:%M:%S', until=None, ids=None, hints_override=False, **query)[source]#

List runs from catalog.

This function provides a thin interface to the highly-reconfigurable ListRuns() class in this package.

PARAMETERS

cat

object : Instance of databroker v1 or v2 catalog.

keys

str or [str] or None: Include these additional keys from the start document. (default: None means "scan_id time plan_name detectors")

missing

str: Test to report when a value is not available. (default: "")

hints_override bool:

For a key that appears in both the metadata and the hints, override the metadata value if the same key is found in the hints. (default: False)

ids

[int] or [str]: List of uid or scan_id value(s). Can mix different kinds in the same list. Also can specify offsets (e.g., -1). According to the rules for databroker catalogs, a string is a uid (partial representations allowed), an int is scan_id if positive or an offset if negative. (default: None)

num

int : Make the table include the num most recent runs. (default: 20)

printing bool or str :

Deprecated.

reverse

bool : If True, sort in descending order by sortby. (default: True)

since

str : include runs that started on or after this ISO8601 time (default: "1995-01-01")

sortby

str : Sort columns by this key, found by exact match in either the start or stop document. (default: "time")

tablefmt str :

Deprecated. Use table_style instead.

table_style object :

Either TableStyle.pyRestTable (default) or TableStyle.pandas, using values from apstools.utils.TableStyle.

Note

pandas.DataFrame wll truncate long text to at most 50 characters.

timefmt

str : The time key (also includes keys "start.time" and "stop.time") will be formatted by the self.timefmt value. See https://strftime.org/ for examples. The special timefmt="raw" is used to report time as the raw value (floating point time as used in python’s time.time()). (default: "%Y-%m-%d %H:%M:%S",)

until

str : include runs that started before this ISO8601 time (default: 2100-12-31)

**query

dict : Any additional keyword arguments will be passed to the databroker to refine the search for matching runs using the mongoquery package.

RETURNS

object:

None or str or pd.DataFrame() object

EXAMPLE:

TODO

(new in release 1.5.0)

apstools.utils.list_runs.summarize_runs(since=None, db=None)[source]#

Report bluesky run metrics from the databroker.

  • How many different plans?

  • How many runs?

  • How many times each run was used?

  • How frequently? (TODO:)

PARAMETERS

since

str : Report all runs since this ISO8601 date & time (default: 1995)

db

object : Instance of databroker.Broker() (default: db from the IPython shell)

Support for logging#

There is a guide describing How to setup logging.

file_log_handler(file_name_base[, maxBytes, ...])

Record logging output to a file.

get_log_path()

Return a path to ./.logs.

setup_IPython_console_logging([logger, ...])

Record all input (In) and output (Out) from IPython console.

stream_log_handler([formatter, level])

Record logging output to a stream (such as the console).

apstools.utils.log_utils.file_log_handler(file_name_base, maxBytes=0, backupCount=0, log_path=None, level=None)[source]#

Record logging output to a file.

PARAMETERS

file_name_basestr

Part of the name to store the log file. Full name is f"<log_path>/{file_name_base}.log" in present working directory.

log_pathstr

Part of the name to store the log file. Full name is f"<log_path>/{file_name_base}.log" in present working directory. default: (the present working directory)/LOG_DIR_BASE

levelint

Threshold for reporting messages with this logger. Logging messages which are less severe than level will be ignored. default: 10 (logging.DEBUG or DEBUG) see: https://docs.python.org/3/library/logging.html#levels

maxBytes(optional) int

Log file rollover begins whenever the current log file is nearly maxBytes in length. A new file is written when the current line will push the current file beyond this limit. default: 0

backupCount(optional) int

When backupCount is non-zero, the system will keep up to backupCount numbered log files (with added extensions .1, ‘.2`, …). The current log file always has no numbered extension. The previous log file is the one with the lowest extension number. default: 0

Note

When either maxBytes or backupCount are zero, log file rollover never occurs, so you generally want to set backupCount to at least 1, and have a non-zero maxBytes.

apstools.utils.log_utils.get_log_path()[source]#

Return a path to ./.logs. Create directory if it does not exist.

apstools.utils.log_utils.setup_IPython_console_logging(logger=None, filename='ipython_console.log', log_path=None)[source]#

Record all input (In) and output (Out) from IPython console.

PARAMETERS

logger

object: Instance of logging.Logger.

filename

str: Name of the log file. (default: ipython_console.log)

log_pathstr

Directory to store the log file. Full name is f"<log_path>/{file_name_base}.log". default: (the present working directory)/LOG_DIR_BASE

apstools.utils.log_utils.stream_log_handler(formatter=None, level='INFO')[source]#

Record logging output to a stream (such as the console).

PARAMETERS

formatter

object: Instance of logging.Formatter.

level

str: Name of the logging level to report. (default: INFO)

Diagnostic Support for Memory#

rss_mem()

return memory used by this process

apstools.utils.memory.rss_mem()[source]#

return memory used by this process

Miscellaneous Support#

call_signature_decorator(f)

Get the names of all function parameters supplied by the caller.

cleanupText(text)

convert text so it can be used as a dictionary key

connect_pvlist(pvlist[, wait, timeout, ...])

Given list of EPICS PV names, return dict of EpicsSignal objects.

count_child_devices_and_signals(device)

Dict with number of children of this device.

count_common_subdirs(p1, p2)

Count how many subdirectories are common to both file paths.

dictionary_table(dictionary, **kwargs)

Return a text table from dictionary.

full_dotted_name(obj)

Return the full dotted name

itemizer(fmt, items)

Format a list of items.

listobjects([show_pv, printing, verbose, ...])

Show all the ophyd Signal and Device objects defined as globals.

pairwise(iterable)

break a list (or other iterable) into pairs

print_RE_md([dictionary, fmt, printing])

custom print the RunEngine metadata in a table

redefine_motor_position(motor, new_position)

Set EPICS motor record's user coordinate to new_position.

render(value[, sig_figs])

Round-off floating-point numbers to sig_figs.

replay(headers[, callback, sort])

Replay the document stream from one (or more) scans (headers).

run_in_thread(func)

(decorator) run func in thread

safe_ophyd_name(text)

make text safe to be used as an ophyd object name

split_quoted_line(line)

splits a line into words some of which might be quoted

text_encode(source)

Encode source using the default codepoint.

to_unicode_or_bust(obj[, encoding])

from: http://farmdev.com/talks/unicode/ .

trim_string_for_EPICS(msg)

String must not exceed EPICS PV length.

unix(command[, raises])

Run a UNIX command, returns (stdout, stderr).

apstools.utils.misc.call_signature_decorator(f)[source]#

Get the names of all function parameters supplied by the caller.

This is used to differentiate user-supplied parameters from as-defined parameters with the same value.

HOW TO USE THIS DECORATOR:

Decorate a function or method with this decorator and add an additional _call_args=None kwarg to the function. The function can test _call_args if a specific kwarg was supplied by the caller.

EXAMPLE:

@call_signature_decorator
def func1(a, b=1, c=True, _call_args=None):
    if 'c' in _call_args:  # Caller supplied this kwarg?
        pass

Note

With call_signature_decorator, it is not possible to get the names of the positional arguments. Since positional parameters are not specified by name, such capability is not expected to become a requirement.

See:

https://stackoverflow.com/questions/14749328#58166804 (how-to-check-whether-optional-function-parameter-is-set)

apstools.utils.misc.cleanupText(text)[source]#

convert text so it can be used as a dictionary key

Given some input text string, return a clean version remove troublesome characters, perhaps other cleanup as well. This is best done with regular expression pattern matching.

apstools.utils.misc.connect_pvlist(pvlist, wait=True, timeout=2, poll_interval=0.1)[source]#

Given list of EPICS PV names, return dict of EpicsSignal objects.

PARAMETERS

pvlist

[str] : list of EPICS PV names

wait

bool : should wait for EpicsSignal objects to connect (default: True)

timeout

float : maximum time to wait for PV connections, seconds (default: 2.0)

poll_interval

float : time to sleep between checks for PV connections, seconds (default: 0.1)

apstools.utils.misc.count_child_devices_and_signals(device)[source]#

Dict with number of children of this device. Keys: Device and Signal.

apstools.utils.misc.count_common_subdirs(p1, p2)[source]#

Count how many subdirectories are common to both file paths.

apstools.utils.misc.dictionary_table(dictionary, **kwargs)[source]#

Return a text table from dictionary.

Dictionary keys in first column, values in second.

PARAMETERS

dictionary

dict : Python dictionary

Note: Keyword arguments parameters are kept for compatibility with previous versions of apstools. They are ignored now.

RETURNS

table

object or None : pyRestTable.Table() object (multiline text table) or None if dictionary has no contents

EXAMPLE:

In [8]: RE.md
Out[8]: {'login_id': 'jemian:wow.aps.anl.gov', 'beamline_id': 'developer', 'proposal_id': None, 'pid': 19072, 'scan_id': 10, 'version': {'bluesky': '1.5.2', 'ophyd': '1.3.3', 'apstools': '1.1.5', 'epics': '3.3.3'}}

In [9]: print(dictionary_table(RE.md))
=========== =============================================================================
key         value
=========== =============================================================================
beamline_id developer
login_id    jemian:wow.aps.anl.gov
pid         19072
proposal_id None
scan_id     10
version     {'bluesky': '1.5.2', 'ophyd': '1.3.3', 'apstools': '1.1.5', 'epics': '3.3.3'}
=========== =============================================================================
apstools.utils.misc.full_dotted_name(obj)[source]#

Return the full dotted name

The .dotted_name property does not include the name of the root object. This routine adds that.

see: bluesky/ophyd#797

apstools.utils.misc.itemizer(fmt, items)[source]#

Format a list of items.

apstools.utils.misc.listobjects(show_pv=True, printing=None, verbose=False, symbols=None, child_devices=False, child_signals=False, table_style=TableStyle.pyRestTable)[source]#

Show all the ophyd Signal and Device objects defined as globals.

PARAMETERS

show_pv

bool : If True, also show relevant EPICS PV, if available. (default: True)

printing bool :

Deprecated.

verbose

bool : If True, also show str(obj. (default: False)

symbols

dict : If None, use global symbol table. If not None, use provided dictionary. (default: globals())

child_devices

bool : If True, also show how many Devices are children of this device. (default: False)

child_signals

bool : If True, also show how many Signals are children of this device. (default: False)

table_style object :

Either apstools.utils.TableStyle.pandas (default) or using values from apstools.utils.TableStyle.

Note

pandas.DataFrame wll truncate long text to at most 50 characters.

RETURNS

object:

Instance of pyRestTable.Table()

EXAMPLE:

In [1]: listobjects()
======== ================================ =============
name     ophyd structure                  EPICS PV
======== ================================ =============
adsimdet MySingleTriggerSimDetector       vm7SIM1:
m1       EpicsMotor                       vm7:m1
m2       EpicsMotor                       vm7:m2
m3       EpicsMotor                       vm7:m3
m4       EpicsMotor                       vm7:m4
m5       EpicsMotor                       vm7:m5
m6       EpicsMotor                       vm7:m6
m7       EpicsMotor                       vm7:m7
m8       EpicsMotor                       vm7:m8
noisy    EpicsSignalRO                    vm7:userCalc1
scaler   ScalerCH                         vm7:scaler1
shutter  SimulatedApsPssShutterWithStatus
======== ================================ =============

Out[1]: <pyRestTable.rest_table.Table at 0x7fa4398c7cf8>

In [2]:

(new in apstools release 1.1.8)

apstools.utils.misc.pairwise(iterable)[source]#

break a list (or other iterable) into pairs

s -> (s0, s1), (s2, s3), (s4, s5), ...

In [71]: for item in pairwise("a b c d e fg".split()):
    ...:     print(item)
    ...:
('a', 'b')
('c', 'd')
('e', 'fg')
apstools.utils.misc.print_RE_md(dictionary=None, fmt='simple', printing=True)[source]#

custom print the RunEngine metadata in a table

PARAMETERS

dictionary

dict : Python dictionary

EXAMPLE:

In [4]: print_RE_md()
RunEngine metadata dictionary:
======================== ===================================
key                      value
======================== ===================================
EPICS_CA_MAX_ARRAY_BYTES 1280000
EPICS_HOST_ARCH          linux-x86_64
beamline_id              APS USAXS 9-ID-C
login_id                 usaxs:usaxscontrol.xray.aps.anl.gov
pid                      67933
proposal_id              testing Bluesky installation
scan_id                  0
versions                 ======== =====
                         key      value
                         ======== =====
                         apstools 1.1.3
                         bluesky  1.5.2
                         epics    3.3.1
                         ophyd    1.3.3
                         ======== =====
======================== ===================================
apstools.utils.misc.redefine_motor_position(motor, new_position)[source]#

Set EPICS motor record’s user coordinate to new_position.

apstools.utils.misc.render(value, sig_figs=12) str[source]#

Round-off floating-point numbers to sig_figs.

Such as:

  • 0.369340000000000063 becomes 0.36934

  • -3.1300000000000003 becomes -3.13

  • -0 becomes 0

  • 0.0 becomes 0

apstools.utils.misc.replay(headers, callback=None, sort=True)[source]#

Replay the document stream from one (or more) scans (headers).

PARAMETERS

headers

run or [run] : Run(s) to be replayed through callback. A run is an instance of a Bluesky databroker.core.BlueskyRun (or the older databroker.Header). see: https://nsls-ii.github.io/databroker/api.html?highlight=header#header-api

callback

run or [run] : The Bluesky callback to handle the stream of documents from a run. If None, then use the bec (BestEffortCallback) from the IPython shell. (default:None)

sort

bool : Sort the headers chronologically if True. (default:True)

(new in apstools release 1.1.11)

apstools.utils.misc.run_in_thread(func)[source]#

(decorator) run func in thread

USAGE:

@run_in_thread
def progress_reporting():
    logger.debug("progress_reporting is starting")
    # ...

#...
progress_reporting()   # runs in separate thread
#...
apstools.utils.misc.safe_ophyd_name(text)[source]#

make text safe to be used as an ophyd object name

Given some input text string, return a clean version. Remove troublesome characters, perhaps other cleanup as well. This is best done with regular expression pattern matching.

The “sanitized” name fits this regular expression:

[A-Za-z_][\w_]*

Also can be used for safe HDF5 and NeXus names.

apstools.utils.misc.split_quoted_line(line)[source]#

splits a line into words some of which might be quoted

TESTS:

FlyScan 0   0   0   blank
FlyScan 5   2   0   "empty container"
FlyScan 5   12   0   "even longer name"
SAXS 0 0 0 blank
SAXS 0 0 0 "blank"

RESULTS:

['FlyScan', '0', '0', '0', 'blank']
['FlyScan', '5', '2', '0', 'empty container']
['FlyScan', '5', '12', '0', 'even longer name']
['SAXS', '0', '0', '0', 'blank']
['SAXS', '0', '0', '0', 'blank']
apstools.utils.misc.text_encode(source)[source]#

Encode source using the default codepoint.

apstools.utils.misc.to_unicode_or_bust(obj, encoding='utf-8')[source]#

from: http://farmdev.com/talks/unicode/ .

apstools.utils.misc.trim_string_for_EPICS(msg)[source]#

String must not exceed EPICS PV length.

apstools.utils.misc.unix(command, raises=True)[source]#

Run a UNIX command, returns (stdout, stderr).

PARAMETERS

command

str : UNIX command to be executed

raises

bool : If True, will raise exceptions as needed, default: True

OverrideParameters#

Define parameters that can be overridden from a user configuration file.

EXAMPLE:

Create an overrides object in a new file override_params.py:

import apstools.utils
overrides = apstools.utils.OverrideParameters()

When code supports a parameter for which a user can provide a local override, the code should import the overrides object (from the override_params module), and then register the parameter name, such as this example:

from override_params import overrides
overrides.register("minimum_step")

Then later:

minstep = overrides.pick("minimum_step", 45e-6)

In the user’s configuration file that will override the value of 45e-6 (such as can be loaded via %run -i user.py), import the overrides` object (from the override_params module):

from override_params import overrides

and then override the attribute(s) as desired:

overrides.set("minimum_step", 1.0e-5)

With this override in place, the minstep value (from pick()) will be 1e-5.

Get a pandas DataFrame object with all the overrides:

overrides.summary()

which returns this table:

    parameter    value
0  minimum_step  0.00001

OverrideParameters()

Define parameters that can be overridden from a user configuration file.

class apstools.utils.override_parameters.OverrideParameters[source]#

Define parameters that can be overridden from a user configuration file.

NOTE: This is a pure Python object, not using ophyd.

pick(parameter, default)

Return either the override parameter value if defined, or the default.

register(parameter_name)

Register a new parameter name to be supported by user overrides.

reset(parameter_name)

Remove an override value for a known parameter.

reset_all()

Remove override values for all known parameters.

set(parameter_name, value)

Define an override value for a known parameter.

summary()

Return a pandas DataFrame with all overrides.

(new in apstools 1.5.2)

pick(parameter, default)[source]#

Return either the override parameter value if defined, or the default.

register(parameter_name)[source]#

Register a new parameter name to be supported by user overrides.

reset(parameter_name)[source]#

Remove an override value for a known parameter. (sets it to undefined)

reset_all()[source]#

Remove override values for all known parameters. (sets all to undefined)

set(parameter_name, value)[source]#

Define an override value for a known parameter.

summary()[source]#

Return a pandas DataFrame with all overrides.

Parameter names that have no override value will be reported as --undefined--.

Plot Support#

plotxy(runs, xname, yname[, append, cat, ...])

Plot y vs x from a bluesky run.

select_mpl_figure(x, y)

Get the MatPlotLib Figure window for y vs x.

select_live_plot(bec, signal)

Get the first live plot that matches signal.

trim_plot_lines(bec, n, x, y)

Find the plot with axes x and y and replot with at most the last n lines.

trim_plot_by_name([n, plots])

Find the plot(s) by name and replot with at most the last n lines.

apstools.utils.plot.plotxy(runs, xname, yname, append=False, cat=None, stats=True, stream='primary', title=None)[source]#

Plot y vs x from a bluesky run.

Note: This is not a bluesky plan. Call it as a normal Python function.

PARAMETERS

runs[run] or run:

List or runs or single run. A run is either a bluesky.core.BlueskyRun object or a reference (uid, scan_id, relative to most recent) to a BlueskyRun in the catalog.

xnamestr:

Name of the signal to plot on the x axis.

ynamestr:

Name of the signal to plot on the y axis.

appendbool:

(optional) If True, append to existing plot window. Default: append=False

catobject:

(optional) Catalog to be used for finding a run by reference. Default: return value from apstools.utils.getCatalog()

statsbool:

(optional) If True, compute and plot centroid and FWHM (computed from sigma). Default: stats=True

streamstr:

(optional) Name of the data stream in which to find “xname” and “yname”. Default: stream="primary"

titlestr:

(optional) Title to show on this plot. Default: Metadata “title” keyword of first run (if found) or scan_id and starting date/time of first run.

RETURNS

Returns a dict of statistics for each run indexed by scan_id, if stats=True, else None. A computed fwhm key is added to the statistics.

New in release 1.6.10.

apstools.utils.plot.select_live_plot(bec, signal)[source]#

Get the first live plot that matches signal.

PARAMETERS

bec

object: instance of bluesky.callbacks.best_effort.BestEffortCallback

signal

object: The Y axis object (an ophyd.Signal)

RETURNS

object:

Instance of bluesky.callbacks.best_effort.LivePlotPlusPeaks() or None

apstools.utils.plot.select_mpl_figure(x, y)[source]#

Get the MatPlotLib Figure window for y vs x.

PARAMETERS

x

object: X axis object (an ophyd.Signal)

y

ophyd object: X axis object (an ophyd.Signal)

RETURNS

object or None:

Instance of matplotlib.pyplot.Figure()

apstools.utils.plot.trim_plot_by_name(n=3, plots=None)[source]#

Find the plot(s) by name and replot with at most the last n lines.

Note: this is not a bluesky plan. Call it as normal Python function.

It is recommended to call trim_plot_by_name() before the scan(s) that generate plots. Plots are generated from a RunEngine callback, executed after the scan completes.

PARAMETERS

n

int : number of plots to keep

plots

str, [str], or None : name(s) of plot windows to trim (default: all plot windows)

EXAMPLES:

trim_plot_by_name()   # default of n=3, apply to all plots
trim_plot_by_name(5)  # change from default of n=3
trim_plot_by_name(5, "noisy_det vs motor")  # just this plot
trim_plot_by_name(
    5,
    ["noisy_det vs motor", "det noisy_det vs motor"]]
)

EXAMPLE:

# use simulators from ophyd
from bluesky import plans as bp
from bluesky import plan_stubs as bps
from ophyd.sim import *

snooze = 0.25

def scan_set():
    trim_plot_by_name()
    yield from bp.scan([noisy_det], motor, -1, 1, 5)
    yield from bp.scan([noisy_det, det], motor, -2, 1, motor2, 3, 1, 6)
    yield from bps.sleep(snooze)

# repeat the_scans 15 times
uids = RE(bps.repeat(scan_set, 15))

(new in release 1.3.5)

apstools.utils.plot.trim_plot_lines(bec, n, x, y)[source]#

Find the plot with axes x and y and replot with at most the last n lines.

Note: trim_plot_lines() is not a bluesky plan. Call it as normal Python function.

EXAMPLE:

trim_plot_lines(bec, 1, m1, noisy)

PARAMETERS

bec

object : instance of BestEffortCallback

n

int : number of plots to keep

x

object : instance of ophyd.Signal (or subclass), independent (x) axis

y

object : instance of ophyd.Signal (or subclass), dependent (y) axis

(new in release 1.3.5)

Support for IPython profiles#

getDefaultNamespace([attr])

get the IPython shell's namespace dictionary (or globals() if not found)

ipython_profile_name()

return the name of the current ipython profile or None

ipython_shell_namespace()

get the IPython shell's namespace dictionary (or empty if not found)

apstools.utils.profile_support.getDefaultNamespace(attr='user_ns')[source]#

get the IPython shell’s namespace dictionary (or globals() if not found)

apstools.utils.profile_support.ipython_profile_name()[source]#

return the name of the current ipython profile or None

Example (add to default RunEngine metadata):

RE.md['ipython_profile'] = str(ipython_profile_name())
print("using profile: " + RE.md['ipython_profile'])
apstools.utils.profile_support.ipython_shell_namespace()[source]#

get the IPython shell’s namespace dictionary (or empty if not found)

EPICS PV Registry#

findbyname(oname[, force_rebuild, ns])

Find the ophyd (dotted name) object associated with the given ophyd name.

findbypv(pvname[, force_rebuild, ns])

Find all ophyd objects associated with the given EPICS PV.

PVRegistry([ns])

Cross-reference EPICS PVs with ophyd EpicsSignalBase objects.

class apstools.utils.pvregistry.PVRegistry(ns=None)[source]#

Cross-reference EPICS PVs with ophyd EpicsSignalBase objects.

Search for ophyd object by ophyd name.

search(pvname)[source]#

Search for PV in both read & write modes.

search_by_mode(pvname, mode='R')[source]#

Search for PV in specified mode.

apstools.utils.pvregistry.findbyname(oname, force_rebuild=False, ns=None)[source]#

Find the ophyd (dotted name) object associated with the given ophyd name.

PARAMETERS

oname

str : ophyd name to search

force_rebuild

bool : If True, rebuild the internal registry that maps ophyd names to ophyd objects.

ns

dict or None : Namespace dictionary of Python objects.

RETURNS

str or None:

Name of the ophyd object.

EXAMPLE:

In [45]: findbyname("adsimdet_cam_acquire")
Out[45]: 'adsimdet.cam.acquire'

(new in apstools 1.5.0)

apstools.utils.pvregistry.findbypv(pvname, force_rebuild=False, ns=None)[source]#

Find all ophyd objects associated with the given EPICS PV.

PARAMETERS

pvname

str : EPICS PV name to search

force_rebuild

bool : If True, rebuild the internal registry that maps EPICS PV names to ophyd objects.

ns

dict or None : Namespace dictionary of Python objects.

RETURNS

dict or None:

Dictionary of matching ophyd objects, keyed by how the PV is used by the ophyd signal. The keys are read and write.

EXAMPLE:

In [45]: findbypv("ad:cam1:Acquire")
Out[45]: {'read': [], 'write': ['adsimdet.cam.acquire']}

In [46]: findbypv("ad:cam1:Acquire_RBV")
Out[46]: {'read': ['adsimdet.cam.acquire'], 'write': []}

Searching databroker catalogs#

db_query(db, query)

Searches the databroker v2 database.

apstools.utils.query.db_query(db, query)[source]#

Searches the databroker v2 database.

PARAMETERS

db

object : Bluesky database, an instance of databroker.catalog.

query

dict : Search parameters.

RETURNS

object :

Bluesky database, an instance of databroker.catalog satisfying the query parameters.

See also

databroker.catalog.search()

Common support of slits

SlitGeometry(width, height, x, y)

Slit size and center as a named tuple

class apstools.utils.slit_core.SlitGeometry(width, height, x, y)#

Slit size and center as a named tuple

height#

Alias for field number 1

width#

Alias for field number 0

x#

Alias for field number 2

y#

Alias for field number 3

Spreadsheet Support#

ExcelDatabaseFileBase([ignore_extra])

base class: read-only support for Excel files, treat them like databases

ExcelDatabaseFileGeneric(filename[, ...])

Generic (read-only) handling of Excel spreadsheet-as-database

ExcelReadError(*args, **kwargs)

Exception when reading Excel spreadsheet.

class apstools.utils.spreadsheet.ExcelDatabaseFileBase(ignore_extra=True)[source]#

base class: read-only support for Excel files, treat them like databases

Use this class when creating new, specific spreadsheet support.

EXAMPLE

Show how to read an Excel file where one of the columns contains a unique key. This allows for random access to each row of data by use of the key.

class ExhibitorsDB(ExcelDatabaseFileBase):
    '''
    content for exhibitors from the Excel file
    '''
    EXCEL_FILE = pathlib.Path("resources") / "exhibitors.xlsx"
    LABELS_ROW = 2

    def handle_single_entry(self, entry):
        '''any special handling for a row from the Excel file'''
        pass

    def handleExcelRowEntry(self, entry):
        '''identify unique key (row of the Excel file)'''
        key = entry["Name"]
        self.db[key] = entry
class apstools.utils.spreadsheet.ExcelDatabaseFileGeneric(filename, labels_row=3, ignore_extra=True)[source]#

Generic (read-only) handling of Excel spreadsheet-as-database

Note

This is the class to use when reading Excel spreadsheets.

In the spreadsheet, the first sheet should contain the table to be used. By default (see keyword parameter labels_row), the table should start in cell A4. The column labels are given in row 4. A blank column should appear to the right of the table (see keyword parameter ignore_extra). The column labels will describe the action and its parameters. Additional columns may be added for metadata or other purposes.

The rows below the column labels should contain actions and parameters for those actions, one action per row.

To make a comment, place a # in the action column. A comment should be ignored by the bluesky plan that reads this table. The table will end with a row of empty cells.

While it’s a good idea to put the action column first, that is not necessary. It is not even necessary to name the column action. You can re-arrange the order of the columns and change their names as long as the column names match what text strings your Python code expects to find.

A future upgrade [1] will allow the table boundaries to be named by Excel when using Excel’s Format as Table [2] feature. For now, leave a blank row and column at the bottom and right edges of the table.

PARAMETERS

filename

str : name (absolute or relative) of Excel spreadsheet file

labels_row

int : Row (zero-based numbering) of Excel file with column labels, default: 3 (Excel row 4)

ignore_extra

bool : When True, ignore any cells outside of the table, default: True.

Note that when True, a row of cells within the table will be recognized as the end of the table, even if there are actions in following rows. To force an empty row, use a comment symbol # (actually, any non-empty content will work).

When False, cells with other information (in Sheet 1) will be made available, sometimes with unpredictable results.

EXAMPLE

See section The run_command_file() plan – batch scans using a text file for more examples.

(See also example screen shot.) Table (on Sheet 1) begins on row 4 in first column:

1  |  some text here, maybe a title
2  |  (could have content here)
3  |  (or even more content here)
4  |  action  | sx   | sy   | sample     | comments          |  | <-- leave empty column
5  |  close   |      |                   | close the shutter |  |
6  |  image   | 0    | 0    | dark       | dark image        |  |
7  |  open    |      |      |            | open the shutter  |  |
8  |  image   | 0    | 0    | flat       | flat field image  |  |
9  |  image   | 5.1  | -3.2 | 4140 steel | heat 9172634      |  |
10 |  scan    | 5.1  | -3.2 | 4140 steel | heat 9172634      |  |
11 |  scan    | 0    | 0    | blank      |                   |  |
12 |
13 |  ^^^ leave empty row ^^^
14 | (could have content here)

Example python code to read this spreadsheet:

from apstools.utils import ExcelDatabaseFileGeneric, cleanupText

def myExcelPlan(xl_file, md={}):
    excel_file = pathlib.Path(xl_file).absolute()
    xl = ExcelDatabaseFileGeneric(excel_file)
    for i, row in xl.db.values():
        # prepare the metadata
        _md = {cleanupText(k): v for k, v in row.items()}
        _md["xl_file"] = xl_file
        _md["excel_row_number"] = i+1
        _md.update(md) # overlay with user-supplied metadata

        # determine what action to take
        action = row["action"].lower()
        if action == "open":
            yield from bps.mv(shutter, "open")
        elif action == "close":
            yield from bps.mv(shutter, "close")
        elif action == "image":
            # your code to take an image, given **row as parameters
            yield from my_image(**row, md=_md)
        elif action == "scan":
            # your code to make a scan, given **row as parameters
            yield from my_scan(**row, md=_md)
        else:
            print(f"no handling for row {i+1}: action={action}")

# execute this plan through the RunEngine
RE(myExcelPlan("spreadsheet.xlsx", md=dict(purpose="apstools demo"))
handleExcelRowEntry(entry)[source]#

use row number as the unique key

class apstools.utils.spreadsheet.ExcelReadError(*args: Any, **kwargs: Any)[source]#

Exception when reading Excel spreadsheet.

Define symbols used by other modules to define time (seconds).

DAY

24 hours (in seconds)

HOUR

60 minutes (in seconds)

MINUTE

60 seconds (in seconds)

SECOND

One second of time (the base unit).

WEEK

7 days (in seconds)

ts2iso(ts[, sep])

Convert Python timestamp (float) to IS8601 time in current time zone.

apstools.utils.time_constants.DAY = 86400#

24 hours (in seconds)

apstools.utils.time_constants.HOUR = 3600#

60 minutes (in seconds)

apstools.utils.time_constants.MINUTE = 60#

60 seconds (in seconds)

apstools.utils.time_constants.SECOND = 1#

One second of time (the base unit).

apstools.utils.time_constants.WEEK = 604800#

7 days (in seconds)

apstools.utils.time_constants.ts2iso(ts: float, sep: str = ' ') str[source]#

Convert Python timestamp (float) to IS8601 time in current time zone.