# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Lesser Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""Abstract Diffractometer class.
Initialises the username property and all the motors and nstate (discrete
positions) equipment, which are part of the diffractometer.
Certain number of roles are fixed and define a corresponding motor or nstate
actuator object. This allows to use them in a standard way by the MXCuBE
application (web or Qt).
There is also a convention of the direction of the motors:
- z axis is in the direction of gravity. Positive direction is upwards.
- y axis is perpendicular to the z axis and to the nominal beam direction.
Positive direction is so that x,y,z would form a right-handed coordinate system.
- x axis is orthogonal to the y and z axes (and approximately parallel to the beam).
Positive direction is from the sample towards the detector.
Here follows the list of the fixed roles and the description of the
corresponding objects, accessible via beamline.diffractometer hardware object.
1. Motor objects (roles) and their functionality:
omega - the rotation axis, independent of the orientation (up, down or side).
Positive direction is a right-handed rotation.
kappa - the rotation axis between the omega and kappa_phi axes.
Positive direction is a right-handed rotation. Not present on all goniostats.
kappa_phi - the rotation axis directly attached to the sample.
Positive direction is a right-handed rotation. Not present on all goniostats.
sampx - centring table x axis
sampy - centring table y axis
focus - alignment table x axis
phiy - alignment table y axis
phiz - alignment table z axis
sample_horizontal - Pseudomotor approximately along the y axis.
Generated by phiy or a combination of sampx, sampy depending on goniostat
sample_vertical - Pseudomotor approximately along the z axis.
Generated or phiz or a combination of sampx, sampy depending on goniostat
backlight - adjust the intensity of the back light
frontlight - adjust the intensity of the front light
kappa - the minikappa kappa axis
kappa_phi - the minikappa phi axis
2. Discrete (N) state equipment and its functionality:
zoom - zoom levels
fshutter - fast (ms) shutter - allow beam on the sample
beamstop - put in front of a detector to avoid the direct beam.
capillary - if present, a tube to reduce the scattering background.
backlightswitch - move the backlight on the level of the on-axis viewer
frontlightswitch - switch on/off the front light
fluo_detector - if present, actuator to move a fluorescence detector close to the sample
3. Goniometer head orientation - horizontal or vertical
horizontal - the sample holder is parallel to the ground
vertical - the sample holder is perpendicular to the ground
"""
import abc
import json
from enum import Enum, unique
from typing import Dict, List, Optional, Tuple, Union
from pathlib import Path
from pydantic.v1 import BaseModel, Field, ValidationError
from mxcubecore.BaseHardwareObjects import HardwareObject, HardwareObjectState
from mxcubecore import HardwareRepository as HWR
__copyright__ = """ Copyright © by the MXCuBE collaboration """
__license__ = "LGPLv3+"
[docs]@unique
class DiffractometerHead(Enum):
"""Enumeration diffractometer head types."""
UNKNOWN = "Unknown"
MINI_KAPPA = "MiniKappa"
SMART_MAGNET = "SmartMagnet"
PLATE = "Plate"
SSX = "SSX"
HARVESTER = "Harvester"
[docs]@unique
class DiffractometerPhase(Enum):
"""Enumeration diffractometer phases."""
UNKNOWN = "Unknown"
CENTRE = "Centring"
COLLECT = "DataCollection"
SEE_BEAM = "BeamLocation"
TRANSFER = "Transfer"
[docs]@unique
class DiffractometerConstraint(Enum):
"""Enumeration diffractometer constraint types."""
UNKNOWN = "Unknown"
RELEASE = "Normal"
INJECTOR = "Injector"
STILL = "LockRotation"
[docs]class HolderTypeEnum(Enum):
"""Enumeration of chip holder geometry."""
KNOWN_GEOMETRY = "known_geometry"
FREE_GEOMETRY = "free_geometry"
[docs]class ChipShapeEnum(Enum):
"""Enumeration of chip holder shape."""
RECTANGULAR = "RECTANGULAR"
ELLIPTICAL = "ELLIPTICAL"
# Diffractometer pydantic models
[docs]class CalibrationData(BaseModel):
"""Chip calibration model."""
top_left: Tuple[float, float, float] = Field(
[0, 0, 0], description="Top left corner motor position"
)
top_right: Tuple[float, float, float] = Field(
[0, 0, 0], description="Top right corner motor position"
)
bottom_left: Tuple[float, float, float] = Field(
[0, 0, 0], description="Bottom left corner motor position"
)
[docs]class SampleHolderSectionModel(BaseModel):
"""Generic sample holder."""
calibration_data: Optional[CalibrationData]
section_offset: Tuple[int, int] = Field(
[0, 0], description="Block offset in grid layout system coordinates x, y"
)
block_size: Tuple[float, float] = Field(
[15, 15], description="Block size horizontal, vertical in mm"
)
block_spacing: Tuple[float, float] = Field(
[15, 15], description="Spacing between blocks horizontal, vertical in mm"
)
block_shape: ChipShapeEnum = ChipShapeEnum.RECTANGULAR
number_of_rows: int = Field(6, description="Numer of rows")
number_of_collumns: int = Field(6, description="Numer of collumns")
row_labels: List[str] = Field([], description="Row lables")
column_lables: List[str] = Field([], description="Collumn lables")
targets_per_block: Tuple[int, int] = Field(
[20, 20], description="Targets per block dim1 and dim2"
)
[docs]class ChipLayout(BaseModel):
"""Chip layout model."""
head_type: DiffractometerHead = DiffractometerHead.SSX
holder_type: HolderTypeEnum = HolderTypeEnum.KNOWN_GEOMETRY
holder_brand: str = Field("", description="Brand/make of sample holder")
holder_size: Tuple[float, float] = Field(
[0, 0], description="Size of sample holder in mm horizontal and vertical"
)
sections: List[SampleHolderSectionModel]
calibration_data: CalibrationData
class GonioHeadConfiguration(BaseModel):
current: str = Field("", description="Selected chip layout")
available: Dict[str, ChipLayout]
[docs]class AbstractDiffractometer(HardwareObject):
"""Abstract Diffractometer.
Attributes:
motors_hwobj_dict (dict): Motor hardware objects.
nstate_equipment_hwobj_dict (dict): N state hardware objects.
username (str): User name
head_type (DiffractometerHead Enum): Current head type
current_phase (DiffractometerPhase Enum): Current phase
current_constraint (DiffractometerConstraint Enum): Current constraint.
timeout (float): Default action timeout [s]
Emits:
valueChanged: ("valueChanged", (value,))
phaseChanged: ("phaseChanged", (state))
States:
HardwareObjectStates: READY, BUSY, FAULT
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name):
super().__init__(name)
self.motors_hwobj_dict = {}
self.nstate_equipment_hwobj_dict = {}
self.username = name
self.current_phase = None
self.head_type = None
self.current_constraint = None
self.timeout = 3 # default timeout 3 s
self.chip_definition_file = ""
self.head_orientation = ""
[docs] def init(self):
"""Initialise username property.
Initialise the equipment, defined in the configuration file
"""
super().init()
self.username = self.get_property("username") or self.username
self.head_orientation = self.get_property("head_orientation")
# motors
for role in self.config.motors:
try:
_hobj = self.get_object_by_role(role)
self.motors_hwobj_dict[role] = _hobj
setattr(self, role, _hobj)
self.connect(_hobj, "valueChanged", _hobj.update_value)
except KeyError:
self.log.warning("Diffractometer: No motors configured")
# nstate (discrete positions) equipment
for role in self.config.nstate_equipment:
try:
_hobj = self.get_object_by_role(role)
self.nstate_equipment_hwobj_dict[role] = _hobj
setattr(self, role, _hobj)
self.connect(_hobj, "valueChanged", _hobj.update_value)
except KeyError:
self.log.warning("No nstate (discrete positions) equipment configured")
# chip definition
_fp = self.get_property("chip_definition_file", "")
self.chip_definition_file = HWR.get_hardware_repository().find_in_repository(
_fp
)
[docs] def get_motors(self) -> dict:
"""Get the dictionary of all configured motors or the ones to use.
Returns:
Dictionary {role: hardware_object}
"""
return self.motors_hwobj_dict.copy()
[docs] def get_nstate_equipment(self) -> dict:
"""Get the dictionary of all the nstate (discrete positions) equipment.
Returns:
Dictionary {role: hardware_object}
"""
return self.nstate_equipment_hwobj_dict.copy()
# -------- Motor Groups --------
[docs] def set_value_motors(
self,
motors_positions_dict: dict,
simultaneous: bool = True,
timeout: float | None = None,
):
"""Move specified motors to the requested positions
Args:
motors_positions_dict: Dictionary {motor_role: target_value}.
simultaneous: Move the motors simultaneousl (True - default) or not.
timeout: timeout [s],
if timeout = 0: return at once and do not wait,
if timeout is None: wait forever (default).
Raises:
TimeoutError: Timeout
KeyError: The name does not correspond to an existing motor
"""
# use only the available motors
mot_hwobj_dict = self.get_motors()
tout = timeout
if simultaneous:
tout = 0
self.update_state(HardwareObjectState.BUSY)
for key, val in motors_positions_dict.items():
try:
mot_hwobj_dict[key].set_value(val, timeout=tout)
except KeyError as err:
msg = f"Invalid motor name {key}"
raise RuntimeError(msg) from err
# wait for the end of move of all the motors, if needed
if simultaneous:
for key in motors_positions_dict:
mot_hwobj_dict[key].wait_ready(timeout)
self.update_state()
[docs] def get_value_motors(self, motors_list: list | None = None) -> dict:
"""Get the positions of diffractometer motors. If the motors_list is
empty, return the positions of all the available motors.
Args:
List of motor roles.
Returns:
Dictionary {motor_role: position}
"""
mot_pos_dict = {}
# use only the available motors
mot_hwobj_dict = self.get_motors()
if not motors_list:
for role, motor in mot_hwobj_dict.items():
try:
mot_pos_dict[role] = float(motor.get_value())
except TypeError:
msg = f"No value for {role}"
self.log.warning(msg)
return mot_pos_dict
for motor in motors_list:
try:
mot_pos_dict[str(motor)] = float(mot_hwobj_dict[motor].get_value())
except KeyError:
msg = f"Invalid motor name {motor}"
self.log.exception(msg)
except TypeError:
msg = f"No value for {motor}"
self.log.warning(msg)
return mot_pos_dict
[docs] def get_state_motors(self, motors_list: list | None = None) -> dict:
"""Get the state of diffractometer motors. If the motors_list is
empty, return the state of all the available motors.
Args:
List of motor roles.
Returns:
Dictionary {motor_role: state}
"""
mot_state_dict = {}
# use only the available motors
mot_hwobj_dict = self.get_motors()
motors_list = motors_list or mot_hwobj_dict.keys()
if not motors_list:
for role, motor in mot_hwobj_dict.items():
try:
mot_state_dict[role] = float(motor.get_state())
except TypeError:
msg = f"No value for {role}"
self.log.warning(msg)
return mot_state_dict
for motor in motors_list:
try:
mot_state_dict[str(motor)] = float(mot_hwobj_dict[motor].get_state())
except KeyError:
msg = f"Invalid motor name {motor}"
self.log.exception(msg)
except TypeError:
msg = f"No value for {motor}"
self.log.warning(msg)
return mot_state_dict
# -------- Head Type and Modes --------
@property
def in_plate_mode(self) -> bool:
"""Check if the head is a plate."""
return self.head_type == DiffractometerHead.PLATE
@property
def in_kappa_mode(self) -> bool:
"""Check if the head is MiniKappa."""
return self.head_type == DiffractometerHead.MINI_KAPPA
@property
def in_chip_mode(self) -> bool:
"""Check if there is chip configuration of the head."""
return (
self.head_type == DiffractometerHead.SSX
and self.current_constraint == DiffractometerConstraint.STILL
)
@property
def in_injector_mode(self) -> bool:
"""Check if there is injector on the head."""
return (
self.head_type == DiffractometerHead.SSX
and self.current_constraint == DiffractometerConstraint.INJECTOR
)
@property
def head_enum(self):
"""Get the diffractometer head Enum. Used when no import wished."""
return DiffractometerHead
[docs] def get_chip_configuration(self) -> Union[GonioHeadConfiguration, None]:
"""Get the chip configuration."""
data = None
if Path(self.chip_definition_file).is_file():
with Path(self.chip_definition_file).open("r") as _f:
chip_def = json.load(_f)
try:
data = GonioHeadConfiguration(**chip_def)
except ValidationError:
msg = f"Validation error in {self.chip_definition_file}"
self.log.exception(msg)
return data
[docs] def set_head_configuration(self, str_data: str) -> None:
"""Write the chip configuration in the chip configuration json file.
Args:
String containing the configuration.
"""
data = json.loads(str_data)
if Path(self.chip_definition_file).is_file():
with Path(self.chip_definition_file).open("w+") as _f:
try:
GonioHeadConfiguration(**data)
except ValidationError:
msg = f"Validation error in {self.chip_definition_file}"
self.log.exception(msg)
else:
_f.write(json.dumps(data, indent=4))
[docs] def set_chip_layout(self, layout_name: str) -> bool:
"""Choose the chip configuration layout.
Args:
The layout name.
"""
data = self.get_head_configuration().dict()
data["current"] = layout_name
self.set_head_configuration(json.dumps(data))
return True
# -------- Phases --------
[docs] def set_phase(self, value: DiffractometerPhase | str, timeout: float | None = None):
"""Sets diffractometer to selected phase.
Args:
value: DiffractometerPhase or string value
timeout: timeout [s],
If timeout = 0: return at once and do not wait;
if timeout is None: wait forever (default).
"""
if not isinstance(value, DiffractometerPhase):
value = self.value_to_enum(value, DiffractometerPhase)
if value != DiffractometerPhase.UNKNOWN:
self.update_state(HardwareObjectState.BUSY)
self._set_phase(value)
if timeout == 0:
return
self.wait_ready(timeout)
def _set_phase(self, value: DiffractometerPhase):
"""Specific implementation to set the diffractometer to selected phase
Args:
value: requested phase.
"""
[docs] def get_phase(self) -> DiffractometerPhase:
"""Get the current phase."""
return self.current_phase
[docs] def get_phase_list(self) -> list:
"""Return a list of all the defined phases."""
phase_list = []
for member in DiffractometerPhase:
_nam = member.name
if _nam not in ["IN", "OUT", "UNKNOWN"]:
phase_list.append(_nam)
return phase_list
@property
def get_phase_enum(self):
"""Get the phase Enum. Used when no import wished."""
return DiffractometerPhase
[docs] def update_phase(self, value: DiffractometerPhase | None = None):
"""Update the phase value, Emit phaseChanged signal.
Args:
value: DiffractometerPhase member. Optional.
"""
if value is None:
value = self.get_phase()
if not isinstance(value, DiffractometerPhase):
value = DiffractometerPhase(value)
if self.current_phase != value:
self.current_phase = value
self.emit("phaseChanged", (value.name,))
# -------- Constraints --------
[docs] def set_constraint(
self, value: DiffractometerConstraint, timeout: float | None = None
):
"""Sets diffractometer to selected constraint.
Args:
value: DiffractometerConstraint member.
timeout: optional - timeout [s],
if timeout = 0: return at once and do not wait,
if timeout is None: wait forever (default).
"""
if isinstance(value, DiffractometerConstraint):
constraint = value
else:
constraint = self.value_to_enum(value, DiffractometerConstraint)
self._set_constraint(constraint)
self._update_value(constraint, value_cmp=self.get_constraint())
if timeout == 0:
return
self.wait_ready(timeout)
def _set_constraint(self, value: DiffractometerConstraint):
"""Specific implementation to set the diffractometer to selected
constraint.
"""
[docs] def get_constraint(self):
"""Get the current constraint
Returns:
(Enum): DiffractometerConstraint member.
"""
return self.current_constraint
@property
def get_constraint_enum(self):
"""Get the constraints Enum. Used when no import wished."""
return DiffractometerConstraint
# -------- data acquisition scans --------
[docs] def do_oscillation_scan(self, *args, **kwargs):
"""Do an oscillation scan."""
raise NotImplementedError
[docs] def do_line_scan(self, *args, **kwargs):
"""Do a line (helical) scan."""
raise NotImplementedError
[docs] def do_mesh_scan(self, *args, **kwargs):
"""Do a mesh scan."""
raise NotImplementedError
[docs] def do_still_scan(self, *args, **kwargs):
"""Do a zero oscillation acquisition."""
raise NotImplementedError
[docs] def do_characterisation_scan(self, *args, **kwargs):
"""Do characterisation."""
raise NotImplementedError
def _update_value(self, value=None, value_cmp=None):
"""Check if the value has changed. Emits signal valueChanged.
Args:
value: value of any type
value_cmp: Value to compare with.
"""
curr_value = None
if value_cmp and value is None:
curr_value = value_cmp
if value != curr_value:
self.emit("valueChanged", (value,))
# -------- auxilarly methods --------
[docs] def value_to_enum(self, value, which_enum):
"""Tranform a value to Enum
Args:
value(str, int, float, tuple, list): value
which_enum (Enum): The enum to be checked.
Returns:
(Enum): Enum member, corresponding to the value or UNKNOWN.
"""
try:
return which_enum[value]
except ValueError:
for evar in which_enum:
if isinstance(evar.value, (tuple, list)) and (value in evar.value):
return evar
return which_enum.UNKNOWN