#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
[Name]
ALBAMiniDiff
[Description]
Specific HwObj for M2D2 diffractometer @ ALBA
[Channels]
- N/A
[Commands]
- N/A
[Emitted signals]
- pixelsPerMmChanged
- phiMotorMoved
- stateChanged
- zoomMotorPredefinedPositionChanged
[Functions]
- None
[Included Hardware Objects]
- None
"""
import time
import gevent
from mxcubecore.HardwareObjects.GenericDiffractometer import (
DiffractometerState,
GenericDiffractometer,
)
from mxcubecore.model import queue_model_objects
__author__ = "Jordi Andreu"
__credits__ = ["MXCuBE collaboration"]
__version__ = "2.2."
__maintainer__ = "Jordi Andreu"
__email__ = "jandreu[at]cells.es"
__status__ = "Draft"
[docs]class ALBAMiniDiff(GenericDiffractometer):
"""
Specific diffractometer HwObj for XALOC beamline.
"""
def __init__(self, *args):
GenericDiffractometer.__init__(self, *args)
self.centring_hwobj = None
self.super_hwobj = None
[docs] def init(self):
self.calibration = self.get_object_by_role("calibration")
self.centring_hwobj = self.get_object_by_role("centring")
self.super_hwobj = self.get_object_by_role("beamline-supervisor")
if self.centring_hwobj is None:
self.log.debug("ALBAMinidiff: Centring math is not defined")
if self.super_hwobj is not None:
self.connect(
self.super_hwobj, "stateChanged", self.supervisor_state_changed
)
self.connect(
self.super_hwobj, "phaseChanged", self.supervisor_phase_changed
)
self.state_channel = self.get_channel_object("State")
self.connect(self.state_channel, "update", self.state_changed)
# This is not used
self.cmd_start_auto_focus = self.get_command_object("startAutoFocus")
self.phi_motor_hwobj = self.get_object_by_role("phi")
self.phiz_motor_hwobj = self.get_object_by_role("phiz")
self.phiy_motor_hwobj = self.get_object_by_role("phiy")
self.zoom_motor_hwobj = self.get_object_by_role("zoom")
self.focus_motor_hwobj = self.get_object_by_role("focus")
self.sample_x_motor_hwobj = self.get_object_by_role("sampx")
self.sample_y_motor_hwobj = self.get_object_by_role("sampy")
if self.phi_motor_hwobj is not None:
self.connect(
self.phi_motor_hwobj, "stateChanged", self.phi_motor_state_changed
)
self.connect(self.phi_motor_hwobj, "valueChanged", self.phi_motor_moved)
else:
self.log.error("ALBAMiniDiff: Phi motor is not defined")
if self.phiz_motor_hwobj is not None:
self.connect(
self.phiz_motor_hwobj, "stateChanged", self.phiz_motor_state_changed
)
self.connect(self.phiz_motor_hwobj, "valueChanged", self.phiz_motor_moved)
else:
self.log.error("ALBAMiniDiff: Phiz motor is not defined")
if self.phiy_motor_hwobj is not None:
self.connect(
self.phiy_motor_hwobj, "stateChanged", self.phiy_motor_state_changed
)
self.connect(self.phiy_motor_hwobj, "valueChanged", self.phiy_motor_moved)
else:
self.log.error("ALBAMiniDiff: Phiy motor is not defined")
if self.zoom_motor_hwobj is not None:
self.connect(
self.zoom_motor_hwobj, "valueChanged", self.zoom_position_changed
)
self.connect(
self.zoom_motor_hwobj,
"predefinedPositionChanged",
self.zoom_motor_predefined_position_changed,
)
self.connect(
self.zoom_motor_hwobj, "stateChanged", self.zoom_motor_state_changed
)
else:
self.log.error("ALBAMiniDiff: Zoom motor is not defined")
if self.sample_x_motor_hwobj is not None:
self.connect(
self.sample_x_motor_hwobj,
"stateChanged",
self.sampleX_motor_state_changed,
)
self.connect(
self.sample_x_motor_hwobj, "valueChanged", self.sampleX_motor_moved
)
else:
self.log.error("ALBAMiniDiff: Sampx motor is not defined")
if self.sample_y_motor_hwobj is not None:
self.connect(
self.sample_y_motor_hwobj,
"stateChanged",
self.sampleY_motor_state_changed,
)
self.connect(
self.sample_y_motor_hwobj, "valueChanged", self.sampleY_motor_moved
)
else:
self.log.error("ALBAMiniDiff: Sampx motor is not defined")
if self.focus_motor_hwobj is not None:
self.connect(self.focus_motor_hwobj, "valueChanged", self.focus_motor_moved)
GenericDiffractometer.init(self)
queue_model_objects.CentredPosition.set_diffractometer_motor_names(
"phi", "phiy", "phiz", "sampx", "sampy", "kappa"
)
[docs] def state_changed(self, state):
"""
Overrides method to map Tango ON state to Difractaometer State Ready.
@state: Tango state
"""
if str(state) == "ON":
state = DiffractometerState.tostring(DiffractometerState.Ready)
if state != self.current_state:
self.log.debug(
"ALBAMinidiff: State changed %s (was: %s)"
% (str(state), self.current_state)
)
self.current_state = state
self.emit("minidiffStateChanged", (self.current_state))
[docs] def getCalibrationData(self, offset=None):
"""
Get pixel size for OAV system
@offset: Unused
@return: 2-tuple float
"""
calibx, caliby = self.calibration.getCalibration()
return 1000.0 / caliby, 1000.0 / caliby
# return 1000./self.md2.CoaxCamScaleX, 1000./self.md2.CoaxCamScaleY
[docs] def get_pixels_per_mm(self):
"""
Returns the pixel/mm for x and y. Overrides GenericDiffractometer method.
"""
px_x, px_y = self.getCalibrationData()
return (px_x, px_y)
[docs] def update_pixels_per_mm(self, *args):
"""
Emit signal with current pixel/mm values.
"""
self.pixels_per_mm_x, self.pixels_per_mm_y = self.getCalibrationData()
self.emit("pixelsPerMmChanged", ((self.pixels_per_mm_x, self.pixels_per_mm_y),))
# TODO: looks quite bizarre.
[docs] def motor_positions_to_screen(self, centred_positions_dict):
"""
Convert motor positions contained in a dictionary to screen (canvas) positions.
Overrides GenericDiffractometer method.
@centered_positions_dict: dictionary to be converted
@return: position
"""
c = centred_positions_dict
# if self.head_type == GenericDiffractometer.HEAD_TYPE_MINIKAPPA:
# kappa = self.motor_hwobj_dict["kappa"]
# phi = self.motor_hwobj_dict["kappa_phi"]
xy = self.centring_hwobj.centringToScreen(c)
if xy is None:
return None
x = xy["X"] * self.pixels_per_mm_x + self.zoom_centre["x"]
y = xy["Y"] * self.pixels_per_mm_y + self.zoom_centre["y"]
# self.log.debug(" motor_positions_to_screen ")
# self.log.debug(" positions = %s " % str(centred_positions_dict))
# self.log.debug(" x,y = %s, %s " % (x,y))
return x, y
# TODO: Must be implemented correctly.
[docs] def get_centred_point_from_coord(self, x, y, return_by_names=None):
"""
Returns a dictionary with motors name and positions centred.
It is expected in start_move_to_beam and move_to_beam methods in
GenericDIffractometer HwObj.
@return: dict
"""
return {"omega": [200, 200]}
[docs] def getBeamInfo(self, update_beam_callback):
"""
Update beam info (position and shape) and execute callback.
@update_beam_callback: callback method passed as argument.
"""
calibx, caliby = self.calibration.getCalibration()
size_x = self.get_channel_object("beamInfoX").get_value() / 1000.0
size_y = self.get_channel_object("beamInfoY").get_value() / 1000.0
data = {"size_x": size_x, "size_y": size_y, "shape": "ellipse"}
update_beam_callback(data)
# TODO:Implement dynamically
[docs] def use_sample_changer(self):
"""
Overrides GenericDiffracometer method.
"""
return True
# TODO:Implement dynamically
@property
def in_plate_mode(self):
"""
Overrides GenericDiffracometer method.
"""
return False
# We are using the sample_centring module. this is not used anymore
[docs] def start_manual_centring(self, *args, **kwargs):
"""
Start manual centring. Overrides GenericDiffracometer method.
Prepares diffractometer for manual centring.
"""
if self.prepare_centring():
GenericDiffractometer.start_manual_centring(self, *args, **kwargs)
else:
self.log.info(" Failed to prepare diffractometer for centring")
self.invalidate_centring()
[docs] def start_auto_centring(self, *args, **kwargs):
"""
Start manual centring. Overrides GenericDiffracometer method.
Prepares diffractometer for manual centring.
"""
if self.prepare_centring():
GenericDiffractometer.start_auto_centring(self, *args, **kwargs)
else:
self.log.info(" Failed to prepare diffractometer for centring")
self.invalidate_centring()
[docs] def prepare_centring(self):
"""
Prepare beamline for to sample_view phase.
"""
if not self.is_sample_view_phase():
self.log.info(" Not in sample view phase. Asking supervisor to go")
success = self.go_sample_view()
if not success:
self.log.info("Cannot set SAMPLE VIEW phase")
return False
return True
# def manual_centring(self):
# """
# We are using the sample_centring module. this is not used anymore
# """
# self.centring_hwobj.initCentringProcedure()
# # self.head_type = self.chan_head_type.get_value()
# # Say diffractometer to go to SampleView phase
# # go to sample_view phase
# if not self.is_sample_view_phase():
# self.log.info(" Not in sample view phase. Asking supervisor to go")
# success = self.go_sample_view()
# if not success:
# self.log.info("Cannot set SAMPLE VIEW phase")
# return False
# phi_init_position = self.phi_motor_hwobj.get_value()
# for click in range(3):
# self.user_clicked_event = gevent.event.AsyncResult()
# x, y = self.user_clicked_event.get()
# self.centring_hwobj.appendCentringDataPoint(
# {"X": (x - self.zoom_centre['x'])/ self.pixels_per_mm_x,
# "Y": (y - self.zoom_centre['y'])/ self.pixels_per_mm_y})
# if self.in_plate_mode:
# dynamic_limits = self.phi_motor_hwobj.get_dynamic_limits()
# if click == 0:
# self.phi_motor_hwobj.set_value(dynamic_limits[0])
# elif click == 1:
# self.phi_motor_hwobj.set_value(dynamic_limits[1])
# else:
# if click < 2:
# self.phi_motor_hwobj.set_value_relative(-90, timeout=None)
# #self.log.info(" Returning phi to initial position %s" % phi_init_position)
# #self.phi_motor_hwobj.set_value(phi_init_position, timeout=None)
#
# return self.centring_hwobj.centeredPosition(return_by_name=False)
[docs] def is_sample_view_phase(self):
"""
Returns boolean by comparing the supervisor current phase and SAMPLE view phase.
@return: boolean
"""
return self.super_hwobj.get_current_phase().upper() == "SAMPLE"
[docs] def go_sample_view(self):
"""
Go to sample view phase.
"""
self.super_hwobj.go_sample_view()
while True:
super_state = str(self.super_hwobj.get_state()).upper()
if super_state != "MOVING":
self.log.debug(
"ALBAMinidiff: go_sample_view done . super_state is %s"
% super_state
)
return True
gevent.sleep(0.2)
[docs] def supervisor_state_changed(self, state):
"""
Emit stateChanged signal according to supervisor current state.
"""
return
self.current_state = state
self.emit("stateChanged", (self.current_state,))
# TODO: Review override current_state by current_phase
[docs] def supervisor_phase_changed(self, phase):
"""
Emit stateChanged signal according to supervisor current phase.
"""
# self.current_state = phase
self.emit("minidiffPhaseChanged", (phase,))
[docs] def phi_motor_moved(self, pos):
"""
Emit phiMotorMoved signal with position value.
"""
self.current_motor_positions["phi"] = pos
# self.emit_diffractometer_moved()
self.emit("phiMotorMoved", pos)
# self.emit('stateChanged', (self.current_motor_states["phi"], ))
[docs] def phi_motor_state_changed(self, state):
"""
Emit stateChanged signal with state value.
"""
self.current_motor_states["phi"] = state
self.emit("stateChanged", (state,))
def phiz_motor_moved(self, pos):
""" """
self.current_motor_positions["phiz"] = pos
# if time.time() - self.centring_time > 3.0:
# self.invalidate_centring()
# self.emit_diffractometer_moved()
[docs] def phiz_motor_state_changed(self, state):
"""
Emit stateChanged signal with state value.
"""
self.emit("stateChanged", (state,))
[docs] def phiy_motor_state_changed(self, state):
"""
Emit stateChanged signal with state value.
"""
self.emit("stateChanged", (state,))
def phiy_motor_moved(self, pos):
""" """
self.current_motor_positions["phiy"] = pos
# if time.time() - self.centring_time > 3.0:
# self.invalidate_centring()
# self.emit_diffractometer_moved()
[docs] def zoom_position_changed(self, value):
"""
Update positions after zoom changed.
@value: zoom position.
"""
self.update_pixels_per_mm()
self.current_motor_positions["zoom"] = value
self.refresh_omega_reference_position()
[docs] def zoom_motor_predefined_position_changed(self, position_name, offset):
"""
Update pixel size and emit signal.
"""
self.update_pixels_per_mm()
self.emit("zoomMotorPredefinedPositionChanged", (position_name, offset))
[docs] def zoom_motor_state_changed(self, state):
"""
Emit signal for motor zoom changed
@state: new state value to emit.
"""
self.emit("stateChanged", (state,))
def sampleX_motor_moved(self, pos):
""" """
self.current_motor_positions["sampx"] = pos
# if time.time() - self.centring_time > 3.0:
# self.invalidate_centring()
# self.emit_diffractometer_moved()
[docs] def sampleX_motor_state_changed(self, state):
"""
Emit stateChanged signal with state value.
"""
self.current_motor_states["sampx"] = state
self.emit("stateChanged", (state,))
def sampleY_motor_moved(self, pos):
""" """
self.current_motor_positions["sampy"] = pos
# if time.time() - self.centring_time > 3.0:
# self.invalidate_centring()
# self.emit_diffractometer_moved()
[docs] def sampleY_motor_state_changed(self, state):
"""
Emit stateChanged signal with state value.
"""
self.current_motor_states["sampy"] = state
self.emit("stateChanged", (state,))
def focus_motor_moved(self, pos):
""" """
self.current_motor_positions["focus"] = pos
# TODO: The command is not configured in the xml. Unused
def start_auto_focus(self):
self.cmd_start_auto_focus()
[docs] def move_omega(self, pos, velocity=None):
"""
Move omega to absolute position.
@pos: target position
"""
# turn it on
if velocity is not None:
self.phi_motor_hwobj.set_velocity(velocity)
self.phi_motor_hwobj.set_value(pos)
time.sleep(0.2)
# it should wait here
[docs] def move_omega_relative(self, relpos):
"""
Move omega to relative position.
@relpos: target relative position
"""
self.wait_device_ready()
self.phi_motor_hwobj.set_value_relative(relpos, timeout=None)
# TODO: define phases as enum members.
[docs] def set_phase(self, phase):
"""
General function to set phase by using supervisor commands.
"""
if phase == "Transfer":
self.super_hwobj.go_transfer()
elif phase == "Collect":
self.super_hwobj.go_collect()
elif phase == "BeamView":
self.super_hwobj.go_beam_view()
elif phase == "Centring":
self.super_hwobj.go_sample_view()
else:
self.log.warning(
"Diffractometer set_phase asked for un-handled phase: %s" % phase
)
def test_hwo(hwo):
print(hwo.get_phase_list())