Source code for mxcubecore.HardwareObjects.abstract.AbstractDiffractometer

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""Abstract Diffractometer class.
Initialises the username property and all the motors and nstate (discrete
positions) equipment, which are part of the diffractometer.
Certain number of roles are fixed and define a corresponding motor or nstate
actuator object. This allows to use them in a standard way by the MXCuBE
application (web or Qt).
There is also a convention of the direction of the motors:
- z axis is in the direction of gravity. Positive direction is upwards.
- y axis is perpendicular to the z axis and to the nominal beam direction.
  Positive direction is so that x,y,z would form a right-handed coordinate system.
- x axis is orthogonal to the y and z axes (and approximately parallel to the beam).
  Positive direction is from the sample towards the detector.


Here follows the list of the fixed roles and the description of the
corresponding objects, accessible via beamline.diffractometer hardware object.

1. Motor objects (roles)  and their functionality:
  omega - the rotation axis, independent of the orientation (up, down or side).
          Positive direction is a right-handed rotation.
  kappa - the rotation axis between the omega and kappa_phi axes.
          Positive direction is a right-handed rotation. Not present on all goniostats.
  kappa_phi - the rotation axis directly attached to the sample.
          Positive direction is a right-handed rotation. Not present on all goniostats.
  sampx - centring table x axis
  sampy - centring table y axis
  focus - alignment table x axis
  phiy - alignment table y axis
  phiz - alignment table z axis
  sample_horizontal - Pseudomotor approximately along the y axis.
         Generated by phiy or a combination of sampx, sampy depending on goniostat
  sample_vertical - Pseudomotor approximately along the z axis.
         Generated or phiz or a combination of sampx, sampy depending on goniostat
  backlight - adjust the intensity of the back light
  frontlight - adjust the intensity of the front light
  kappa - the minikappa kappa axis
  kappa_phi - the minikappa phi axis

2. Discrete (N) state equipment and its functionality:
zoom - zoom levels
fshutter - fast (ms) shutter - allow beam on the sample
beamstop - put in front of a detector to avoid the direct beam.
capillary - if present, a tube to reduce the scattering background.
backlightswitch - move the backlight on the level of the on-axis viewer
frontlightswitch - switch on/off the front light
fluo_detector - if present, actuator to move a fluorescence detector close to the sample

3. Goniometer head orientation - horizontal or vertical
 horizontal - the sample holder is parallel to the ground
 vertical - the sample holder is perpendicular to the ground
