# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
import logging
import gevent
from mxcubecore import HardwareRepository as HWR
from mxcubecore.dispatcher import dispatcher
from mxcubecore.queue_entry.base_queue_entry import (
QUEUE_ENTRY_STATUS,
BaseQueueEntry,
QueueAbortedException,
QueueExecutionException,
)
__credits__ = ["MXCuBE collaboration"]
__license__ = "LGPLv3+"
__category__ = "General"
[docs]class EnergyScanQueueEntry(BaseQueueEntry):
def __init__(self, view=None, data_model=None):
BaseQueueEntry.__init__(self, view, data_model)
self.energy_scan_task = None
self._failed = False
def __getstate__(self):
d = dict(self.__dict__)
d["energy_scan_task"] = None
return d
def __setstate__(self, d):
self.__dict__.update(d)
[docs] def execute(self):
BaseQueueEntry.execute(self)
if HWR.beamline.energy_scan:
energy_scan = self.get_data_model()
self.get_view().setText(1, "Starting energy scan")
if energy_scan.shape is not None:
point = HWR.beamline.sample_view.get_shape(energy_scan.shape)
energy_scan.centred_position = point.get_centred_position()
sample_model = self.get_data_model().get_sample_node()
sample_lims_id = sample_model.lims_id
# No sample id, pass None to start_energy_scan
if sample_lims_id == -1:
sample_lims_id = None
self.energy_scan_task = gevent.spawn(
HWR.beamline.energy_scan.start_energy_scan,
energy_scan.element_symbol,
energy_scan.edge,
energy_scan.path_template.directory,
energy_scan.path_template.get_prefix(),
HWR.beamline.session.session_id,
sample_lims_id,
cpos=energy_scan.centred_position,
)
HWR.beamline.energy_scan.ready_event.wait()
HWR.beamline.energy_scan.ready_event.clear()
[docs] def pre_execute(self):
BaseQueueEntry.pre_execute(self)
self._failed = False
qc = self.get_queue_controller()
qc.connect(
HWR.beamline.energy_scan,
"scanStatusChanged",
self.energy_scan_status_changed,
)
qc.connect(
HWR.beamline.energy_scan, "energyScanStarted", self.energy_scan_started
)
qc.connect(
HWR.beamline.energy_scan, "energyScanFinished", self.energy_scan_finished
)
qc.connect(
HWR.beamline.energy_scan, "energyScanFailed", self.energy_scan_failed
)
[docs] def post_execute(self):
BaseQueueEntry.post_execute(self)
qc = self.get_queue_controller()
qc.disconnect(
HWR.beamline.energy_scan,
"scanStatusChanged",
self.energy_scan_status_changed,
)
qc.disconnect(
HWR.beamline.energy_scan, "energyScanStarted", self.energy_scan_started
)
qc.disconnect(
HWR.beamline.energy_scan, "energyScanFinished", self.energy_scan_finished
)
qc.disconnect(
HWR.beamline.energy_scan, "energyScanFailed", self.energy_scan_failed
)
if self._failed:
raise QueueAbortedException("Queue stopped", self)
self.get_view().set_checkable(False)
def energy_scan_status_changed(self, msg):
logging.getLogger("user_level_log").info(msg)
def energy_scan_started(self, *args):
logging.getLogger("user_level_log").info("Energy scan started.")
self.get_view().setText(1, "In progress")
def energy_scan_finished(self, scan_info):
self.get_view().setText(1, "Done")
energy_scan = self.get_data_model()
(
pk,
fppPeak,
fpPeak,
ip,
fppInfl,
fpInfl,
rm,
chooch_graph_x,
chooch_graph_y1,
chooch_graph_y2,
title,
) = HWR.beamline.energy_scan.do_chooch(
energy_scan.element_symbol,
energy_scan.edge,
energy_scan.path_template.directory,
energy_scan.path_template.get_archive_directory(),
"%s_%d"
% (
energy_scan.path_template.get_prefix(),
energy_scan.path_template.run_number,
),
)
# scan_file_archive_path,
# scan_file_path)
# Trying to get the sample from the EnergyScan model instead through
# the view. Keeping the old way fore backward compatibility
if energy_scan.sample:
sample = energy_scan.sample
else:
sample = self.get_view().parent().parent().get_model()
sample.crystals[0].energy_scan_result.peak = pk
sample.crystals[0].energy_scan_result.inflection = ip
sample.crystals[0].energy_scan_result.first_remote = rm
sample.crystals[0].energy_scan_result.second_remote = None
energy_scan.result.pk = pk
energy_scan.result.fppPeak = fppPeak
energy_scan.result.fpPeak = fpPeak
energy_scan.result.ip = ip
energy_scan.result.fppInfl = fppInfl
energy_scan.result.fpInfl = fpInfl
energy_scan.result.rm = rm
energy_scan.result.chooch_graph_x = chooch_graph_x
energy_scan.result.chooch_graph_y1 = chooch_graph_y1
energy_scan.result.chooch_graph_y2 = chooch_graph_y2
energy_scan.result.title = title
try: # noqa: SIM105
energy_scan.result.data = HWR.beamline.energy_scan.get_scan_data()
except AttributeError:
pass
if (
sample.crystals[0].energy_scan_result.peak
and sample.crystals[0].energy_scan_result.inflection
):
logging.getLogger("user_level_log").info(
"Energy scan: Result peak: %.4f, inflection: %.4f"
% (
sample.crystals[0].energy_scan_result.peak,
sample.crystals[0].energy_scan_result.inflection,
)
)
self.get_view().setText(1, "Done")
self._queue_controller.emit("energy_scan_finished", (pk, ip, rm, sample))
def energy_scan_failed(self):
self._failed = True
self.get_view().setText(1, "Failed")
self.status = QUEUE_ENTRY_STATUS.FAILED
logging.getLogger("user_level_log").error("Energy scan: failed")
raise QueueExecutionException("Energy scan failed", self)
[docs] def stop(self):
BaseQueueEntry.stop(self)
try:
# self.get_view().setText(1, 'Stopping ...')
HWR.beamline.energy_scan.cancelEnergyScan()
if self.centring_task:
self.centring_task.kill(block=False)
except gevent.GreenletExit:
raise
self.get_view().setText(1, "Stopped")
logging.getLogger("queue_exec").info("Calling stop on: " + str(self))
# this is to work around the remote access problem
dispatcher.send("collect_finished")
raise QueueAbortedException("Queue stopped", self)
def get_type_str(self):
return "Energy scan"