import logging
import math
import time
import gevent
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import (
HardwareObject,
HardwareObjectState,
)
from mxcubecore.HardwareObjects import (
MiniDiff,
sample_centring,
)
MICRODIFF = None
EXPORTER_TO_HWOBJ_STATE = {
"Fault": HardwareObjectState.FAULT,
"Ready": HardwareObjectState.READY,
"Running": HardwareObjectState.BUSY,
"Moving": HardwareObjectState.BUSY,
"Busy": HardwareObjectState.BUSY,
"Unknown": HardwareObjectState.BUSY,
"Offline": HardwareObjectState.OFF,
}
[docs]class Microdiff(MiniDiff.MiniDiff):
[docs] def init(self):
global MICRODIFF
MICRODIFF = self
self.phiMotor = self.get_object_by_role("phi")
self.exporter_addr = self.get_property("exporter_address")
self.update_state(HardwareObjectState.READY)
self.x_calib = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "x_calib",
},
"CoaxCamScaleX",
)
self.y_calib = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "y_calib",
},
"CoaxCamScaleY",
)
self.moveMultipleMotors = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "move_multiple_motors",
},
"SyncMoveMotors",
)
self.head_type = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "head_type",
},
"HeadType",
)
self.kappa_channel = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "kappa_enable",
},
"KappaIsEnabled",
)
self.phases = {
"Centring": 1,
"BeamLocation": 2,
"DataCollection": 3,
"Transfer": 4,
}
self.move_phase = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "move_to_phase",
},
"startSetPhase",
)
self.readPhase = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "read_phase",
},
"CurrentPhase",
)
self.state = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "state",
},
"State",
)
self.scanLimits = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "scan_limits",
},
"getOmegaMotorDynamicScanLimits",
)
if self.get_property("use_hwstate"):
self.hwstate_attr = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "hwstate",
},
"HardwareState",
)
else:
self.hwstate_attr = None
self.swstate_attr = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "swstate",
},
"State",
)
self.nb_frames = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "nbframes",
},
"ScanNumberOfFrames",
)
# raster scan attributes
self.scan_range = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "scan_range",
},
"ScanRange",
)
self.scan_exposure_time = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "exposure_time",
},
"ScanExposureTime",
)
self.scan_start_angle = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_angle",
},
"ScanStartAngle",
)
self.scan_detector_gate_pulse_enabled = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "detector_gate_pulse_enabled",
},
"DetectorGatePulseEnabled",
)
self.scan_detector_gate_pulse_readout_time = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "detector_gate_pulse_readout_time",
},
"DetectorGatePulseReadoutTime",
)
self.abort_cmd = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "abort",
},
"abort",
)
self._move_sync_motors = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "move_sync_motors",
},
"startSimultaneousMoveMotors",
)
self.beam_position_horizontal = self.add_channel(
{"type": "exporter", "exporter_address": self.exporter_addr, "name": "bph"},
"BeamPositionHorizontal",
)
self.beam_position_vertical = self.add_channel(
{"type": "exporter", "exporter_address": self.exporter_addr, "name": "bpv"},
"BeamPositionVertical",
)
self.save_centring_positions = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "save_centring_positions",
},
"saveCentringPositions",
)
self.set_room_temperature_mode = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "set_room_temperature_mode",
},
"setRoomTemperatureMode",
)
self.prepare_ssx_grid_scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "prepare_ssx_grid_scan",
},
"prepareSSXGridScan",
)
self.start_ssx_scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_scan",
},
"startSSXGridScan",
)
self.start_still_ssx_scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_still_ssx_scan",
},
"startSSXStillScan",
)
self.prepare_ssx_grid_scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "prepare_ssx_grid_scan",
},
"prepareSSXGridScan",
)
self.get_ssx_scan_method = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "get_ssx_scan_method",
},
"getSSXScanMethod",
)
self.get_ssx_delay = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "get_ssx_delay",
},
"getSSXDeltaT",
)
self.prepare_ssx_line_scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "prepare_ssx_line_scan",
},
"prepareSSXLineScan",
)
self.start_ssx_line_scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"startSSXLineScan",
)
self.get_last_task_info = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "get_last_task_info",
},
"getLastTaskInfo",
)
#
# Args: doublex, doubley, doublez
# Retruns: 1 if success else other than 1
#
self.add_ssx_chip_calibration_fiducial = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"addSSXChipCalibrationFiducial",
)
self.add_ssx_chip_calibration_point = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"addSSXChipCalibrationPoint",
)
self.add_ssx_chip_calibration_point = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"addSSXChipCalibrationPoint",
)
self.add_ssx_chip_calibration_point = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"resetSSXChipCalibration",
)
self.get_ssx_chip_calibration_state = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"getSSXChipCalibrationState",
)
self.get_ssx_block_calibration_state = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"getSSXBlockCalibrationState",
)
self.start_ssx_all_block_calibration = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"startSSXAllBlockCalibration",
)
self.reset_ssx_all_block_calibration = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"resetSSXAllBlockCalibration",
)
self.ir_auto_focus = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_line_scan",
},
"doSSXAutoFocus",
)
self.start_ssx_all_block_calibration = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_ssx_all_block_calibration",
},
"startSSXAllBlockCalibration",
)
MiniDiff.MiniDiff.init(self)
self.centringPhiy.direction = -1
self.MOTOR_TO_EXPORTER_NAME = self.getMotorToExporterNames()
self.move_to_coord = self.move_to_beam
self.centringVertical = self.get_object_by_role("sample_vertical")
self.centringFocus = self.get_object_by_role("sample_focus")
self.saved_motor_position = {}
self.frontLight = self.get_object_by_role("FrontLight")
self.backLight = self.get_object_by_role("BackLight")
self.wait_ready = self._wait_ready
self.pixelsPerMmY, self.pixelsPerMmZ = self.getCalibrationData(None)
self.readPhase.connect_signal("update", self._update_value)
self.state.connect_signal("update", self._update_state)
HardwareObject.init(self)
self.handle_detector_cover = self.get_object_by_role("handle_detcover")
def use_position_for_calibration(self, data):
for _d in data:
if len(data[_d]) > 0:
x, y, z = data[_d]
logging.getLogger("HWR").info(
f"Setting fiducial {_d} to: ({x}, {y}, {z})"
)
self.add_ssx_chip_calibration_fiducial(x, y, z)
self.start_ssx_all_block_calibration()
res = 1
return res == 1
def _update_value(self, value=None):
if value is None:
value = self.get_current_phase()
self.emit("phaseChanged", (value))
def _update_state(self, value):
self.update_state(
EXPORTER_TO_HWOBJ_STATE.get(value, HardwareObjectState.UNKNOWN)
)
[docs] def abort(self):
self.abort_cmd()
return True
def getMotorToExporterNames(self):
MOTOR_TO_EXPORTER_NAME = {
"focus": self.focusMotor.get_property("actuator_name"),
"phi": self.phiMotor.get_property("actuator_name"),
"phiy": self.phiyMotor.get_property("actuator_name"),
"phiz": self.phizMotor.get_property("actuator_name"),
"sampx": self.sampleXMotor.get_property("actuator_name"),
"sampy": self.sampleYMotor.get_property("actuator_name"),
"zoom": "Zoom",
}
if self.in_kappa_mode:
MOTOR_TO_EXPORTER_NAME.update(
{"kappa": self.kappaMotor.get_property("actuator_name")}
)
MOTOR_TO_EXPORTER_NAME.update(
{"kappa_phi": self.kappaPhiMotor.get_property("actuator_name")}
)
return MOTOR_TO_EXPORTER_NAME
def save_current_motor_position(self):
motor_pos_dict = {
"focus": self.focusMotor.get_value(),
"phiy": self.phiyMotor.get_value(),
"phiz": self.phizMotor.get_value(),
"centring_focus": self.centringFocus.get_value(),
"centring_vertical": self.centringVertical.get_value(),
}
self.saved_motor_position = motor_pos_dict
def getCalibrationData(self, offset):
return (1.0 / self.x_calib.get_value(), 1.0 / self.y_calib.get_value())
def emitCentringSuccessful(self):
# check first if all the motors have stopped
self._wait_ready(30)
# do normal stuff
return MiniDiff.MiniDiff.emitCentringSuccessful(self)
def _ready(self):
if self.hwstate_attr:
if (
self.hwstate_attr.get_value() == "Ready"
and self.swstate_attr.get_value() == "Ready"
):
return True
else:
if self.swstate_attr.get_value() == "Ready":
return True
return False
def _wait_ready(self, timeout=None):
# None means infinite timeout
# <=0 means default timeout
if timeout is not None and timeout <= 0:
self.log.warning("DEBUG: Strange timeout value passed %s" % str(timeout))
timeout = 30
with gevent.Timeout(
timeout, RuntimeError("Timeout waiting for diffractometer to be ready")
):
while not self._ready():
time.sleep(0.5)
def open_detector_cover(self, timeout=10):
use = True
if self.handle_detector_cover:
try:
use = self.handle_detector_cover.get_value().value
except AttributeError:
use = False
logging.getLogger("user_level_log").info("Use detector cover open: %s" % use)
if use:
try:
detcover = self.get_object_by_role("controller").detcover
if detcover.state == "IN":
detcover.set_out(timeout)
except AttributeError:
logging.getLogger("user_level_log").exception(
"No detector cover configured"
)
def close_detector_cover(self, timeout=10):
use = True
if self.handle_detector_cover:
try:
use = self.handle_detector_cover.get_value().value
except AttributeError:
use = False
logging.getLogger("user_level_log").info("Use detector cover close: %s" % use)
if use:
try:
detcover = self.get_object_by_role("controller").detcover
if detcover.state == "OUT":
detcover.set_in(timeout)
except AttributeError:
logging.getLogger("user_level_log").exception(
"No detector cover configured"
)
def phase_prepare(self, phase):
if phase == "Centring":
try:
diffr = self.get_object_by_role("controller").diffractometer
diffr.prepare("centre")
except Exception:
self.log.exception("Cannot prepare centring")
[docs] def set_light_in(self):
"""Set the backlight in - used by the XMLRPC calls"""
self.log.info("Moving backlight in")
light_hwobj = self.get_object_by_role("BackLightSwitch")
light_hwobj.set_value(light_hwobj.VALUES.IN)
self._wait_ready(20)
[docs] def set_light_out(self):
"""Set the backlight out - used by the XMLRPC calls"""
self.log.info("Moving backlight out")
light_hwobj = self.get_object_by_role("BackLightSwitch")
light_hwobj.set_value(light_hwobj.VALUES.OUT)
self._wait_ready(20)
[docs] def set_phase(self, phase, wait=False, timeout=None):
"""Set the phase"""
_use_custom = self.get_property("use_custom_phase_script", False)
if not self._ready():
self.log.exception("MD not ready - phase not set.")
return
current_phase = self.get_current_phase()
msg = f"Current phase is {current_phase} and moving to {phase}"
logging.getLogger("user_level_log").info(msg)
if phase in self.phases:
if phase in ["BeamLocation", "Transfer"]:
self.close_detector_cover()
self.phase_prepare(phase)
if _use_custom and not self.in_plate_mode:
script = "ChangePhase_" + phase.lower()
msg = f"Changing phase to {phase}, using pmac script"
logging.getLogger("user_level_log").info(msg)
self.run_script(script)
self._wait_ready(600)
else:
self.move_phase(phase)
if wait:
timeout = timeout or 40
self._wait_ready(timeout)
def get_current_phase(self):
return self.readPhase.get_value()
def get_phase_list(self):
return list(self.phases.keys())
def move_sync_motors(self, motors_dict, wait=False, timeout=None):
in_kappa_mode = self.in_kappa_mode
argin = ""
# print "start moving motors =============", time.time()
if wait:
self._wait_ready()
for motor in motors_dict.keys():
position = motors_dict[motor]
if position is None:
continue
if not in_kappa_mode and motor in ("kappa", "kappa_phi"):
continue
name = self.MOTOR_TO_EXPORTER_NAME[motor]
argin += "%s=%0.3f;" % (name, position)
if not argin:
return
self._move_sync_motors(argin)
if wait:
time.sleep(0.1)
self._wait_ready()
# print "end moving motors =============", time.time()
def oscilScan(self, start, end, exptime, number_of_images, wait=False):
if self.in_plate_mode:
scan_speed = math.fabs(end - start) / exptime
low_lim, hi_lim = map(float, self.scanLimits(scan_speed))
if start < low_lim:
raise ValueError("Scan start below the allowed value %f" % low_lim)
elif end > hi_lim:
raise ValueError("Scan end above the allowed value %f" % hi_lim)
dead_time = HWR.beamline.detector.get_deadtime()
self.scan_detector_gate_pulse_enabled.set_value(True)
self.scan_detector_gate_pulse_readout_time.set_value(dead_time * 1000)
if self.get_property("md_set_number_of_frames", False):
self.nb_frames.set_value(number_of_images)
else:
self.nb_frames.set_value(1)
scan_params = "1\t%0.3f\t%0.3f\t%0.4f\t1" % (start, (end - start), exptime)
scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_scan",
},
"startScanEx",
)
scan(scan_params)
print("oscil scan started at ----------->", time.time())
if wait:
self._wait_ready(40 * 60) # Timeout of 40 min
def oscilScan4d(
self, start, end, exptime, number_of_images, motors_pos, wait=False
):
if self.in_plate_mode:
scan_speed = math.fabs(end - start) / exptime
low_lim, hi_lim = map(float, self.scanLimits(scan_speed))
if start < low_lim:
raise ValueError("Scan start below the allowed value %f" % low_lim)
elif end > hi_lim:
raise ValueError("Scan end above the allowed value %f" % hi_lim)
if self.get_property("md_set_number_of_frames", False):
self.nb_frames.set_value(number_of_images)
else:
self.nb_frames.set_value(1)
scan_params = "%0.3f\t%0.3f\t%f\t" % (start, (end - start), exptime)
scan_params += "%0.3f\t" % motors_pos["1"]["phiy"]
scan_params += "%0.3f\t" % motors_pos["1"]["phiz"]
scan_params += "%0.3f\t" % motors_pos["1"]["sampx"]
scan_params += "%0.3f\t" % motors_pos["1"]["sampy"]
scan_params += "%0.3f\t" % motors_pos["2"]["phiy"]
scan_params += "%0.3f\t" % motors_pos["2"]["phiz"]
scan_params += "%0.3f\t" % motors_pos["2"]["sampx"]
scan_params += "%0.3f" % motors_pos["2"]["sampy"]
scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_scan4d",
},
"startScan4DEx",
)
scan(scan_params)
print("helical scan started at ----------->", time.time())
if wait:
self._wait_ready(20 * 60) # timeout of 15 min
print("finished at ---------->", time.time())
def oscilScanMesh(
self,
start,
end,
exptime,
dead_time,
mesh_num_lines,
mesh_total_nb_frames,
mesh_center,
mesh_range,
wait=False,
):
self.scan_detector_gate_pulse_enabled.set_value(True)
dead_time = HWR.beamline.detector.get_deadtime()
self.scan_detector_gate_pulse_readout_time.set_value(dead_time * 1000)
self.move_motors(mesh_center.as_dict())
positions = self.get_positions()
params = "%0.3f\t" % (end - start)
params += "%0.3f\t" % -mesh_range["horizontal_range"]
params += "%0.3f\t" % mesh_range["vertical_range"]
params += "%0.3f\t" % start
params += "%0.3f\t" % positions["phiy"]
params += "%0.3f\t" % positions["phiz"]
params += "%0.3f\t" % positions["sampx"]
params += "%0.3f\t" % positions["sampy"]
params += "%d\t" % mesh_num_lines
params += "%d\t" % (mesh_total_nb_frames / mesh_num_lines)
params += "%0.6f\t" % (exptime / mesh_num_lines)
params += "%r\t" % True
params += "%r\t" % True
params += "%r\t" % self.get_property("use_fast_mesh", True)
scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_raster_scan",
},
"startRasterScanEx",
)
self._wait_ready(900) # timeout of 30 min
scan(params)
if wait:
# timeout of 30 min
self._wait_ready(1800)
def stillScan(self, pulse_duration, pulse_period, pulse_nb, wait=False):
scan_params = "%0.6f\t%0.6f\t%d" % (pulse_duration, pulse_period, pulse_nb)
scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "start_scan",
},
"startStillScan",
)
scan(scan_params)
print("still scan started at ----------->", time.time())
if wait:
self._wait_ready(1800) # timeout of 30 min
print("finished at ---------->", time.time())
[docs] def characterisation_scan(
self,
start,
scan_range,
nb_frames,
exptime,
nb_scans,
angle,
wait=False,
):
"""Do N scans continuously.
Args:
start (float): Position of omega for the first scan [deg].
scan_range (float): range for each scan [deg].
nb_frames (int): Frame numbers for each scan.
exptime (float): Total exposure time for each scan [s].
nb_scans (int): How many times a scan to be repeated.
angle (float): The angle between each scan [deg]. This number,
added to the last position of each scan and will
be the start position of the consequent scan.
wait (bool); Wait (True) or no (False) the end of the command.
"""
if self.in_plate_mode:
# to see if needed when plates
return
scan_params = "%d\t%0.3f\t%0.3f\t" % (nb_frames, start, scan_range)
scan_params += "%0.3f\t%d\t%0.3f" % (exptime, nb_scans, angle)
scan = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "characterisation_scan",
},
"startCharacterisationScanEx",
)
scan(scan_params)
print("characterisation scan started at ----------->", time.time())
if wait:
self._wait_ready(20 * 60) # timeout of 15 min
print("finished at ---------->", time.time())
@property
def in_plate_mode(self):
try:
return self.head_type.get_value() == "Plate"
except Exception:
return False
@property
def in_kappa_mode(self):
return (
self.head_type.get_value() == "MiniKappa" and self.kappa_channel.get_value()
)
[docs] def get_motors(self):
"""Get motor_name:Motor dictionary"""
return {
"phi": self.phiMotor,
"focus": self.focusMotor,
"phiy": self.phiyMotor,
"phiz": self.phizMotor,
"sampx": self.sampleXMotor,
"sampy": self.sampleYMotor,
"zoom": self.zoomMotor,
"kappa": self.kappaMotor if self.in_kappa_mode else None,
"kappa_phi": self.kappaPhiMotor if self.in_kappa_mode else None,
}
def get_positions(self):
pos = {
"phi": float(self.phiMotor.get_value()),
"focus": float(self.focusMotor.get_value()),
"phiy": float(self.phiyMotor.get_value()),
"phiz": float(self.phizMotor.get_value()),
"sampx": float(self.sampleXMotor.get_value()),
"sampy": float(self.sampleYMotor.get_value()),
"zoom": self.zoomMotor.get_value().value,
"kappa": (
float(self.kappaMotor.get_value()) if self.in_kappa_mode else None
),
"kappa_phi": (
float(self.kappaPhiMotor.get_value()) if self.in_kappa_mode else None
),
}
return pos
def move_motors(self, roles_positions_dict):
self.move_sync_motors(roles_positions_dict, wait=True)
def move_to_beam(self, x, y):
if not self.in_plate_mode:
MiniDiff.MiniDiff.move_to_beam(self, x, y)
else:
try:
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
self.centringVertical.set_value_relative(
self.centringPhiz.direction
* (y - beam_pos_y)
/ float(self.pixelsPerMmZ)
)
self.centringPhiy.set_value_relative(
self.centringPhiy.direction
* (x - beam_pos_x)
/ float(self.pixelsPerMmY)
)
except Exception:
logging.getLogger("user_level_log").exception(
"Microdiff: could not move to beam, aborting"
)
def start_manual_centring(self, sample_info=None):
self._wait_ready(5)
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
self.log.info("Starting centring procedure ...")
if self.in_plate_mode:
plateTranslation = self.get_object_by_role("plateTranslation")
cmd_set_plate_vertical = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "plate_vertical",
},
"setPlateVertical",
)
low_lim, high_lim = self.phiMotor.get_dynamic_limits()
self.current_centring_procedure = sample_centring.start_plate_1_click(
{
"phi": self.centringPhi,
"phiy": self.centringPhiy,
"sampx": self.centringSamplex,
"sampy": self.centringSampley,
"phiz": self.centringVertical,
"plateTranslation": plateTranslation,
},
self.pixelsPerMmY,
self.pixelsPerMmZ,
beam_pos_x,
beam_pos_y,
cmd_set_plate_vertical,
low_lim + 0.5,
high_lim - 0.5,
)
else:
self.current_centring_procedure = sample_centring.start(
{
"phi": self.centringPhi,
"phiy": self.centringPhiy,
"sampx": self.centringSamplex,
"sampy": self.centringSampley,
"phiz": self.centringPhiz,
},
self.pixelsPerMmY,
self.pixelsPerMmZ,
beam_pos_x,
beam_pos_y,
chi_angle=self.chiAngle,
)
self.current_centring_procedure.link(self.manualCentringDone)
[docs] def interrupt_and_accept_centring(self):
"""Used when plate. Kills the current 1 click centring infinite loop
and accepts fake centring - only save the motor positions
"""
self.current_centring_procedure.kill()
self.do_centring = False
self.start_centring_method(self, self.MANUAL3CLICK_MODE)
self.do_centring = True
[docs] def start_harvester_centring(self, computed_offset: tuple[float]):
"""used when Pin from Harvester"""
phiy_offset, centringFocus, centringTableVertical = computed_offset
motor_pos_dict = {
"kappa": float(
self["HacentringReferencePosition"].get_property("kappa_ref")
),
"kappa_phi": float(
self["HacentringReferencePosition"].get_property("phi_ref")
),
"phi": float(self["HacentringReferencePosition"].get_property("omega_ref")),
"phiy": self.phiyMotor.get_value() + phiy_offset,
}
self.move_motors(motor_pos_dict)
self.centringFocus.set_value_relative(centringFocus, None)
self.centringVertical.set_value_relative(centringTableVertical, None)
def getFrontLightLevel(self):
return self.frontLight.get_value()
def setFrontLightLevel(self, level):
return self.frontLight.set_value(level)
def getBackLightLevel(self):
return self.backLight.get_value()
def setBackLightLevel(self, level):
return self.backLight.set_value(level)
def get_beam_position(self):
return (
self.beam_position_horizontal.get_value(),
self.beam_position_vertical.get_value(),
)
def to_float(d):
for k, v in d.items():
try:
d[k] = float(v)
except Exception:
pass