Source code for mxcubecore.HardwareObjects.SOLEIL.PX1.PX1MiniDiff

import logging
import time

import gevent

from mxcubecore.HardwareObjects import sample_centring
from mxcubecore.HardwareObjects.GenericDiffractometer import GenericDiffractometer


[docs]class PX1MiniDiff(GenericDiffractometer): CENTRING_MOTORS_NAME = [ "phi", "phiz", "phiy", "sampx", "sampy", "kappa", "kappa_phi", "beam_x", "beam_y", "zoom", ]
[docs] def init(self): self.smargon = self.get_object_by_role("smargon") self.connect(self.smargon, "stateChanged", self.smargon_state_changed) self.lightarm_hwobj = self.get_object_by_role("lightarm") # self.centring_hwobj = self.get_object_by_role('centring') self.px1conf_ho = self.get_object_by_role("px1configuration") self.px1env_ho = self.get_object_by_role("px1environment") self.pixels_per_mm_x = 0 self.pixels_per_mm_y = 0 GenericDiffractometer.init(self) self.centring_methods = { GenericDiffractometer.CENTRING_METHOD_MANUAL: self.px1_manual_centring, GenericDiffractometer.CENTRING_METHOD_AUTO: self.start_automatic_centring, GenericDiffractometer.CENTRING_METHOD_MOVE_TO_BEAM: self.start_move_to_beam, }
def prepare_centring(self, timeout=20): self.px1env_ho.gotoSampleViewPhase() gevent.sleep(0.5) if self.px1env_ho.isPhaseVisuSample(): t0 = time.time() while True: env_state = self.px1env_ho.get_state() if env_state != "RUNNING" and self.px1env_ho.isPhaseVisuSample(): break if time.time() - t0 > timeout: self.log.debug("timeout sending supervisor to sample view phase") break gevent.sleep(0.1) self.lightarm_hwobj.adjustLightLevel() def smargon_state_changed(self, value): self.log.debug("smargon state changed") self.smargon_state = value self.emit("minidiffStateChanged", (value,))
[docs] def is_ready(self): return self.smargon.get_state() == "STANDBY"
# self.smargon_state = str(self.smargon_state_ch.get_value()) # return self.smargon_state == "STANDBY"
[docs] def get_pixels_per_mm(self): self.update_zoom_calibration() return GenericDiffractometer.get_pixels_per_mm(self)
def update_zoom_calibration(self): """ """ if "zoom" not in self.motor_hwobj_dict: # not initialized yet return zoom_motor = self.motor_hwobj_dict["zoom"] props = zoom_motor.getCurrentPositionProperties() if "pixelsPerMmZ" in props.keys() and "pixelsPerMmY" in props.keys(): self.pixels_per_mm_x = float(props["pixelsPerMmY"]) self.pixels_per_mm_y = float(props["pixelsPerMmZ"]) else: self.pixels_per_mm_x = 0 self.pixels_per_mm_y = 0 if "beamPositionX" in props.keys() and "beamPositionY" in props.keys(): self.beam_xc = float(props["beamPositionX"]) self.beam_yc = float(props["beamPositionY"]) if 0 not in [self.pixels_per_mm_x, self.pixels_per_mm_y]: self.emit( "pixelsPerMmChanged", ((self.pixels_per_mm_x, self.pixels_per_mm_y),) ) def px1_manual_centring(self, sample_info=None, wait_result=None): """ """ self.emit_progress_message("Manual 3 click centring...") self.log.debug( " starting manual 3 click centring. phiy is %s" % str(self.centring_phiy) ) centring_points = self.px1conf_ho.getCentringPoints() centring_phi_incr = self.px1conf_ho.getCentringPhiIncrement() centring_sample_type = self.px1conf_ho.getCentringSampleType() self.current_centring_procedure = sample_centring.px1_start( { "phi": self.centring_phi, "phiy": self.centring_phiy, "sampx": self.centring_sampx, "sampy": self.centring_sampy, "phiz": self.centring_phiz, }, self.pixels_per_mm_x, self.pixels_per_mm_y, self.beam_position[0], self.beam_position[1], n_points=centring_points, phi_incr=centring_phi_incr, sample_type=centring_sample_type, ) self.current_centring_procedure.link(self.centring_done)
[docs] def centring_done(self, centring_procedure): """ Descript. : """ self.log.debug("Diffractometer: centring procedure done.") try: motor_pos = centring_procedure.get() if isinstance(motor_pos, gevent.GreenletExit): raise motor_pos except Exception: logging.exception("Could not complete centring") self.emit_centring_failed() else: self.log.debug("Diffractometer: centring procedure done. %s" % motor_pos) for motor in motor_pos: position = motor_pos[motor] self.log.debug(" - motor is %s - going to %s" % (motor.id, position)) self.emit_progress_message("Moving sample to centred position...") self.emit_centring_moving() try: self.move_to_motors_positions(motor_pos, wait=True) except Exception: logging.exception("Could not move to centred position") self.emit_centring_failed() else: # done already by px1_center pass # if 3 click centring move -180 # if not self.in_plate_mode: # self.wait_device_ready() # self.motor_hwobj_dict['phi'].set_value_relative(-180, timeout=None) if ( self.current_centring_method == GenericDiffractometer.CENTRING_METHOD_AUTO ): self.emit("newAutomaticCentringPoint", motor_pos) self.centring_time = time.time() self.emit_centring_successful() self.emit_progress_message("") self.ready_event.set()
def move_to_motors_positions(self, motors_positions, wait=False): """ """ self.emit_progress_message("Moving to motors positions...") self.move_to_motors_positions_procedure = gevent.spawn( self.move_motors, motors_positions ) self.move_to_motors_positions_procedure.link(self.move_motors_done) if wait: self.wait_device_ready(10)
[docs] def move_omega_relative(self, relative_pos): omega_mot = self.motor_hwobj_dict.get("phi") omega_mot.set_value_relative(relative_pos, timeout=None)
[docs] def move_motors(self, motor_positions, timeout=15): """ Moves diffractometer motors to the requested positions :param motors_dict: dictionary with motor names or hwobj and target values. :type motors_dict: dict """ from mxcubecore.model.queue_model_objects import CentredPosition if isinstance(motor_positions, CentredPosition): motor_positions_copy = motor_positions.as_dict() else: # We do not want to modify the input dict motor_positions_copy = motor_positions.copy() self.log.debug("MiniDiff moving motors. %s" % str(motor_positions_copy)) self.wait_device_ready(timeout) self.log.debug(" now ready to move them") for motor in motor_positions_copy.keys(): position = motor_positions_copy[motor] if type(motor) in (str, unicode): motor_role = motor motor = self.motor_hwobj_dict.get(motor_role) del motor_positions_copy[motor_role] if None in (motor, position): continue motor_positions_copy[motor] = position self.log.debug(" / moving motor. %s to %s" % (motor.id, position)) self.wait_device_ready(timeout) try: motor.set_value(position, timeout=None) except Exception: self.log.debug( " / error moving motor on diffractometer. state is %s" % (self.smargon_state) ) self.log.exception("") self.wait_device_ready(timeout)