Source code for mxcubecore.HardwareObjects.EMBL.TINEMotor

#
#  Project name: MXCuBEv
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""TINEMotor class defines motor in the TINE control system"""

import gevent

from mxcubecore.BaseHardwareObjects import HardwareObjectState
from mxcubecore.HardwareObjects.abstract.AbstractMotor import AbstractMotor

__credits__ = ["EMBL Hamburg"]
__license__ = "LGPLv3+"
__category__ = "Motor"


TINE_STATE_DICT = {
    0: HardwareObjectState.READY,
    "ready": HardwareObjectState.READY,
    "busy": HardwareObjectState.BUSY,
    "error": HardwareObjectState.FAULT,
}


[docs]class TINEMotor(AbstractMotor): """TINEMotor class defines motor in the TINE control system""" def __init__(self, name): AbstractMotor.__init__(self, name) self.chan_position = None self.chan_state = None self.chan_limits = None self.cmd_set_position = None self.cmd_stop_axis = None self.cmd_set_online = None self.step_limits = None
[docs] def init(self): """Connects to all Tine channels and commands""" self.chan_limits = self.get_channel_object("axisLimits", optional=True) if self.chan_limits is not None: self.chan_limits.connect_signal("update", self.update_limits) self.update_limits(self.chan_limits.get_value()) else: try: if self.get_property("default_limits"): self.update_limits(eval(self.get_property("default_limits"))) except Exception: self.log.exception("") self.chan_position = self.get_channel_object("axisPosition") if self.chan_position is not None: self.chan_position.connect_signal("update", self.update_tine_value) self.update_value(self.chan_position.get_value()) self.chan_state = self.get_channel_object("axisState", optional=True) if self.chan_state is not None: self.chan_state.connect_signal("update", self.update_tine_state) self.cmd_set_position = self.get_command_object("setPosition") if self.cmd_set_position: self.cmd_set_position.connect("connected", self.connected) self.cmd_set_position.connect("disconnected", self.disconnected) self.cmd_stop_axis = self.get_command_object("stopAxis") if self.cmd_stop_axis: self.cmd_stop_axis.connect("connected", self.connected) self.cmd_stop_axis.connect("disconnected", self.disconnected) self.cmd_set_online = self.get_command_object("setOnline") # NBNB TODO change config from 'epsilon' to 'tolerance'? self._tolerance = self.get_property("epsilon") try: self.step_limits = eval(self.get_property("stepLimits")) except Exception: self.log.exception("")
[docs] def connected(self): """ Sets ready :return: """ self.update_state(HardwareObjectState.READY)
[docs] def disconnected(self): """ Sets not ready :return: """ self.update_state(HardwareObjectState.OFF)
[docs] def connect_notify(self, signal): """ :param signal: signal :type signal: signal """ if self.connected(): if signal == "stateChanged": self.update_state() elif signal == "limitsChanged": self.update_limits() elif signal == "valueChanged": self.update_value()
[docs] def get_step_limits(self): """Returns step limits""" return self.step_limits
# def get_position(self): # return self.chan_position.get_value()
[docs] def get_value(self): return self.chan_position.get_value()
[docs] def get_state(self): """Get HardwareObject state""" # NNBNB TODO map channel states to all HardwareObject states # TODO add treatment of specific_states if self.chan_state is None: return HardwareObjectState.READY else: return self.convert_state(self.chan_state.get_value())
def convert_state(self, tine_state): if type(tine_state) in (tuple, list): tine_state = tine_state[0] return TINE_STATE_DICT.get(tine_state, HardwareObjectState.UNKNOWN)
[docs] def abort(self): """Stops motor movement""" self.cmd_stop_axis()
def _set_value(self, value): """ Main move method :param value: float :return: """ if self.chan_state is not None: self.update_state(self.STATES.BUSY) self.chan_state.set_old_value("moving") if self.cmd_set_position: self.cmd_set_position(value) else: self.chan_position.set_value(value)
[docs] def update_tine_value(self, value=None): """Updates motor position""" if type(value) in (list, tuple): value = value[0] super(TINEMotor, self).update_value(value)
def update_tine_state(self, value): value = self.convert_state(value) super(TINEMotor, self).update_state(value)
[docs] def get_motor_mnemonic(self): """ Returns motor mnemonic :return: """ return "TINEMotor"
[docs] def enable_motor(self): """ Enables motor """ if self.cmd_set_online: self.cmd_set_online(1) gevent.sleep(2)
[docs] def disable_motor(self): """ Disables motor """ if self.cmd_set_online: self.cmd_set_online(0) gevent.sleep(2)