Source code for mxcubecore.HardwareObjects.DESY.P11Shutter

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"

import time
import urllib
from enum import (
    Enum,
    unique,
)

import gevent

from mxcubecore.HardwareObjects.abstract.AbstractShutter import AbstractShutter

__credits__ = ["DESY P11"]
__license__ = "LGPLv3+"
__category__ = "General"


[docs]class P11Shutter(AbstractShutter): """ P11 Shutter define interface to Tango shutters at DESY P11 """ default_timeout = 6
[docs] @unique class BaseValueEnum(Enum): """Defines only the compulsory values.""" OPEN = "OPEN" CLOSED = "CLOSED" MOVING = "MOVING" UNKNOWN = "UNKNOWN"
VALUES = BaseValueEnum def __init__(self, name): super().__init__(name) self.simulation = False self.simulated_opened = True self.simulated_moving = False self.url_open = None self.url_closee = None self.cmd_started = 0 self.chan_state = None
[docs] def init(self): """Initialise the predefined values""" # if simulation is set - open and close will be mere software flags self.simulation = self.get_property("simulation") self._initialise_values() if not self.simulation: url_base = self.get_property("base_url") dev_open = self.get_property("device_open") dev_close = self.get_property("device_close") self.url_open = "{}&deviceName={}".format(url_base, dev_open) self.url_close = "{}&deviceName={}".format(url_base, dev_close) self.chan_state = self.get_channel_object("chanState") if self.chan_state is not None: self.chan_state.connect_signal("update", self.update_shutter_state) self.update_shutter_state() else: self.simulated_update() self.update_state(self.STATES.READY) super().init()
def _initialise_values(self): """Add additional, known in advance states to VALUES""" values_dict = {item.name: item.value for item in self.VALUES} values_dict.update({"MOVING": "MOVING"}) self.VALUES = Enum("ValueEnum", values_dict)
[docs] def get_value(self): if self.simulation: return self.simulated_update() return self.update_shutter_state()
def _set_value(self, value): if value == self.VALUES.OPEN: open_it = 1 elif value == self.VALUES.CLOSED: open_it = 0 else: self.log.debug(" ### setting wrong value for shutter %s" % str(value)) return if self.simulation: self.simulated_opened = open_it self.simulated_moving = True self.t1 = gevent.spawn(self.simul_do) self.t1.link(self.do_finish) self.t1.link_exception(self.do_finish_exc) else: if open_it: self.do_open() else: self.do_close() self.cmd_started = time.time() self.log.debug(" ### setting value value for shutter done") def do_open(self, timeout=3): self.log.debug(" OPENING SHUTTER (web request)") result = urllib.request.urlopen(self.url_open, None, timeout).readlines() self.log.debug(" OPENING SHUTTER (web request) retured") def do_close(self, timeout=3): self.log.debug(" CLOSING SHUTTER (web request)") result = urllib.request.urlopen(self.url_close, None, timeout).readlines() self.log.debug(" CLOSING SHUTTER (web request) retured") def simul_do(self): self.log.debug("### starting simulated shutter move") gevent.sleep(1) self.simulated_moving = False self.log.debug("### updating simulated shutter") self.simulated_update() self.log.debug("### ending simulated shutter move") def do_finish(self, t=None): self.log.debug("### simulated finished") def do_finish_exc(self, exc=None): self.log.debug("### simulated finished with exception")
[docs] def update_shutter_state(self, state=None): """Updates shutter state :return: shutter state as str """ if state is None: state = self.chan_state.get_value() self.log.debug(" SHUTTER state changed") if state[0] == 3: self.log.debug(" P11SHUTTER IS OPEN") value = self.VALUES.OPEN else: self.log.debug(" P11SHUTTER IS CLOSED") value = self.VALUES.CLOSED # else: # if time.time() - self.cmd_started > self.cmd_timeout: # value = self.VALUES.UNKNOWN # else: # value = self.VALUES.MOVING self.update_value(value) self.log.debug(" update shutter state done") return value
def simulated_update(self): if self.simulated_moving: value = self.VALUES.MOVING elif self.simulated_opened: value = self.VALUES.OPEN else: value = self.VALUES.CLOSED self.update_value(value) return value