"""

import abc
import json
from enum import Enum, unique
from typing import Dict, List, Optional, Tuple, Union
from pathlib import Path
from pydantic.v1 import BaseModel, Field, ValidationError

from mxcubecore.BaseHardwareObjects import HardwareObject, HardwareObjectState
from mxcubecore import HardwareRepository as HWR


__copyright__ = """ Copyright © by the MXCuBE collaboration """
__license__ = "LGPLv3+"


[docs]@unique class DiffractometerHead(Enum): """Enumeration diffractometer head types.""" UNKNOWN = "Unknown" MINI_KAPPA = "MiniKappa" SMART_MAGNET = "SmartMagnet" PLATE = "Plate" SSX = "SSX" HARVESTER = "Harvester"
[docs]@unique class DiffractometerPhase(Enum): """Enumeration diffractometer phases.""" UNKNOWN = "Unknown" CENTRE = "Centring" COLLECT = "DataCollection" SEE_BEAM = "BeamLocation" TRANSFER = "Transfer"
[docs]@unique class DiffractometerConstraint(Enum): """Enumeration diffractometer constraint types.""" UNKNOWN = "Unknown" RELEASE = "Normal" INJECTOR = "Injector" STILL = "LockRotation"
[docs]class HolderTypeEnum(Enum): """Enumeration of chip holder geometry.""" KNOWN_GEOMETRY = "known_geometry" FREE_GEOMETRY = "free_geometry"
[docs]class ChipShapeEnum(Enum): """Enumeration of chip holder shape.""" RECTANGULAR = "RECTANGULAR" ELLIPTICAL = "ELLIPTICAL"
# Diffractometer pydantic models
[docs]class CalibrationData(BaseModel): """Chip calibration model.""" top_left: Tuple[float, float, float] = Field( [0, 0, 0], description="Top left corner motor position" ) top_right: Tuple[float, float, float] = Field( [0, 0, 0], description="Top right corner motor position" ) bottom_left: Tuple[float, float, float] = Field( [0, 0, 0], description="Bottom left corner motor position" )
[docs]class SampleHolderSectionModel(BaseModel): """Generic sample holder.""" calibration_data: Optional[CalibrationData] section_offset: Tuple[int, int] = Field( [0, 0], description="Block offset in grid layout system coordinates x, y" ) block_size: Tuple[float, float] = Field( [15, 15], description="Block size horizontal, vertical in mm" ) block_spacing: Tuple[float, float] = Field( [15, 15], description="Spacing between blocks horizontal, vertical in mm" ) block_shape: ChipShapeEnum = ChipShapeEnum.RECTANGULAR number_of_rows: int = Field(6, description="Numer of rows") number_of_collumns: int = Field(6, description="Numer of collumns") row_labels: List[str] = Field([], description="Row lables") column_lables: List[str] = Field([], description="Collumn lables") targets_per_block: Tuple[int, int] = Field( [20, 20], description="Targets per block dim1 and dim2" )
[docs]class ChipLayout(BaseModel): """Chip layout model.""" head_type: DiffractometerHead = DiffractometerHead.SSX holder_type: HolderTypeEnum = HolderTypeEnum.KNOWN_GEOMETRY holder_brand: str = Field("", description="Brand/make of sample holder") holder_size: Tuple[float, float] = Field( [0, 0], description="Size of sample holder in mm horizontal and vertical" ) sections: List[SampleHolderSectionModel] calibration_data: CalibrationData
class GonioHeadConfiguration(BaseModel): current: str = Field("", description="Selected chip layout") available: Dict[str, ChipLayout]
[docs]class AbstractDiffractometer(HardwareObject): """Abstract Diffractometer. Attributes: motors_hwobj_dict (dict): Motor hardware objects. nstate_equipment_hwobj_dict (dict): N state hardware objects. username (str): User name head_type (DiffractometerHead Enum): Current head type current_phase (DiffractometerPhase Enum): Current phase current_constraint (DiffractometerConstraint Enum): Current constraint. timeout (float): Default action timeout [s] Emits: valueChanged: ("valueChanged", (value,)) phaseChanged: ("phaseChanged", (state)) States: HardwareObjectStates: READY, BUSY, FAULT """ __metaclass__ = abc.ABCMeta def __init__(self, name): super().__init__(name) self.motors_hwobj_dict = {} self.nstate_equipment_hwobj_dict = {} self.username = name self.current_phase = None self.head_type = None self.current_constraint = None self.timeout = 3 # default timeout 3 s self.chip_definition_file = "" self.head_orientation = ""
[docs] def init(self): """Initialise username property. Initialise the equipment, defined in the configuration file """ super().init() self.username = self.get_property("username") or self.username self.head_orientation = self.get_property("head_orientation") # motors for role in self.config.motors: try: _hobj = self.get_object_by_role(role) self.motors_hwobj_dict[role] = _hobj setattr(self, role, _hobj) self.connect(_hobj, "valueChanged", _hobj.update_value) except KeyError: self.log.warning("Diffractometer: No motors configured") # nstate (discrete positions) equipment for role in self.config.nstate_equipment: try: _hobj = self.get_object_by_role(role) self.nstate_equipment_hwobj_dict[role] = _hobj setattr(self, role, _hobj) self.connect(_hobj, "valueChanged", _hobj.update_value) except KeyError: self.log.warning("No nstate (discrete positions) equipment configured") # chip definition _fp = self.get_property("chip_definition_file", "") self.chip_definition_file = HWR.get_hardware_repository().find_in_repository( _fp )
[docs] def get_motors(self) -> dict: """Get the dictionary of all configured motors or the ones to use. Returns: Dictionary {role: hardware_object} """ return self.motors_hwobj_dict.copy()
[docs] def get_nstate_equipment(self) -> dict: """Get the dictionary of all the nstate (discrete positions) equipment. Returns: Dictionary {role: hardware_object} """ return self.nstate_equipment_hwobj_dict.copy()
# -------- Motor Groups --------
[docs] def set_value_motors( self, motors_positions_dict: dict, simultaneous: bool = True, timeout: float | None = None, ): """Move specified motors to the requested positions Args: motors_positions_dict: Dictionary {motor_role: target_value}. simultaneous: Move the motors simultaneousl (True - default) or not. timeout: timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). Raises: TimeoutError: Timeout KeyError: The name does not correspond to an existing motor """ # use only the available motors mot_hwobj_dict = self.get_motors() tout = timeout if simultaneous: tout = 0 self.update_state(HardwareObjectState.BUSY) for key, val in motors_positions_dict.items(): try: mot_hwobj_dict[key].set_value(val, timeout=tout) except KeyError as err: msg = f"Invalid motor name {key}" raise RuntimeError(msg) from err # wait for the end of move of all the motors, if needed if simultaneous: for key in motors_positions_dict: mot_hwobj_dict[key].wait_ready(timeout) self.update_state()
[docs] def get_value_motors(self, motors_list: list | None = None) -> dict: """Get the positions of diffractometer motors. If the motors_list is empty, return the positions of all the available motors. Args: List of motor roles. Returns: Dictionary {motor_role: position} """ mot_pos_dict = {} # use only the available motors mot_hwobj_dict = self.get_motors() if not motors_list: for role, motor in mot_hwobj_dict.items(): try: mot_pos_dict[role] = float(motor.get_value()) except TypeError: msg = f"No value for {role}" self.log.warning(msg) return mot_pos_dict for motor in motors_list: try: mot_pos_dict[str(motor)] = float(mot_hwobj_dict[motor].get_value()) except KeyError: msg = f"Invalid motor name {motor}" self.log.exception(msg) except TypeError: msg = f"No value for {motor}" self.log.warning(msg) return mot_pos_dict
[docs] def get_state_motors(self, motors_list: list | None = None) -> dict: """Get the state of diffractometer motors. If the motors_list is empty, return the state of all the available motors. Args: List of motor roles. Returns: Dictionary {motor_role: state} """ mot_state_dict = {} # use only the available motors mot_hwobj_dict = self.get_motors() motors_list = motors_list or mot_hwobj_dict.keys() if not motors_list: for role, motor in mot_hwobj_dict.items(): try: mot_state_dict[role] = float(motor.get_state()) except TypeError: msg = f"No value for {role}" self.log.warning(msg) return mot_state_dict for motor in motors_list: try: mot_state_dict[str(motor)] = float(mot_hwobj_dict[motor].get_state()) except KeyError: msg = f"Invalid motor name {motor}" self.log.exception(msg) except TypeError: msg = f"No value for {motor}" self.log.warning(msg) return mot_state_dict
# -------- Head Type and Modes -------- @property def in_plate_mode(self) -> bool: """Check if the head is a plate.""" return self.head_type == DiffractometerHead.PLATE @property def in_kappa_mode(self) -> bool: """Check if the head is MiniKappa.""" return self.head_type == DiffractometerHead.MINI_KAPPA @property def in_chip_mode(self) -> bool: """Check if there is chip configuration of the head.""" return ( self.head_type == DiffractometerHead.SSX and self.current_constraint == DiffractometerConstraint.STILL ) @property def in_injector_mode(self) -> bool: """Check if there is injector on the head.""" return ( self.head_type == DiffractometerHead.SSX and self.current_constraint == DiffractometerConstraint.INJECTOR ) @property def head_enum(self): """Get the diffractometer head Enum. Used when no import wished.""" return DiffractometerHead
[docs] def get_chip_configuration(self) -> Union[GonioHeadConfiguration, None]: """Get the chip configuration.""" data = None if Path(self.chip_definition_file).is_file(): with Path(self.chip_definition_file).open("r") as _f: chip_def = json.load(_f) try: data = GonioHeadConfiguration(**chip_def) except ValidationError: msg = f"Validation error in {self.chip_definition_file}" self.log.exception(msg) return data
[docs] def set_head_configuration(self, str_data: str) -> None: """Write the chip configuration in the chip configuration json file. Args: String containing the configuration. """ data = json.loads(str_data) if Path(self.chip_definition_file).is_file(): with Path(self.chip_definition_file).open("w+") as _f: try: GonioHeadConfiguration(**data) except ValidationError: msg = f"Validation error in {self.chip_definition_file}" self.log.exception(msg) else: _f.write(json.dumps(data, indent=4))
[docs] def set_chip_layout(self, layout_name: str) -> bool: """Choose the chip configuration layout. Args: The layout name. """ data = self.get_head_configuration().dict() data["current"] = layout_name self.set_head_configuration(json.dumps(data)) return True
# -------- Phases --------
[docs] def set_phase(self, value: DiffractometerPhase | str, timeout: float | None = None): """Sets diffractometer to selected phase. Args: value: DiffractometerPhase or string value timeout: timeout [s], If timeout = 0: return at once and do not wait; if timeout is None: wait forever (default). """ if not isinstance(value, DiffractometerPhase): value = self.value_to_enum(value, DiffractometerPhase) if value != DiffractometerPhase.UNKNOWN: self.update_state(HardwareObjectState.BUSY) self._set_phase(value) if timeout == 0: return self.wait_ready(timeout)
def _set_phase(self, value: DiffractometerPhase): """Specific implementation to set the diffractometer to selected phase Args: value: requested phase. """
[docs] def get_phase(self) -> DiffractometerPhase: """Get the current phase.""" return self.current_phase
[docs] def get_phase_list(self) -> list: """Return a list of all the defined phases.""" phase_list = [] for member in DiffractometerPhase: _nam = member.name if _nam not in ["IN", "OUT", "UNKNOWN"]: phase_list.append(_nam) return phase_list
@property def get_phase_enum(self): """Get the phase Enum. Used when no import wished.""" return DiffractometerPhase
[docs] def update_phase(self, value: DiffractometerPhase | None = None): """Update the phase value, Emit phaseChanged signal. Args: value: DiffractometerPhase member. Optional. """ if value is None: value = self.get_phase() if not isinstance(value, DiffractometerPhase): value = DiffractometerPhase(value) if self.current_phase != value: self.current_phase = value self.emit("phaseChanged", (value.name,))
# -------- Constraints --------
[docs] def set_constraint( self, value: DiffractometerConstraint, timeout: float | None = None ): """Sets diffractometer to selected constraint. Args: value: DiffractometerConstraint member. timeout: optional - timeout [s], if timeout = 0: return at once and do not wait, if timeout is None: wait forever (default). """ if isinstance(value, DiffractometerConstraint): constraint = value else: constraint = self.value_to_enum(value, DiffractometerConstraint) self._set_constraint(constraint) self._update_value(constraint, value_cmp=self.get_constraint()) if timeout == 0: return self.wait_ready(timeout)
def _set_constraint(self, value: DiffractometerConstraint): """Specific implementation to set the diffractometer to selected constraint. """
[docs] def get_constraint(self): """Get the current constraint Returns: (Enum): DiffractometerConstraint member. """ return self.current_constraint
@property def get_constraint_enum(self): """Get the constraints Enum. Used when no import wished.""" return DiffractometerConstraint # -------- data acquisition scans --------
[docs] def do_oscillation_scan(self, *args, **kwargs): """Do an oscillation scan.""" raise NotImplementedError
[docs] def do_line_scan(self, *args, **kwargs): """Do a line (helical) scan.""" raise NotImplementedError
[docs] def do_mesh_scan(self, *args, **kwargs): """Do a mesh scan.""" raise NotImplementedError
[docs] def do_still_scan(self, *args, **kwargs): """Do a zero oscillation acquisition.""" raise NotImplementedError
[docs] def do_characterisation_scan(self, *args, **kwargs): """Do characterisation.""" raise NotImplementedError
def _update_value(self, value=None, value_cmp=None): """Check if the value has changed. Emits signal valueChanged. Args: value: value of any type value_cmp: Value to compare with. """ curr_value = None if value_cmp and value is None: curr_value = value_cmp if value != curr_value: self.emit("valueChanged", (value,)) # -------- auxilarly methods --------
[docs] def value_to_enum(self, value, which_enum): """Tranform a value to Enum Args: value(str, int, float, tuple, list): value which_enum (Enum): The enum to be checked. Returns: (Enum): Enum member, corresponding to the value or UNKNOWN. """ try: return which_enum[value] except ValueError: for evar in which_enum: if isinstance(evar.value, (tuple, list)) and (value in evar.value): return evar return which_enum.UNKNOWN