#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
[Name] SOLEILMachineInfo
based on EMBLMachineInfo
[Description]
Hardware Object is used to get relevant machine information
(current, intensity, hutch temperature and humidity, and data storage disc
information). Value limits are included
[Channels]
- chanMachCurr
- chanStateText
[Commands]
- cmdSetIntensResolution
- cmdSetIntensAcqTime
- cmdSetIntensRange
[Emitted signals]
- valuesChanged
- inRangeChanged
[Functions]
- mach_current_changed()
- machStateTextChanged()
- updateValues()
- setInitialIntens()
- setExternalValues()
[Included Hardware Objects]
Example Hardware Object XML file :
==================================
<object class="MachineInfo">
<updateIntervalS>120</updateIntervalS>
<discPath>/home</discPath>
<limits>{'current':90, 'temp': 25, 'hum': 60, 'intens': 0.1,
'discSizeGB': 20}</limits>
</object>
"""
import logging
import os
import time
from mxcubecore.BaseHardwareObjects import HardwareObject
__credits__ = ["SOLEIL", "EMBL Hamburg"]
__version__ = "2.3."
__category__ = "General"
[docs]class SOLEILMachineInfo(HardwareObject):
"""
Descript. : Displays actual information about the beeamline
"""
def __init__(self, name):
"""__init__"""
HardwareObject.__init__(self, name)
# Parameters
self.update_interval = None
self.limits_dict = None
self.hutch_temp_addr = None
self.hutch_hum_addr = None
self.hutch_temp = 0
self.hutch_hum = 0
self.overflow_alarm = None
self.low_level_alarm = 0
self.auto_refill = None
self.state_text = "Not updated yet"
self.ring_energy = None
self.bunch_count = None
self.flux_area = None
self.last_transmission = None
self.values_list = []
# Intensity current ranges
# Machine current, index = 0
temp_dict = {}
temp_dict["value"] = 0
temp_dict["value_str"] = ""
temp_dict["in_range"] = False
temp_dict["title"] = "Machine current"
temp_dict["bold"] = True
temp_dict["font"] = 14
# temp_dict['history'] = True
self.values_list.append(temp_dict)
# Machine state, index = 1
temp_dict = {}
temp_dict["value"] = None
temp_dict["in_range"] = True
temp_dict["title"] = "Machine state"
self.values_list.append(temp_dict)
# Hutch temperature, index = 2
temp_dict = {}
temp_dict["value"] = ""
temp_dict["value_str"] = ""
temp_dict["in_range"] = None
temp_dict["title"] = "Hutch temperature"
self.values_list.append(temp_dict)
# Remeasure flux, index = 3
temp_dict = {}
temp_dict["value"] = 1
temp_dict["value_str"] = "Remeasure flux!"
temp_dict["in_range"] = False
temp_dict["title"] = "Flux"
temp_dict["align"] = "left"
self.values_list.append(temp_dict)
# Cryo jet, index = 4
temp_dict = {}
temp_dict["value"] = "???"
temp_dict["in_range"] = None
temp_dict["title"] = "Cryostream"
self.values_list.append(temp_dict)
# Dewar level, index = 5
temp_dict = {}
temp_dict["value"] = "Dewar level in range"
temp_dict["in_range"] = True
temp_dict["title"] = "Sample changer"
self.values_list.append(temp_dict)
self.temp_hum_values = [None, None]
self.temp_hum_in_range = [None, None]
self.temp_hum_polling = None
self.chan_mach_curr = None
self.chan_mach_energy = None
self.chan_bunch_count = None
self.chan_state_text = None
self.chan_cryojet_in = None
self.chan_sample_temperature = None
self.chan_sc_dewar_low_level_alarm = None
self.chan_sc_dewar_overflow_alarm = None
self.ring_energy = 2.75
[docs] def init(self):
"""init"""
self.update_interval = int(self.get_property("updateIntervalS"))
self.limits_dict = eval(self.get_property("limits"))
self.chan_mach_curr = self.get_channel_object("machCurrent")
if self.chan_mach_curr is not None:
self.chan_mach_curr.connect_signal("update", self.mach_current_changed)
self.chan_filling_mode = self.get_channel_object("fillingMode")
if self.chan_filling_mode is not None:
self.chan_filling_mode.connect_signal("update", self.state_text_changed)
self.chan_state_text0 = self.get_channel_object("operatorMessage0")
if self.chan_state_text0 is not None:
self.chan_state_text0.connect_signal("update", self.state_text_changed)
self.chan_state_text1 = self.get_channel_object("operatorMessage1")
if self.chan_state_text1 is not None:
self.chan_state_text1.connect_signal("update", self.state_text_changed)
self.chan_state_text2 = self.get_channel_object("operatorMessage2")
if self.chan_state_text2 is not None:
self.chan_state_text2.connect_signal("update", self.state_text_changed)
self.chan_is_beam_usable = self.get_channel_object("isBeamUsable")
if self.chan_is_beam_usable is not None:
self.chan_is_beam_usable.connect_signal("update", self.state_text_changed)
self.chan_cryojet_in = self.get_channel_object("cryojetIn")
if self.chan_cryojet_in is not None:
self.cryojet_in_changed(self.chan_cryojet_in.get_value())
self.chan_cryojet_in.connect_signal("update", self.cryojet_in_changed)
else:
self.log.debug("MachineInfo: Cryojet channel not defined")
self.chan_sample_temperature = self.get_channel_object("sampleTemp")
if self.chan_sample_temperature is not None:
# self.chan_sample_temperature.connect_signal('update', self.cryojet_in_changed)
self.cryojet_in_changed(self.chan_cryojet_in.get_value())
self.chan_sc_auto_refill = self.get_channel_object("scAutoRefill")
if self.chan_sc_auto_refill is not None:
self.chan_sc_auto_refill.connect_signal(
"update", self.sc_autorefill_changed
)
self.sc_autorefill_changed(self.chan_sc_auto_refill.get_value())
self.chan_sc_dewar_low_level_alarm = self.get_channel_object("scLowLevelAlarm")
if self.chan_sc_dewar_low_level_alarm is not None:
self.chan_sc_dewar_low_level_alarm.connect_signal(
"update", self.low_level_alarm_changed
)
self.low_level_alarm_changed(self.chan_sc_dewar_low_level_alarm.get_value())
self.chan_sc_dewar_overflow_alarm = self.get_channel_object("scOverflowAlarm")
if self.chan_sc_dewar_overflow_alarm is not None:
self.chan_sc_dewar_overflow_alarm.connect_signal(
"update", self.overflow_alarm_changed
)
# self.chan_flux = self.get_channel_object('flux')
# if self.chan_flux is not None:
# self.chan_flux.connect_signal('update', self.flux_changed)
self.chan_temperature_exp = self.get_channel_object("temperatureExp")
if self.chan_temperature_exp is not None:
self.chan_temperature_exp.connect_signal("update", self.temperature_changed)
self.temperature_changed(self.chan_temperature_exp.get_value())
self.re_emit_values()
[docs] def clear_gevent(self):
self.temp_hum_polling.kill()
if self.update_task:
self.update_task.kill()
[docs] def cryojet_in_changed(self, value):
"""Cryojet in/out value changed"""
self.log.debug("cryojet_in_changed: %s" % value)
self.values_list[4]["in_range"] = False
self.values_list[4]["bold"] = True
if value == 0:
self.values_list[4]["value"] = " In place"
self.values_list[4]["in_range"] = True
self.values_list[4]["bold"] = False
elif value == 1:
self.values_list[4]["value"] = "NOT IN PLACE"
else:
self.values_list[4]["value"] = "Unknown"
if self.chan_sample_temperature is not None:
self.values_list[4]["value"] += (
"\n sample temperature: %.1f K"
% self.chan_sample_temperature.get_value()
)
else:
self.log.debug("chan_sample_temperature: %s" % self.chan_sample_temperature)
self.re_emit_values()
[docs] def mach_current_changed(self, value):
"""Method called if the machine current is changed
:param value: new machine current
:type value: float
"""
if (
self.values_list[0]["value"] is None
or abs(self.values_list[0]["value"] - value) > 0.00001
):
self.values_list[0]["value"] = value
self.values_list[0]["value_str"] = "%.1f mA" % value
self.values_list[0]["in_range"] = value > 60.0
self.re_emit_values()
[docs] def state_text_changed(self, text):
"""Function called if machine state text is changed
:param text: new machine state text
:type text: string
"""
# self.state_text = str(text)
# self.values_list[1]['in_range'] = text != "Fehler"
self.update_machine_state()
[docs] def update_machine_state(self):
"""Machine state assembly"""
filling_mode = self.chan_filling_mode.get_value()
state_text0 = self.chan_state_text0.get_value()
state_text1 = self.chan_state_text1.get_value()
state_text2 = self.chan_state_text2.get_value()
is_beam_usable = self.chan_is_beam_usable.get_value()
date_boundary_string = " :"
date_boundary = state_text1.find(date_boundary_string)
date = state_text1[:date_boundary]
state_text1 = state_text1[date_boundary + len(date_boundary_string) :]
state_text = "%s, %s\n" % (date, state_text0)
state_text += "electron energy: %.2f GeV, filling: %s\n" % (
self.ring_energy,
filling_mode,
)
state_text += "%s\n" % (state_text1,)
if state_text2 != " ":
state_text += "%s\n" % state_text2
if is_beam_usable:
self.values_list[1]["in_range"] = True
state_text += "Beam usable"
else:
self.values_list[1]["in_range"] = False
state_text += "Beam unusable"
self.values_list[1]["value"] = state_text
self.state_text = state_text
self.re_emit_values()
[docs] def low_level_alarm_changed(self, value):
"""Low level alarm"""
self.low_level_alarm = value
self.update_sc_alarm()
[docs] def overflow_alarm_changed(self, value):
"""Overflow alarm"""
self.overflow_alarm = value
self.update_sc_alarm()
def sc_autorefill_changed(self, value):
self.auto_refill = value
self.update_sc_alarm()
def file_transfer_status_changed(self, total, pending, failed):
self.values_list[-1]["value"] = "%d - %d - %d" % (total, pending, failed)
self.values_list[-1]["in_range"] = failed == 0
if failed > 0:
logging.getLogger("GUI").error(
"Error in file transfer (%d files failed to copy)." % failed
)
[docs] def update_sc_alarm(self):
"""Sample changer alarm"""
if self.low_level_alarm == 1:
self.values_list[5]["value"] = "Low level alarm!"
self.values_list[5]["in_range"] = False
self.values_list[5]["bold"] = True
# logging.getLogger("GUI").error("Liquid nitrogen " + \
# " level in sample changer dewar is too low!")
elif self.overflow_alarm:
self.values_list[5]["value"] = "Overflow alarm!"
self.values_list[5]["in_range"] = False
self.values_list[5]["bold"] = True
logging.getLogger("GUI").error(
"Liquid nitrogen " + "overflow in sample changer dewar!"
)
else:
self.values_list[5]["value"] = "Dewar level in range"
self.values_list[5]["in_range"] = True
self.log.error("chan_sc_auto_refill %s" % self.chan_sc_auto_refill.get_value())
if self.chan_sc_auto_refill.get_value() == 0:
self.values_list[5]["value"] += ", refill OFF"
else:
self.values_list[5]["value"] += ", refill ON"
self.re_emit_values()
[docs] def flux_changed(self, value, beam_info=None, transmission=None):
"""Sets flux value"""
if value is None:
value = -1
self.values_list[3]["value"] = value
msg_str = "Flux: %.2E ph/s" % value
# msg_str += "\n@ %.1f transmission , %d x %d beam" % (\
# transmission, beam_info['size_x'] * 1000, beam_info['size_y'] * 1000)
self.values_list[3]["value_str"] = msg_str
self.values_list[3]["in_range"] = value > 1e6
self.re_emit_values()
[docs] def re_emit_values(self):
"""Emits list of values"""
self.emit("valuesChanged", self.values_list)
[docs] def get_values(self):
"""Returns list of values"""
val = dict(self.values_list)
return val
[docs] def temperature_changed(self, value):
""" "Update hutch temperature"""
self.values_list[2]["value"] = "%.1f C" % value
self.values_list[2]["in_range"] = value < 25 # self.limits_dict['temp']
self.re_emit_values()
[docs] def get_temp_hum_values(self, sleep_time):
"""Updates temperature and humidity values"""
while True:
temp = self.get_external_value(self.hutch_temp_addr)
hum = self.get_external_value(self.hutch_hum_addr)
if not None in (temp, hum):
if abs(float(temp) - self.hutch_temp) > 0.1 or abs(
float(hum) != self.hutch_hum > 1
):
self.hutch_temp = temp
self.hutch_hum = hum
self.values_list[2]["value"] = "%.1f C, %.1f %%" % (temp, hum)
self.values_list[2]["in_range"] = temp < 25 and hum < 60
self.re_emit_values()
time.sleep(sleep_time)
[docs] def get_current(self):
"Returns current"
return self.values_list[0]["value"]
[docs] def get_current_value(self):
"""Returns current"""
return self.values_list[0]["value"]
[docs] def get_message(self):
"""Returns synchrotron state text"""
return self.state_text
def update_ramdisk_size(self, sleep_time):
while True:
total, free, perc = self.get_ramdisk_size()
if None in (total, free, perc):
txt = " Unable to read ramdisk size!"
self.values_list[-1]["in_range"] = False
else:
txt = " Total: %s\n Free: %s (%s)" % (
self.sizeof_fmt(total),
self.sizeof_fmt(free),
"{0:.0%}".format(perc),
)
self.values_list[-1]["in_range"] = free / 2**30 > 10
self.values_list[-1]["value"] = txt
self.re_emit_values()
time.sleep(sleep_time)
def get_ramdisk_size(self):
data_dir = "/ramdisk/"
if os.path.exists(data_dir):
st = os.statvfs(data_dir)
total = st.f_blocks * st.f_frsize
free = st.f_bavail * st.f_frsize
perc = st.f_bavail / float(st.f_blocks)
return total, free, perc
else:
return None, None, None
[docs] def sizeof_fmt(self, num):
"""Returns disk space formatted in string"""
try:
for x in ["bytes", "KB", "MB", "GB"]:
if num < 1024.0:
return "%3.1f%s" % (num, x)
num /= 1024.0
return "%3.1f%s" % (num, "TB")
except Exception:
return "???"