Source code for instrument.utils.controls_setup
"""
EPICS & ophyd related setup
===========================
.. autosummary::
~oregistry
~set_control_layer
~set_timeouts
~epics_scan_id_source
~connect_scan_id_pv
"""
import logging
import ophyd
from ophyd.signal import EpicsSignalBase
from ophydregistry import Registry
from .config_loaders import iconfig
logger = logging.getLogger(__name__)
logger.bsdev(__file__)
re_config = iconfig.get("RUN_ENGINE", {})
DEFAULT_CONTROL_LAYER = "PyEpics"
DEFAULT_TIMEOUT = 60 # default used next...
ophyd_config = iconfig.get("OPHYD", {})
[docs]
def epics_scan_id_source(_md):
"""
Callback function for RunEngine. Returns *next* scan_id to be used.
* Ignore metadata dictionary passed as argument.
* Get current scan_id from PV.
* Apply lower limit of zero.
* Increment (so that scan_id numbering starts from 1).
* Set PV with new value.
* Return new value.
Exception will be raised if PV is not connected when next
``bps.open_run()`` is called.
"""
scan_id_epics = oregistry.find(name="scan_id_epics")
new_scan_id = max(scan_id_epics.get(), 0) + 1
scan_id_epics.put(new_scan_id)
return new_scan_id
[docs]
def connect_scan_id_pv(RE, pv: str = None):
"""
Define a PV to use for the RunEngine's `scan_id`.
"""
from ophyd import EpicsSignal
pv = pv or re_config.get("SCAN_ID_PV")
if pv is None:
return
logger.info("Using EPICS PV %r for RunEngine 'scan_id'", pv)
scan_id_epics = EpicsSignal(pv, name="scan_id_epics")
# Setup the RunEngine to call epics_scan_id_source()
# which uses the EPICS PV to provide the scan_id.
RE.scan_id_source = epics_scan_id_source
scan_id_epics.wait_for_connection()
try:
RE.md["scan_id_pv"] = scan_id_epics.pvname
RE.md["scan_id"] = scan_id_epics.get() # set scan_id from EPICS
except TypeError:
pass # Ignore PersistentDict errors that only raise when making the docs
[docs]
def set_control_layer(control_layer: str = DEFAULT_CONTROL_LAYER):
"""
Communications library between ophyd and EPICS Channel Access.
Choices are: PyEpics (default) or caproto.
OPHYD_CONTROL_LAYER is an application of "lessons learned."
Only used in a couple rare cases where PyEpics code was failing.
It's defined here since it was difficult to find how to do this
in the ophyd documentation.
"""
control_layer = ophyd_config.get("CONTROL_LAYER", control_layer)
ophyd.set_cl(control_layer.lower())
logger.info("using ophyd control layer: %r", ophyd.cl.name)
[docs]
def set_timeouts():
"""Set default timeout for all EpicsSignal connections & communications."""
if not EpicsSignalBase._EpicsSignalBase__any_instantiated:
# Only BEFORE any EpicsSignalBase (or subclass) are created!
timeouts = ophyd_config.get("TIMEOUTS", {})
EpicsSignalBase.set_defaults(
auto_monitor=True,
timeout=timeouts.get("PV_READ", DEFAULT_TIMEOUT),
write_timeout=timeouts.get("PV_WRITE", DEFAULT_TIMEOUT),
connection_timeout=iconfig.get("PV_CONNECTION", DEFAULT_TIMEOUT),
)
oregistry = Registry(auto_register=True)
"""Registry of all ophyd-style Devices and Signals."""