Source code for apsbits.demo_instrument.plans.dm_plans
"""
Plans in support of APS Data Management
=======================================
.. autosummary::
    ~dm_kickoff_workflow
    ~dm_list_processing_jobs
    ~dm_submit_workflow_job
"""
import logging
from apstools.devices import DM_WorkflowConnector
from apstools.utils import dm_api_proc
from apstools.utils import share_bluesky_metadata_with_dm
from bluesky import plan_stubs as bps
logger = logging.getLogger(__name__)
logger.bsdev(__file__)
[docs]
def dm_kickoff_workflow(run, argsDict, timeout=None, wait=False):
    """
    Start a DM workflow for this bluesky run and share run's metadata with DM.
    PARAMETERS:
    run (*obj*): Bluesky run object (such as 'run = cat[uid]').
    argsDict (*dict*): Dictionary of parameters needed by 'workflowName'.
        At minimum, most workflows expect these keys: 'filePath' and
        'experimentName'.  Consult the workflow for the expected
        content of 'argsDict'.
    timeout (*number*): When should bluesky stop reporting on this
        DM workflow job (if it has not ended). Units are seconds.
        Default is forever.
    wait (*bool*): Should this plan stub wait for the job to end?
        Default is 'False'.
    """
    dm_workflow = DM_WorkflowConnector(name="dm_workflow")
    if timeout is None:
        # Disable periodic reports, use a long time (s).
        timeout = 999_999_999_999
    yield from bps.mv(dm_workflow.concise_reporting, True)
    yield from bps.mv(dm_workflow.reporting_period, timeout)
    workflow_name = argsDict.pop["workflowName"]
    yield from dm_workflow.run_as_plan(
        workflow=workflow_name,
        wait=wait,
        timeout=timeout,
        **argsDict,
    )
    # Upload bluesky run metadata to APS DM.
    share_bluesky_metadata_with_dm(argsDict["experimentName"], workflow_name, run)
    # Users requested the DM workflow job ID be printed to the console.
    dm_workflow._update_processing_data()
    job_id = dm_workflow.job_id.get()
    job_stage = dm_workflow.stage_id.get()
    job_status = dm_workflow.status.get()
    print(f"DM workflow id: {job_id!r}  status: {job_status}  stage: {job_stage}") 
[docs]
def dm_list_processing_jobs(exclude=None):
    """
    Show all the DM jobs with status not excluded.
    Excluded status (default): 'done', 'failed'
    """
    yield from bps.null()  # make this a plan stub
    api = dm_api_proc()
    if exclude is None:
        exclude = ("done", "failed")
    for j in api.listProcessingJobs():
        if j["status"] not in exclude:
            print(
                f"id={j['id']!r}"
                f"  submitted={j.get('submissionTimestamp')}"
                f"  status={j['status']!r}"
            ) 
[docs]
def dm_submit_workflow_job(workflowName, argsDict):
    """
    Low-level plan stub to submit a job to a DM workflow.
    It is recommended to use dm_kickoff_workflow() instead.
    This plan does not share run metadata with DM.
    PARAMETERS:
    workflowName (*str*): Name of the DM workflow to be run.
    argsDict (*dict*): Dictionary of parameters needed by 'workflowName'.
        At minimum, most workflows expect these keys: 'filePath' and
        'experimentName'.  Consult the workflow for the expected
        content of 'argsDict'.
    """
    yield from bps.null()  # make this a plan stub
    api = dm_api_proc()
    job = api.startProcessingJob(api.username, workflowName, argsDict)
    print(f"workflow={workflowName!r}  id={job['id']!r}")