Source code for apstools.devices.aps_data_management

"""
Connect with APS Data Management workflows.

Example::

    import bluesky
    from apstools.devices import DM_WorkflowConnector

    RE = bluesky.RunEngine()

    dm_workflow = DM_WorkflowConnector(name="dm_workflow", labels=["DM"])
    RE(
        dm_workflow.run_as_plan(
            workflow="example-01",
            filePath="/home/beams/S1IDTEST/.bashrc"
        )
    )

.. note::  :class:`~DM_WorkflowConnector()` requires APS Data Management package (``aps-dm-api >=5``)

.. autosummary::

    ~DM_WorkflowConnector

"""

__all__ = """
    DM_WorkflowConnector
""".split()

import logging
import time

from ophyd import Component
from ophyd import Device
from ophyd import Signal

from ..utils import run_in_thread

logger = logging.getLogger(__name__)

NOT_AVAILABLE = "-n/a-"
NOT_RUN_YET = "not_run"
POLLING_PERIOD_S = 1.0
REPORT_PERIOD_DEFAULT = 10
REPORT_PERIOD_MIN = 1
STARTING = "running"
TIMEOUT_DEFAULT = 180  # TODO: Consider removing/renaming the timeout feature


[docs]class DM_WorkflowConnector(Device): """ Support for the APS Data Management tools. The DM workflow dictionary of arguments (``workflow_args``) needs special attention. Python's ``dict`` structure is not compatible with MongoDB. In turn, ophyd does not support it. A custom plan can choose how to use `the `workflow_args`` dictionary: - use with DM workflow, as planned - add ``workflow_args`` to the start metadata - write as run stream:: from apstools.devices import make_dict_device from apstools.plans import write_stream yield from write_stream( [make_dict_device(workflow_args, name="kwargs")], "workflow_args" ) .. autosummary:: ~api ~idle ~processing_jobs ~report_processing_stages ~report_status ~run_as_plan ~start_workflow ~workflows ~put_if_different ~_update_processing_data """ job = None # DM processing job (must update during workflow execution) _api = None # DM processing API owner = Component(Signal, value="", kind="config") workflow = Component(Signal, value="") workflow_args = {} job_id = Component(Signal, value=NOT_RUN_YET) # exit_status = Component(Signal, value=NOT_RUN_YET) run_time = Component(Signal, value=0) stage_id = Component(Signal, value=NOT_RUN_YET) status = Component(Signal, value=NOT_RUN_YET) polling_period = Component(Signal, value=POLLING_PERIOD_S, kind="config") reporting_period = Component(Signal, value=REPORT_PERIOD_DEFAULT, kind="config") concise_reporting = Component(Signal, value=True, kind="config") def __repr__(self): """Default representation of class instance.""" # fmt: off innards = f"owner='{self.owner.get()}'" innards += f", workflow='{self.workflow.get()}'" for key, value in sorted(self.workflow_args.items()): innards += f", {key}={value!r}" # fmt: on if self.job_id.get() != NOT_RUN_YET: innards += f", id={self.job_id.get()!r}" run_time = self.run_time.get() if run_time > 0: innards += f", {run_time=:.2f}" innards += f", stage_id={self.stage_id.get()!r}" innards += f", status={self.status.get()!r}" return f"{self.__class__.__name__}({innards})" def __init__(self, name=None, workflow=None, **kwargs): """Constructor.""" if name is None: raise KeyError("Must provide value for 'name'.") super().__init__(name=name) if workflow is not None: self.workflow.put(workflow) self.workflow_args.update(kwargs) self.owner.put(self.api.username)
[docs] def put_if_different(self, signal, value): """Put ophyd signal only if new value is different.""" if signal.get() != value: signal.put(value)
[docs] def getJob(self): """Get the current processing job object.""" return self.api.getProcessingJobById(self.owner.get(), self.job_id.get())
[docs] def _update_processing_data(self): """ (internal) Called periodically (while process runs) to update self.job. Also updates certain ophyd signals. """ if self.job_id.get() == NOT_RUN_YET: return self.job = self.getJob() rep = self.job.getDictRep() # self.put_if_different(self.exit_status, rep.get("exitStatus", NOT_AVAILABLE)) self.put_if_different(self.run_time, rep.get("runTime", -1)) self.put_if_different(self.stage_id, rep.get("stage", NOT_AVAILABLE)) self.put_if_different(self.status, rep.get("status", NOT_AVAILABLE))
@property def api(self): """Local copy of DM Processing API object.""" from dm import ProcApiFactory if self._api is None: self._api = ProcApiFactory.getWorkflowProcApi() return self._api @property def idle(self): """Is DM Processing idle?""" return self.status.get() in (NOT_RUN_YET, "done")
[docs] def report_status(self, t_offset=None): """Status report.""" if self.concise_reporting.get(): t = f"{self.__class__.__name__} {self.name}:" t += f" {self.workflow.get()!r}" t += f" {self.job_id.get()[:8]!r}" t += f" {self.status.get()!r}" t += f" {self.stage_id.get()!r}" if t_offset is not None: t += f" elapsed: {time.time()-t_offset:.0f}s" logger.info(t) else: self.report_processing_stages()
[docs] def start_workflow(self, workflow="", timeout=TIMEOUT_DEFAULT, **kwargs): """ Kickoff a DM workflow with optional reporting timeout. The reporting process will continue until the workflow ends or the timeout period is exceeded. It does not affect the actual workflow. """ if workflow == "": workflow = self.workflow.get() else: workflow = workflow if workflow == "": raise AttributeError("Must define a workflow name.") self.put_if_different(self.workflow, workflow) wfargs = self.workflow_args.copy() wfargs.update(kwargs) self.start_time = time.time() self._report_deadline = self.start_time def update_report_deadline(catch_up=False): period = max(self.reporting_period.get(), REPORT_PERIOD_MIN) if catch_up: # catch-up (if needed) and set in near future new_deadline = round(self._report_deadline, 2) while time.time() > new_deadline: new_deadline += period else: new_deadline = time.time() + period self._report_deadline = new_deadline def _reporter(*args, **kwargs): update_report_deadline(catch_up=False) self.report_status(t_offset=self.start_time) def _cleanup(): """Call when DM workflow finishes.""" self.stage_id.unsubscribe_all() self.status.unsubscribe_all() if "_report_deadline" in dir(self): del self._report_deadline @run_in_thread def _run_DM_workflow_thread(): logger.info( "run DM workflow: %s with reporting time limit=%s s", self.workflow.get(), timeout, ) self.job = self.api.startProcessingJob( workflowOwner=self.owner.get(), workflowName=workflow, argsDict=wfargs, ) self.job_id.put(self.job["id"]) logger.info(f"DM workflow started: {self}") # wait for workflow to finish deadline = time.time() + timeout while time.time() < deadline and self.status.get() not in "done failed timeout".split(): self._update_processing_data() if "_report_deadline" not in dir(self) or time.time() >= self._report_deadline: _reporter() time.sleep(self.polling_period.get()) _cleanup() logger.info("Final workflow status: %s", self.status.get()) if self.status.get() in "done failed".split(): logger.info(f"{self}") self.report_status(self.start_time) return self.status.put("timeout") logger.info(f"{self}") # fmt: off logger.error( "Workflow %s timeout in %s s.", repr(self.workflow.get()), timeout ) # raise TimeoutError( # f"Workflow {self.workflow.get()!r}" # f" did not finish in {timeout} s." # ) # fmt: on self.job = None self.stage_id.put(NOT_RUN_YET) self.job_id.put(NOT_RUN_YET) self.status.put(STARTING) self.stage_id.subscribe(_reporter) self.status.subscribe(_reporter) _run_DM_workflow_thread()
[docs] def run_as_plan( self, workflow: str = "", wait: bool = True, timeout: int = TIMEOUT_DEFAULT, **kwargs, ): """Run the DM workflow as a bluesky plan.""" from bluesky import plan_stubs as bps if workflow == "": workflow = self.workflow.get() self.start_workflow(workflow=workflow, timeout=timeout, **kwargs) logger.info("plan: workflow started") if wait: while not self.idle: yield from bps.sleep(self.polling_period.get())
@property def processing_jobs(self): """The list of DM processsing jobs.""" return self.api.listProcessingJobs(self.owner.get()) @property def workflows(self): """Return the list of workflows.""" return self.api.listWorkflows(self.owner.get())
[docs] def report_processing_stages(self, truncate=40): """ Print a table about each stage of the workflow process. """ import pyRestTable if self.job is None: return wf = self.job["workflow"] stage_keys = "status runTime exitStatus stdOut stdErr".split() table = pyRestTable.Table() table.labels = "stage_id process processTime".split() + stage_keys for stage_id, dstage in wf["stages"].items(): childProcesses = dstage.get("childProcesses", {"": {}}) for k, v in childProcesses.items(): row = [stage_id, k] status = v.get("status") if status is None: processTime = 0 else: submitTime = v.get("submitTime", time.time()) endTime = v.get("endTime", submitTime) # might be unknown processTime = max(0, min(endTime - submitTime, 999999)) row.append(round(processTime, 3)) for key in stage_keys: value = v.get(key, "") if key in ("runTime"): value = round(v.get(key, 0), 4) if key in ("stdOut", "stdErr"): value = str(value).strip()[:truncate] row.append(value) table.addRow(row) logger.info( f"{wf['description']!r}" f" workflow={wf['name']!r}" f" id={self.job['id'][:8]!r}" f" elapsed={time.time()-self.start_time:.1f}s" f"\n{self!r}" f"\n{table}" )