Source code for mxcubecore.queue_entry.xrf_spectrum

#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
XRF Spectrum queue implementation of pre_execute, execute and post_execute
"""

import logging

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObjectState
from mxcubecore.queue_entry.base_queue_entry import (
    QUEUE_ENTRY_STATUS,
    BaseQueueEntry,
    QueueAbortedException,
    QueueExecutionException,
)

__credits__ = ["MXCuBE collaboration"]
__copyright__ = """ Copyright © by the MXCuBE collaboration """
__license__ = "LGPLv3+"


[docs]class XrfSpectrumQueueEntry(BaseQueueEntry): """XRF queue handler""" def __init__(self, view=None, data_model=None): super().__init__(view, data_model) self._failed = False def __getstate__(self) -> dict: d = dict(self.__dict__) d["xrf_spectrum_task"] = None return d def __setstate__(self, d: dict): self.__dict__.update(d)
[docs] def execute(self): """Execute""" super().execute() if HWR.beamline.xrf_spectrum is not None: xrf_spectrum = self.get_data_model() if xrf_spectrum.shape is not None: point = HWR.beamline.sample_view.get_shape(xrf_spectrum.shape) xrf_spectrum.centred_position = point.get_centred_position() self.get_view().setText(1, "Starting xrf spectrum") path_template = xrf_spectrum.path_template HWR.beamline.xrf_spectrum.start_spectrum( integration_time=xrf_spectrum.count_time, data_dir=xrf_spectrum.path_template.directory, archive_dir=xrf_spectrum.path_template.get_archive_directory(), prefix=f"{path_template.get_prefix()}_{path_template.run_number}", session_id=HWR.beamline.session.session_id, blsample_id=xrf_spectrum.sample.lims_id, cpos=xrf_spectrum.centred_position, ) HWR.beamline.xrf_spectrum._ready_event.wait() HWR.beamline.xrf_spectrum._ready_event.clear() else: logging.getLogger("user_level_log").info( "XRFSpectrum not defined in beamline setup" ) self.xrf_state_handler(HardwareObjectState.FAULT)
[docs] def pre_execute(self): """Pre-execution actions""" super().pre_execute() self._failed = False qctrl = self.get_queue_controller() qctrl.connect( HWR.beamline.xrf_spectrum, "xrfSpectrumStatusChanged", self.xrf_spectrum_status_changed, ) qctrl.connect(HWR.beamline.xrf_spectrum, "stateChanged", self.xrf_state_handler)
[docs] def post_execute(self): """Post-execution actions""" qctrl = self.get_queue_controller() qctrl.disconnect( HWR.beamline.xrf_spectrum, "xrfSpectrumStatusChanged", self.xrf_spectrum_status_changed, ) qctrl.disconnect( HWR.beamline.xrf_spectrum, "stateChanged", self.xrf_state_handler ) if self._failed: raise QueueAbortedException("Queue stopped", self) self.get_view().set_checkable(False) super().post_execute()
[docs] def xrf_spectrum_status_changed(self, msg: str): """xrfSpectrumStatusChanged handler. Args: msg: Message when ``xrfSpectrumStatusChanged`` emitted. """ logging.getLogger("user_level_log").info(msg)
[docs] def xrf_state_handler(self, state=None): """State handler - signal connected is stateChanged. Args: state (enum): HardwareObjectState enum member. Raises: QueueExecutionException: If procedure failed. """ state = state or HWR.beamline.xrf_spectrum.get_state() if state == HWR.beamline.xrf_spectrum.STATES.BUSY: logging.getLogger("user_level_log").info("XRF spectrum started.") self.get_view().setText(1, "In progress") if state == HWR.beamline.xrf_spectrum.STATES.READY: logging.getLogger("user_level_log").info("XRF spectrum finished.") self.get_view().setText(1, "Done") if state == HWR.beamline.xrf_spectrum.STATES.FAULT: self._failed = True self.get_view().setText(1, "Failed") self.status = QUEUE_ENTRY_STATUS.FAILED logging.getLogger("user_level_log").error("XRF spectrum failed.") raise QueueExecutionException("XRF spectrum failed", self)
def get_type_str(self) -> str: return "XRF spectrum"