Source code for mxcubecore.HardwareObjects.EMBL.EMBLMachineInfo

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
Hardware Object is used to get relevant machine information
(current, intensity, hutch temperature and humidity, and data storage disc
information). Value limits are included
"""

import logging
import os
import time

try:
    from urllib2 import urlopen
except ImportError:
    from urllib.request import urlopen

from collections import OrderedDict
from datetime import (
    datetime,
    timedelta,
)

from gevent import spawn

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject

__credits__ = ["EMBL Hamburg"]
__license__ = "LGPLv3+"
__category__ = "General"


[docs]class EMBLMachineInfo(HardwareObject): """Displays actual information about the beamline""" def __init__(self, name): """OrderedDict is used to have a sorted items for display and directory like access when updating values """ HardwareObject.__init__(self, name) self.update_interval = None self.limits_dict = None self.hutch_temp_addr = None self.hutch_hum_addr = None self.hutch_temp = 0 self.hutch_hum = 0 self.overflow_alarm = None self.low_level_alarm = None self.state_text = "" self.ring_energy = None self.bunch_count = None self.flux_area = None self.last_transmission = None self.frontend_is_open = False self.undulator_gap = 9999 self.values_ordered_dict = OrderedDict() self.values_ordered_dict["current"] = { "value": 0, "value_str": "", "in_range": False, "title": "Machine current", "bold": True, } self.values_ordered_dict["machine_state"] = { "value": None, "in_range": False, "title": "Machine state", } self.values_ordered_dict["frontend_undulator"] = { "value": None, "in_range": True, "title": "Front End, undulator gap", } self.values_ordered_dict["temp_hum"] = { "value": "", "value_str": "", "in_range": None, "title": "Hutch temperature and humidity", } self.temp_hum_values = [None, None] self.temp_hum_in_range = [None, None] self.temp_hum_polling = None self.chan_mach_curr = None self.chan_mach_energy = None self.chan_bunch_count = None self.chan_frontend_status = None self.chan_undulator_gap = None self.chan_state_text = None self.chan_cryojet_in = None self.chan_sc_dewar_low_level_alarm = None self.chan_sc_dewar_overflow_alarm = None
[docs] def init(self): self.update_interval = int(self.get_property("updateIntervalS")) self.limits_dict = eval(self.get_property("limits")) self.hutch_temp_addr = self.get_property("hutchTempAddress") self.hutch_hum_addr = self.get_property("hutchHumAddress") self.chan_mach_curr = self.get_channel_object("machCurrent") self.chan_mach_curr.connect_signal("update", self.mach_current_changed) self.chan_state_text = self.get_channel_object("machStateText") self.chan_state_text.connect_signal("update", self.state_text_changed) # self.state_text_changed(self.chan_state_text.get_value()) self.chan_mach_energy = self.get_channel_object("machEnergy") self.chan_mach_energy.connect_signal("update", self.mach_energy_changed) self.chan_bunch_count = self.get_channel_object("machBunchCount") self.chan_bunch_count.connect_signal("update", self.bunch_count_changed) self.chan_frontend_status = self.get_channel_object("frontEndStatus") self.chan_frontend_status.connect_signal("update", self.frontend_status_changed) # self.frontend_status_changed(self.chan_frontend_status.get_value()) if HWR.beamline.flux is not None: self.connect(HWR.beamline.flux, "fluxInfoChanged", self.flux_info_changed) self.values_ordered_dict["flux"] = { "value": 1, "value_str": "Remeasure flux!", "in_range": False, "title": "Measured flux", } self.chan_undulator_gap = self.get_channel_object("chanUndulatorGap") if self.chan_undulator_gap is not None: self.chan_undulator_gap.connect_signal("update", self.undulator_gap_changed) self.undulator_gap_changed(self.chan_undulator_gap.get_value()) self.chan_cryojet_in = self.get_channel_object("cryojetIn", optional=True) if self.chan_cryojet_in is not None: self.values_ordered_dict["cryo"] = { "value": "???", "in_range": None, "title": "Cryoject in place", } self.cryojet_in_changed(self.chan_cryojet_in.get_value()) self.chan_cryojet_in.connect_signal("update", self.cryojet_in_changed) else: self.log.debug("MachineInfo: Cryojet channel not defined") self.chan_sc_dewar_low_level_alarm = self.get_channel_object( "scLowLevelAlarm", optional=True ) if self.chan_sc_dewar_low_level_alarm is not None: self.values_ordered_dict["sc"] = { "value": "Dewar level in range", "in_range": True, "title": "Sample changer", } self.chan_sc_dewar_low_level_alarm.connect_signal( "update", self.low_level_alarm_changed ) self.low_level_alarm_changed(self.chan_sc_dewar_low_level_alarm.get_value()) self.chan_sc_dewar_overflow_alarm = self.get_channel_object( "scOverflowAlarm", optional=True ) if self.chan_sc_dewar_overflow_alarm is not None: self.chan_sc_dewar_overflow_alarm.connect_signal( "update", self.overflow_alarm_changed ) if hasattr(HWR.beamline, "ppu_control"): self.values_ordered_dict["ppu"] = { "value": "- - -", "in_range": False, "title": "Files copied - pending - failed", } self.connect( HWR.beamline.ppu_control, "fileTranferStatusChanged", self.file_transfer_status_changed, ) self.chan_count_dropped = self.get_channel_object( "framesCountDropped", optional=True ) if self.chan_count_dropped is not None: self.values_ordered_dict["frames_dropped"] = { "value": "", "in_range": True, "title": "Frames dropped", } self.chan_count_dropped.connect_signal("update", self.count_dropped_changed) self.temp_hum_polling = spawn( self.get_temp_hum_values, self.get_property("updateIntervalS") ) self.re_emit_values()
[docs] def clear_gevent(self): """Clear gevent tasks :return: None """ self.temp_hum_polling.kill() if self.update_task: self.update_task.kill()
[docs] def cryojet_in_changed(self, value): """Updates cryojet status :param value: status :type value: bool :return: None """ self.values_ordered_dict["cryo"]["in_range"] = False self.values_ordered_dict["cryo"]["bold"] = True if value == 1: self.values_ordered_dict["cryo"]["value"] = " In place" self.values_ordered_dict["cryo"]["in_range"] = True self.values_ordered_dict["cryo"]["bold"] = False elif value == 0: self.values_ordered_dict["cryo"]["value"] = "NOT IN PLACE" else: self.values_ordered_dict["cryo"]["value"] = "Unknown" self.re_emit_values()
[docs] def mach_current_changed(self, value): """Method called if the machine current is changed :param value: new machine current :type value: float """ if ( self.values_ordered_dict["current"]["value"] is None or abs(self.values_ordered_dict["current"]["value"] - value) > 0.00001 ): self.values_ordered_dict["current"]["value"] = value self.values_ordered_dict["current"]["value_str"] = "%.1f mA" % value self.values_ordered_dict["current"]["in_range"] = value > 60.0 self.re_emit_values()
[docs] def state_text_changed(self, text): """Function called if machine state text is changed :param text: new machine state text :type text: string """ self.state_text = str(text) self.values_ordered_dict["machine_state"]["in_range"] = text != "Fehler" self.update_machine_state()
[docs] def mach_energy_changed(self, value): """Updates machine energy value :param value: machine energy :type value: float :return: None """ self.ring_energy = value self.update_machine_state()
[docs] def bunch_count_changed(self, value): """Bunch count changed""" self.bunch_count = value self.update_machine_state()
[docs] def frontend_status_changed(self, value): """ Update front end status :param value: :return: """ self.frontend_is_open = value[2] == 2 self.update_machine_state()
[docs] def undulator_gap_changed(self, value): """ Update undulator gaps :param value: float :return: """ if isinstance(value, (list, tuple)): value = value[0] self.undulator_gap = value / 1000
[docs] def update_machine_state(self): """Machine state assembly""" state_text = self.state_text if self.ring_energy is not None: state_text += "\n%.2f GeV " % self.ring_energy if self.bunch_count is not None: state_text += ", %d Bunches" % self.bunch_count self.values_ordered_dict["machine_state"]["value"] = state_text if not self.frontend_is_open or self.undulator_gap > 30: self.values_ordered_dict["frontend_undulator"]["in_range"] = False else: self.values_ordered_dict["frontend_undulator"]["in_range"] = True if self.frontend_is_open: self.values_ordered_dict["frontend_undulator"]["value_str"] = ( "Opened, %d mm" % self.undulator_gap ) else: self.values_ordered_dict["frontend_undulator"]["value_str"] = ( "Closed, %d mm" % self.undulator_gap ) self.re_emit_values()
[docs] def low_level_alarm_changed(self, value): """Low level alarm""" self.low_level_alarm = value self.update_sc_alarm()
[docs] def overflow_alarm_changed(self, value): """Overflow alarm""" self.overflow_alarm = value self.update_sc_alarm()
[docs] def file_transfer_status_changed(self, status): """ Updates info about file being transferred :param total: int :param pending: int :param failed: int :return: """ self.values_ordered_dict["ppu"]["value"] = "%d - %d - %d" % ( status[0], status[1], status[2], ) self.values_ordered_dict["ppu"]["in_range"] = status[2] == 0 if status[2] > 0: logging.getLogger("GUI").error( "Error in file transfer (%d files failed to copy)." % status[2] )
def count_dropped_changed(self, num_dropped): self.values_ordered_dict["frames_dropped"]["value"] = str(num_dropped) self.values_ordered_dict["frames_dropped"]["in_range"] = num_dropped == 0 if num_dropped > 0: logging.getLogger("GUI").error( "Error during the frame acquisition (in total %d frame(s) dropped)." % num_dropped )
[docs] def update_sc_alarm(self): """Sample changer alarm""" if self.low_level_alarm == 1: self.values_ordered_dict["sc"]["value"] = "Low level alarm!" self.values_ordered_dict["sc"]["in_range"] = False self.values_ordered_dict["sc"]["bold"] = True logging.getLogger("GUI").error( "Liquid nitrogen level in sample changer dewar is too low!" ) elif self.overflow_alarm: self.values_ordered_dict["sc"]["value"] = "Overflow alarm!" self.values_ordered_dict["sc"]["in_range"] = False self.values_ordered_dict["sc"]["bold"] = True logging.getLogger("GUI").error( "Liquid nitrogen overflow in sample changer dewar!" ) else: self.values_ordered_dict["sc"]["value"] = "Dewar level in range" self.values_ordered_dict["sc"]["in_range"] = True self.re_emit_values()
[docs] def flux_info_changed(self, flux_info): """Sets flux value""" if flux_info["measured"] is None: self.values_ordered_dict["flux"]["value"] = 0 self.values_ordered_dict["flux"]["value_str"] = ( "Beamline mode changed\nRemeasure flux!" ) self.values_ordered_dict["flux"]["in_range"] = False else: msg_str = "Flux: %.2E ph/s\n" % flux_info["measured"]["flux"] msg_str += "%d%% transmission, %dx%d beam" % ( flux_info["measured"]["transmission"], flux_info["measured"]["size_x"] * 1000, flux_info["measured"]["size_y"] * 1000, ) self.values_ordered_dict["flux"]["value"] = flux_info["measured"]["flux"] self.values_ordered_dict["flux"]["value_str"] = msg_str self.values_ordered_dict["flux"]["in_range"] = ( flux_info["measured"]["flux"] > 1e6 ) self.re_emit_values()
[docs] def re_emit_values(self): """Emits list of values""" self.emit("valuesChanged", self.values_ordered_dict)
[docs] def get_values(self): """Returns list of values""" return self.values_ordered_dict
[docs] def get_temp_hum_values(self, sleep_time): """Updates temperature and humidity values""" while True: temp = self.get_external_value(self.hutch_temp_addr) hum = self.get_external_value(self.hutch_hum_addr) if not None in (temp, hum): if abs(float(temp) - self.hutch_temp) > 0.1 or abs( float(hum) != self.hutch_hum > 1 ): self.hutch_temp = temp self.hutch_hum = hum self.values_ordered_dict["temp_hum"]["value"] = ( "%.1f C, %.1f %%" % (temp, hum) ) self.values_ordered_dict["temp_hum"]["in_range"] = ( temp < 25 and hum < 60 ) self.re_emit_values() time.sleep(sleep_time)
[docs] def get_current(self): """Returns machine current in mA""" return self.values_ordered_dict["current"]["value"]
[docs] def get_current_value(self): """Returns machine current in mA""" return self.values_ordered_dict["current"]["value"]
[docs] def get_message(self): """Returns synchrotron state text""" return self.state_text
[docs] def get_external_value(self, addr): """Extracts value from the given epics address. This is very specific implementation how to get a value from epics web tool. At first web address string is formed and then web page by urllib2 extracted. Page contains column with records. Then the last value is chosen as the last active value. :param addr: epics address :type addr: str :returns : float """ url_prefix = ( "http://cssweb.desy.de:8084/ArchiveViewer/archive" + "reader.jsp?DIRECTORY=%2Fdata7%2FChannelArchiver%" + "2FchannelReference2.kryo&PATTERN=&" ) end = datetime.now() start = end - timedelta(hours=24) url_date = "=on&STARTMONTH=%d&STARTDAY=%d&STARTYEAR=%d" % ( start.month, start.day, start.year, ) + "&STARTHOUR=%d&STARTMINUTE=%d&STARTSECOND=0" % (start.hour, start.minute) url_date = url_date + ( "&ENDMONTH=%d&ENDDAY=%d&ENDYEAR=%d" % (end.month, end.day, end.year) + "&ENDHOUR=%d&ENDMINUTE=%d&ENDSECOND=0" % (end.hour, (end.minute - 10)) ) url_date = ( url_date + "&COMMAND=GET&Y0=0&Y1=0&FORMAT=SPREADSHEET&" + "INTERPOL=0&NUMOFPOINTS=10" ) url_file = None last_value = None try: addr = addr.split(":") url_device = ( "NAMES=" + addr[0] + "%3A" + addr[1] + "%3A" + addr[2] + "%3A" + addr[3] ) url_device = ( url_device + "&FRAME2=1" + addr[0] + "%3A" + addr[1] + "%3A" + addr[2] + "%3A" + addr[3] ) url_device = ( url_device + "&NAMES2=&" + addr[0] + "%3A" + addr[1] + "%3A" + addr[2] + "%3A" + addr[3] ) final_url = url_prefix + url_device + url_date url_file = urlopen(final_url) for line in url_file: line_el = line.split() if line_el: if line_el[-1].isdigit: last_value = line_el[-1] last_value = float(last_value) except Exception: self.log.debug("MachineInfo: Unable to read epics values") finally: if url_file: url_file.close() return last_value
[docs] def get_ramdisk_size(self): """ Gets ramdisk size in bytes :return: total, free disc size in bytes and free disc in perc """ data_dir = "/ramdisk/" # p = '/' + data_dir.split('/')[1] # data_dir = str(p) if os.path.exists(data_dir): st = os.statvfs(data_dir) total = st.f_blocks * st.f_frsize free = st.f_bavail * st.f_frsize perc = st.f_bavail / float(st.f_blocks) return total, free, perc else: return None, None, None
[docs] def sizeof_fmt(self, num): """Returns disk space formatted in string""" try: for x in ["bytes", "KB", "MB", "GB"]: if num < 1024.0: return "%3.1f%s" % (num, x) num /= 1024.0 return "%3.1f%s" % (num, "TB") except Exception: return "???"