"""LimaEigerDetector Class
Lima Tango Device Server implementation of the Dectris Eiger2 Detector.
"""
import logging
import math
import os
import time
import gevent
from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractDetector import AbstractDetector
from mxcubecore.model.queue_model_objects import PathTemplate
from mxcubecore.TaskUtils import task
[docs]class LimaEigerDetector(AbstractDetector):
def __init__(self, name):
AbstractDetector.__init__(self, name)
self.binning_mode = 1
[docs] def init(self):
AbstractDetector.init(self)
self.header = {}
self._images_per_file = self.get_property("images_per_file", 100)
lima_device = self.get_property("lima_device")
eiger_device = self.get_property("eiger_device")
for channel_name in (
"acq_status",
"acq_trigger_mode",
"saving_mode",
"acq_nb_frames",
"acq_expo_time",
"saving_directory",
"saving_prefix",
"saving_suffix",
"saving_next_number",
"saving_index_format",
"saving_format",
"saving_overwrite_policy",
"last_image_saved",
"saving_frame_per_file",
"saving_managed_mode",
):
self.add_channel(
{"type": "tango", "name": channel_name, "tangoname": lima_device},
channel_name,
)
for channel_name in ("photon_energy",):
self.add_channel(
{"type": "tango", "name": channel_name, "tangoname": eiger_device},
channel_name,
)
self.add_command(
{"type": "tango", "name": "prepare_acq", "tangoname": lima_device},
"prepareAcq",
)
self.add_command(
{"type": "tango", "name": "start_acq", "tangoname": lima_device}, "startAcq"
)
self.add_command(
{"type": "tango", "name": "stop_acq", "tangoname": lima_device}, "stopAcq"
)
self.add_command(
{"type": "tango", "name": "reset", "tangoname": lima_device}, "reset"
)
self.add_channel(
{"type": "tango", "name": "saving_common_header", "tangoname": lima_device},
"saving_common_header",
)
self.get_command_object("prepare_acq").init_device()
self.get_command_object("prepare_acq").device.set_timeout_millis(5 * 60 * 1000)
self.get_channel_object("photon_energy").init_device()
# Be sure that the video_live is off.
self.add_channel(
{"type": "tango", "name": "video_live", "tangoname": lima_device},
"video_live",
)
self.get_channel_object("video_live").set_value(False)
self.update_state(self.STATES.READY)
[docs] def has_shutterless(self):
return True
[docs] def wait_ready(self, timeout=30):
acq_status_chan = self.get_channel_object("acq_status")
if acq_status_chan.get_value() != "Ready":
self.update_state(self.STATES.BUSY)
with gevent.Timeout(timeout, RuntimeError("Detector not ready")):
while acq_status_chan.get_value() != "Ready":
time.sleep(1)
self.update_state(self.STATES.READY)
[docs] def last_image_saved(self):
return self.get_channel_object("last_image_saved").get_value() + 1
def get_deadtime(self):
return float(self.get_property("deadtime"))
[docs] def prepare_acquisition(
self,
take_dark,
start,
osc_range,
exptime,
npass,
number_of_images,
comment,
mesh,
mesh_num_lines,
):
"""
diffractometer_positions = HWR.beamline.diffractometer.get_positions()
self.start_angles = list()
for i in range(number_of_images):
self.start_angles.append("%0.4f deg." % (start + osc_range * i))
self.header["file_comments"] = comment
self.header["N_oscillations"] = number_of_images
self.header["Oscillation_axis"] = "omega"
self.header["Chi"] = "0.0000 deg."
try:
self.header["Phi"] = "%0.4f deg." % diffractometer_positions.get(
"kappa_phi", -9999
)
self.header["Kappa"] = "%0.4f deg." % diffractometer_positions.get(
"kappa", -9999
)
except Exception:
self.header["Phi"] = "0.0000 deg."
self.header["Kappa"] = "0.0000 deg."
self.header["Alpha"] = "0.0000 deg."
self.header["Polarization"] = HWR.beamline.collect.bl_config.polarisation
self.header["Detector_2theta"] = "0.0000 deg."
self.header["Angle_increment"] = "%0.4f deg." % osc_range
self.header["Transmission"] = HWR.beamline.transmission.get_value()
self.header["Flux"] = HWR.beamline.flux.get_value()
self.header["Detector_Voffset"] = "0.0000 m"
self.header["Energy_range"] = "(0, 0) eV"
self.header["Trim_directory:"] = "(nil)"
self.header["Flat_field:"] = "(nil)"
self.header["Excluded_pixels:"] = " badpix_mask.tif"
self.header["N_excluded_pixels:"] = "= 321"
self.header["Threshold_setting"] = (
"%d eV" % self.get_channel_object("photon_energy").get_value()
)
self.header["Count_cutoff"] = "1048500"
self.header["Tau"] = "= 0 s"
self.header["Exposure_period"] = "%f s" % (exptime + self.get_deadtime())
self.header["Exposure_time"] = "%f s" % exptime
"""
self.stop()
self.wait_ready()
self.get_channel_object("video_live").set_value(False)
beam_x, beam_y = self.get_beam_position()
header_info = [
"beam_center_x=%s" % (beam_x),
"beam_center_y=%s" % (beam_y),
"detector_distance=%s"
% (HWR.beamline.detector.distance.get_value() / 1000.0),
"omega_start=%0.4f" % start,
"omega_increment=%0.4f" % osc_range,
"wavelength=%s" % HWR.beamline.energy.get_wavelength(),
]
self.get_channel_object("saving_common_header").set_value(header_info)
if mesh:
"""
self.get_channel_object("acq_trigger_mode").set_value("EXTERNAL_TRIGGER_SEQUENCES")
self.get_channel_object("acq_nb_sequences").set_value(mesh_num_lines)
"""
logging.getLogger("user_level_log").info(
"Preparing detector for mesh EXTERNAL_TRIGGER_MULTI"
)
self.get_channel_object("acq_trigger_mode").set_value(
"EXTERNAL_TRIGGER_MULTI"
)
else:
logging.getLogger("user_level_log").info(
"Preparing detector for oscillation EXTERNAL_TRIGGER"
)
self.set_channel_value("acq_trigger_mode", "EXTERNAL_TRIGGER")
self.get_channel_object("saving_frame_per_file").set_value(
min(self._images_per_file, number_of_images)
)
# 'MANUAL', 'AUTO_FRAME', 'AUTO_SEQUENCE
self.get_channel_object("saving_mode").set_value("AUTO_FRAME")
logging.getLogger("user_level_log").info(
"Acq. nb frames = %d", number_of_images
)
self.get_channel_object("acq_nb_frames").set_value(number_of_images)
self.get_channel_object("acq_expo_time").set_value(exptime)
# 'ABORT', 'OVERWRITE', 'APPEND'
self.get_channel_object("saving_overwrite_policy").set_value("OVERWRITE")
# 'SOFTWARE', 'HARDWARE'
self.get_channel_object("saving_managed_mode").set_value("HARDWARE")
def set_energy_threshold(self, energy):
minE = self.get_property("minE")
if energy < minE:
energy = minE
working_energy_chan = self.get_channel_object("photon_energy")
working_energy = working_energy_chan.get_value() / 1000.0
if math.fabs(working_energy - energy) > 0.1:
egy = int(energy * 1000.0)
working_energy_chan.set_value(egy)
@task
def set_detector_filenames(self, frame_number, start, filename):
prefix, suffix = os.path.splitext(os.path.basename(filename))
prefix = "_".join(prefix.split("_")[:-1]) + "_"
dirname = os.path.dirname(filename)
if dirname.startswith(os.path.sep):
dirname = dirname[len(os.path.sep) :]
saving_directory = os.path.join(self.get_property("buffer"), dirname)
self.wait_ready()
self.get_channel_object("saving_directory").set_value(saving_directory)
self.get_channel_object("saving_prefix").set_value(
prefix + "%01d" % frame_number
)
self.get_channel_object("saving_suffix").set_value(suffix)
self.get_channel_object("saving_format").set_value("HDF5")
[docs] def start_acquisition(self):
self.wait_ready()
logging.getLogger("user_level_log").info("Preparing acquisition.")
self.get_command_object("prepare_acq")()
logging.getLogger("user_level_log").info("Detector ready, continuing")
self.get_command_object("start_acq")()
[docs] def stop_acquisition(self):
self.update_state(self.STATES.BUSY)
try:
self.get_command_object("stop_acq")()
except Exception:
self.log.exception("")
time.sleep(1)
self.get_command_object("reset")()
self.wait_ready()
self.update_state(self.STATES.READY)
def reset(self):
self.stop_acquisition()
return True
@property
def status(self):
try:
acq_status = self.get_channel_value("acq_status")
except Exception:
acq_status = "OFFLINE"
status = {"acq_satus": acq_status.upper()}
return status
def _emit_status(self):
self.emit("statusChanged", self.status)
def recover_from_failure(self):
pass
def get_image_file_name(self, pt, suffix=None):
pt.precision = 1
template = "%s_%s_%%" + str(pt.precision) + "d_master.%s"
if suffix:
file_name = template % (pt.get_prefix(), pt.run_number, suffix)
else:
file_name = template % (pt.get_prefix(), pt.run_number, pt.suffix)
if pt.compression:
file_name = "%s.gz" % file_name
return file_name
[docs] def get_first_and_last_file(self, pt: PathTemplate) -> tuple:
"""
Get complete path to first and last image
Args:
Path template parameter
Returns:
Tuple containing first and last image path (first, last)
"""
start_num = int(math.ceil(pt.start_num / 100))
end_num = int(math.ceil((pt.start_num + pt.num_files - 1) / 100))
return (pt.get_image_path() % start_num, pt.get_image_path() % end_num)
[docs] def get_actual_file_path(self, master_file_path: str, image_number: int) -> tuple:
"""
Get file path to image with the given image number <image_number> and
the master h5 file <master_file_path>
Args:
first_file_path: Path to master h5 file
image_number: image number (absolute)
Returns:
A tuple [image file path, image_number (relative to file)]
"""
result_data_path = "_".join(master_file_path.split("_")[0:-2])
img_number = int(image_number) % self._images_per_file
start_file_number = int(int(image_number) / self._images_per_file) + 1
result_data_path += "_data_%06d.h5" % start_file_number
return result_data_path, img_number