Source code for apstools.synApps.swait

"""
Ophyd support for the EPICS synApps swait record

EXAMPLES::

    import apstools.synApps
    calcs = apstools.synApps.UserCalcsDevice("xxx:", name="calcs")

    calc1 = calcs.calc1
    apstools.synApps.setup_random_number_swait(calc1)

    apstools.synApps.setup_incrementer_swait(calcs.calc2)

    calc1.reset()


.. autosummary::

    ~UserCalcN
    ~UserCalcsDevice
    ~SwaitRecord
    ~SwaitRecordChannel
    ~setup_random_number_swait
    ~setup_gaussian_swait
    ~setup_lorentzian_swait
    ~setup_incrementer_swait

:see: https://htmlpreview.github.io/?https://raw.githubusercontent.com/epics-modules/calc/R3-6-1/documentation/swaitRecord.html
"""


from collections import OrderedDict

from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as DDC
from ophyd import EpicsSignal
from ophyd import FormattedComponent as FC
from ophyd.signal import EpicsSignalBase

from .. import utils as APS_utils
from ._common import EpicsRecordDeviceCommonAll
from ._common import EpicsSynAppsRecordEnableMixin

CHANNEL_LETTERS_LIST = "A B C D E F G H I J K L".split()


[docs]class SwaitRecordChannel(Device): """ EPICS synApps synApps swait record: single channel [A-L] .. index:: Ophyd Device; synApps SwaitRecordChannel """ input_value = FC(EpicsSignal, "{self.prefix}.{self._ch_letter}", kind="config") input_pv = FC(EpicsSignal, "{self.prefix}.IN{self._ch_letter}N", kind="config") input_trigger = FC(EpicsSignal, "{self.prefix}.IN{self._ch_letter}P", kind="config") read_attrs = [ "input_value", ] hints = {"fields": read_attrs} def __init__(self, prefix, letter, **kwargs): self._ch_letter = letter super().__init__(prefix, **kwargs)
[docs] def reset(self): """set all fields to default values""" self.input_pv.put("") self.input_value.put(0) self.input_trigger.put("Yes")
def _swait_channels(channel_list): defn = OrderedDict() for chan in channel_list: defn[chan] = (SwaitRecordChannel, "", {"letter": chan}) return defn
[docs]class SwaitRecord(EpicsRecordDeviceCommonAll): """ EPICS synApps swait record: used as ``$(P):userCalc$(N)`` .. index:: Ophyd Device; synApps SwaitRecord .. autosummary:: ~reset """ precision = Cpt(EpicsSignal, ".PREC", kind="config") high_operating_range = Cpt(EpicsSignal, ".HOPR", kind="config") low_operating_range = Cpt(EpicsSignal, ".LOPR", kind="config") calculated_value = Cpt(EpicsSignal, ".VAL", kind="hinted") calculation = Cpt(EpicsSignal, ".CALC", kind="config") output_link_pv = Cpt(EpicsSignal, ".OUTN", kind="config") output_location_name = Cpt(EpicsSignal, ".DOLN", kind="config") output_location_data = Cpt(EpicsSignal, ".DOLD", kind="config") output_data_option = Cpt(EpicsSignal, ".DOPT", kind="config") output_execute_option = Cpt(EpicsSignal, ".OOPT", kind="config") output_execution_delay = Cpt(EpicsSignal, ".ODLY", kind="config") event_to_issue = Cpt(EpicsSignal, ".OEVT", kind="config") read_attrs = APS_utils.itemizer("channels.%s", CHANNEL_LETTERS_LIST) hints = {"fields": read_attrs} channels = DDC(_swait_channels(CHANNEL_LETTERS_LIST)) @property def value(self): return self.calculated_value.get()
[docs] def reset(self): """set all fields to default values""" pvname = self.description.pvname.split(".")[0] self.description.put(pvname) self.scanning_rate.put("Passive") self.calculation.put("0") self.precision.put("5") self.output_location_data.put(0) self.output_location_name.put("") self.output_data_option.put("Use VAL") self.forward_link.put("0") self.output_execution_delay.put(0) self.output_execute_option.put("Every Time") self.output_link_pv.put("") for letter in self.channels.read_attrs: channel = getattr(self.channels, letter) if isinstance(channel, SwaitRecordChannel): channel.reset() self.read_attrs = ["channels.%s" % c for c in CHANNEL_LETTERS_LIST] self.hints = {"fields": self.read_attrs} self.read_attrs.append("calculated_value")
[docs]class UserCalcN(EpicsSynAppsRecordEnableMixin, SwaitRecord): """Single instance of the userCalcN database."""
[docs]class UserCalcsDevice(Device): """ EPICS synApps XXX IOC setup of userCalcs: ``$(P):userCalc$(N)`` .. index:: Ophyd Device; synApps UserCalcsDevice .. autosummary:: ~reset """ enable = Cpt(EpicsSignal, "userCalcEnable", kind="config") calc1 = Cpt(UserCalcN, "userCalc1") calc2 = Cpt(UserCalcN, "userCalc2") calc3 = Cpt(UserCalcN, "userCalc3") calc4 = Cpt(UserCalcN, "userCalc4") calc5 = Cpt(UserCalcN, "userCalc5") calc6 = Cpt(UserCalcN, "userCalc6") calc7 = Cpt(UserCalcN, "userCalc7") calc8 = Cpt(UserCalcN, "userCalc8") calc9 = Cpt(UserCalcN, "userCalc9") calc10 = Cpt(UserCalcN, "userCalc10")
[docs] def reset(self): # lgtm [py/similar-function] """set all fields to default values""" self.calc1.reset() self.calc2.reset() self.calc3.reset() self.calc4.reset() self.calc5.reset() self.calc6.reset() self.calc7.reset() self.calc8.reset() self.calc9.reset() self.calc10.reset() self.read_attrs = ["calc%d" % (c + 1) for c in range(10)]
[docs]def setup_random_number_swait(swait, **kw): """setup swait record to generate random numbers""" swait.reset() swait.scanning_rate.put("Passive") swait.description.put("uniform random numbers") swait.calculation.put("RNDM") swait.scanning_rate.put(".1 second") swait.read_attrs = [ "calculated_value", ] swait.hints = {"fields": swait.read_attrs}
def _setup_peak_swait_(calc, desc, swait, ref_signal, center=0, width=1, scale=1, noise=0.05): """ internal: setup that is common to both Gaussian and Lorentzian swaits PARAMETERS swait *object* : instance of :class:`SwaitRecord` ref_signal *object* : instance of :class:`EpicsSignal` used as $A$ center *float* : EPICS record field $B$, default = 0 width *float* : EPICS record field $C$, default = 1 scale *float* : EPICS record field $D$, default = 1 noise *float* : EPICS record field $E$, default = 0.05 """ # consider a noisy background, as well (needs a couple calcs) if not isinstance(swait, SwaitRecord): raise TypeError("expected SwaitRecord instance, received {type(swait)}") if not isinstance(ref_signal, EpicsSignalBase): raise TypeError("expected EpicsSignalBase instance, received {type(ref_signal)}") if width <= 0: raise ValueError(f"width must be positive, received {width}") if not (0.0 <= noise <= 1.0): raise ValueError(f"noise must be between 0 and 1, received {noise}") swait.reset() swait.scanning_rate.put("Passive") swait.description.put(desc) swait.channels.A.input_pv.put(ref_signal.pvname) swait.channels.B.input_value.put(center) swait.channels.C.input_value.put(width) swait.channels.D.input_value.put(scale) swait.channels.E.input_value.put(noise) swait.calculation.put(calc) swait.scanning_rate.put("I/O Intr") swait.read_attrs = [ "calculated_value", ] swait.hints = {"fields": swait.read_attrs}
[docs]def setup_gaussian_swait(swait, ref_signal, center=0, width=1, scale=1, noise=0.05): """ setup swait for noisy Gaussian calculation: $D*(0.95+E*RNDM)/exp(((A-B)/C)^2)$ PARAMETERS swait *object* : instance of :class:`SwaitRecord` ref_signal *object* : instance of :class:`EpicsSignal` used as $A$ center *float* : instance of :class:`EpicsSignal` used as $B$, default = 0 width *float* : instance of :class:`EpicsSignal` used as $C$, default = 1 scale *float* : instance of :class:`EpicsSignal` used as $D$, default = 1 noise *float* : instance of :class:`EpicsSignal` used as $E$, default = 0.05 """ _setup_peak_swait_( "D*(0.95+E*RNDM)/exp(((A-b)/c)^2)", "noisy Gaussian curve", swait, ref_signal, center=center, width=width, scale=scale, noise=noise, )
[docs]def setup_lorentzian_swait(swait, ref_signal, center=0, width=1, scale=1, noise=0.05): """ setup swait record for noisy Lorentzian calculation: $D*(0.95+E*RNDM)/(1+((A-B)/C)^2)$ PARAMETERS swait *object* : instance of :class:`SwaitRecord` ref_signal *object* : instance of :class:`EpicsSignal` used as $A$ center *float* : instance of :class:`EpicsSignal` used as $B$, default = 0 width *float* : instance of :class:`EpicsSignal` used as $C$, default = 1 scale *float* : instance of :class:`EpicsSignal` used as $D$, default = 1 noise *float* : instance of :class:`EpicsSignal` used as $E$, default = 0.05 """ _setup_peak_swait_( "D*(0.95+E*RNDM)/(1+((A-b)/c)^2)", "noisy Lorentzian curve", swait, ref_signal, center=center, width=width, scale=scale, noise=noise, )
[docs]def setup_incrementer_swait(swait, scan=None, limit=100000): """ setup swait record as an incrementer PARAMETERS swait *object* : instance of :class:`SwaitRecord` scan *text* or *int* or ``None`` any of the EPICS record ``.SCAN`` values, or the index number of the value, set to default if ``None``, default: ``.1 second`` limit *int* or ``None`` set the incrementer back to zero when this number is reached (or passed), default: 100000 """ # consider a noisy background, as well (needs a couple calcs) scan = scan or ".1 second" swait.reset() swait.description.put("incrementer") swait.scanning_rate.put("Passive") pvname = swait.calculated_value.pvname swait.channels.A.input_pv.put(pvname) swait.channels.B.input_value.put(limit) swait.calculation.put("(A+1) % B") swait.precision.put(0) swait.scanning_rate.put(scan) swait.read_attrs = [ "calculated_value", ] swait.hints = {"fields": swait.read_attrs}
# ----------------------------------------------------------------------------- # :author: Pete R. Jemian # :email: jemian@anl.gov # :copyright: (c) 2017-2023, UChicago Argonne, LLC # # Distributed under the terms of the Argonne National Laboratory Open Source License. # # The full license is in the file LICENSE.txt, distributed with this software. # -----------------------------------------------------------------------------