Source code for mxcubecore.HardwareObjects.MicroDiffractometer

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""Micro Diffractometer implementation of the AbstractDiffractometer class."""

import time

from gevent import Timeout, sleep

from mxcubecore import HardwareRepository as HWR
from mxcubecore.Command.Exporter import Exporter
from mxcubecore.Command.exporter.ExporterStates import ExporterStates
from mxcubecore.HardwareObjects import sample_centring
from mxcubecore.HardwareObjects.abstract.AbstractDiffractometer import (
    AbstractDiffractometer,
    DiffractometerConstraint,
    DiffractometerHead,
    DiffractometerPhase,
)

__copyright__ = """ Copyright © by the MXCuBE collaboration """
__license__ = "LGPLv3+"


[docs]class MicroDiffractometer(AbstractDiffractometer): """Microdiff with Exporter implementation of AbstractDiffractometer""" def __init__(self, name): super().__init__(name) self._exporter = None
[docs] def init(self): """Initialise the device""" exporter_address = self.get_property("exporter_address") _host, _port = exporter_address.split(":") self._exporter = Exporter(_host, int(_port)) self.head_type = self._head_type # add the custom commands for nam, cmd in self.get_property("commands").items(): _cmd = { "type": "exporter", "exporter_address": exporter_address, "name": nam, } setattr(self, nam, self.add_command(_cmd, cmd)) # add the custom channels for nam, attr in self.get_property("channels").items(): _attr = { "type": "exporter", "exporter_address": exporter_address, "name": nam, } setattr(self, nam, self.add_channel(_attr, attr)) super().init() self.update_state() # we must have global_state and phase_channel channels configured try: self.phase_channel.connect_signal("update", self.update_phase) self.global_state.connect_signal("update", self._update_state) except AttributeError: self.log.exception("global_state and phase_channel not configured!") sample_centring.NUM_CENTRING_ROUNDS = self.get_property( "num_centring_rounds", 1 )
[docs] def abort(self): """Immediately terminate action.""" self._exporter.execute("abort")
@property def _get_hwstate(self) -> str: """Get the hardware state, reported by the MD2 application. Returns: The state. """ try: return self._exporter.read_property("HardwareState") except AttributeError: return "Ready" @property def _get_swstate(self) -> str: """Get the software state, reported by the MD2 application. Returns: The state. """ return self.global_state.get_value() def _update_state(self, value): if isinstance(value, str): self.update_state(self.get_state()) else: self.update_state(value)
[docs] def get_state(self): """Get the diffractometer general state. Returns: (enum 'HardwareObjectState'): state """ try: return ExporterStates[self._get_swstate.upper()].value except ValueError: return self.STATES.UNKNOWN
@property def _ready(self) -> bool: """Get the "Ready" state - software and hardware. Returns: True if both "Ready", False otherwise. """ return self._get_swstate == "Ready" and self._get_hwstate == "Ready"
[docs] def wait_status_ready(self, timeout: float | None = None): """Wait timeout seconds until status is ready. Args: timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). """ if timeout == 0: return with Timeout(timeout, RuntimeError("Timeout waiting for status ready")): while not self._ready: sleep(0.5)
[docs] def set_value_motors( self, motors_positions_dict: dict[str, float], simultaneous: bool = True, timeout: float | None = None, ): """Move specified motors to the requested positions. Args: motors_positions_dict (dict): Dictionary {motor_role: target_value}. simultaneous: Move the motors simultaneously (True - default) or not. timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). Raises: TimeoutError: Timeout KeyError: The name does not correspond to an existing motor """ if not simultaneous: super().set_value_motors(motors_positions_dict, simultaneous, timeout) else: # prepare the command cmd = "" for role, pos in motors_positions_dict.items(): name = self.motors_hwobj_dict[role].actuator_name cmd += f"{name}={pos:0.3f};" self._exporter.execute("startSimultaneousMoveMotors", (cmd,)) self.wait_status_ready(timeout) self.update_state()
[docs] def get_value_motors(self, motors_list: [list | None] = None) -> dict[str, float]: """Get the positions of diffractometer motors. If the motors_list is empty, return the positions of all the availble motors. Args: motors_list: List of motor roles. Returns: Dictionary {role: position}. """ motors_positions_dict = super().get_value_motors(motors_list) if not self.in_kappa_mode: motors_positions_dict.update({"kappa": None, "kappa_phi": None}) return motors_positions_dict
[docs] def get_motors(self): """Get the dictionary of all the motors which can be used. Returns: Ddctionary {role: hardware_object}. """ def find_elem(ddict, val): """Find dictionary elemnt from motor actuator_name""" for role, hwobj in ddict.items(): if hwobj.actuator_name == val: return {role: hwobj} return {} motors = self._exporter.read_property("MotorStates") motors_dict = {} for mot in motors: mot_stat = mot.split("=") if mot_stat[1] not in ("Disable", "Unknown"): elem = find_elem(self.motors_hwobj_dict, mot_stat[0]) motors_dict.update(elem) return motors_dict
@property def _head_type(self) -> DiffractometerHead: """Get the head type.""" try: self.head_type = DiffractometerHead( self._exporter.read_property("HeadType") ) except ValueError: self.head_type = DiffractometerHead.UNKNOWN return self.head_type def _set_phase(self, value: DiffractometerPhase): """Specific implementation to set the diffractometer to selected phase. Args: value: requested phase. """ _use_custom = self.get_property("use_custom_phase_script") or False current_phase = self.get_phase() if value != current_phase: msg = f"Current phase is {current_phase} and moving to {value}" self.log.info(msg) # protect the detector or open the cover if detector cover defined # do not wait it to finish. det_cover = self.get_object_by_role("detector_cover") if det_cover: if value in (DiffractometerPhase.TRANSFER, DiffractometerPhase.SEE_BEAM): det_cover.set_value(det_cover.detector_cover.VALUES.CLOSE, timeout=0) if value == DiffractometerPhase.COLLECT: det_cover.set_value(det_cover.detector_cover.VALUES.OPEN, timeout=0) if _use_custom and not self.in_plate_mode: script = "ChangePhase_" + value.value.lower() msg = f"Changing phase to {value.value}, using pmac script" self.log.info(msg) self.run_custom_script(script) else: self._exporter.execute("startSetPhase", (value.value,)) self.wait_status_ready(timeout=600)
[docs] def get_phase(self) -> DiffractometerPhase: """Get the current phase.""" value = self.phase_channel.get_value() try: self.current_phase = DiffractometerPhase(value) except ValueError: self.current_phase = DiffractometerPhase.UNKNOWN return self.current_phase
def _set_constraint(self, value: DiffractometerConstraint): """Specific implementation to set the diffractometer to selected constraint Args: value: requested constraint. """ self._exporter.execute("startSetMode", (value.value,))
[docs] def get_constraint(self) -> DiffractometerConstraint: """Get the diffrractometer constraint type.""" value = self._exporter.read_property("CurrentMode") try: self.current_constraint = DiffractometerConstraint(value) except ValueError: self.current_constraint = DiffractometerConstraint.UNKNOWN return self.current_constraint
[docs] def check_scan_limits(self, start: float, end: float, exptime: float) -> bool: """Check if the scan parameters are within the limits Args: start: scan start position. end: scan end position. exptime: scan exposure time (total) [s]. Returns: True (parameters within the limits), False otherwise. """ if self.in_plate_mode: scan_speed = abs(end - start) / exptime llim, hlim = map( float, self._exporter.execute("getOmegaMotorDynamicScanLimits", (scan_speed,)), ) if start < llim: msg = f"Scan start below the allowed value {llim}" raise ValueError(msg) if end > hlim: msg = f"Scan end above the allowed value {hlim}" raise ValueError(msg) return True
[docs] def do_oscillation_scan( self, start: float, end: float, exptime: float, number_of_images: int = 1, timeout: float | None = None, ): """Do an oscillation scan on omega. Args: start: omega start position. end: omega end position. exptime: scan exposure time (total). number_of_images: Used if need to set number of frames. timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). Raises: RuntimeError: Timeout waiting for status ready. ValueError: Scan parameters not within limits (if relevant). """ # check the scan limits self.check_scan_limits(start, end, exptime) # set the number of frames if not self.get_property("md_set_number_of_frames"): number_of_images = 1 self._exporter.write_property("ScanNumberOfFrames", number_of_images) scan_params = f"1\t{start:0.3f}\t{(end - start):0.3f}\t{exptime:0.3f}\t1" self._exporter.execute("startScanEx", (scan_params,)) self.wait_status_ready(timeout)
[docs] def do_line_scan( self, start: float, end: float, exptime: float, number_of_images: int, motors_pos: dict[str, dict], timeout: float | None = None, ): """Do helical (line) scan on omega. Args: start: scan start position. end: scan end position. exptime: scan exposure time (total). number_of_images: Used only if more tahn one frame needed. timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). motors_pos: {"1": [centred position], "2": [centred position]} Raises: RuntimeError: Timeout waiting for status ready. ValueError: Scan parameters not within limits (if relevant). """ # check the scan limits self.check_scan_limits(start, end, exptime) if not self.get_property("md_set_number_of_frames"): number_of_images = 1 self._exporter.write_property("ScanNumberOfFrames", number_of_images) scan_params = f"{start:0.3f}\t{(end - start):0.3f}\t{exptime:0.3f}\t" if self.head_otientation == "vertical": _order = ["phiz", "phiy", "sampx", "sampy"] if self.head_otientation == "horizontal": _order = ["phiy", "phiz", "sampx", "sampy"] for name in _order: scan_params += f"{motors_pos['1'][name]:0.3f}\t" for name in _order: scan_params += f"{motors_pos['2'][name]:0.3f}\t" self._exporter.execute("startScan4DEx", (scan_params,)) self.wait_status_ready(timeout)
[docs] def do_mesh_scan( self, start: float, end: float, exptime: float, dead_time: float, nb_lines: int, nb_frames_total: int, grid_centre: list[tuple[str, float]], mesh_range: dict, timeout: float | None = None, ): """Do a mesh scan. Args: start: scan start position. end: scan end position. exptime: scan exposure time (total). dead_time: Dead time between the pulses. Detector dependant. nb_lines: Total number of lines. nb_frames_total: Total number of frames. grid_centre: List of tuples (motor_role, position). representing the centre of the mesh grid. mesh_range: Horizontal and vertical range. timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). Raises: RuntimeError: Timeout waiting for status ready. """ # enable gate pulses self._exporter.write_property("DetectorGatePulseEnabled", value=True) # dead_time depends on the detector. We transform it to us dead_time = dead_time or HWR.beamline.detector.get_deadtime() * 1000 self._exporter.write_property("DetectorGatePulseReadoutTime", dead_time) grid_centre = grid_centre.as_dict() self.set_value_motors(grid_centre, simultaneous=True, timeout=timeout) scan_params = f"{(end - start):0.3f}\t" if self.head_otientation == "vertical": scan_params += f"{mesh_range['vertical_range']:0.3f}\t" scan_params += f"{-mesh_range['horizontal_range']:0.3f}\t" _order = ["phiz", "phiy", "sampx", "sampy"] if self.head_otientation == "horizontal": scan_params += f"{-mesh_range['horizontal_range']:0.3f}\t" scan_params += f"{mesh_range['vertical_range']:0.3f}\t" _order = ["phiy", "phiz", "sampx", "sampy"] scan_params += f"{start:0.3f}\t" for name in _order: for key, val in grid_centre.items(): if name == key: scan_params += f"{float(val):0.3f}\t" scan_params += f"{nb_lines}\t" scan_params += f"{nb_frames_total / nb_lines}\t" scan_params += f"{exptime / nb_lines}\t" scan_params += "True\tTrue\tTrue\t" self._exporter.execute("startRasterScanEx", (scan_params,)) self.wait_status_ready(timeout)
[docs] def do_still_scan( self, pulse_duration: float, pulse_period: float, nb_pulse: int, timeout: [None | float] = None, ): """Do a zero oscillation acquisition. Args: pulse_duration: Duration of the pulse sent to the detector. pulse_period: The period of the pulse sent to the detector. nb_pulse: Number of pulses to be sent. timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). Raises: RuntimeError: Timeout waiting for status ready. """ scan_params = f"{pulse_duration:0.6f}\t{pulse_period:0.6f}\t{nb_pulse}" self._exporter.execute("startStillScan", (scan_params,)) self.wait_status_ready(timeout)
[docs] def do_characterisation_scan( self, start: float, scan_range: float, nb_frames: int, exptime: float, nb_scans: int, angle: float, timeout: float | None = None, ): """Do fast characterisation. Args: start: Position of omega for the first scan [deg]. scan_range: range for each scan [deg]. nb_frames: Frame numbers for each scan. exptime: Total exposure time for each scan [s]. nb_scans: How many times a scan to be repeated. angle: The angle between each scan [deg]. This number, added to the last position of each scan and will be the start position of the following scan. timeout (float): optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). Raises: RuntimeError: Timeout waiting for status ready. """ if self.in_plate_mode: # to see if needed when plates return scan_params = f"{nb_frames}\t{start:0.3f}\t{scan_range:0.3f}\t" scan_params += f"{exptime:0.3f}\t{nb_scans}\t{angle:0.3f}" self._exporter.execute("startCharacterisationScanEx", (scan_params,)) if timeout: # min timeout is 20 min timeout = max(timeout, 20 * 60) self.wait_status_ready(timeout)
[docs] def get_pixels_per_mm(self) -> tuple[int, int]: """Get the pixel/mm values. Returns: (x ,y) [pixel/mm] """ x_calib = self._exporter.read_property("CoaxCamScaleX") y_calib = self._exporter.read_property("CoaxCamScaleY") return 1.0 / x_calib, 1.0 / y_calib
[docs] def get_beam_position(self) -> tuple: """Get the beam position defined in MD""" return ( self.beam_position_horizontal.get_value(), self.beam_position_vertical.get_value(), )
[docs] def run_custom_script(self, script_cmd: str, timeout: float | None = None): """Run custom script.""" self.run_script(script_cmd) if timeout != 0: # Wait for script to start before checking status, # can perhaps be improved to wait for ready-busy-ready ? time.sleep(0.4) self.wait_status_ready(timeout)