Source code for instrument.plans.dm_plans

"""
Plans in support of APS Data Management.

.. autosummary::

    ~dm_kickoff_workflow
    ~dm_list_processing_jobs
    ~dm_submit_workflow_job
"""

import logging

from apstools.devices import DM_WorkflowConnector
from apstools.utils import dm_api_proc
from apstools.utils import share_bluesky_metadata_with_dm
from bluesky import plan_stubs as bps

logger = logging.getLogger(__name__)
logger.bsdev(__file__)


[docs] def dm_kickoff_workflow(run, argsDict, timeout=None, wait=False): """ Start a DM workflow for this bluesky run and share run's metadata with DM. PARAMETERS: run (*obj*): Bluesky run object (such as 'run = cat[uid]'). argsDict (*dict*): Dictionary of parameters needed by 'workflowName'. At minimum, most workflows expect these keys: 'filePath' and 'experimentName'. Consult the workflow for the expected content of 'argsDict'. timeout (*number*): When should bluesky stop reporting on this DM workflow job (if it has not ended). Units are seconds. Default is forever. wait (*bool*): Should this plan stub wait for the job to end? Default is 'False'. """ dm_workflow = DM_WorkflowConnector(name="dm_workflow") if timeout is None: # Disable periodic reports, use a long time (s). timeout = 999_999_999_999 yield from bps.mv(dm_workflow.concise_reporting, True) yield from bps.mv(dm_workflow.reporting_period, timeout) workflow_name = argsDict.pop["workflowName"] yield from dm_workflow.run_as_plan( workflow=workflow_name, wait=wait, timeout=timeout, **argsDict, ) # Upload bluesky run metadata to APS DM. share_bluesky_metadata_with_dm(argsDict["experimentName"], workflow_name, run) # Users requested the DM workflow job ID be printed to the console. dm_workflow._update_processing_data() job_id = dm_workflow.job_id.get() job_stage = dm_workflow.stage_id.get() job_status = dm_workflow.status.get() print(f"DM workflow id: {job_id!r} status: {job_status} stage: {job_stage}")
[docs] def dm_list_processing_jobs(exclude=None): """ Show all the DM jobs with status not excluded. Excluded status (default): 'done', 'failed' """ yield from bps.null() # make this a plan stub api = dm_api_proc() if exclude is None: exclude = ("done", "failed") for j in api.listProcessingJobs(): if j["status"] not in exclude: print( f"id={j['id']!r}" f" submitted={j.get('submissionTimestamp')}" f" status={j['status']!r}" )
[docs] def dm_submit_workflow_job(workflowName, argsDict): """ Low-level plan stub to submit a job to a DM workflow. It is recommended to use dm_kickoff_workflow() instead. This plan does not share run metadata with DM. PARAMETERS: workflowName (*str*): Name of the DM workflow to be run. argsDict (*dict*): Dictionary of parameters needed by 'workflowName'. At minimum, most workflows expect these keys: 'filePath' and 'experimentName'. Consult the workflow for the expected content of 'argsDict'. """ yield from bps.null() # make this a plan stub api = dm_api_proc() job = api.startProcessingJob(api.username, workflowName, argsDict) print(f"workflow={workflowName!r} id={job['id']!r}")