Source code for mxcubecore.HardwareObjects.DESY.P11DetectorCover

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"

import time
from enum import Enum

import gevent

from mxcubecore.HardwareObjects.abstract.AbstractShutter import AbstractShutter

__credits__ = ["DESY P11"]
__license__ = "LGPLv3+"
__category__ = "General"


[docs]class P11DetectorCover(AbstractShutter): """ P11DetectorCover define interface to Tango Detector Cover at DESY P11 Similar to P11 Shutter tango - but differs """ default_timeout = 4 move_time_min = 1 def __init__(self, name): super().__init__(name) self.simulation = False self.simulated_opened = False self.simulated_moving = False self.cmd_open = None self.cmd_close = None self.cmd_started = 0 self.chan_state_open = None self.chan_state_close = None self.cover_is_open = False self.cover_is_closed = False
[docs] def init(self): """Initialise the predefined values""" self._initialise_values() self.simulation = self.get_property("simulation") self.cmd_timeout = self.get_property("command_timeout", self.default_timeout) if not self.simulation: self.cmd_open = self.get_command_object("cmdOpen") self.cmd_close = self.get_command_object("cmdClose") self.chan_state_open = self.get_channel_object("chanStateOpen") self.chan_state_closed = self.get_channel_object("chanStateClosed") if self.chan_state_open is not None: self.chan_state_open.connect_signal("update", self.state_open_changed) else: self.log.error("Cannot get channel for shutter %s" % self.username) if self.chan_state_closed is not None: self.chan_state_closed.connect_signal( "update", self.state_closed_changed ) else: self.log.error("Cannot get channel for shutter %s" % self.username) self.state_open_changed(self.chan_state_open.get_value()) else: self.simulated_update() super().init()
def _initialise_values(self): """Add additional, known in advance states to VALUES""" values_dict = {item.name: item.value for item in self.VALUES} values_dict.update({"MOVING": "MOVING"}) self.VALUES = Enum("ValueEnum", values_dict)
[docs] def get_value(self): if self.simulation: return self.simulated_update() if self._nominal_value == self.VALUES.MOVING: if (time.time() - self.cmd_started) < self.move_time_min: return self.VALUES.MOVING self.cover_is_open = self.chan_state_open.get_value() self.cover_is_closed = self.chan_state_closed.get_value() return self.update_cover_state()
def _set_value(self, value): if self.simulation: if value == self.VALUES.OPEN: self.simulated_opened = 1 else: self.simulated_opened = 0 self.simulated_moving = True gevent.spawn(self.simul_do) return current_value = self.get_value() if current_value == self.VALUES.MOVING: self.log.error("Cannot move while moving") return if value != current_value: if value == self.VALUES.OPEN: self.cmd_open() elif value == self.VALUES.CLOSED: self.cmd_close() self.cmd_started = time.time() self.update_value(self.VALUES.MOVING) def simul_do(self): gevent.sleep(3) self.simulated_moving = False self.simulated_update() def is_moving(self): return self.get_value() == self.VALUES.MOVING
[docs] def state_open_changed(self, value): """Updates cover state when cover open value changes :param state: cover open state :type state: str :return: None """ self.cover_is_open = value self.update_cover_state()
[docs] def state_closed_changed(self, value): """Updates cover state when cover close value changes :param state: cover close state :type state: str :return: None """ self.cover_is_closed = value self.update_cover_state()
[docs] def update_cover_state(self): """Updates cover state :return: cover state as str """ if self.cover_is_open: value = self.VALUES.OPEN elif self.cover_is_closed: value = self.VALUES.CLOSED else: if time.time() - self.cmd_started > self.cmd_timeout: value = self.VALUES.UNKNOWN else: value = self.VALUES.MOVING self.update_value(value) return value
def simulated_update(self): if self.simulated_moving: value = self.VALUES.MOVING elif self.simulated_opened: value = self.VALUES.OPEN else: value = self.VALUES.CLOSED self.update_value(value) return value @property def is_closed(self): """Check if the shutter is closed. Returns: (bool): True if open, False otherwise. """ return self.get_value() == self.VALUES.CLOSED