Source code for apstools.utils.pvregistry

"""
EPICS PV Registry
+++++++++++++++++++++++++++++++++++++++

.. autosummary::

   ~findbyname
   ~findbypv
   ~PVRegistry
"""

import logging
from collections import defaultdict

import ophyd

from . import full_dotted_name
from . import ipython_shell_namespace

logger = logging.getLogger(__name__)


_findpv_registry = None


[docs]class PVRegistry: """ Cross-reference EPICS PVs with ophyd EpicsSignalBase objects. """ def __init__(self, ns=None): """ Search ophyd objects for PV or ophyd names. The rebuild starts with the IPython console namespace (and defaults to the global namespace if the former cannot be obtained). PARAMETERS ns *dict* or `None`: namespace dictionary """ self._pvdb = defaultdict(lambda: defaultdict(list)) self._odb = {} self._device_name = None self._known_device_names = [] g = ns or ipython_shell_namespace() or globals() # kickoff the registration process # fmt: off logger.debug( "Cross-referencing EPICS PVs with Python objects & ophyd symbols" ) # fmt: on self._ophyd_epicsobject_walker(g) def _ophyd_epicsobject_walker(self, parent): """ Walk through the parent object for ophyd Devices & EpicsSignals. This function is used to rebuild the ``self._pvdb`` object. """ if isinstance(parent, dict): keys = parent.keys() else: keys = parent.component_names _nm_base = self._device_name for k in keys: if isinstance(parent, dict): _nm_base = [] v = self._ref_dict(parent, k) else: v = self._ref_object_attribute(parent, k) if v is None: continue # print(k, type(v)) if isinstance(v, ophyd.signal.EpicsSignalBase): try: self._signal_processor(v) self._odb[v.name] = ".".join(self._device_name + [k]) except (KeyError, RuntimeError) as exc: # fmt: off logger.error( "Exception while examining key '%s': (%s)", k, exc ) # fmt: on elif isinstance(v, ophyd.Device): # print("Device", v.name) self._device_name = _nm_base + [k] if v.name not in self._known_device_names: self._odb[v.name] = ".".join(self._device_name) self._known_device_names.append(v.name) self._ophyd_epicsobject_walker(v) def _ref_dict(self, parent, key): """Accessor used by ``_ophyd_epicsobject_walker()``""" return parent[key] def _ref_object_attribute(self, parent, key): """Accessor used by ``_ophyd_epicsobject_walker()``""" try: obj = getattr(parent, key, None) return obj except (KeyError, RuntimeError, TimeoutError) as exc: # fmt: off logger.error( "Exception while getting object '%s.%s': (%s)", ".".join(self._device_name), key, exc, ) # fmt: on def _register_signal(self, signal, pv, mode): """Register a signal with the given mode.""" fdn = full_dotted_name(signal) if fdn not in self._pvdb[pv][mode]: self._pvdb[pv][mode].append(fdn) def _signal_processor(self, signal): """Register a signal's read & write PVs.""" self._register_signal(signal, signal._read_pv.pvname, "R") if hasattr(signal, "_write_pv"): self._register_signal(signal, signal._write_pv.pvname, "W")
[docs] def search_by_mode(self, pvname, mode="R"): """Search for PV in specified mode.""" if mode not in ["R", "W"]: raise ValueError(f"Incorrect mode given ({mode}. Must be either `R` or `W`.") return self._pvdb[pvname][mode]
[docs] def search(self, pvname): """Search for PV in both read & write modes.""" return dict( read=self.search_by_mode(pvname, "R"), write=self.search_by_mode(pvname, "W"), )
def _get_pv_registry(force_rebuild, ns): """ Check if need to build/rebuild the PV registry. PARAMETERS force_rebuild *bool* : If ``True``, rebuild the internal registry that maps EPICS PV names to ophyd objects. ns *dict* or `None` : Namespace dictionary of Python objects. """ global _findpv_registry if _findpv_registry is None or force_rebuild: _findpv_registry = PVRegistry(ns=ns) return _findpv_registry
[docs]def findbyname(oname, force_rebuild=False, ns=None): """ Find the ophyd (dotted name) object associated with the given ophyd name. PARAMETERS oname *str* : ophyd name to search force_rebuild *bool* : If ``True``, rebuild the internal registry that maps ophyd names to ophyd objects. ns *dict* or `None` : Namespace dictionary of Python objects. RETURNS str or ``None``: Name of the ophyd object. EXAMPLE:: In [45]: findbyname("adsimdet_cam_acquire") Out[45]: 'adsimdet.cam.acquire' (new in apstools 1.5.0) """ return _get_pv_registry(force_rebuild, ns).ophyd_search(oname)
[docs]def findbypv(pvname, force_rebuild=False, ns=None): """ Find all ophyd objects associated with the given EPICS PV. PARAMETERS pvname *str* : EPICS PV name to search force_rebuild *bool* : If ``True``, rebuild the internal registry that maps EPICS PV names to ophyd objects. ns *dict* or `None` : Namespace dictionary of Python objects. RETURNS dict or ``None``: Dictionary of matching ophyd objects, keyed by how the PV is used by the ophyd signal. The keys are ``read`` and ``write``. EXAMPLE:: In [45]: findbypv("ad:cam1:Acquire") Out[45]: {'read': [], 'write': ['adsimdet.cam.acquire']} In [46]: findbypv("ad:cam1:Acquire_RBV") Out[46]: {'read': ['adsimdet.cam.acquire'], 'write': []} """ return _get_pv_registry(force_rebuild, ns).search(pvname)
# ----------------------------------------------------------------------------- # :author: Pete R. Jemian # :email: jemian@anl.gov # :copyright: (c) 2017-2023, UChicago Argonne, LLC # # Distributed under the terms of the Argonne National Laboratory Open Source License. # # The full license is in the file LICENSE.txt, distributed with this software. # -----------------------------------------------------------------------------