Source code for apstools.devices.shutters

"""
Shutters
+++++++++++++++++++++++++++++++++++++++

.. autosummary::

   ~ApsPssShutter
   ~ApsPssShutterWithStatus
   ~EpicsMotorShutter
   ~EpicsOnOffShutter
   ~OneSignalShutter
   ~ShutterBase
   ~SimulatedApsPssShutterWithStatus
"""

import threading
import time

import numpy as np
from ophyd import Component
from ophyd import Device
from ophyd import DeviceStatus
from ophyd import EpicsMotor
from ophyd import EpicsSignal
from ophyd import EpicsSignalRO
from ophyd import FormattedComponent
from ophyd import Signal


[docs]class ShutterBase(Device): """ Base class for all shutter Devices. .. index:: Ophyd Device; ShutterBase PARAMETERS value *str* : any from ``self.choices`` (typically "open" or "close") valid_open_values *[str]* : A list of lower-case text values that are acceptable for use with the ``set()`` command to open the shutter. valid_close_values *[str]* : A list of lower-case text values that are acceptable for use with the ``set()`` command to close the shutter. open_value *number* : The actual value to send to open ``signal`` to open the shutter. (default = 1) close_value *number* : The actual value to send to close ``signal`` to close the shutter. (default = 0) delay_s *float* : time to wait (s) after move is complete, does not wait if shutter already in position (default = 0) busy *Signal* : (internal) tells if a move is in progress unknown_state *str* : (constant) Text reported by ``state`` when not open or closed. cannot move to this position (default = "unknown") name *str* : (kwarg, required) object's canonical name """ # fmt: off valid_open_values = ["open", "opened"] # lower-case strings ONLY valid_close_values = ["close", "closed"] # fmt: on open_value = 1 # value of "open" close_value = 0 # value of "close" delay_s = 0.0 # time to wait (s) after move is complete busy = Component(Signal, value=False) unknown_state = "unknown" # cannot move to this position # - - - - likely to override these methods in subclass - - - -
[docs] def open(self): """ BLOCKING: request shutter to open, called by ``set()``. Must implement in subclass of ShutterBase() EXAMPLE:: if not self.isOpen: self.signal.put(self.open_value) if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here """ raise NotImplementedError("must implement in subclass")
[docs] def close(self): """ BLOCKING: request shutter to close, called by ``set()``. Must implement in subclass of ShutterBase() EXAMPLE:: if not self.isClosed: self.signal.put(self.close_value) if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here """ raise NotImplementedError("must implement in subclass")
@property def state(self): """ returns ``open``, ``close``, or ``unknown`` Must implement in subclass of ShutterBase() EXAMPLE:: if self.signal.get() == self.open_value: result = self.valid_open_values[0] elif self.signal.get() == self.close_value: result = self.valid_close_values[0] else: result = self.unknown_state return result """ raise NotImplementedError("must implement in subclass") # - - - - - - possible to override in subclass - - - - - - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.valid_open_values = list(map(self.lowerCaseString, self.valid_open_values)) self.valid_close_values = list(map(self.lowerCaseString, self.valid_close_values)) @property def isOpen(self): """is the shutter open?""" return str(self.state) == self.valid_open_values[0] @property def isClosed(self): """is the shutter closed?""" return str(self.state) == self.valid_close_values[0]
[docs] def inPosition(self, target): """is the shutter at the target position?""" self.validTarget(target) __value__ = self.lowerCaseString(target) if __value__ in self.valid_open_values and self.isOpen: return True elif __value__ in self.valid_close_values and self.isClosed: return True return False
[docs] def set(self, value, **kwargs): """ plan: request the shutter to open or close PARAMETERS value *str* : any from ``self.choices`` (typically "open" or "close") kwargs *dict* : ignored at this time """ if self.busy.get(): raise RuntimeError("shutter is operating") __value__ = self.lowerCaseString(value) self.validTarget(__value__) status = DeviceStatus(self) if self.inPosition(__value__): # no need to move, cut straight to the end status._finished(success=True) else: def move_it(): # runs in a thread, no need to "yield from" self.busy.put(True) if __value__ in self.valid_open_values: self.open() elif __value__ in self.valid_close_values: self.close() self.busy.put(False) status._finished(success=True) # get it moving threading.Thread(target=move_it, daemon=True).start() return status
# - - - - - - not likely to override in subclass - - - - - -
[docs] def addCloseValue(self, text): """a synonym to close the shutter, use with set()""" self.valid_close_values.append(self.lowerCaseString(text)) return self.choices # return the list of acceptable values
[docs] def addOpenValue(self, text): """a synonym to open the shutter, use with set()""" self.valid_open_values.append(self.lowerCaseString(text)) return self.choices # return the list of acceptable values
@property def choices(self): """return list of acceptable choices for set()""" return self.valid_open_values + self.valid_close_values
[docs] def lowerCaseString(self, value): """ensure any given value is a lower-case string""" return str(value).lower()
[docs] def validTarget(self, target, should_raise=True): """ return whether (or not) target value is acceptable for self.set() raise ValueError if not acceptable (default) """ acceptable_values = self.choices ok = self.lowerCaseString(target) in acceptable_values if not ok and should_raise: msg = "received " + str(target) msg += " : should be only one of " msg += " | ".join(acceptable_values) raise ValueError(msg) return ok
[docs]class OneSignalShutter(ShutterBase): """ Shutter Device using one Signal for open and close. .. index:: Ophyd Device; OneSignalShutter PARAMETERS signal ``EpicsSignal`` or ``Signal`` : (override in subclass) The ``signal`` is the comunication to the hardware. In a subclass, the hardware may have more than one communication channel to use. See the ``ApsPssShutter`` as an example. name *str* : (kwarg, required) object's canonical name See ``ShutterBase`` for more parameters. EXAMPLE Create a simulated shutter: shutter = OneSignalShutter(name="shutter") open the shutter (interactively): shutter.open() Check the shutter is open: In [144]: shutter.isOpen Out[144]: True Use the shutter in a Bluesky plan. Set a post-move delay time of 1.0 seconds. Be sure to use ``yield from``, such as:: def in_a_plan(shutter): shutter.delay_s = 1.0 t0 = time.time() print("Shutter state: " + shutter.state, time.time()-t0) yield from bps.abs_set(shutter, "open", wait=True) # wait for completion is optional print("Shutter state: " + shutter.state, time.time()-t0) yield from bps.mv(shutter, "open") # do it again print("Shutter state: " + shutter.state, time.time()-t0) yield from bps.mv(shutter, "close") # ALWAYS waits for completion print("Shutter state: " + shutter.state, time.time()-t0) RE(in_a_plan(shutter)) which gives this output: Shutter state: close 1.7642974853515625e-05 Shutter state: open 1.0032124519348145 Shutter state: open 1.0057861804962158 Shutter state: close 2.009695529937744 The strings accepted by `set()` are defined in two lists: `valid_open_values` and `valid_close_values`. These lists are treated (internally to `set()`) as lower case strings. Example, add "o" & "x" as aliases for "open" & "close": shutter.addOpenValue("o") shutter.addCloseValue("x") shutter.set("o") shutter.set("x") """ signal = Component(Signal, value=0) @property def state(self): """is shutter "open", "close", or "unknown"?""" if self.signal.get() == self.open_value: result = self.valid_open_values[0] elif self.signal.get() == self.close_value: result = self.valid_close_values[0] else: result = self.unknown_state return result
[docs] def open(self): """BLOCKING: request shutter to open, called by set()""" if not self.isOpen: self.signal.put(self.open_value) if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here
[docs] def close(self): """BLOCKING: request shutter to close, called by set()""" if not self.isClosed: self.signal.put(self.close_value) if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here
[docs]class ApsPssShutter(ShutterBase): """ APS PSS shutter .. index:: Ophyd Device; ApsPssShutter * APS PSS shutters have separate bit PVs for open and close * set either bit, the shutter moves, and the bit resets a short time later * no indication that the shutter has actually moved from the bits (see :func:`ApsPssShutterWithStatus()` for alternative) Since there is no direct indication that a shutter has moved, the ``state`` property will always return *unknown* and the ``isOpen`` and ``isClosed`` properties will always return *False*. A consequence of the unknown state is that the shutter will always be commanded to move (and wait the ``delay_s`` time), even if it is already at that position. This device could keep track of the last commanded position, but that is not guaranteed to be true since the shutter could be moved from other software. The default ``delay_s`` has been set at *1.2 s* to allow for shutter motion. Change this as desired. Advise if this default should be changed. PARAMETERS prefix *str* : EPICS PV prefix name *str* : (kwarg, required) object's canonical name close_pv *str* : (kwarg, optional) Name of EPICS PV to close the shutter. If ``None``, defaults to ``"{prefix}Close"``. open_pv *str* : (kwarg, optional) Name of EPICS PV to open the shutter. If ``None``, defaults to ``"{prefix}Open"``. EXAMPLE:: shutter_a = ApsPssShutter("2bma:A_shutter:", name="shutter") shutter_a.wait_for_connection() shutter_a.open() shutter_a.close() shutter_a.set("open") shutter_a.set("close") When using the shutter in a plan, be sure to use ``yield from``, such as:: def in_a_plan(shutter): yield from abs_set(shutter, "open", wait=True) # do something yield from abs_set(shutter, "close", wait=True) RE(in_a_plan(shutter_a)) The strings accepted by `set()` are defined in two lists: `valid_open_values` and `valid_close_values`. These lists are treated (internally to `set()`) as lower case strings. Example, add "o" & "x" as aliases for "open" & "close": shutter_a.addOpenValue("o") shutter_a.addCloseValue("x") shutter_a.set("o") shutter_a.set("x") """ # bo records that reset after a short time, set to 1 to move # note: upper-case first characters here (unique to 9-ID)? open_signal = FormattedComponent(EpicsSignal, "{self.open_pv}") close_signal = FormattedComponent(EpicsSignal, "{self.close_pv}") delay_s = 1.2 # allow time for shutter to move def __init__(self, prefix, *args, close_pv=None, open_pv=None, **kwargs): self.open_pv = open_pv or f"{prefix}Open" self.close_pv = close_pv or f"{prefix}Close" super().__init__(prefix, *args, **kwargs) @property def state(self): """is shutter "open", "close", or "unknown"?""" return self.unknown_state # no state info available
[docs] def open(self, timeout=10): """request the shutter to open (timeout is ignored)""" if not self.isOpen: self.open_signal.put(1) # wait for the shutter to move if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here # reset that signal (if not done by EPICS) if self.open_signal.get() == 1: self.open_signal.put(0)
[docs] def close(self, timeout=10): """request the shutter to close (timeout is ignored)""" if not self.isClosed: self.close_signal.put(1) # wait for the shutter to move if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here # reset that signal (if not done by EPICS) if self.close_signal.get() == 1: self.close_signal.put(0)
[docs]class ApsPssShutterWithStatus(ApsPssShutter): """ APS PSS shutter with separate status PV .. index:: Ophyd Device; ApsPssShutterWithStatus * APS PSS shutters have separate bit PVs for open and close * set either bit, the shutter moves, and the bit resets a short time later * a separate status PV tells if the shutter is open or closed (see :func:`ApsPssShutter()` for alternative) PARAMETERS prefix *str* : EPICS PV prefix state_pv *str* : Name of EPICS PV that provides shutter's current state. name *str* : (kwarg, required) object's canonical name EXAMPLE:: A_shutter = ApsPssShutterWithStatus( "2bma:A_shutter:", "PA:02BM:STA_A_FES_OPEN_PL", name="A_shutter") B_shutter = ApsPssShutterWithStatus( "2bma:B_shutter:", "PA:02BM:STA_B_SBS_OPEN_PL", name="B_shutter") A_shutter.wait_for_connection() B_shutter.wait_for_connection() A_shutter.open() A_shutter.close() or A_shutter.set("open") A_shutter.set("close") When using the shutter in a plan, be sure to use `yield from`. def in_a_plan(shutter): yield from abs_set(shutter, "open", wait=True) # do something yield from abs_set(shutter, "close", wait=True) RE(in_a_plan(A_shutter)) """ # bi record ZNAM=OFF, ONAM=ON pss_state = FormattedComponent(EpicsSignalRO, "{self.state_pv}") pss_state_open_values = [1] pss_state_closed_values = [0] delay_s = 0 # let caller add time after the move _poll_factor_ = 1.5 _poll_s_min_ = 0.002 _poll_s_max_ = 0.15 def __init__(self, prefix, state_pv, *args, **kwargs): self.state_pv = state_pv super().__init__(prefix, *args, **kwargs) @property def state(self): """is shutter "open", "close", or "unknown"?""" # update the list of acceptable values - very inefficient but works for item in self.pss_state.enum_strs[1]: if item not in self.pss_state_open_values: self.pss_state_open_values.append(item) for item in self.pss_state.enum_strs[0]: if item not in self.pss_state_closed_values: self.pss_state_closed_values.append(item) if self.pss_state.get() in self.pss_state_open_values: result = self.valid_open_values[0] elif self.pss_state.get() in self.pss_state_closed_values: result = self.valid_close_values[0] else: result = self.unknown_state return result
[docs] def wait_for_state(self, target, timeout=10, poll_s=0.01): """ wait for the PSS state to reach a desired target PARAMETERS (kwarg, optional) Name of EPICS PV to close the shutter. If ``None``, defaults to ``"{prefix}Close"``. target *[str]* : list of strings containing acceptable values timeout *non-negative number* : (kwarg, optional) Maximum amount of time (seconds) to wait for PSS state to reach target. If ``None``, defaults to ``10``. poll_s *non-negative number* : (kwarg, optional) Time to wait (seconds) in first polling cycle. After first poll, this will be increased by ``_poll_factor_`` up to a maximum time of ``_poll_s_max_``. If ``None``, defaults to ``0.01``. """ if timeout is not None: expiration = time.time() + max(timeout, 0) # ensure non-negative timeout else: expiration = None # ensure the poll delay is reasonable if poll_s > self._poll_s_max_: poll_s = self._poll_s_max_ elif poll_s < self._poll_s_min_: poll_s = self._poll_s_min_ while self.pss_state.get() not in target: time.sleep(poll_s) if poll_s < self._poll_s_max_: poll_s *= self._poll_factor_ # progressively longer if expiration is not None and time.time() > expiration: msg = f"Timeout ({timeout} s) waiting for shutter state" msg += f" to reach a value in {target}" raise TimeoutError(msg)
[docs] def open(self, timeout=10): """request the shutter to open""" if not self.isOpen: self.open_signal.put(1) # wait for the shutter to move self.wait_for_state(self.pss_state_open_values, timeout=timeout) # wait as caller specified if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here # reset that signal (if not done by EPICS) if self.open_signal.get() == 1: self.open_signal.put(0)
[docs] def close(self, timeout=10): """request the shutter to close""" if not self.isClosed: self.close_signal.put(1) # wait for the shutter to move self.wait_for_state(self.pss_state_closed_values, timeout=timeout) # wait as caller specified if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here # reset that signal (if not done by EPICS) if self.close_signal.get() == 1: self.close_signal.put(0)
[docs]class SimulatedApsPssShutterWithStatus(ApsPssShutterWithStatus): """ Simulated APS PSS shutter .. index:: Ophyd Device; SimulatedApsPssShutterWithStatus PARAMETERS prefix *str* : EPICS PV prefix name *str* : (kwarg, required) object's canonical name EXAMPLE:: sim = SimulatedApsPssShutterWithStatus(name="sim") """ open_signal = Component(Signal, value=0) close_signal = Component(Signal, value=0) pss_state = FormattedComponent(Signal, value="close") def __init__(self, *args, **kwargs): # was: super(ApsPssShutter, self).__init__("", *args, **kwargs) super(SimulatedApsPssShutterWithStatus, self).__init__("", "", *args, **kwargs) self.pss_state_open_values += self.valid_open_values self.pss_state_closed_values += self.valid_close_values
[docs] def wait_for_state(self, target, timeout=10, poll_s=0.01): """ wait for the PSS state to reach a desired target PARAMETERS target *[str]* : list of strings containing acceptable values timeout *non-negative number* : Ignored in the simulation. poll_s *non-negative number* : Ignored in the simulation. """ simulated_response_time_s = np.random.uniform(0.1, 0.9) time.sleep(simulated_response_time_s) self.pss_state.put(target[0])
@property def state(self): """is shutter "open", "close", or "unknown"?""" if self.pss_state.get() in self.pss_state_open_values: result = self.valid_open_values[0] elif self.pss_state.get() in self.pss_state_closed_values: result = self.valid_close_values[0] else: result = self.unknown_state return result
[docs]class EpicsMotorShutter(OneSignalShutter): """ Shutter, implemented with an EPICS motor moved between two positions .. index:: Ophyd Device; EpicsMotorShutter PARAMETERS prefix *str* : EPICS PV prefix name *str* : (kwarg, required) object's canonical name EXAMPLE:: tomo_shutter = EpicsMotorShutter("2bma:m23", name="tomo_shutter") tomo_shutter.wait_for_connection() tomo_shutter.close_value = 1.0 # default tomo_shutter.open_value = 0.0 # default tomo_shutter.tolerance = 0.01 # default tomo_shutter.open() tomo_shutter.close() # or, when used in a plan def planA(): yield from abs_set(tomo_shutter, "open", group="O") yield from wait("O") yield from abs_set(tomo_shutter, "close", group="X") yield from wait("X") def planA(): yield from abs_set(tomo_shutter, "open", wait=True) yield from abs_set(tomo_shutter, "close", wait=True) def planA(): yield from mv(tomo_shutter, "open") yield from mv(tomo_shutter, "close") """ signal = Component(EpicsMotor, "") tolerance = 0.01 # how close is considered in-position? @property def state(self): """is shutter "open", "close", or "unknown"?""" if abs(self.signal.user_readback.get() - self.open_value) <= self.tolerance: result = self.valid_open_values[0] elif abs(self.signal.user_readback.get() - self.close_value) <= self.tolerance: result = self.valid_close_values[0] else: result = self.unknown_state return result
[docs] def open(self): """move motor to BEAM NOT BLOCKED position, interactive use""" if not self.isOpen: self.signal.move(self.open_value) if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here
[docs] def close(self): """move motor to BEAM BLOCKED position, interactive use""" self.signal.move(self.close_value) if not self.isClosed: self.signal.move(self.close_value) if self.delay_s > 0: time.sleep(self.delay_s) # blocking call OK here
[docs]class EpicsOnOffShutter(OneSignalShutter): """ Shutter using a single EPICS PV moved between two positions .. index:: Ophyd Device; EpicsOnOffShutter Use for a shutter controlled by a single PV which takes a value for the close command and a different value for the open command. The current position is determined by comparing the value of the control with the expected open and close values. PARAMETERS prefix *str* : EPICS PV prefix name *str* : (kwarg, required) object's canonical name EXAMPLE:: bit_shutter = EpicsOnOffShutter("2bma:bit1", name="bit_shutter") bit_shutter.wait_for_connection() bit_shutter.close_value = 0 # default bit_shutter.open_value = 1 # default bit_shutter.open() bit_shutter.close() # or, when used in a plan def planA(): yield from mv(bit_shutter, "open") yield from mv(bit_shutter, "close") """ signal = Component(EpicsSignal, "")
# ----------------------------------------------------------------------------- # :author: Pete R. Jemian # :email: jemian@anl.gov # :copyright: (c) 2017-2024, UChicago Argonne, LLC # # Distributed under the terms of the Argonne National Laboratory Open Source License. # # The full license is in the file LICENSE.txt, distributed with this software. # -----------------------------------------------------------------------------