Source code for mxcubecore.HardwareObjects.abstract.AbstractActuator

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""Abstract Actuator."""

from __future__ import annotations

import abc
import math
from ast import literal_eval

from gevent.lock import RLock

from mxcubecore.BaseHardwareObjects import HardwareObject

__copyright__ = """ Copyright © 2010-2022 by the MXCuBE collaboration """
__license__ = "LGPLv3+"


[docs]class AbstractActuator(HardwareObject): """Abstract actuator defines methods common to all moving devices. The ``_set_value`` method is the only abstract method that needs to be overloaded in each implementation. Attributes: _nominal_value (float | None): Current actuator value. default_value (float | None): Value specified by XML property, otherwise ``None``. _nominal_limits (tuple[float | None, float | None]): Values specified by XML property, otherwise ``None``. actuator_name (str | None): Actuator name specified by XML property, otherwise ``None``. read_only (bool): Read-only flag specified by XML property, otherwise ``False``. username (str): Emits: valueChanged (tuple[int]): Tuple whose first and only item is the new value. Emitted during initialization of the hardware object and when setting a new value. limitsChanged (tuple[tuple[int, int]]): Tuple whose first and only item is a two-item tuple of the new limits (low limit first and high limit second). Emitted by ``update_limits`` if limit values are changed. stateChanged (tuple): Tuple whose first and only item is the new state. Emitted by ``force_emit_signals`` """ __metaclass__ = abc.ABCMeta unit = None def __init__(self, name: str): super().__init__(name) self._nominal_value = None self._nominal_limits = (None, None) self.actuator_name = None self.read_only = False self.default_value = None self.username = None self._lock = RLock()
[docs] def init(self): """Init properties: actuator_name, username, read_only and default_value.""" self.actuator_name = self.get_property("actuator_name") self.read_only = self.get_property("read_only") or False self.default_value = self.get_property("default_value") if self.default_value is not None: self.update_value(self.default_value) self._nominal_limits = self.get_property("default_limits", (None, None)) if isinstance(self._nominal_limits, str): self._nominal_limits = tuple(literal_eval(self._nominal_limits)) self.username = self.get_property("username")
[docs] @abc.abstractmethod def get_value(self): """Read the actuator position. Returns: Actuator position. """ return None
[docs] def get_limits(self): """Return actuator low and high limits. Returns: (tuple): Two-item tuple (low limit, high limit). """ return self._nominal_limits
[docs] def set_limits(self, limits: tuple) -> None: """Set actuator low and high limits and emit signal ``limitsChanged``. Args: limits (tuple): Two-item tuple (low limit, high limit). Raises: ValueError: Attempt to set limits for read-only actuator. """ if self.read_only: raise ValueError("Attempt to set limits for read-only Actuator") self._nominal_limits = limits self.emit("limitsChanged", (self._nominal_limits,))
[docs] def validate_value(self, value) -> bool: """Check if the value is within limits. Args: value(numerical): Value. Returns: ``True`` if within the limits, ``False`` otherwise. """ if value is None: return True if math.isnan(value) or math.isinf(value): return False if None in self._nominal_limits: return True return self._nominal_limits[0] <= value <= self._nominal_limits[1]
@abc.abstractmethod def _set_value(self, value): """Implementation of specific set actuator logic. Args: value: Target value. """
[docs] def set_value(self, value, timeout: float = 0) -> None: """Set actuator to value. If ``timeout == 0``: return at once and do not wait (default). If ``timeout is None``: wait forever. Args: value: target value timeout (float): Optional timeout in seconds. Default is ``0``: do not wait. Raises: ValueError: Invalid value or attempt to set read only actuator. RuntimeError: Timeout waiting for status ready (from ``wait_ready``): """ with self._lock: if self.read_only: raise ValueError("Attempt to set value for read-only Actuator") if self.validate_value(value): self._set_value(value) self.update_value() if timeout == 0: return self.wait_ready(timeout) else: raise ValueError(f"Invalid value {value}")
[docs] def update_value(self, value=None) -> None: """Check if the value has changed and emit signal ``valueChanged``. Args: value: Value. """ if value is None: value = self.get_value() if self._nominal_value != value: self._nominal_value = value self.emit("valueChanged", (value,))
[docs] def update_limits(self, limits=None) -> None: """Check if the limits have changed and emit signal ``limitsChanged``. Args: limits (tuple): Two-item tuple (low limit, high limit). """ if not limits: limits = self.get_limits() if self._nominal_limits != limits: # All values are not NaN if not any(isinstance(lim, float) and math.isnan(lim) for lim in limits): self._nominal_limits = limits self.emit("limitsChanged", (limits,))
[docs] def re_emit_values(self) -> None: """Update values for all internal attributes.""" self.update_value(self.get_value()) self.update_limits(self.get_limits()) super(AbstractActuator, self).re_emit_values()
[docs] def force_emit_signals(self) -> None: """Force emission of all signals. Method is called from GUI. Do not call it from within a hardware object. """ self.emit("valueChanged", (self.get_value(),)) self.emit("limitsChanged", (self.get_limits(),)) self.emit("stateChanged", (self.get_state(),))