Source code for mxcubecore.HardwareObjects.DESY.ValueStateChannel
# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"
from mxcubecore.BaseHardwareObjects import HardwareObject
[docs]class ValueStateChannel(HardwareObject):
def __init__(self, name):
super().__init__(name)
[docs] def init(self):
self.yellow_states = self.green_states = self.red_states = None
self.yellow_limits = self.red_limits = None
self.current_value = -1
self.current_state = "ON"
self.chan_value = self.get_channel_object("value")
if self.chan_value is not None:
self.chan_value.connect_signal("update", self.value_changed)
self.chan_state = self.get_channel_object("state")
if self.chan_state is not None:
self.chan_state.connect_signal("update", self.state_changed)
self.update_delta = self.get_property("delta")
green_states = self.get_property("ok_states")
yellow_states = self.get_property("warning_states")
red_states = self.get_property("alarm_states")
yellow_limits = self.get_property("warning_limits")
red_limits = self.get_property("alarm_limits")
if green_states:
self.green_states = [
stat.strip().upper() for stat in green_states.split(",")
]
if yellow_states:
self.yellow_states = [
stat.strip().upper() for stat in yellow_states.split(",")
]
if red_states:
self.red_states = [stat.strip().upper() for stat in red_states.split(",")]
if yellow_limits:
lims = [float(lim.strip()) for lim in yellow_limits.split(",")]
if len(lims) == 2:
if lims[0] > lims[1]:
self.yellow_limits = lims[1], lims[0]
else:
self.yellow_limits = lims
if red_limits:
lims = [float(lim.strip()) for lim in red_limits.split(",")]
if len(lims) == 2:
if lims[0] > lims[1]:
self.red_limits = lims[1], lims[0]
else:
self.red_limits = lims
def state_changed(self, state=None):
if state is None:
state = self.chan_state.get_value()
self._state = str(state).upper()
self.update_status()
[docs] def value_changed(self, value=None):
"""
Event called when value has been changed
:param pos: float
:return:
"""
if value is None:
value = self.chan_value.get_value()
if value != self.current_value:
self.current_value = value
self.emit("valueChanged", (value))
self.update_status()
def update_status(self):
_state = "ON"
val = self.current_value
if self.red_limits:
llim, hlim = self.red_limits
if llim > val or hlim < val:
_state = "ALARM"
if _state != "ALARM" and self.yellow_limits:
llim, hlim = self.yellow_limits
if llim > val or hlim < val:
_state = "WARNING"
if self._red_states and self._state in self._red_states:
_state = "ALARM"
elif self._yellow_states and self._state in self._yellow_states:
_state = _state == "ALARM" and _state or "WARNING"
if _state != self.current_state:
self.current_state = _state
self.emit("stateChanged", _state)