# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"
import gevent
from mxcubecore.Command.Tango import DeviceProxy
from mxcubecore.HardwareObjects.abstract.AbstractDetector import AbstractDetector
[docs]class P11EigerDetector(AbstractDetector):
def __init__(self, *args):
AbstractDetector.__init__(self, *args)
[docs] def init(self):
AbstractDetector.init(self)
self.eiger_devname = self.get_property("eiger_device")
self.filewriter_name = self.get_property("filewriter_device")
self._roi_mode = self.get_property("roi_mode", "disabled")
# self.log.debug("EIGER - device name is : %s" % self.eiger_devname)
# self.log.debug("EIGER - filewriter name is : %s" % self.filewriter_name)
# self.log.debug("EIGER - detector distance is : %s" % self._distance_motor_hwobj)
# self.log.debug("EIGER - shutterless : %s" % self.has_shutterless())
self.eiger_dev = DeviceProxy(self.eiger_devname)
self.writer_dev = DeviceProxy(self.filewriter_name)
# Get the detector distance device
self.detector_tower_dev = DeviceProxy(
self.get_property("detector_tower_device")
)
self.dcm_energy_dev = DeviceProxy(self.get_property("dcm_energy_device"))
self.chan_status = self.get_channel_object("_status")
self.chan_status.connect_signal("update", self.status_changed)
self._exposure_time_limits = eval(self.get_property("exposure_time_limits"))
self._beam_centre = eval(self.get_property("beam_centre"))
self.originx = self._beam_centre[0]
self.originy = self._beam_centre[1]
self.log.debug("EIGER - beamcenter: %s" % str(self._beam_centre))
self.log.debug("EIGER - originx: %s" % str(self.originx))
self.log.debug("EIGER - originy: %s" % str(self.originy))
self.inited = False
def init_acquisition(self):
# set parameters like exts, images_per_file, etc...
# self.roi_mode = "4M"
# self.roi_mode = "disabled"
self.set_eiger_enum("TriggerMode", "exts")
self.set_eiger_enum("RoiMode", self._roi_mode)
self.set_eiger_enum("Compression", "bslz4")
self.set_writer_enum("Mode", "enabled")
# self.writer_dev.write_attribute("NImagesPerFile", 1000)
self.inited = True
[docs] def has_shutterless(self):
return True
[docs] def prepare_acquisition(self, *args, **kwargs):
"""
Prepares detector for acquisition
"""
self.log.debug("EIGER - prepare_acquisition")
# Sets the metadata for the header
self.set_metadata()
[docs] def get_radius(self, distance=None):
# a proper calculation should be done here
# this value comes from crystalControlMaxwell hardcoded value to
# estimate resolution
return 311 / 2.0
def get_eiger_detector_distance(self):
# Get current detector distance from detector tower server
det_tower_distance = self.detector_tower_dev.read_attribute(
"DetectorDistance"
).value
self.log.debug("EIGER: DetectorTower distance is : %s" % det_tower_distance)
return det_tower_distance
def set_eiger_detector_distance(self):
# Set detector distance in the Detector server for the header.
# It does not set the actual detector distance!
# in mm from detector tower server
current_det_tower_distance = self.get_eiger_detector_distance()
# in meters for the detector header info
self.eiger_dev.write_attribute(
"DetectorDistance", float(current_det_tower_distance / 1000.0)
) # in m
# self.eiger_dev.write_attribute("DetectorDistance", float(current_det_tower_distance)) #in mkm
# TODO add if needed
def get_eiger_beam_center(self):
return True
def set_eiger_beam_center(self):
# ==== Setting the beam center ======================
# Beam center is depending on the detector distance as for the CrystalControl.
# Emulate of the same hardcoded config parameters as before.
# originx and originy (set in eiger.xml) are the X and Y of the beam mark at the detector distance of 160 mm.
# Beam center params after calibration in October 2022 from CC:
current_det_tower_distance = self.get_eiger_detector_distance()
corrected_originx = (
self.originx + 63.5 * (current_det_tower_distance - 160) / 1000.0
)
corrected_originy = (
self.originy - 14.2 * (current_det_tower_distance - 160) / 1000.0
)
self.eiger_dev.write_attribute("BeamCenterX", float(corrected_originx))
self.eiger_dev.write_attribute("BeamCenterY", float(corrected_originy))
self.log.debug(
"EIGER - current beamcenter X and Y: %f, %f at detector distance %f mm"
% (corrected_originx, corrected_originy, current_det_tower_distance)
)
# TODO add if needed
def get_eiger_photon_energy(self):
return True
# FIXME: Add the same conditions as for CC
def set_eiger_photon_energy(self):
# Sets photon energy in the detector server for the header
current_dcm_energy = float(self.dcm_energy_dev.read_attribute("Position").value)
if current_dcm_energy > 5500:
self.eiger_dev.write_attribute("PhotonEnergy", float(current_dcm_energy))
else:
self.eiger_dev.write_attribute("PhotonEnergy", float(5500))
def set_eiger_start_angle(self, arg):
# Sets Detector start angle for the header
arg = float(arg)
self.eiger_dev.write_attribute("OmegaStart", arg)
def set_eiger_angle_increment(self, arg):
# Sets detector angle increment
arg = float(arg)
self.eiger_dev.write_attribute("OmegaIncrement", arg)
def prepare_common(self, exptime, filepath):
if not self.inited:
self.init_acquisition()
self.eiger_dev.write_attribute("CountTime", float(exptime))
self.eiger_dev.write_attribute("FrameTime", float(exptime))
self.eiger_dev.write_attribute("TriggerStartDelay", 0.003)
if filepath.startswith("/gpfs"):
filepath = filepath[len("/gpfs") :]
self.writer_dev.write_attribute("NamePattern", filepath)
# Sets the metadata for the header
self.set_metadata()
def prepare_characterisation(self, exptime, number_of_images, angle_inc, filepath):
self.writer_dev.write_attribute(
"NImagesPerFile", 1
) # To write one image per characterisation.
self.prepare_common(exptime, filepath)
self.eiger_dev.write_attribute("Nimages", 1) # Number of images per trigger
self.log.debug(
"Eiger. preparing characterization. Number of triggers is: %d"
% number_of_images
)
self.eiger_dev.write_attribute("Ntrigger", int(number_of_images))
def prepare_std_collection(self, exptime, number_of_images, filepath):
self.prepare_common(exptime, filepath)
self.eiger_dev.write_attribute("Nimages", int(number_of_images))
self.eiger_dev.write_attribute("Ntrigger", 1)
self.writer_dev.write_attribute("NImagesPerFile", 1000) # Default
self.writer_dev.write_attribute("ImageNrStart", 1)
def set_metadata(self):
self.set_eiger_detector_distance() # set detector distance for the header
self.set_eiger_beam_center() # set detector beam center for the header
self.set_eiger_photon_energy() # set detector photon energy
[docs] def start_acquisition(self):
self.eiger_dev.Arm()
self.wait_ready()
[docs] def stop_acquisition(self):
self.eiger_dev.Abort()
gevent.sleep(1)
self.eiger_dev.Disarm()
self.wait_ready()
[docs] def wait_ready(self, timeout=30):
with gevent.Timeout(timeout, RuntimeError("timeout waiting detector ready")):
while self.chan_status.get_value().lower() not in ["ready", "idle"]:
gevent.sleep(2)
def status_changed(self, status):
self.log.debug("P11EigerDetector - status changed. now is %s" % status)
[docs] def get_beam_position(self, distance=None, wavelength=None):
return self._beam_centre
# managing enum device server attributes
def set_eiger_enum(self, attr, value):
dev = self.eiger_dev
self.set_attr_enum(dev, attr, value)
def set_writer_enum(self, attr, value):
dev = self.writer_dev
self.set_attr_enum(dev, attr, value)
def set_attr_enum(self, dev, attr, value):
values = list(dev.get_attribute_config(attr).enum_labels)
no = values.index(value)
if no >= 0:
print("writing no %s for value %s (%s)" % (no, value, str(values)))
dev.write_attribute(attr, no)
else:
self.log.error(
"Trying to write invalid value %s for attribute %s" % (value, attr)
)
def get_eiger_enum(self, attr):
dev = self.eiger_dev
return self.get_attr_enum(dev, attr)
def get_writer_enum(self, attr):
dev = self.writer_dev
return self.get_attr_enum(dev, attr)
def get_attr_enum(self, dev, attr):
values = list(dev.get_attribute_config(attr).enum_labels)
no = dev.read_attribute(attr).value
return values[no]
def get_eiger_name_pattern(self):
return self.writer_dev.NamePattern
def get_latest_local_master_image_name(self):
latest_image = self.get_eiger_name_pattern()
latest_image = f"/gpfs{latest_image}_master.h5"
return latest_image