import enum
import time
from gevent import Timeout
from mxcubecore.BaseHardwareObjects import (
HardwareObjectState,
)
from mxcubecore.HardwareObjects.abstract.AbstractMotor import AbstractMotor
"""
Interfaces Sardana Motor objects.
taurusname is the only obligatory property.
.. code-block:: xml
<object class="SardanaMotor">
<taurusname>dmot01</taurusname>
<username>Dummy</username>
<actuator_name>dummy_motor</actuator_name>
<threshold>0.005</threshold>
<move_threshold>0.005</move_threshold>
<interval>2000</interval>
</object>
"""
[docs]class SardanaMotorState(enum.Enum):
READY = HardwareObjectState.READY
ON = HardwareObjectState.READY
OFF = HardwareObjectState.OFF
MOVING = HardwareObjectState.BUSY
STANDBY = HardwareObjectState.READY
FAULT = HardwareObjectState.FAULT
INIT = HardwareObjectState.BUSY
RUNNING = HardwareObjectState.BUSY
ALARM = HardwareObjectState.WARNING
DISABLE = HardwareObjectState.OFF
UNKNOWN = HardwareObjectState.UNKNOWN
INVALID = HardwareObjectState.FAULT
[docs]class SardanaMotor(AbstractMotor):
suffix_position = "Position"
suffix_state = "State"
suffix_stop = "Stop"
suffix_velocity = "Velocity"
suffix_acceleration = "Acceleration"
def __init__(self, name):
super().__init__(name)
self.stop_command = None
self.position_channel = None
self.state_channel = None
self.taurusname = None
self.motor_position = 0.0
self.threshold_default = 0.0018
self.move_threshold_default = 0.0
self.polling_default = "events"
self.limit_upper = None
self.limit_lower = None
self.static_limits = (-1e4, 1e4)
self.limits = (None, None)
[docs] def init(self):
super().init()
self.taurusname = self.get_property("taurusname")
if not self.taurusname:
raise RuntimeError("Undefined property taurusname")
self.actuator_name = self.get_property("actuator_name")
if not self.name:
self.log.info(
"Undefined property actuator_name in xml. Applying name during instance creation."
)
self.actuator_name = self.name
self.threshold = self.get_property("threshold", self.threshold_default)
self.log.debug(
"Motor {0} threshold = {1}".format(self.actuator_name, self.threshold)
)
self.move_threshold = self.get_property(
"move_threshold", self.move_threshold_default
)
self.log.debug(
"Motor {0} move_threshold = {1}".format(
self.actuator_name, self.move_threshold
)
)
self.polling = self.get_property("interval", self.polling_default)
self.log.debug(
"Motor {0} polling = {1}".format(self.actuator_name, self.polling)
)
self.stop_command = self.add_command(
{
"type": "sardana",
"name": self.actuator_name + SardanaMotor.suffix_stop,
"taurusname": self.taurusname,
},
"Stop",
)
self.position_channel = self.add_channel(
{
"type": "sardana",
"name": self.actuator_name + SardanaMotor.suffix_position,
"taurusname": self.taurusname,
"polling": self.polling,
},
"Position",
)
self.state_channel = self.add_channel(
{
"type": "sardana",
"name": self.actuator_name + SardanaMotor.suffix_state,
"taurusname": self.taurusname,
"polling": self.polling,
},
"State",
)
self.velocity_channel = self.add_channel(
{
"type": "sardana",
"name": self.actuator_name + SardanaMotor.suffix_velocity,
"taurusname": self.taurusname,
},
"Velocity",
)
self.acceleration_channel = self.add_channel(
{
"type": "sardana",
"name": self.actuator_name + SardanaMotor.suffix_acceleration,
"taurusname": self.taurusname,
},
"Acceleration",
)
self.position_channel.connect_signal("update", self.update_value)
self.state_channel.connect_signal("update", self._update_state)
self.limits = self.get_limits()
self.update_state()
self.update_value()
[docs] def get_state(self) -> HardwareObjectState:
"""Get the motor state
Returns:
Motor state
"""
try:
_state = self.state_channel.get_value()
self.specific_state = _state
return SardanaMotorState[_state.name].value
except (KeyError, AttributeError):
return self.STATES.UNKNOWN
def _update_state(self, state):
try:
state = state.upper()
state = SardanaMotorState[state].value
except (AttributeError, KeyError):
state = self.STATES.UNKNOWN
return self.update_state(state)
[docs] def is_ready(self) -> bool:
"""
Returns:
True if the motor is ready
"""
return self.get_state() == HardwareObjectState.READY
[docs] def wait_ready(self, timeout=None):
with Timeout(timeout, RuntimeError("Timeout waiting for status ready")):
while not self.is_ready():
time.sleep(0.1)
[docs] def is_moving(self) -> bool:
"""
Returns:
True if the motor is currently moving
"""
return self.get_state() == HardwareObjectState.BUSY
[docs] def wait_end_of_move(self, timeout: float = None) -> None:
"""
Wait till the motor stops
"""
with Timeout(timeout):
# Wait a bit to ensure the motor started moving
# 0.1 empirically obtained
time.sleep(0.1)
while self.is_moving():
time.sleep(0.1)
[docs] def get_limits(self):
"""
Descript. : returns motor limits. If no limits channel defined then
static_limits is returned
"""
try:
self._nominal_limits = (
self.position_channel.info.minval,
self.position_channel.info.maxval,
)
return self._nominal_limits
except Exception:
return (None, None)
[docs] def get_value(self):
"""
Descript. : returns the current position
"""
self.motor_position = self.position_channel.get_value()
return self.motor_position
def _set_value(self, value):
"""
Descript. : move to the given position
"""
self.position_channel.set_value(value)
[docs] def stop(self):
"""
Descript. : stops the motor immediately
"""
self.stop_command()
[docs] def get_velocity(self):
try:
return self.velocity_channel.get_value()
except Exception:
return None
[docs] def set_velocity(self, value):
self.velocity_channel.set_value(value)
def get_acceleration(self):
try:
return self.acceleration_channel.get_value()
except Exception:
return None