"""
Directory of bluesky runs
+++++++++++++++++++++++++++++++++++++++
.. autosummary::
~getRunData
~getRunDataValue
~listRunKeys
~ListRuns
~listruns
~summarize_runs
"""
import dataclasses
import datetime
import logging
import time
import typing
import warnings
from collections import defaultdict
from deprecated.sphinx import deprecated
from deprecated.sphinx import versionadded
from deprecated.sphinx import versionchanged
from ._core import FIRST_DATA
from ._core import LAST_DATA
from ._core import TableStyle
from .query import db_query
logger = logging.getLogger(__name__)
[docs]
@versionadded(version="1.5.1")
def getRunData(scan_id, db=None, stream="primary", query=None, use_v1=True):
"""
Convenience function to get the run's data. Default is the ``primary`` stream.
PARAMETERS
scan_id
*int* or *str* :
Scan (run) identifier.
Positive integer value is ``scan_id`` from run's metadata.
Negative integer value is since most recent run in databroker.
String is run's ``uid`` unique identifier (can abbreviate to
the first characters needed to assure it is unique).
db
*object* :
Bluesky database, an instance of ``databroker.catalog``.
Default: will search existing session for instance.
stream
*str* :
Name of the bluesky data stream to obtain the data.
Default: 'primary'
query
*dict* :
mongo query dictionary, used to filter the results
Default: ``{}``
see: https://docs.mongodb.com/manual/reference/operator/query/
use_v1
*bool* :
Chooses databroker API version between 'v1' or 'v2'.
Default: ``True`` (meaning use the v1 API)
"""
from . import getCatalog
cat = getCatalog(db)
if query:
cat = db_query(cat, query)
stream = stream or "primary"
if use_v1 is None or use_v1:
run = cat.v1[scan_id]
if stream in run.stream_names:
return run.table(stream_name=stream)
else:
run = cat.v2[scan_id]
if hasattr(run, stream):
return run[stream].read().to_dataframe()
raise AttributeError(f"No such stream '{stream}' in run '{scan_id}'.")
[docs]
@versionadded(version="1.5.1")
def getRunDataValue(scan_id, key, db=None, stream="primary", query=None, idx=-1, use_v1=True):
"""
Convenience function to get value of key in run stream.
Defaults are last value of key in primary stream.
PARAMETERS
scan_id
*int* or *str* :
Scan (run) identifier.
Positive integer value is ``scan_id`` from run's metadata.
Negative integer value is since most recent run in databroker.
String is run's ``uid`` unique identifier (can abbreviate to
the first characters needed to assure it is unique).
key
*str* :
Name of the key (data column) in the table of the stream's data.
Must match *identically*.
db
*object* :
Bluesky database, an instance of ``databroker.catalog``.
Default: will search existing session for instance.
stream
*str* :
Name of the bluesky data stream to obtain the data.
Default: 'primary'
query
*dict* :
mongo query dictionary, used to filter the results
Default: ``{}``
see: https://docs.mongodb.com/manual/reference/operator/query/
idx
*int* or *str* :
List index of value to be returned from column of table.
Can be ``0`` for first value, ``-1`` for last value, ``"mean"``
for average value, or ``"all"`` for the full list of values.
Default: ``-1``
use_v1
*bool* :
Chooses databroker API version between 'v1' or 'v2'.
Default: ``True`` (meaning use the v1 API)
"""
if idx is None:
idx = -1
try:
_idx = int(idx)
except ValueError:
_idx = str(idx).lower()
if isinstance(_idx, str) and _idx not in "all mean".split():
raise KeyError(f"Did not understand 'idx={idx}', use integer, 'all', or 'mean'.")
stream = stream or "primary"
table = getRunData(scan_id, db=db, stream=stream, query=query)
if key not in table:
raise KeyError(f"'{key}' not found in scan {scan_id} stream '{stream}'.")
data = table[key]
if _idx == "all":
return data.values
elif _idx == "mean":
return data.mean()
elif (0 <= _idx < len(data)) or (_idx < 0):
return data.values[_idx]
raise KeyError(f"Cannot reference idx={idx} in scan {scan_id} stream'{stream}' key={key}.")
[docs]
@versionadded(version="1.5.1")
def listRunKeys(
scan_id,
key_fragment="",
db=None,
stream="primary",
query=None,
strict=False,
use_v1=True,
):
"""
Convenience function to list all keys (column names) in the scan's stream (default: primary).
PARAMETERS
scan_id
*int* or *str* :
Scan (run) identifier.
Positive integer value is ``scan_id`` from run's metadata.
Negative integer value is since most recent run in databroker.
String is run's ``uid`` unique identifier (can abbreviate to
the first characters needed to assure it is unique).
key_fragment
*str* :
Part or all of key name to be found in selected stream.
For instance, if you specify ``key_fragment="lakeshore"``,
it will return all the keys that include ``lakeshore``.
db
*object* :
Bluesky database, an instance of ``databroker.catalog``.
Default: will search existing session for instance.
stream
*str* :
Name of the bluesky data stream to obtain the data.
Default: 'primary'
query
*dict* :
mongo query dictionary, used to filter the results
Default: ``{}``
see: https://docs.mongodb.com/manual/reference/operator/query/
strict
*bool* :
Should the ``key_fragment`` be matched identically (``strict=True``)
or matched by lower case comparison (``strict=False``)?
Default: ``False``
use_v1
*bool* :
Chooses databroker API version between 'v1' or 'v2'.
Default: ``True`` (meaning use the v1 API)
"""
table = getRunData(scan_id, db=db, stream=stream, query=query, use_v1=use_v1)
# fmt: off
if len(key_fragment):
output = [
col
for col in table.columns
if (
(strict and key_fragment in col)
or (not strict and key_fragment.lower() in col.lower())
)
]
else:
output = list(table.columns)
# fmt: on
return output
[docs]
@dataclasses.dataclass
class ListRuns:
"""
List the runs from the given catalog according to some options.
EXAMPLE::
ListRuns(cat).to_dataframe()
PUBLIC METHODS
.. autosummary::
~to_dataframe
~to_table
~parse_runs
INTERNAL METHODS
.. autosummary::
~_get_by_key
~_check_cat
~_apply_search_filters
~_check_keys
"""
cat: object = None
query: object = None
keys: object = None
missing: str = ""
num: int = 20
reverse: bool = True
since: object = None
sortby: str = "time"
timefmt: str = "%Y-%m-%d %H:%M:%S"
until: object = None
ids: "typing.Any" = None
hints_override: bool = False
_default_keys = "scan_id time plan_name detectors"
def _get_by_key(self, md, key):
"""
Get run's metadata value by key.
Look in ``start`` document first.
If not found, look in ``stop`` document.
If not found, report using ``self.missing``.
If ``key`` is found but value is None, report as ``self.missing``.
The ``time`` key will be formatted by the ``self.timefmt`` value.
See https://strftime.org/ for examples. The special ``timefmt="raw"``
is used to report time as the raw value (floating point time as used in
python's ``time.time()``).
A special syntax of ``key`` allows reporting of keys in other
metadata subdictionaries. The syntax is ``doc.key`` (such as
specifying the ``time`` key from the ``stop`` document:
``stop.time``) where a ``.`` is used to separate the
subdictionary name (``doc``) from the ``key``.
**Note**:
It is not possible to ``sortby`` the dotted-key syntax
at this time.
"""
v = self.missing
if key == "time":
v = md["start"][key]
if self.timefmt != "raw":
ts = datetime.datetime.fromtimestamp(v)
v = ts.strftime(self.timefmt)
elif key in md["start"]:
v = md["start"].get(key, self.missing)
elif md["stop"] and key in md["stop"]:
v = md["stop"].get(key, self.missing)
elif len(key.split(".")) == 2:
# dotted-key syntax
doc, key = key.split(".")
if md[doc] is not None:
v = md[doc].get(key, self.missing)
if key == "time" and self.timefmt != "raw":
ts = datetime.datetime.fromtimestamp(v)
v = ts.strftime(self.timefmt)
hints = md["start"].get("hints", {})
if (v == self.missing or self.hints_override) and key in hints:
v = hints[key]
return v
def _check_cat(self):
from . import getCatalog
if self.cat is None:
self.cat = getCatalog()
def _apply_search_filters(self):
"""Search for runs from the catalog."""
from databroker.queries import TimeRange
since = self.since or FIRST_DATA
until = self.until or LAST_DATA
self._check_cat()
query = {}
query.update(TimeRange(since=since, until=until))
query.update(self.query or {})
cat = self.cat.v2.search(query)
return cat
[docs]
def parse_runs(self):
"""Parse the runs for the given metadata keys. Return a dict."""
self._check_keys()
cat = self._apply_search_filters()
def _sort(uid):
"""Sort runs in desired order based on metadata key."""
md = self.cat[uid].metadata
for doc in "start stop".split():
if md[doc] and self.sortby in md[doc]:
return md[doc][self.sortby] or self.missing
return self.missing
num_runs_requested = min(abs(self.num), len(cat))
results = {k: [] for k in self.keys}
sequence = () # iterable of run uids
if self.ids is not None:
sequence = []
for k in self.ids:
try:
cat[k] # try to access the run using `k`
sequence.append(k)
except Exception as exc:
logger.warning(
"Could not find run %s in search of catalog %s: %s",
k,
self.cat.name,
exc,
)
else:
from databroker import Broker
from databroker._drivers.mongo_normalized import BlueskyMongoCatalog
MONGO_CATALOG_CLASSES = (Broker, BlueskyMongoCatalog)
if isinstance(cat, MONGO_CATALOG_CLASSES) and self.sortby == "time":
if self.reverse:
# the default rendering: from MongoDB in reverse time order
sequence = iter(cat)
else:
# by increasing time order
sequence = [uid for uid in cat][::-1]
else:
# full search in Python
sequence = sorted(cat.keys(), key=_sort, reverse=self.reverse)
count = 0
for uid in sequence:
run = cat[uid]
for k in self.keys:
results[k].append(self._get_by_key(run.metadata, k))
count += 1
if count >= num_runs_requested:
break
return results
def _check_keys(self):
"""Check that self.keys is a list of strings."""
self.keys = self.keys or self._default_keys
if isinstance(self.keys, str):
self.keys = self.keys.split()
[docs]
@deprecated(version="1.6.14")
def to_dataframe(self):
"""**Deprecated**: Output as pandas DataFrame object"""
warnings.warn("'ListRuns.to_dataframe()' method is deprecated.")
dd = self.parse_runs()
return TableStyle.pandas.value(dd, columns=self.keys)
[docs]
@deprecated(version="1.6.14")
def to_table(self, fmt=None):
"""**Deprecated**: Output as pyRestTable object."""
warnings.warn("'ListRuns.to_table()' method is deprecated.")
dd = self.parse_runs()
return TableStyle.pyRestTable.value(dd=dd).reST(fmt=fmt or "simple")
[docs]
@versionadded(version="1.5.0")
def listruns(
cat=None,
keys=None,
missing="",
num=20,
printing=None, # DEPRECATED
reverse=True,
since=None,
sortby="time",
tablefmt=None, # DEPRECATED
table_style=TableStyle.pyRestTable,
timefmt="%Y-%m-%d %H:%M:%S",
until=None,
ids=None,
hints_override=False,
**query,
):
"""
List runs from catalog.
This function provides a thin interface to the highly-reconfigurable
``ListRuns()`` class in this package.
PARAMETERS
cat
*object* :
Instance of databroker v1 or v2 catalog.
keys
*str* or *[str]* or None:
Include these additional keys from the start document.
(default: ``None`` means ``"scan_id time plan_name detectors"``)
missing
*str*:
Test to report when a value is not available.
(default: ``""``)
hints_override *bool*:
For a key that appears in both the metadata and the hints,
override the metadata value if the same key is found in the hints.
(default: ``False``)
ids
*[int]* or *[str]*:
List of ``uid`` or ``scan_id`` value(s).
Can mix different kinds in the same list.
Also can specify offsets (e.g., ``-1``).
According to the rules for ``databroker`` catalogs,
a string is a ``uid`` (partial representations allowed),
an int is ``scan_id`` if positive or an offset if negative.
(default: ``None``)
num
*int* :
Make the table include the ``num`` most recent runs.
(default: ``20``)
printing *bool* or *str* :
Deprecated.
reverse
*bool* :
If ``True``, sort in descending order by ``sortby``.
(default: ``True``)
since
*str* :
include runs that started on or after this ISO8601 time
(default: ``"1995-01-01"``)
sortby
*str* :
Sort columns by this key, found by exact match in either
the ``start`` or ``stop`` document.
(default: ``"time"``)
tablefmt *str* :
Deprecated. Use ``table_style`` instead.
table_style *object* :
Either ``TableStyle.pyRestTable`` (default) or ``TableStyle.pandas``,
using values from :class:`apstools.utils.TableStyle`.
.. note:: ``pandas.DataFrame`` wll truncate long text to at most 50 characters.
timefmt
*str* :
The ``time`` key (also includes keys ``"start.time"`` and ``"stop.time"``)
will be formatted by the ``self.timefmt`` value.
See https://strftime.org/ for examples. The special ``timefmt="raw"``
is used to report time as the raw value (floating point time as used in
python's ``time.time()``).
(default: ``"%Y-%m-%d %H:%M:%S",``)
until
*str* :
include runs that started before this ISO8601 time
(default: ``2100-12-31``)
``**query``
*dict* :
Any additional keyword arguments will be passed to
the databroker to refine the search for matching runs
using the ``mongoquery`` package.
RETURNS
object:
``None`` or ``str`` or ``pd.DataFrame()`` object
.. EXAMPLE
TODO
"""
lr = ListRuns(
cat=cat,
keys=keys,
missing=missing,
num=num,
query=query,
reverse=reverse,
since=since,
sortby=sortby,
timefmt=timefmt,
until=until,
ids=ids,
hints_override=hints_override,
)
table_style = table_style or TableStyle.pyRestTable
if tablefmt is not None:
if tablefmt == "dataframe":
choice = "TableStyle.pandas"
table_style = TableStyle.pandas
else:
choice = "TableStyle.pyRestTable"
table_style = TableStyle.pyRestTable
# fmt: off
warnings.warn(
f"Use 'table_style={choice}' instead of"
f" deprecated option 'tablefmt=\"{tablefmt}\"'."
)
# fmt: on
if printing is not None:
warnings.warn(f"Keyword argument 'printing={printing}' is deprecated.")
return table_style.value(lr.parse_runs())
[docs]
def summarize_runs(since=None, db=None):
"""
Report bluesky run metrics from the databroker.
* How many different plans?
* How many runs?
* How many times each run was used?
* How frequently? (TODO:)
PARAMETERS
since
*str* :
Report all runs since this ISO8601 date & time
(default: ``1995``)
db
*object* :
Instance of ``databroker.Broker()``
(default: ``db`` from the IPython shell)
"""
from databroker.queries import TimeRange
from . import ipython_shell_namespace
db = db or ipython_shell_namespace()["db"]
# no APS X-ray experiment data before 1995!
since = since or "1995"
cat = db.v2.search(TimeRange(since=since))
plans = defaultdict(list)
t0 = time.time()
for n, uid in enumerate(cat):
t1 = time.time()
# next step is very slow (0.01 - 0.5 seconds each!)
run = cat[uid]
t2 = time.time()
plan_name = run.metadata["start"].get("plan_name", "unknown")
# fmt:off
dt = datetime.datetime.fromtimestamp(
run.metadata["start"]["time"]
).isoformat()
# fmt:on
scan_id = run.metadata["start"].get("scan_id", "unknown")
# fmt: off
plans[plan_name].append(
dict(
plan_name=plan_name,
dt=dt,
time_start=dt,
uid=uid,
scan_id=scan_id,
)
)
# fmt: on
logger.debug(
"%s %s dt1=%4.01fus dt2=%5.01fms %s",
scan_id,
dt,
(t1 - t0) * 1e6,
(t2 - t1) * 1e3,
plan_name,
)
t0 = time.time()
def sorter(plan_name):
return len(plans[plan_name])
table = TableStyle.pyRestTable.value()
table.labels = "plan quantity".split()
for k in sorted(plans.keys(), key=sorter, reverse=True):
table.addRow((k, sorter(k)))
table.addRow(("TOTAL", n + 1))
print(table)
# -----------------------------------------------------------------------------
# :author: BCDA
# :copyright: (c) 2017-2025, UChicago Argonne, LLC
#
# Distributed under the terms of the Argonne National Laboratory Open Source License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------