Source code for mxcubecore.HardwareObjects.TangoMotor

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
TangoMotor class defines motor in the Tango control system (used and tested in DESY/P11
"""

import gevent

from mxcubecore.HardwareObjects.abstract.AbstractMotor import AbstractMotor

__credits__ = ["DESY P11"]
__license__ = "LGPLv3+"
__category__ = "Motor"


[docs]class TangoMotor(AbstractMotor): """TangoMotor class defines motor in the Tango control system""" default_polling = 500 def __init__(self, name): AbstractMotor.__init__(self, name) self.chan_position = None self.chan_state = None self.chan_limits = None self.cmd_set_position = None self.cmd_stop_axis = None self.cmd_set_online = None self.latest_position = None self.auto_on = False self.cmd_on = None self.cmd_calibrate = None self.step_limits = None
[docs] def init(self): """Connects to all Tango channels and commands""" self.polling = self.get_property("polling", TangoMotor.default_polling) self.actuator_name = self.get_property("actuator_name", self.name()) self._tolerance = self.get_property("tolerance", 1e-3) self.is_simulation = self.get_property("simulation", False) self.auto_on = self.get_property("auto_on", False) if self.auto_on: self.log.debug("AUTO_ON is set for motor %s" % self.name()) if self.is_simulation: self.simulated_pos = 0.0 self.set_ready() return self.chan_position = self.get_channel_object("axisPosition", optional=True) if self.chan_position is None: self.chan_position = self.add_channel( { "type": "tango", "name": "axisPosition", "tangoname": self.tangoname, "polling": self.polling, }, "Position", ) if self.chan_position is not None: self.chan_position.connect_signal("update", self.update_value) self.update_value(self.chan_position.get_value()) self.chan_state = self.get_channel_object("axisState", None) if self.chan_state is None: self.chan_state = self.add_channel( { "type": "tango", "name": "axisState", "tangoname": self.tangoname, "polling": self.polling, }, "State", ) if self.chan_state is not None: self.chan_state.connect_signal("update", self.motor_state_changed) self.chan_limits = self.get_channel_object("axisLimits", optional=True) if self.chan_limits is not None: self.chan_limits.connect_signal("update", self.update_limits) self.update_limits(self.chan_limits.get_value()) else: try: if self.get_property("default_limits"): self.update_limits(eval(self.get_property("default_limits"))) except Exception: self.log.exception("") self.cmd_stop = self.get_command_object("stopAxis") if self.cmd_stop is None: self.cmd_stop = self.add_command( { "type": "tango", "name": "stopAxis", "tangoname": self.tangoname, }, "Stop", ) self.cmd_on = self.get_command_object("onCmd") if self.cmd_on is None: self.cmd_on = self.add_command( { "type": "tango", "name": "onCmd", "tangoname": self.tangoname, }, "On", ) self.chan_velocity = self.get_channel_object("velocity", optional=True) self.cmd_calibrate = self.get_command_object("calibrate") # update values self.motor_state_changed() self.update_value()
[docs] def connectNotify(self, signal): """ :param signal: signal :type signal: signal """ if signal == "stateChanged": self.motor_state_changed() elif signal == "limitsChanged": self.update_limits() elif signal == "valueChanged": self.update_value()
[docs] def get_value(self): if self.is_simulation: return self.simulated_pos value = self.chan_position.get_value() return value
[docs] def get_velocity(self): if self.chan_velocity is not None: return self.chan_velocity.get_value()
[docs] def set_velocity(self, value): if self.chan_velocity is not None: self.chan_velocity.set_value(value)
def motstate_to_state(self, motstate): motstate = str(motstate) if motstate == "ON": state = self.STATES.READY elif motstate == "MOVING": state = self.STATES.BUSY elif motstate == "FAULT": state = self.STATES.FAULT elif motstate == "OFF": state = self.STATES.OFF else: state = self.STATES.UNKNOWN return state def motor_state_changed(self, state=None): if state is None: state = self.chan_state.get_value() self.update_state(self.motstate_to_state(state)) def set_ready(self): self.update_state(self.STATES.READY) def is_moving(self): return (self.get_state() == self.STATES.BUSY) or ( self.get_state() == self.SPECIFIC_STATES.MOVING )
[docs] def abort(self): """Stops motor movement""" self.cmd_stop()
def calibrate(self, value): self.cmd_calibrate(value) def _set_value(self, value): """ Main move method :param value: float :return: """ self.log.debug("TangoMotor.py - Moving motor %s to %s" % (self.id, value)) if self.is_simulation: self.simulated_pos = value else: self.start_moving() self.chan_position.set_value(value) def start_moving(self): self.motor_state_changed("MOVING") if self.auto_on: state = str(self.chan_state.get_value()) if state == "OFF": self.cmd_on() # ensure that the state is updated at least once after the polling time # in case we miss the state update gevent.spawn(self._update_state) def _update_state(self): gevent.sleep(0.5) motor_state = self.chan_state.get_value() self.log.debug( " reading motor state for %s is %s" % (self.id, str(motor_state)) ) self.motor_state_changed(motor_state)
[docs] def update_value(self, value=None): """Updates motor position""" if value is None: value = self.get_value() self.latest_value = value super(TangoMotor, self).update_value(value)
[docs] def get_motor_mnemonic(self): """ Returns motor mnemonic :return: """ return "TangoMotor"