Source code for id3c.startup

"""
Start Bluesky Data Acquisition sessions of all kinds.

Includes:

* Python script
* IPython console
* Jupyter notebook
* Bluesky queueserver
"""

# Standard Library Imports
import logging
from pathlib import Path

# Core Functions
from apsbits.core.best_effort_init import init_bec_peaks
from apsbits.core.catalog_init import init_catalog
from apsbits.core.instrument_init import init_instrument
from apsbits.core.instrument_init import make_devices
from apsbits.core.run_engine_init import init_RE
from apsbits.core.session_setup import prepare_bits

# Utility functions
from apsbits.utils.aps_functions import host_on_aps_subnet
from apsbits.utils.baseline_setup import setup_baseline_stream

# Configuration functions
from apsbits.utils.config_loaders import load_config
from apsbits.utils.helper_functions import register_bluesky_magics
from apsbits.utils.helper_functions import running_in_queueserver
from apsbits.utils.logging_setup import configure_logging

# Run first so we get better diagnostics about subsequent problems
configure_logging()
prepare_bits()

# Configuration block
# Get the path to the instrument package
# Load configuration to be used by the instrument.
[docs] instrument_path = Path(__file__).parent
[docs] iconfig_path = instrument_path / "configs" / "iconfig.yml"
[docs] iconfig = load_config(iconfig_path)
# Additional logging configuration # only needed if using different logging setup # from the one in the apsbits package
[docs] extra_logging_configs_path = instrument_path / "configs" / "extra_logging.yml"
configure_logging(extra_logging_configs_path=extra_logging_configs_path)
[docs] logger = logging.getLogger(__name__)
logger.info("Starting Instrument with iconfig: %s", iconfig_path) # initialize instrument instrument, oregistry = init_instrument("guarneri") # Discard oregistry items loaded above. oregistry.clear() # Configure the session with callbacks, devices, and plans. # aps_dm_setup(iconfig.get("DM_SETUP_FILE")) # Command-line tools, such as %wa, %ct, ... register_bluesky_magics() # Bluesky initialization block bec, peaks = init_bec_peaks(iconfig)
[docs] cat = init_catalog(iconfig)
RE, sd = init_RE(iconfig, subscribers=[bec, cat]) # Optional Nexus callback block # delete this block if not using Nexus if iconfig.get("NEXUS_DATA_FILES", {}).get("ENABLE", False): from .callbacks.demo_nexus_callback import nxwriter_init
[docs] nxwriter = nxwriter_init(RE, iconfig)
# Optional SPEC callback block # delete this block if not using SPEC if iconfig.get("SPEC_DATA_FILES", {}).get("ENABLE", False): from .callbacks.demo_spec_callback import init_specwriter_with_RE from .callbacks.demo_spec_callback import newSpecFile # noqa: F401 from .callbacks.demo_spec_callback import spec_comment # noqa: F401
[docs] specwriter = init_specwriter_with_RE(RE, iconfig) # noqa: F811
# These imports must come after the above setup. # Queue server block if running_in_queueserver(): ### To make all the standard plans available in QS, import by '*', otherwise import ### plan by plan. from apstools.plans import lineup2 # noqa: F401 from bluesky.plans import * # noqa: F403 else: # Import bluesky plans and stubs with prefixes set by common conventions. # The apstools plans and utils are imported by '*'. from apstools.plans import * # noqa: F403 from apstools.utils import * # noqa: F403 from bluesky import plan_stubs as bps # noqa: F401 from bluesky import plans as bp # noqa: F401 # Experiment specific logic, device and plan loading. # Create the devices. make_devices(clear=False, file="devices.yml", device_manager=instrument) if host_on_aps_subnet(): make_devices(clear=False, file="devices_aps_only.yml", device_manager=instrument) # Install bidirectional Bluesky-session interlock between # sample_stage.omega and laser_optics. See module docstring for scope # and limitations (no EPICS-side protection). from .devices.omega_laser_interlock import setup_omega_laser_interlock # noqa: E402 setup_omega_laser_interlock(oregistry) # Setup baseline stream with connect=False is default # Devices with the label 'baseline' will be added to the baseline stream. setup_baseline_stream(sd, oregistry, connect=False) # Configure area detector plugins # FIXME: Why only one HDF5 file for each step in # bp.scan([eiger2], sample_stage.xprime, 0, 0.2, 5) # Expected either 5 files with 1 frame each, or 1 file with 5 frames. # Instead we get 1 file with 1 frame. # We get 5 TIFF files, as expected.
[docs] eiger2 = oregistry["eiger2"]
eiger2.hdf1.kind = "hinted" eiger2.tiff1.kind = "hinted" from .plans.sim_plans import sim_count_plan # noqa: E402, F401 from .plans.sim_plans import sim_print_plan # noqa: E402, F401 from .plans.sim_plans import sim_rel_scan_plan # noqa: E402, F401