Source code for mxcubecore.HardwareObjects.EMBL.EMBLFlux

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

import logging
from copy import deepcopy

import gevent
import numpy
from scipy.interpolate import interp1d

from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractFlux import AbstractFlux

__credits__ = ["EMBL Hamburg"]
__category__ = "General"

diode_calibration_amp_per_watt = interp1d(
    [4.0, 6.0, 8.0, 10.0, 12.0, 12.5, 15.0, 16.0, 20.0, 30.0],
    [0.2267, 0.2116, 0.1405, 0.086, 0.0484, 0.0469, 0.0289, 0.0240, 0.01248, 0.00388],
)

diode_calibration_amp_per_watt = interp1d(
    [4.0, 6.0, 8.0, 10.0, 12.0, 12.5, 15.0, 16.0, 20.0, 30.0],
    [0.2267, 0.2116, 0.1405, 0.086, 0.0484, 0.0469, 0.0289, 0.0240, 0.01248, 0.00388],
)

air_absorption_coeff_per_meter = interp1d(
    [
        4.0,
        4.2,
        4.4,
        4.6,
        4.8,
        5.0,
        5.4,
        5.8,
        6.2,
        6.6,
        7.0,
        8.0,
        9.2,
        11.8,
        14.4,
        17.0,
        19.6,
        22.2,
        24.8,
        27.4,
        30,
    ],
    [
        9.19440446,
        7.94983601,
        6.91804807,
        6.10374226,
        5.32906528,
        4.71308953,
        3.73630655,
        3.00942560,
        2.45767288,
        2.0317802,
        1.69805057,
        1.12911273,
        0.73628084,
        0.34554261,
        0.19176669,
        0.12030697,
        0.08331135,
        0.06203213,
        0.04926173,
        0.04114024,
        0.0357374,
    ],
)

carbon_window_transmission = interp1d(
    [4.0, 6.6, 9.2, 11.8, 14.4, 17.0, 19.6, 22.2, 24.8, 27.4, 30],
    [
        0.74141,
        0.93863,
        0.97775,
        0.98946,
        0.99396,
        0.99599,
        0.99701,
        0.99759,
        0.99793,
        0.99815,
        0.99828,
    ],
)

# # Replaced by AbstractFLux.get_dose_rate_per_photon_per_mmsq (same values0
#
# dose_rate_per_10to14_ph_per_mmsq = interp1d(
#     [4.0, 6.6, 9.2, 11.8, 14.4, 17.0, 19.6, 22.2, 24.8, 27.4, 30.0],
#     [
#         459000.0,
#         162000.0,
#         79000.0,
#         45700.0,
#         29300.0,
#         20200.0,
#         14600.0,
#         11100.0,
#         8610.0,
#         6870.0,
#         5520.0,
#     ],
# )


