"""
Connect with APS Data Management workflows.
Example::
import bluesky
from apstools.devices import DM_WorkflowConnector
RE = bluesky.RunEngine()
dm_workflow = DM_WorkflowConnector(name="dm_workflow", labels=["DM"])
RE(
dm_workflow.run_as_plan(
workflow="example-01",
filePath="/home/beams/S1IDTEST/.bashrc"
)
)
.. note:: :class:`~DM_WorkflowConnector()` requires APS Data Management package (``aps-dm-api >=5``)
.. autosummary::
~DM_WorkflowConnector
from: https://github.com/APS-1ID-MPE/hexm-bluesky/blob/main/instrument/devices/data_management.py
"""
__all__ = """
DM_WorkflowConnector
""".split()
import logging
import os
import time
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # allow any log content at this level
logger.info(__file__)
print(__file__)
from ophyd import Component
from ophyd import Device
from ophyd import Signal
from ..utils import run_in_thread
DM_STATION_NAME = str(os.environ.get("DM_STATION_NAME", "terrier")).lower()
NOT_AVAILABLE = "-n/a-"
NOT_RUN_YET = "not_run"
REPORT_PERIOD_DEFAULT = 10
REPORT_PERIOD_MIN = 1
STARTING = "running"
TIMEOUT_DEFAULT = 180 # TODO: Consider removing the timeout feature
[docs]class DM_WorkflowConnector(Device):
"""
Support for the APS Data Management tools.
The DM workflow dictionary of arguments (``workflow_args``) needs special
attention. Python's ``dict`` structure is not compatible with MongoDB. In
turn, ophyd does not support it. A custom plan can choose how to use `the
`workflow_args`` dictionary:
- use with DM workflow, as planned
- add ``workflow_args`` to the start metadata
- write as run stream::
from apstools.devices import make_dict_device
from apstools.plans import write_stream
yield from write_stream(
[make_dict_device(workflow_args, name="kwargs")],
"workflow_args"
)
.. autosummary::
~api
~idle
~processing_jobs
~report_processing_stages
~report_status
~run_as_plan
~start_workflow
~workflows
~put_if_different
~_update_processing_data
"""
job = None # DM processing job (must update during workflow execution)
_api = None # DM common API
owner = Component(Signal, value=DM_STATION_NAME, kind="config")
workflow = Component(Signal, value="")
workflow_args = {}
job_id = Component(Signal, value=NOT_RUN_YET)
# exit_status = Component(Signal, value=NOT_RUN_YET)
run_time = Component(Signal, value=0)
stage_id = Component(Signal, value=NOT_RUN_YET)
status = Component(Signal, value=NOT_RUN_YET)
polling_period = Component(Signal, value=0.1, kind="config")
reporting_period = Component(Signal, value=REPORT_PERIOD_DEFAULT, kind="config")
concise_reporting = Component(Signal, value=True, kind="config")
def __repr__(self):
"""Default representation of class instance."""
# fmt: off
innards = f"owner='{self.owner.get()}'"
innards += f", workflow='{self.workflow.get()}'"
for key, value in sorted(self.workflow_args.items()):
innards += f", {key}={value!r}"
# fmt: on
if self.job_id.get() != NOT_RUN_YET:
innards += f", id={self.job_id.get()!r}"
run_time = self.run_time.get()
if run_time > 0:
innards += f", {run_time=:.2f}"
innards += f", stage_id={self.stage_id.get()!r}"
innards += f", status={self.status.get()!r}"
return f"{self.__class__.__name__}({innards})"
def __init__(self, name=None, workflow=None, **kwargs):
"""Constructor."""
if name is None:
raise KeyError("Must provide value for 'name'.")
super().__init__(name=name)
if workflow is not None:
self.workflow.put(workflow)
self.workflow_args.update(kwargs)
[docs] def put_if_different(self, signal, value):
"""Put ophyd signal only if new value is different."""
if signal.get() != value:
signal.put(value)
[docs] def getJob(self):
"""Get the current processing job object."""
return self.api.getProcessingJobById(self.owner.get(), self.job_id.get())
[docs] def _update_processing_data(self):
"""
(internal) Called periodically (while process runs) to update self.job.
Also updates certain ophyd signals.
"""
if self.job_id.get() == NOT_RUN_YET:
return
self.job = self.getJob()
rep = self.job.getDictRep()
# self.put_if_different(self.exit_status, rep.get("exitStatus", NOT_AVAILABLE))
self.put_if_different(self.run_time, rep.get("runTime", -1))
self.put_if_different(self.stage_id, rep.get("stage", NOT_AVAILABLE))
self.put_if_different(self.status, rep.get("status", NOT_AVAILABLE))
@property
def api(self):
"""Local copy of DM Processing API object."""
from dm import ProcApiFactory
if self._api is None:
self._api = ProcApiFactory.getWorkflowProcApi()
return self._api
@property
def idle(self):
"""Is DM Processing idle?"""
return self.status.get() in (NOT_RUN_YET, "done")
[docs] def report_status(self, t_offset=None):
"""Status report."""
if self.concise_reporting.get():
t = f"{self.__class__.__name__} {self.name}:"
t += f" {self.workflow.get()!r}"
t += f" {self.job_id.get()[:8]!r}"
t += f" {self.status.get()!r}"
t += f" {self.stage_id.get()!r}"
if t_offset is not None:
t += f" elapsed: {time.time()-t_offset:.0f}s"
logger.info(t)
else:
self.report_processing_stages()
[docs] def start_workflow(self, workflow="", timeout=TIMEOUT_DEFAULT, **kwargs):
"""Kickoff a DM workflow with optional wait & timeout."""
if workflow == "":
workflow = self.workflow.get()
else:
workflow = workflow
if workflow == "":
raise AttributeError("Must define a workflow name.")
self.put_if_different(self.workflow, workflow)
wfargs = self.workflow_args.copy()
wfargs.update(kwargs)
self.start_time = time.time()
self._report_deadline = self.start_time
def update_report_deadline(catch_up=False):
period = max(self.reporting_period.get(), REPORT_PERIOD_MIN)
if catch_up: # catch-up (if needed) and set in near future
new_deadline = round(self._report_deadline, 2)
while time.time() > new_deadline:
new_deadline += period
else:
new_deadline = time.time() + period
self._report_deadline = new_deadline
def _reporter(*args, **kwargs):
update_report_deadline(catch_up=False)
self.report_status(t_offset=self.start_time)
def _cleanup():
"""Call when DM workflow finishes."""
self.stage_id.unsubscribe_all()
self.status.unsubscribe_all()
if "_report_deadline" in dir(self):
del self._report_deadline
@run_in_thread
def _run_DM_workflow_thread():
logger.info("run DM workflow: %s with timeout=%s s", self.workflow.get(), timeout)
self.job = self.api.startProcessingJob(
workflowOwner=self.owner.get(),
workflowName=workflow,
argsDict=wfargs,
)
self.job_id.put(self.job["id"])
logger.info(f"DM workflow started: {self}")
# wait for workflow to finish
deadline = time.time() + timeout
while time.time() < deadline and self.status.get() not in "done failed timeout".split():
self._update_processing_data()
if "_report_deadline" not in dir(self) or time.time() >= self._report_deadline:
_reporter()
time.sleep(self.polling_period.get())
_cleanup()
logger.info("Final workflow status: %s", self.status.get())
if self.status.get() in "done failed".split():
logger.info(f"{self}")
self.report_status(self.start_time)
return
self.status.put("timeout")
logger.info(f"{self}")
# fmt: off
logger.error(
"Workflow %s timeout in %s s.", repr(self.workflow.get()), timeout
)
# raise TimeoutError(
# f"Workflow {self.workflow.get()!r}"
# f" did not finish in {timeout} s."
# )
# fmt: on
self.job = None
self.stage_id.put(NOT_RUN_YET)
self.job_id.put(NOT_RUN_YET)
self.status.put(STARTING)
self.stage_id.subscribe(_reporter)
self.status.subscribe(_reporter)
_run_DM_workflow_thread()
[docs] def run_as_plan(self, workflow="", wait=True, timeout=TIMEOUT_DEFAULT, **kwargs):
"""Run the DM workflow as a bluesky plan."""
from bluesky import plan_stubs as bps
if workflow == "":
workflow = self.workflow.get()
self.start_workflow(workflow=workflow, timeout=timeout, **kwargs)
logger.info("plan: workflow started")
if wait:
while not self.idle:
yield from bps.sleep(self.polling_period.get())
@property
def processing_jobs(self):
"""The list of DM processsing jobs."""
return self.api.listProcessingJobs(self.owner.get())
@property
def workflows(self):
"""Return the list of workflows."""
return self.api.listWorkflows(self.owner.get())
[docs] def report_processing_stages(self, truncate=40):
"""
Print a table about each stage of the workflow process.
"""
import pyRestTable
if self.job is None:
return
wf = self.job["workflow"]
stage_keys = "status runTime exitStatus stdOut stdErr".split()
table = pyRestTable.Table()
table.labels = "stage_id process processTime".split() + stage_keys
for stage_id, dstage in wf["stages"].items():
childProcesses = dstage.get("childProcesses", {"": {}})
for k, v in childProcesses.items():
row = [stage_id, k]
status = v.get("status")
if status is None:
processTime = 0
else:
submitTime = v.get("submitTime", time.time())
endTime = v.get("endTime", submitTime) # might be unknown
processTime = max(0, min(endTime - submitTime, 999999))
row.append(round(processTime, 3))
for key in stage_keys:
value = v.get(key, "")
if key in ("runTime"):
value = round(v.get(key, 0), 4)
if key in ("stdOut", "stdErr"):
value = str(value).strip()[:truncate]
row.append(value)
table.addRow(row)
logger.info(
f"{wf['description']!r}"
f" workflow={wf['name']!r}"
f" id={self.job['id'][:8]!r}"
f" elapsed={time.time()-self.start_time:.1f}s"
f"\n{self!r}"
f"\n{table}"
)