Source code for mxcubecore.HardwareObjects.mockup.ActuatorMockup
# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
Mixin superclass for all mock actuators
Should be put as the first superclass,
e.g. class MotorMockup(ActuatorMockup, AbstractMotor):
"""
import random
import time
import gevent
from mxcubecore.HardwareObjects.abstract import AbstractActuator
__copyright__ = """ Copyright © 2010-2020 by the MXCuBE collaboration """
__license__ = "LGPLv3+"
[docs]class ActuatorMockup(AbstractActuator.AbstractActuator):
"""Mock Motor implementation"""
def __init__(self, name):
super().__init__(name)
self.__move_task = None
[docs] def init(self):
"""Initialisation method"""
super().init()
self.update_state(self.STATES.READY)
def _move(self, value):
"""Simulated value change - override as needed
Must set specific_state as needed, take a non-zero amount of time
call update_value for intermediate positions
and return the final value (in case it does not match the input value)
Args:
value : target actuator value
Returns:
final actuator value (may differ from target value)
"""
time.sleep(random.uniform(0.1, 1.0))
return value
[docs] def get_value(self):
"""Read the actuator position.
Returns:
float: Actuator position.
"""
return self._nominal_value
[docs] def set_value(self, value, timeout=0):
"""
Set actuator to absolute value.
This is NOT the recommended way, but for technical reasons
overriding is necessary in this particular case
Args:
value (float): target value
timeout (float): optional - timeout [s],
If timeout == 0: return at once and do not wait (default);
if timeout is None: wait forever.
Raises:
ValueError: Value not valid or attempt to set read-only actuator.
RuntimeError: Timeout.
"""
if self.read_only:
raise ValueError("Attempt to set value for read-only Actuator")
if self.validate_value(value):
self.update_state(self.STATES.BUSY)
if timeout or timeout is None:
with gevent.Timeout(
timeout, RuntimeError(f"Motor {self.username} timed out")
):
new_value = self._move(value)
self._set_value(new_value)
else:
self.__move_task = gevent.spawn(self._move, value)
self.__move_task.link(self._callback)
else:
raise ValueError(f"Invalid value {value}; limits are {self.get_limits()}")
[docs] def abort(self):
"""Immediately halt movement. By default self.stop = self.abort"""
if self.__move_task is not None:
self.__move_task.kill()
self.update_state(self.STATES.READY)
def _callback(self, move_task):
value = move_task.get()
if not isinstance(value, gevent.GreenletExit):
self._set_value(value)
def _set_value(self, value):
"""
Implementation of specific set actuator logic.
Args:
value (float): target value
"""
self.update_value(value)
self.update_state(self.STATES.READY)