[docs]class EMBLFlux(AbstractFlux): def __init__(self, name): AbstractFlux.__init__(self, name) self.measured_flux_dict = None self.measured_flux_list = None self.current_flux_dict = None self.flux_value = 0 # self.ampl_chan_index = None # self.intensity_ranges = [] self.intensity_value = None self.flux_record_status = None self.origin_flux_value = None self.origin_beam_info = None self.origin_transmission = None self.measuring = False self.transmission_value = None self.chan_intens_range = None self.chan_intens_mean = None self.cmd_set_intens_acq_time = None self.cmd_set_intens_range = None self.cmd_set_intens_resolution = None
[docs] def init(self): """Reads config xml, initiates all necessary hwobj, channels and cmds""" super(EMBLFlux, self).init() """ self.intensity_ranges = [] try: for intens_range in self["intensity"]["ranges"]: temp_intens_range = {} temp_intens_range["max"] = intens_range.CurMax temp_intens_range["index"] = intens_range.CurIndex temp_intens_range["offset"] = intens_range.CurOffset self.intensity_ranges.append(temp_intens_range) self.intensity_ranges = sorted( self.intensity_ranges, key=lambda item: item["max"] ) except Exception: self.log.error("BeamlineTest: No intensity ranges defined") """ self.chan_intens_mean = self.get_channel_object("intensMean") self.chan_intens_mean.connect_signal("update", self.intens_mean_changed) self.chan_intens_range = self.get_channel_object("intensRange") self.chan_flux_transmission = self.get_channel_object("fluxTransmission") self.cmd_set_intens_resolution = self.get_command_object("setIntensResolution") self.cmd_set_intens_acq_time = self.get_command_object("setIntensAcqTime") self.cmd_set_intens_range = self.get_command_object("setIntensRange") self.cmd_flux_record = self.get_command_object("fluxRecord") self.connect( HWR.beamline.transmission, "valueChanged", self.transmission_changed ) # self.init_flux_values() self.chan_flux_status = self.get_channel_object("fluxStatus") self.chan_flux_status.connect_signal("update", self.flux_status_changed) self.chan_flux_message = self.get_channel_object("fluxMessage") self.chan_flux_message.connect_signal("update", self.flux_message_changed) self.connect(HWR.beamline.beam, "beamInfoChanged", self.beam_info_changed) self.connect( HWR.beamline.beam.aperture, "diameterIndexChanged", self.aperture_diameter_changed, ) if HWR.beamline.beam.definer is not None: self.connect( HWR.beamline.beam.definer, "focusingModeChanged", self.focusing_mode_changed, ) self.init_flux_values()
def init_flux_values(self): if not self.chan_flux_status.get_value(): logging.getLogger("GUI").error( "No valid flux value available. Please remeasure flux!" ) return flux_values = self.cmd_flux_record.get() flux_transmission = self.chan_flux_transmission.get_value() aperture_diameter_list = HWR.beamline.beam.aperture.get_diameter_size_list() self.measured_flux_list = [] for index, flux_value in enumerate(flux_values): self.measured_flux_list.append( { "flux": flux_value, "transmission": flux_transmission, "size_x": aperture_diameter_list[index] / 1000.0, "size_y": aperture_diameter_list[index] / 1000.0, } ) def flux_message_changed(self, message): if message is not "": logging.getLogger("GUI").error("Flux-record message: %s" % message) def flux_status_changed(self, status): if not status and self.flux_record_status: logging.getLogger("GUI").error( "Flux value invalidated. Please remeasure flux!" ) self.reset_flux() self.flux_record_status = status
[docs] def aperture_diameter_changed(self, index, size): """Updates flux if the aperture diameter has been changed""" if self.measured_flux_list and not self.measuring: if len(self.measured_flux_list) > 1: self.measured_flux_dict = self.measured_flux_list[index] self.update_flux_value()
[docs] def beam_info_changed(self, beam_info): """Updates flux value if the beam size changes""" self.beam_info = beam_info self.update_flux_value()
[docs] def transmission_changed(self, transmission): """Updates flux value if the transmission has been changed""" self.transmission_value = transmission self.update_flux_value()
def intens_mean_changed(self, value): pass def focusing_mode_changed(self, mode, size): logging.getLogger("GUI").warning( "Beamline focus mode changed. Please remeasure flux!" ) self.reset_flux() def reset_flux(self): self.current_flux_dict = None self.measured_flux_dict = None self.measured_flux_list = [] self.emit( "fluxInfoChanged", {"measured": self.measured_flux_dict, "current": self.current_flux_dict}, )
[docs] def get_value(self): """Returns flux value as float""" if self.current_flux_dict is not None: return self.current_flux_dict["flux"] else: return 1
def update_flux_value(self): if self.measured_flux_dict is not None and self.transmission_value is not None: self.current_flux_dict = deepcopy(self.measured_flux_dict) if int(self.transmission_value) != int( self.measured_flux_dict["transmission"] ): self.current_flux_dict["flux"] = ( self.measured_flux_dict["flux"] * self.transmission_value / self.measured_flux_dict["transmission"] ) self.current_flux_dict["transmission"] = self.transmission_value if len(self.measured_flux_list) == 1: origin_area = ( self.measured_flux_dict["size_x"] * self.measured_flux_dict["size_y"] ) current_area = self.beam_info["size_x"] * self.beam_info["size_y"] if origin_area != current_area: self.current_flux_dict["size_x"] = self.beam_info["size_x"] self.current_flux_dict["size_y"] = self.beam_info["size_y"] self.current_flux_dict["flux"] = ( self.measured_flux_dict["flux"] * current_area / origin_area ) self.emit( "fluxInfoChanged", { "measured": self.measured_flux_dict, "current": self.current_flux_dict, }, ) def measure_flux(self, wait=True): gevent.spawn(self.measure_flux_task, wait) def measure_flux_task(self, wait=True): if not HWR.beamline.safety_shutter.is_opened(): msg = "Unable to measure flux! Safety shutter is closed." self.print_log("GUI", "error", msg) return if not HWR.beamline.detector.is_cover_closed(): msg = "Unable to measure flux! Detecor cover is open." self.print_log("GUI", "error", "Unable to measure flux!") self.print_log("GUI", "error", msg) return if HWR.beamline.session.beamline_name == "P14": if HWR.beamline.detector.distance.get_value() > 501: self.print_log( "GUI", "error", "Detector is too far away for flux measurements. Move to 500 mm or closer.", ) return self.measuring = True intens_value = 0 max_frame_rate = 1 / HWR.beamline.detector.get_exposure_time_limits()[0] current_phase = HWR.beamline.diffractometer.current_phase current_transmission = HWR.beamline.transmission.get_value() current_aperture_index = HWR.beamline.beam.aperture.get_diameter_index() self.emit("progressInit", "Measuring flux. Please wait...", 10, True) # Set transmission to 100% # ----------------------------------------------------------------- self.emit("progressStep", 1, "Setting transmission to 100%") HWR.beamline.transmission.set_value(100, timeout=20) # Close the fast shutter # ----------------------------------------------------------------- HWR.beamline.fast_shutter.close(wait=True) self.log.debug("Measure flux: Fast shutter closed") gevent.sleep(0.2) HWR.beamline.diffractometer.wait_ready(10) # Move back light in, check beamstop position # ----------------------------------------------------------------- self.log.info("Measure flux: Moving backlight out...") self.emit("progressStep", 1, "Moving backlight out") HWR.beamline.back_light.move_in() self.log.debug("Measure flux: Backlight moved out") beamstop_position = HWR.beamline.beamstop.get_value() if beamstop_position == "BEAM": self.emit("progressStep", 2, "Moving beamstop OFF") HWR.beamline.beamstop.set_position("OFF") HWR.beamline.diffractometer.wait_ready(30) self.log.info("Measure flux: Beamstop moved off") # Check scintillator position # ----------------------------------------------------------------- scintillator_position = HWR.beamline.diffractometer.get_scintillator_position() if scintillator_position == "SCINTILLATOR": self.emit("progressStep", 3, "Setting the photodiode") HWR.beamline.diffractometer.set_scintillator_position("PHOTODIODE") gevent.sleep(1) HWR.beamline.diffractometer.wait_ready(30) self.log.debug("Measure flux: Scintillator set to photodiode") self.measured_flux_list = [] # ----------------------------------------------------------------- if HWR.beamline.session.beamline_name == "P13": HWR.beamline.beam.aperture.set_in() HWR.beamline.diffractometer.wait_ready(30) HWR.beamline.beam.aperture.set_diameter_index(0) HWR.beamline.fast_shutter.openShutter(wait=True) for index, diameter_size in enumerate( HWR.beamline.beam.aperture.get_diameter_list() ): # 5. open the fast shutter ----------------------------------------- self.emit( "progressStep", 4 + index, "Measuring flux with %d micron aperture" % diameter_size, ) HWR.beamline.beam.aperture.set_diameter_index(index) HWR.beamline.diffractometer.wait_ready(10) gevent.sleep(1) intens_value = self.chan_intens_mean.get_value(force=True) self.log.info("Measured current: %s" % intens_value) # HWR.beamline.fast_shutter.closeShutter(wait=True) intensity_value = intens_value[0] + 1.860e-5 # 2.780e-6 self.measured_flux_list.append(self.get_flux_result(intensity_value)) gevent.sleep(1) HWR.beamline.fast_shutter.closeShutter(wait=True) try: self.cmd_flux_record([_x["flux"] for _x in self.measured_flux_list]) gevent.sleep(2) except Exception: self.log.exception("") max_frame_rate = 25 else: self.emit("progressStep", 5, "Measuring the intensity") current_aperture_index = 0 HWR.beamline.fast_shutter.open(wait=True) self.log.debug("Measure flux: Fast shutter opened") gevent.sleep(0.5) intens_value = self.chan_intens_mean.get_value() intens_range_now = self.chan_intens_range.get_value() HWR.beamline.fast_shutter.close(wait=True) self.log.debug("Measure flux: Fast shutter closed") intensity_value = intens_value[0] + 2.780e-6 self.measured_flux_list.append(self.get_flux_result(intensity_value)) self.emit("progressStep", 10, "Restoring original state") self.print_log("GUI", "info", "Flux measurement results:") self.print_log( "GUI", "info", "Beam size | Flux (ph/s) | " + "Dose rate (KGy/s) | Time to reach 20 MGy (s) | " + "Number of frames @ %d Hz" % max_frame_rate, ) for index, item in enumerate(self.measured_flux_list): msg = " * %d x %d | %1.1e | %1.1e | %.1f | %d" % ( item["size_x"] * 1000, item["size_y"] * 1000, item["flux"], item["dose_rate"], item["time_to_reach_limit"], item["frames_to_reach_limit"], ) if index > 0: # low_value = item["flux"] < 1e9 # low_value = item["intensity"] - 1.860e-5 < 1e-6 low_value = item["intensity"] < 1e-6 out_of_range = False if ( self.measured_flux_list[0]["flux"] <= self.measured_flux_list[-1]["flux"] or self.measured_flux_list[index - 1]["flux"] <= self.measured_flux_list[index]["flux"] ): out_of_range = True if low_value or out_of_range: msg += " (intensity: %1.1e)" % item["intensity"] self.print_log("GUI", "error", msg) else: self.print_log("GUI", "info", msg) else: self.print_log("GUI", "info", msg) self.measured_flux_dict = self.measured_flux_list[current_aperture_index] self.current_flux_dict = self.measured_flux_list[current_aperture_index] self.emit( "fluxInfoChanged", {"measured": self.measured_flux_dict, "current": self.current_flux_dict}, ) self.measuring = False # 7 Restoring previous states ---------------------------------------- HWR.beamline.transmission.set_value(current_transmission) HWR.beamline.diffractometer.set_phase(current_phase) HWR.beamline.diffractometer.wait_ready(10) if HWR.beamline.session.beamline_name == "P13": HWR.beamline.beam.aperture.set_diameter_index(current_aperture_index) self.emit("progressStop", ()) def get_flux_result(self, intensity_value): energy = HWR.beamline.energy.get_value() detector_distance = HWR.beamline.detector.distance.get_value() beam_size = HWR.beamline.beam.get_beam_size() transmission = HWR.beamline.transmission.get_value() air_trsm = numpy.exp( -air_absorption_coeff_per_meter(energy) * detector_distance / 1000.0 ) carb_trsm = carbon_window_transmission(energy) flux = ( 0.624151 * 1e16 * intensity_value / diode_calibration_amp_per_watt(energy) / energy / air_trsm / carb_trsm ) flux = flux * 1.8 dose_rate = ( 1e-3 # * 1e-14 * self.get_dose_rate_per_photon_per_mmsq(energy) * flux / beam_size[0] / beam_size[1] ) max_frame_rate = 1 / HWR.beamline.detector.get_exposure_time_limits()[0] result = { "energy": energy, "detector_distance": detector_distance, "size_x": beam_size[0], "size_y": beam_size[1], "transmission": transmission, "intensity": intensity_value, "flux": flux, "dose_rate": dose_rate, "time_to_reach_limit": 20000.0 / dose_rate, "frames_to_reach_limit": int(max_frame_rate * 20000.0 / dose_rate), "max_frame_rate": max_frame_rate, } return result