Source code for mxcubecore.HardwareObjects.LimaPilatusDetector

import os
import subprocess
import time

import gevent
from PyTango import DeviceProxy

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObjectState
from mxcubecore.CommandContainer import ConnectionError
from mxcubecore.HardwareObjects.abstract.AbstractDetector import AbstractDetector
from mxcubecore.TaskUtils import task


[docs]class LimaPilatusDetector(AbstractDetector): def __init__(self, name): AbstractDetector.__init__(self, name) self.header = {} self.start_angles = []
[docs] def init(self): AbstractDetector.init(self) lima_device = self.get_property("lima_device") pilatus_device = self.get_property("pilatus_device") if None in (lima_device, pilatus_device): return try: for channel_name in ( "latency_time", "State", "acq_status", "acq_trigger_mode", "saving_mode", "acq_nb_frames", "acq_expo_time", "saving_directory", "saving_prefix", "saving_suffix", "saving_next_number", "saving_index_format", "saving_format", "saving_overwrite_policy", "saving_header_delimiter", "last_image_saved", "image_roi", ): self.add_channel( {"type": "tango", "name": channel_name, "tangoname": lima_device}, channel_name, ) for channel_name in ("fill_mode", "threshold"): self.add_channel( { "type": "tango", "name": channel_name, "tangoname": pilatus_device, }, channel_name, ) pilatus_tg_device = DeviceProxy(pilatus_device) if hasattr(pilatus_tg_device, "working_energy"): self.add_channel( { "type": "tango", "name": "working_energy", "tangoname": pilatus_device, }, "working_energy", ) self.add_channel( { "type": "tango", "name": "energy_threshold", "tangoname": pilatus_device, }, "energy_threshold", ) self.add_command( {"type": "tango", "name": "prepare_acq", "tangoname": lima_device}, "prepareAcq", ) self.add_command( {"type": "tango", "name": "start_acq", "tangoname": lima_device}, "startAcq", ) self.add_command( {"type": "tango", "name": "stop_acq", "tangoname": lima_device}, "stopAcq", ) self.add_command( {"type": "tango", "name": "reset", "tangoname": lima_device}, "reset" ) self.add_command( {"type": "tango", "name": "set_image_header", "tangoname": lima_device}, "SetImageHeader", ) self.get_channel_object("image_roi").connect_signal( "update", self.roi_mode_changed ) self.get_command_object("prepare_acq").set_device_timeout(10000) if self.status.get("acq_satus") == "READY": self.update_state(HardwareObjectState.READY) self._emit_status() except ConnectionError: self.update_state(HardwareObjectState.FAULT) self.log.error("Could not connect to detector %s" % lima_device) self._emit_status()
[docs] def has_shutterless(self): return True
[docs] def wait_ready(self, timeout=3500): with gevent.Timeout(timeout, RuntimeError("Detector not ready")): while self.get_channel_value("acq_status") != "Ready": time.sleep(1)
[docs] def last_image_saved(self): try: img = self.get_channel_object("last_image_saved").get_value() + 1 return img except Exception: return 0
def get_deadtime(self): return float(self.get_property("deadtime"))
[docs] def roi_mode_changed(self, mode): """ROI mode change event""" self.roi_mode = self.roi_modes_list.index(str(mode)) self.emit("detectorRoiModeChanged", (self.roi_mode,))
[docs] def prepare_acquisition( self, take_dark, start, osc_range, exptime, npass, number_of_images, comment, mesh, mesh_num_lines, ): if mesh: trigger_mode = self.get_property("mesh_trigger_mode", "EXTERNAL_GATE") else: trigger_mode = self.get_property("osc_trigger_mode", "EXTERNAL_GATE") diffractometer_positions = HWR.beamline.diffractometer.get_positions() self.start_angles = [] for i in range(number_of_images): self.start_angles.append("%0.4f deg." % (start + osc_range * i)) self.header = {} self.header["file_comments"] = comment self.header["N_oscillations"] = number_of_images self.header["Oscillation_axis"] = "omega" self.header["Chi"] = "0.0000 deg." kappa_phi = diffractometer_positions.get("kappa_phi", -9999) if kappa_phi is None: kappa_phi = -9999 kappa = diffractometer_positions.get("kappa", -9999) if kappa is None: kappa = -9999 self.header["Phi"] = "%0.4f deg." % kappa_phi self.header["Kappa"] = "%0.4f deg." % kappa self.header["Alpha"] = "0.0000 deg." self.header["Polarization"] = HWR.beamline.collect.bl_config.polarisation self.header["Detector_2theta"] = "0.0000 deg." self.header["Angle_increment"] = "%0.4f deg." % osc_range self.header["Transmission"] = HWR.beamline.transmission.get_value() self.header["Flux"] = HWR.beamline.flux.get_value() self.header["Beam_xy"] = "(%.2f, %.2f) pixels" % self.get_beam_position() self.header["Detector_Voffset"] = "0.0000 m" self.header["Energy_range"] = "(0, 0) eV" self.header["Detector_distance"] = "%f m" % (self.distance.get_value() / 1000.0) self.header["Wavelength"] = "%f A" % HWR.beamline.energy.get_wavelength() self.header["Trim_directory:"] = "(nil)" self.header["Flat_field:"] = "(nil)" self.header["Excluded_pixels:"] = " badpix_mask.tif" self.header["N_excluded_pixels:"] = "= 321" self.header["Threshold_setting"] = "%d eV" % self.get_channel_value("threshold") self.header["Count_cutoff"] = "1048500" self.header["Tau"] = "= 0 s" self.header["Exposure_period"] = "%f s" % (exptime + self.get_deadtime()) self.header["Exposure_time"] = "%f s" % exptime self.reset() self.wait_ready() self.set_energy_threshold(HWR.beamline.energy.get_value()) self.set_channel_value("acq_trigger_mode", trigger_mode) if self.get_property("set_latency_time", False): self.set_channel_value("latency_time", self.get_deadtime()) self.set_channel_value("saving_mode", "AUTO_FRAME") self.set_channel_value("acq_nb_frames", number_of_images) self.set_channel_value("acq_expo_time", exptime) self.set_channel_value("saving_overwrite_policy", "OVERWRITE")
[docs] def set_energy_threshold(self, energy): """Set the energy threshold. Args: energy (int): Energy [eV] or [keV] """ minE = self.get_property("minE") # some versions of Lima Pilatus server take the energy ergument in keV # some in eV. From minE we can set a conversion factor. factor = 1000 if minE > 100 else 1.0 energy_threshold = self.get_channel_value("energy_threshold") # check if need to convert energy in eV. if energy < 100: energy *= factor if energy < minE: energy = minE if abs(energy_threshold - energy) > 0.1: self.set_channel_value("energy_threshold", energy) while abs(self.get_channel_value("energy_threshold") - energy) > 0.1: time.sleep(1) self.set_channel_value("fill_mode", "ON")
@task def set_detector_filenames(self, frame_number, start, filename): prefix, suffix = os.path.splitext(os.path.basename(filename)) prefix = "_".join(prefix.split("_")[:-1]) + "_" dirname = os.path.dirname(filename) if dirname.startswith(os.path.sep): dirname = dirname[len(os.path.sep) :] saving_directory = os.path.join(self.get_property("buffer", "/"), dirname) subprocess.Popen( "ssh %s@%s mkdir --parents %s" % (os.environ["USER"], self.get_property("control"), saving_directory), shell=True, stdin=None, stdout=None, stderr=None, close_fds=True, ).wait() self.set_channel_value("saving_directory", saving_directory) self.set_channel_value("saving_prefix", prefix) self.set_channel_value("saving_suffix", suffix) self.set_channel_value("saving_next_number", frame_number) self.set_channel_value("saving_index_format", "%04d") self.set_channel_value("saving_format", "CBF") self.set_channel_value("saving_header_delimiter", ["|", ";", ":"]) headers = [] for i, start_angle in enumerate(self.start_angles): header = "\n%s\n" % self.get_property("serial") header += "# %s\n" % time.strftime("%Y/%b/%d %T") header += "\n%s\n" % self.get_property("sensor") header += "\n%s\n" % self.get_property("pixel_size") self.header["Start_angle"] = start_angle for key, value in self.header.items(): header += "# %s %s\n" % (key, value) headers.append("%d : array_data/header_contents|%s;" % (i, header)) self.header = headers
[docs] def start_acquisition(self): self.wait_ready() self.execute_command("stop_acq") self.execute_command("prepare_acq") self.execute_command("set_image_header", self.header) self.execute_command("start_acq") self._emit_status()
[docs] def stop_acquisition(self): try: self.execute_command("stop_acq") except Exception: pass time.sleep(1) self.execute_command("reset") self.wait_ready() self._emit_status()
def reset(self): self.stop_acquisition() @property def status(self): try: acq_status = self.get_channel_value("acq_status") except Exception: acq_status = "OFFLINE" status = { "acq_satus": acq_status.upper(), } return status def _emit_status(self): self.emit("statusChanged", self.status) def recover_from_failure(self): self.prepare_acquisition( False, 0, 0, 0.5, None, 1, "", HWR.beamline.energy.get_value(), "INTERNAL_TRIGGER", ) self.start_acquisition() self.wait_ready() self.stop_acquisition()