Source code for id3c.startup
"""
Start Bluesky Data Acquisition sessions of all kinds.
Includes:
* Python script
* IPython console
* Jupyter notebook
* Bluesky queueserver
"""
# Standard Library Imports
import logging
from pathlib import Path
# Core Functions
from apsbits.core.best_effort_init import init_bec_peaks
from apsbits.core.catalog_init import init_catalog
from apsbits.core.instrument_init import init_instrument
from apsbits.core.instrument_init import make_devices
from apsbits.core.run_engine_init import init_RE
from apsbits.core.session_setup import prepare_bits
# Utility functions
from apsbits.utils.aps_functions import host_on_aps_subnet
from apsbits.utils.baseline_setup import setup_baseline_stream
# Configuration functions
from apsbits.utils.config_loaders import load_config
from apsbits.utils.helper_functions import register_bluesky_magics
from apsbits.utils.helper_functions import running_in_queueserver
from apsbits.utils.logging_setup import configure_logging
# Run first so we get better diagnostics about subsequent problems
configure_logging()
prepare_bits()
# Configuration block
# Get the path to the instrument package
# Load configuration to be used by the instrument.
[docs]
instrument_path = Path(__file__).parent
[docs]
iconfig_path = instrument_path / "configs" / "iconfig.yml"
[docs]
iconfig = load_config(iconfig_path)
# Additional logging configuration
# only needed if using different logging setup
# from the one in the apsbits package
configure_logging(extra_logging_configs_path=extra_logging_configs_path)
[docs]
logger = logging.getLogger(__name__)
logger.info("Starting Instrument with iconfig: %s", iconfig_path)
# initialize instrument
instrument, oregistry = init_instrument("guarneri")
# Discard oregistry items loaded above.
oregistry.clear()
# Configure the session with callbacks, devices, and plans.
# aps_dm_setup(iconfig.get("DM_SETUP_FILE"))
# Command-line tools, such as %wa, %ct, ...
register_bluesky_magics()
# Bluesky initialization block
bec, peaks = init_bec_peaks(iconfig)
[docs]
cat = init_catalog(iconfig)
RE, sd = init_RE(iconfig, subscribers=[bec, cat])
# Optional Nexus callback block
# delete this block if not using Nexus
if iconfig.get("NEXUS_DATA_FILES", {}).get("ENABLE", False):
from .callbacks.demo_nexus_callback import nxwriter_init
[docs]
nxwriter = nxwriter_init(RE, iconfig)
# Optional SPEC callback block
# delete this block if not using SPEC
if iconfig.get("SPEC_DATA_FILES", {}).get("ENABLE", False):
from .callbacks.demo_spec_callback import init_specwriter_with_RE
from .callbacks.demo_spec_callback import newSpecFile # noqa: F401
from .callbacks.demo_spec_callback import spec_comment # noqa: F401
[docs]
specwriter = init_specwriter_with_RE(RE, iconfig) # noqa: F811
# These imports must come after the above setup.
# Queue server block
if running_in_queueserver():
### To make all the standard plans available in QS, import by '*', otherwise import
### plan by plan.
from apstools.plans import lineup2 # noqa: F401
from bluesky.plans import * # noqa: F403
else:
# Import bluesky plans and stubs with prefixes set by common conventions.
# The apstools plans and utils are imported by '*'.
from apstools.plans import * # noqa: F403
from apstools.utils import * # noqa: F403
from bluesky import plan_stubs as bps # noqa: F401
from bluesky import plans as bp # noqa: F401
# Experiment specific logic, device and plan loading. # Create the devices.
make_devices(clear=False, file="devices.yml", device_manager=instrument)
if host_on_aps_subnet():
make_devices(clear=False, file="devices_aps_only.yml", device_manager=instrument)
# Install bidirectional Bluesky-session interlock between
# sample_stage.omega and laser_optics. See module docstring for scope
# and limitations (no EPICS-side protection).
from .devices.omega_laser_interlock import setup_omega_laser_interlock # noqa: E402
setup_omega_laser_interlock(oregistry)
# Setup baseline stream with connect=False is default
# Devices with the label 'baseline' will be added to the baseline stream.
setup_baseline_stream(sd, oregistry, connect=False)
# Configure area detector plugins
# FIXME: Why only one HDF5 file for each step in
# bp.scan([eiger2], sample_stage.xprime, 0, 0.2, 5)
# Expected either 5 files with 1 frame each, or 1 file with 5 frames.
# Instead we get 1 file with 1 frame.
# We get 5 TIFF files, as expected.
[docs]
eiger2 = oregistry["eiger2"]
eiger2.hdf1.kind = "hinted"
eiger2.tiff1.kind = "hinted"
from .plans.sim_plans import sim_count_plan # noqa: E402, F401
from .plans.sim_plans import sim_print_plan # noqa: E402, F401
from .plans.sim_plans import sim_rel_scan_plan # noqa: E402, F401