Source code for mxcubecore.HardwareObjects.EMBL.EMBLXRFSpectrum
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
import logging
import gevent
from mxcubecore.HardwareObjects.abstract.AbstractXRFSpectrum import AbstractXRFSpectrum
__credits__ = ["EMBL Hamburg"]
__license__ = "LGPLv3+"
__category__ = "Task"
[docs]class EMBLXRFSpectrum(AbstractXRFSpectrum):
def __init__(self, name):
AbstractXRFSpectrum.__init__(self, name)
self.ready_event = None
self.spectrum_running = None
self.spectrum_info = None
self.config_filename = None
self.spectrum_data = None
self.mca_calib = None
self.chan_spectrum_status = None
self.chan_spectrum_consts = None
self.chan_scan_error = None
self.cmd_spectrum_start = None
self.cmd_adjust_transmission = None
[docs] def init(self):
self.ready_event = gevent.event.Event()
self.cmd_spectrum_start = self.get_command_object("cmdSpectrumStart")
self.cmd_adjust_transmission = self.get_command_object("cmdAdjustTransmission")
self.chan_spectrum_status = self.get_channel_object("chanSpectrumStatus")
self.chan_spectrum_status.connect_signal("update", self.spectrum_status_update)
self.chan_spectrum_consts = self.get_channel_object("chanSpectrumConsts")
self.chan_scan_error = self.get_channel_object("chanSpectrumError")
self.chan_scan_error.connect_signal("update", self.scan_error_update)
self.config_filename = self.get_property("configFile")
self.write_in_raw_data = False
self.mca_calib = [
x / 1000.0 for x in self.chan_spectrum_consts.get_value()
] # converted from eV to keV
[docs] def can_spectrum(self):
"""
Returns True if one can run spectrum command
:return:
"""
return True
[docs] def execute_spectrum_command(self, count_sec, filename, adjust_transmission=True):
"""Sends start command"""
try:
self.cmd_spectrum_start((count_sec, adjust_transmission))
except Exception:
logging.getLogger().exception("XRFSpectrum: problem in starting spectrum")
self.emit(
"xrfSpectrumStatusChanged", ("Error problem in starting spectrum",)
)
self.spectrum_command_aborted()
[docs] def spectrum_status_update(self, status):
"""Controls execution"""
if self.spectrum_running:
if status == "scaning":
self.log.info("XRF spectrum in progress...")
elif status == "ready":
if self.spectrum_running:
self.spectrum_data = list(self.cmd_spectrum_start.get())
# self.mca_calib = self.chan_spectrum_consts.get_value()[::-1]
self.spectrum_command_finished()
self.log.info("XRF spectrum finished")
elif status == "aborting":
if self.spectrum_running:
self.spectrum_command_aborted()
self.log.info("XRF spectrum aborted!")
elif status == "error":
self.spectrum_command_failed()
self.log.error("XRF spectrum failed!")
[docs] def cancel_spectrum(self, *args):
"""Cancels acquisition"""
if self.spectrum_running:
self.ready_event.set()
[docs] def adjust_transmission(self):
"""Adjusts transmission before executing XRF spectrum"""
if self.cmd_adjust_transmission is not None:
self.cmd_adjust_transmission()
[docs] def scan_error_update(self, error_msg):
"""Prints error message
:param error_msg: error message
:type error_msg: str
:return: None
"""
if len(error_msg) > 0:
# and self.spectrum_running:
logging.getLogger("GUI").error("XRF spectrum: %s" % error_msg)