Source code for apsbits.core.run_engine_init

"""
Setup and initialize the Bluesky RunEngine.
===========================================

This module provides the function init_RE to create and configure a
Bluesky RunEngine with metadata storage, subscriptions, and various
settings based on a configuration dictionary.

.. autosummary::
    init_RE
"""

import logging
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple

import bluesky
from bluesky.utils import ProgressBarManager

from apsbits.utils.controls_setup import connect_scan_id_pv
from apsbits.utils.controls_setup import set_control_layer
from apsbits.utils.controls_setup import set_timeouts
from apsbits.utils.metadata import get_md_path
from apsbits.utils.metadata import re_metadata
from apsbits.utils.stored_dict import StoredDict

logger = logging.getLogger(__name__)
logger.bsdev(__file__)


[docs] def init_RE( iconfig: Dict[str, Any], bec_instance: Optional[Any] = None, cat_instance: Optional[Any] = None, ) -> Tuple[bluesky.RunEngine, bluesky.SupplementalData]: """ Initialize and configure a Bluesky RunEngine instance. This function creates a Bluesky RunEngine, sets up metadata storage, subscriptions, and various preprocessors based on the provided configuration dictionary. It configures the control layer and timeouts, attaches supplemental data for baselines and monitors, and optionally adds a progress bar and metadata updates from a catalog or BestEffortCallback. Parameters: iconfig (Dict[str, Any]): Configuration dictionary with keys including: - "RUN_ENGINE": A dict containing RunEngine-specific settings. - "DEFAULT_METADATA": (Optional) Default metadata for the RunEngine. - "USE_PROGRESS_BAR": (Optional) Boolean flag to enable the progress bar. - "OPHYD": A dict for control layer settings (e.g., "CONTROL_LAYER" and "TIMEOUTS"). bec_instance (Optional[Any]): Instance of BestEffortCallback for subscribing to the RunEngine. Defaults to None. cat_instance (Optional[Any]): Instance of a databroker catalog for subscribing to the RunEngine. Defaults to None. Returns: Tuple[bluesky.RunEngine, bluesky.SupplementalData]: A tuple containing the configured RunEngine instance and its associated SupplementalData. Notes: The function attempts to set up persistent metadata storage in the RE.md attr. If an error occurs during the creation of the metadata storage handler, the error is logged and the function proceeds without persistent metadata. Subscriptions are added for the catalog and BestEffortCallback if provided, and additional configurations such as control layer, timeouts, and progress bar integration are applied. """ re_config = iconfig.get("RUN_ENGINE", {}) # Steps that must occur before any EpicsSignalBase (or subclass) is created. control_layer = iconfig.get("OPHYD", {}).get("CONTROL_LAYER", "PyEpics") set_control_layer(control_layer=control_layer) set_timeouts(timeouts=iconfig.get("OPHYD", {}).get("TIMEOUTS", {})) RE = bluesky.RunEngine() """The Bluesky RunEngine object.""" sd = bluesky.SupplementalData() """Supplemental data providing baselines and monitors for the RunEngine.""" RE.preprocessors.append(sd) MD_PATH = get_md_path(iconfig) # Save/restore RE.md dictionary in the specified order. if MD_PATH is not None: handler_name = StoredDict logger.debug( "Selected %r to store 'RE.md' dictionary in %s.", handler_name, MD_PATH, ) try: if handler_name == "PersistentDict": RE.md = bluesky.utils.PersistentDict(MD_PATH) else: RE.md = StoredDict(MD_PATH) except Exception as error: print( "\n" f"Could not create {handler_name} for RE metadata. Continuing " f"without saving metadata to disk. {error=}\n" ) logger.warning("%s('%s') error:%s", handler_name, MD_PATH, error) if cat_instance is not None: RE.md.update(re_metadata(iconfig, cat_instance)) # programmatic metadata RE.md.update(re_config.get("DEFAULT_METADATA", {})) RE.subscribe(cat_instance.v1.insert) if bec_instance is not None: RE.subscribe(bec_instance) RE.preprocessors.append(sd) scan_id_pv = iconfig.get("RUN_ENGINE", {}).get("SCAN_ID_PV") connect_scan_id_pv(RE, pv=scan_id_pv) if re_config.get("USE_PROGRESS_BAR", True): # Add a progress bar. pbar_manager = ProgressBarManager() RE.waiting_hook = pbar_manager return RE, sd