Source code for mxcubecore.HardwareObjects.ALBA.ALBAPilatus

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

import time

from PyTango.gevent import DeviceProxy

from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects.abstract.AbstractDetector import AbstractDetector

__author__ = "Vicente Rey"
__credits__ = ["ALBA"]
__version__ = "2.3."
__category__ = "General"


[docs]class ALBAPilatus(AbstractDetector, HardwareObject): """Detector class. Contains all information about detector - states are 'OK', and 'BAD' - status is busy, exposing, ready, etc. - physical property is RH for pilatus, P for rayonix """ def __init__(self, name): AbstractDetector.__init__(self) HardwareObject.__init__(self, name) self.distance_motor_hwobj = None self.default_distance = None self.default_distance_limits = None self.default_latency_time = 0.003 self.exp_time_limits = None self.headers = {}
[docs] def init(self): self.distance_motor_hwobj = self.get_object_by_role("distance_motor") self.devname = self.get_property("tangoname") try: self.latency_time = float(self.get_property("latency_time")) except Exception: self.latency_time = None if self.latency_time is None: self.log.debug( "Cannot obtain latency time from Pilatus XML. Using %s" % self.default_latency_time ) self.latency_time = self.default_latency_time self.devspecific = self.get_property("device_specific") exp_time_limits = self.get_property("exposure_limits") self.exp_time_limits = map(float, exp_time_limits.strip().split(",")) self.device = DeviceProxy(self.devname) self.device_specific = DeviceProxy(self.devspecific) self.device.set_timeout_millis(30000) self.beamx_chan = self.get_channel_object("beamx") self.beamy_chan = self.get_channel_object("beamy")
[docs] def start_acquisition(self): self.device.startAcq()
[docs] def stop_acquisition(self): self.device.abortAcq()
def wait_move_distance_done(self): self.distance_motor_hwobj.wait_end_of_move() def get_threshold(self): return self.device_specific.threshold def get_threshold_gain(self): return self.device_specific.threshold_gain
[docs] def has_shutterless(self): """Return True if has shutterless mode""" return True
[docs] def get_beam_position(self, distance=None, wavelength=None): """Returns beam center coordinates""" # NBNB TODO check if pixels or mm, and adjust code # Should be pixels beam_x = 0 beam_y = 0 try: beam_x = self.beamx_chan.get_value() beam_y = self.beamy_chan.get_value() except Exception: self.log.exception("") return beam_x, beam_y
def get_manufacturer(self): return self.get_property("manufacturer") return "Dectris" def get_model(self): return self.get_property("model") def get_detector_type(self): return self.get_property("type") def get_default_exposure_time(self): return self.get_property("default_exposure_time") def get_minimum_exposure_time(self): return self.get_property("minimum_exposure_time")
[docs] def get_exposure_time_limits(self): """Returns exposure time limits as list with two floats""" return self.exp_time_limits
def get_file_suffix(self): return self.get_property("file_suffix")
[docs] def get_pixel_size(self): return self.get_property("px"), self.get_property("py")
# methods for data collection def set_energy_threshold(self): eugap_ch = self.get_channel_object("eugap") try: currentenergy = eugap_ch.get_value() except Exception: currentenergy = 12.6 det_energy = self.get_threshold() # threshold = det_energy / 2. # limitenergy = threshold / 0.8 if round(currentenergy, 6) < 7.538: currentenergy = 7.538 kev_diff = abs(det_energy - currentenergy) if kev_diff > 1.2: self.log.debug( "programming energy_threshold on pilatus to: %s" % currentenergy ) # if self.wait_standby(): # self.device_specific.energy_threshold = currentenergy def get_latency_time(self): return self.latency_time def wait_standby(self, timeout=300): t0 = time.time() while self.device_specific.cam_state == "STANDBY": if time.time() - t0 > timeout: print("timeout waiting for Pilatus to be on STANDBY") return False time.sleep(0.1) return True
[docs] def prepare_acquisition(self, dcpars): self.set_energy_threshold() # self.wait_standby() osc_seq = dcpars["oscillation_sequence"][0] file_pars = dcpars["fileinfo"] basedir = file_pars["directory"] prefix = "%s_%s_" % (file_pars["prefix"], file_pars["run_number"]) first_img_no = osc_seq["start_image_number"] nb_frames = osc_seq["number_of_images"] exp_time = osc_seq["exposure_time"] fileformat = "CBF" trig_mode = "EXTERNAL_TRIGGER" # latency_time = 0.003 self.log.debug( " Preparing detector (dev=%s) for data collection" % self.devname ) self.log.debug(" /saving directory: %s" % basedir) self.log.debug(" /prefix : %s" % prefix) self.log.debug(" /saving_format : %s" % fileformat) self.log.debug(" /trigger_mode : %s" % trig_mode) self.log.debug(" /acq_nb_frames : %s" % nb_frames) self.log.debug(" /acq_expo_time : %s" % str(exp_time - self.latency_time)) self.log.debug(" /latency_time : %s" % self.latency_time) self.device.write_attribute("saving_mode", "AUTO_FRAME") self.device.write_attribute("saving_directory", basedir) self.device.write_attribute("saving_prefix", prefix) self.device.write_attribute("saving_format", fileformat) # set ROI and header in limaserver # TODO TrigList = [ "INTERNAL_TRIGGER", "EXTERNAL_TRIGGER", "EXTERNAL_TRIGGER_MULTI", "EXTERNAL_GATE", "EXTERNAL_START_STOP", ] self.device.write_attribute("acq_trigger_mode", trig_mode) self.device.write_attribute("acq_expo_time", exp_time - self.latency_time) self.device.write_attribute("latency_time", self.latency_time) return True
def prepare_collection(self, nb_frames, first_img_no): self.log.debug( "ALBAPilatus. preparing collection. nb_images: %s, first_no: %s" % (nb_frames, first_img_no) ) self.device.write_attribute("acq_nb_frames", nb_frames) self.device.write_attribute("saving_next_number", first_img_no) self.device.prepareAcq() return True def start_collection(self): self.start_acquisition() def stop_collection(self): self.stop_acquisition() def set_image_headers(self, image_headers, angle_info): nb_images = image_headers["nb_images"] angle_inc = image_headers["Angle_increment"] start_angle = image_headers["Start_angle"] startangles_list = [] ang_start, ang_inc, spacing = angle_info for i in range(nb_images): startangles_list.append("%0.4f deg." % (ang_start + spacing * i)) headers = [] for i, sa in enumerate(startangles_list): header = ( "_array_data.header_convention PILATUS_1.2\n" "# Detector: PILATUS 6M, S/N 60-0108, Alba\n" "# %s\n" "# Pixel_size 172e-6 m x 172e-6 m\n" "# Silicon sensor, thickness 0.000320 m\n" % time.strftime("%Y/%b/%d %T") ) # Acquisition values (headers dictionary) but overwrites start angle image_headers["Start_angle"] = sa for key, value in image_headers.items(): if key == "nb_images": continue header += "# %s %s\n" % (key, value) headers.append("%d : array_data/header_contents|%s;" % (i, header)) self.device.write_attribute("saving_header_delimiter", ["|", ";", ":"]) self.device.resetCommonHeader() self.device.resetFrameHeaders() self.device.setImageHeader(headers)
def test_hwo(hwo): print("Detector Distance is: ", hwo.distance.get_value()) print(" Beam X: %s / Beam Y: %s" % hwo.get_beam_position()) # print("going to 490 : ", hwo.distance.set_value(490))