Source code for mxcubecore.HardwareObjects.ExporterMotor

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
Example xml file:
<object class="ExporterMotor">
  <username>phiy</username>
  <exporter_address>wid30bmd2s:9001</exporter_address>
  <actuator_name>AlignmentY</actuator_name>
  <tolerance>1e-2</tolerance>
</object>
optional parameters, with default values. To be used only if the values
differ from the default ones.
  The position attribute is actuator_name+position_suffix
  <position_suffix>Position</position_suffix>
  The state attribute is actuator_name+state_suffix
  <state_suffix>State</state_suffix>
  Use the global application state instead of the motor state.
  <use_global_state>False</use_global_state>
"""

import math
import sys

from gevent import (
    Timeout,
    sleep,
)

from mxcubecore.Command.Exporter import Exporter
from mxcubecore.Command.exporter.ExporterStates import ExporterStates
from mxcubecore.HardwareObjects.abstract.AbstractMotor import AbstractMotor

__copyright__ = """ Copyright © 2019 by the MXCuBE collaboration """
__license__ = "LGPLv3+"


[docs]class ExporterMotor(AbstractMotor): """Motor using the Exporter protocol, based on AbstractMotor""" def __init__(self, name): super().__init__(name) self._motor_pos_suffix = None self._motor_state_suffix = None self._exporter = None self._exporter_address = None self.motor_position_chan = None self.motor_state_chan = None self.use_state = None
[docs] def init(self): """Initialise the motor""" super().init() self._motor_pos_suffix = self.get_property("position_suffix", "Position") self._motor_state_suffix = self.get_property("state_suffix", "State") self.use_state = self.get_property("use_global_state", False) self._exporter_address = self.get_property("exporter_address") _host, _port = self._exporter_address.split(":") self._exporter = Exporter(_host, int(_port)) self.motor_position_chan = self.add_channel( { "type": "exporter", "exporter_address": self._exporter_address, "name": "position", }, self.actuator_name + self._motor_pos_suffix, ) if self.motor_position_chan: self.get_value() self.motor_position_chan.connect_signal("update", self.update_value) if self.use_state: _name = "State" else: _name = self.actuator_name + self._motor_state_suffix self.motor_state_chan = self.add_channel( { "type": "exporter", "exporter_address": self._exporter_address, "name": "motor_state", }, _name, ) if self.motor_state_chan: self.motor_state_chan.connect_signal("update", self._update_state) self.update_state()
[docs] def get_state(self): """Get the motor state. Returns: (enum 'HardwareObjectState'): Motor state. """ try: _state = self.motor_state_chan.get_value().upper() self.specific_state = _state return ExporterStates[_state].value except (KeyError, AttributeError): return self.STATES.UNKNOWN
def _update_state(self, state): try: state = state.upper() state = ExporterStates[state].value except (AttributeError, KeyError): state = self.STATES.UNKNOWN return self.update_state(state) def _get_hwstate(self): """Get the hardware state, reported by the MD2 application. Returns: (string): The state. """ try: return self._exporter.read_property("HardwareState") except Exception: return "Ready" def _get_swstate(self): """Get the software state, reported by the MD2 application. Returns: (string): The state. """ return self._exporter.read_property("State") def _ready(self): """Get the "Ready" state - software and hardware. Returns: (bool): True if both "Ready", False otherwise. """ if ( self._get_swstate() == "Ready" and self._get_hwstate() == "Ready" and self.motor_state_chan.get_value() == "Ready" ): return True return False def _wait_ready(self, timeout=3): """Wait for the state to be "Ready". Args: timeout (float): waiting time [s]. Raises: RuntimeError: Execution timeout. """ with Timeout(timeout, RuntimeError("Execution timeout")): while not self._ready(): sleep(0.01)
[docs] def wait_move(self, timeout=20): """Wait until the end of move ended, using the application state. Args: timeout(float): Timeout [s]. Default value is 20 s """ self._wait_ready(timeout)
[docs] def wait_motor_move(self, timeout=20): """Wait until the end of move ended using the motor state. Args: timeout(float): Timeout [s]. Default value is 20 s Raises: RuntimeError: Execution timeout. """ with Timeout(timeout, RuntimeError("Execution timeout")): while self.get_state() != self.STATES.READY: sleep(0.01)
[docs] def get_value(self): """Get the motor position. Returns: (float): Motor position. """ _val = self.motor_position_chan.get_value() if _val is None or math.isnan(_val): self.log.debug("Value of %s is NaN" % self.actuator_name) _val = self._nominal_value self._nominal_value = _val return self._nominal_value
def __get_limits(self, cmd): """Returns motor low and high limits. Args: cmd (str): command name Returns: (tuple): two floats tuple (low limit, high limit). """ try: _low, _high = self._exporter.execute(cmd, (self.actuator_name,)) # inf is a problematic value, convert to sys.float_info.max if _low == float("-inf"): _low = -sys.float_info.max if _high == float("inf"): _high = sys.float_info.max return _low, _high except Exception: return self._nominal_limits
[docs] def get_limits(self): """Returns motor low and high limits. Args: cmd (str): command name Returns: (tuple): two floats tuple (low limit, high limit). """ self._nominal_limits = self.__get_limits("getMotorLimits") return self._nominal_limits
[docs] def get_dynamic_limits(self): """Returns motor low and high dynamic limits. Returns: (tuple): two floats tuple (low limit, high limit). """ return self.__get_limits("getMotorDynamicLimits")
def _retry_on_ex(self, fun, N, *args, **kwargs): # noqa: N803 """ """ for attempt in range(1, N + 1): # noqa: RET503 try: return fun(*args, **kwargs) except Exception as e: # noqa: PERF203 msg = f"Error when moving {self.actuator_name}, re-trying" self.log.exception(msg) if attempt == N: msg = f"Tried moving {self.actuator_name} {N} times and failed" raise RuntimeError(msg) from e sleep(1) def _retry_set_value(self, value): """Move motor to absolute value. Args: value (float): target value """ self.update_state(self.STATES.BUSY) self.motor_position_chan.set_value(value) def _set_value(self, value): """Move motor to absolute value. Args: value (float): target value """ self._retry_on_ex(self._retry_set_value, 5, value)
[docs] def abort(self): """Stop the motor movement immediately.""" if self.get_state() != self.STATES.UNKNOWN: self._exporter.execute("abort")
[docs] def home(self, timeout=None): """Homing procedure. Args: timeout (float): optional - timeout [s]. """ self._exporter.execute("startHomingMotor", (self.actuator_name,)) self._wait_ready(timeout)
[docs] def get_max_speed(self): """Get the motor maximum speed. Returns: (float): the maximum speed [unit/s]. """ return self._exporter.execute("getMotorMaxSpeed", (self.actuator_name,))