import copy
import json
import logging
import math
import os
import time
import xml.etree.ElementTree as ET
from typing import Union
import gevent
import numpy
from pydantic import ValidationError
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects import sample_centring
from mxcubecore.HardwareObjects.GenericDiffractometer import GonioHeadConfiguration
from mxcubecore.model import queue_model_objects as qmo
from mxcubecore.TaskUtils import task
[docs]class MiniDiff(HardwareObject):
MANUAL3CLICK_MODE = "Manual 3-click"
CENTRING_METHOD_MANUAL = MANUAL3CLICK_MODE
C3D_MODE = "Computer automatic"
# MOVE_TO_BEAM_MODE = "Move to Beam"
def __init__(self, *args):
super().__init__(*args)
qmo.CentredPosition.set_diffractometer_motor_names(
"phi",
"focus",
"phiz",
"phiy",
"zoom",
"sampx",
"sampy",
"kappa",
"kappa_phi",
)
self.phiMotor = None
self.kappaMotor = None
self.kappaPhiMotor = None
self.phizMotor = None
self.phiyMotor = None
self.lightMotor = None
self.zoomMotor = None
self.sampleXMotor = None
self.sampleYMotor = None
self.lightWago = None
self.currentSampleInfo = None
self.aperture = None
self.cryostream = None
self.beamstop = None
self.capillary = None
self.pixelsPerMmY = None
self.pixelsPerMmZ = None
self.centredTime = 0
self.do_centring = True
self.chiAngle = 0.0
self.connect(self, "equipmentReady", self.equipmentReady)
self.connect(self, "equipmentNotReady", self.equipmentNotReady)
def if_role_set_attr(self, role_name):
obj = self.get_object_by_role(role_name)
if obj is not None:
setattr(self, role_name, obj)
[docs] def init(self):
self.centringMethods = {
MiniDiff.MANUAL3CLICK_MODE: self.start_manual_centring,
MiniDiff.C3D_MODE: self.start_auto_centring,
}
self._run_script = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "runScript",
},
"runScript",
)
sample_centring.NUM_CENTRING_ROUNDS = self.get_property(
"num_centring_rounds", 1
)
self.cancel_centring_methods = {}
self.current_centring_procedure = None
self.currentCentringMethod = None
self.centringStatus = {"valid": False}
self.chiAngle = self.get_property("chi", 0)
try:
phiz_ref = self["centringReferencePosition"].get_property("phiz")
except:
phiz_ref = None
try:
phiy_ref = self["centringReferencePosition"].get_property("phiy")
except:
phiy_ref = None
self.phiMotor = self.get_object_by_role("phi")
self.phizMotor = self.get_object_by_role("phiz")
self.phiyMotor = self.get_object_by_role("phiy")
self.zoomMotor = self.get_object_by_role("zoom")
self.lightMotor = self.get_object_by_role("light")
self.focusMotor = self.get_object_by_role("focus")
self.sampleXMotor = self.get_object_by_role("sampx")
self.sampleYMotor = self.get_object_by_role("sampy")
self.kappaMotor = self.get_object_by_role("kappa")
self.kappaPhiMotor = self.get_object_by_role("kappa_phi")
self.centringPhi = sample_centring.CentringMotor(self.phiMotor, direction=-1)
self.centringPhiz = sample_centring.CentringMotor(
self.phizMotor, reference_position=phiz_ref
)
self.centringPhiy = sample_centring.CentringMotor(
self.phiyMotor, reference_position=phiy_ref
)
self.centringSamplex = sample_centring.CentringMotor(self.sampleXMotor)
self.centringSampley = sample_centring.CentringMotor(self.sampleYMotor)
roles_to_add = ["aperture", "beamstop", "cryostream", "capillary"]
for role in roles_to_add:
self.if_role_set_attr(role)
hwr = HWR.get_hardware_repository()
wl_prop = self.get_property("wagolight")
if wl_prop is not None:
try:
self.lightWago = hwr.get_hardware_object(wl_prop)
except Exception:
self.log.exception("")
if self.phiMotor is not None:
self.connect(self.phiMotor, "stateChanged", self.phiMotorStateChanged)
self.connect(self.phiMotor, "valueChanged", self.emit_diffractometer_moved)
else:
self.log.error(
"MiniDiff: phi motor is not defined in minidiff equipment %s",
str(self.name),
)
if self.phizMotor is not None:
self.connect(self.phizMotor, "stateChanged", self.phizMotorStateChanged)
self.connect(self.phizMotor, "valueChanged", self.phizMotorMoved)
self.connect(self.phizMotor, "valueChanged", self.emit_diffractometer_moved)
else:
self.log.error(
"MiniDiff: phiz motor is not defined in minidiff equipment %s",
str(self.name),
)
if self.phiyMotor is not None:
self.connect(self.phiyMotor, "stateChanged", self.phiyMotorStateChanged)
self.connect(self.phiyMotor, "valueChanged", self.phiyMotorMoved)
self.connect(self.phiyMotor, "valueChanged", self.emit_diffractometer_moved)
else:
self.log.error(
"MiniDiff: phiy motor is not defined in minidiff equipment %s",
str(self.name),
)
if self.zoomMotor is not None:
self.connect(
self.zoomMotor, "valueChanged", self.zoomMotorPredefinedPositionChanged
)
self.connect(
self.zoomMotor,
"predefinedPositionChanged",
self.zoomMotorPredefinedPositionChanged,
)
self.connect(self.zoomMotor, "stateChanged", self.zoomMotorStateChanged)
else:
self.log.error(
"MiniDiff: zoom motor is not defined in minidiff equipment %s",
str(self.name),
)
if self.sampleXMotor is not None:
self.connect(
self.sampleXMotor, "stateChanged", self.sampleXMotorStateChanged
)
self.connect(self.sampleXMotor, "valueChanged", self.sampleXMotorMoved)
self.connect(
self.sampleXMotor, "valueChanged", self.emit_diffractometer_moved
)
else:
self.log.error(
"MiniDiff: sampx motor is not defined in minidiff equipment %s",
str(self.name),
)
if self.sampleYMotor is not None:
self.connect(
self.sampleYMotor, "stateChanged", self.sampleYMotorStateChanged
)
self.connect(self.sampleYMotor, "valueChanged", self.sampleYMotorMoved)
self.connect(
self.sampleYMotor, "valueChanged", self.emit_diffractometer_moved
)
else:
self.log.error(
"MiniDiff: sampx motor is not defined in minidiff equipment %s",
str(self.name),
)
if HWR.beamline.sample_changer is None:
self.log.warning(
"MiniDiff: sample changer is not defined in minidiff equipment %s",
str(self.name),
)
else:
try:
self.connect(
HWR.beamline.sample_changer,
"sampleIsLoaded",
self.sampleChangerSampleIsLoaded,
)
except Exception:
self.log.exception(
"MiniDiff: could not connect to sample changer smart magnet"
)
if self.lightWago is not None:
self.connect(self.lightWago, "wagoStateChanged", self.wagoLightStateChanged)
else:
self.log.warning(
"MiniDiff: wago light is not defined in minidiff equipment %s",
str(self.name),
)
if self.aperture is not None:
self.connect(
self.aperture, "predefinedPositionChanged", self.apertureChanged
)
self.connect(self.aperture, "positionReached", self.apertureChanged)
# Agree on a correct method name, inconsistent arguments for move_to_beam, disabled temporarily
# self.move_to_coord = self.move_to_beam()
def set_rotation_axis_position(self, value: float):
self._set_rotation_axis_position(value, motor_name="phiz")
def _set_rotation_axis_position(self, value: float, motor_name="phiz"):
self.log.info(f"Setting rotation axis ({motor_name}) position to {value}")
fname = self._path
try:
self.log.info(f"Updating {fname}")
tree = ET.parse(fname)
motor_tag = (
tree.getroot()
.findall("centringReferencePosition")[0]
.findall(motor_name)
)
motor_tag.text = str(value)
tree.write(fname)
except:
self.log.info(f"Could not update {fname}")
# raise
else:
self.log.info(f"Wrote {fname}")
if motor_name == "phiz":
self.centringPhiz = sample_centring.CentringMotor(
self.phizMotor, reference_position=value
)
elif motor_name == "phiy":
self.centringPhiy = sample_centring.CentringMotor(
self.phiyMotor, reference_position=value
)
script_name = (
"Change_AlignmentZ" if motor_name == "phiz" else "Change_AlignmentY"
)
try:
self.log.info(f"Setting MD Alignment reference position")
print(f" script name {script_name} value {value}")
self.run_script(f"{script_name}, {value}")
except:
self.log.exception(f"Setting MD Alignment reference position failed")
raise
# Contained Objects
# NBNB Temp[orary hack - should be cleaned up together with configuration
@property
def omega(self):
"""omega motor object
Returns:
AbstractActuator
"""
return self.phiMotor
@property
def kappa(self):
"""kappa motor object
Returns:
AbstractActuator
"""
return self.kappaMotor
@property
def kappa_phi(self):
"""kappa_phi motor object
Returns:
AbstractActuator
"""
return self.kappaPhiMotor
@property
def centring_x(self):
"""centring_x motor object
Returns:
AbstractActuator
"""
return self.sampleXMotor
@property
def centring_y(self):
"""centring_y motor object
Returns:
AbstractActuator
"""
return self.sampleYMotor
@property
def alignment_x(self):
"""alignment_x motor object (also used as graphics.focus)
Returns:
AbstractActuator
"""
return self.focusMotor
@property
def alignment_y(self):
"""alignment_y motor object
Returns:
AbstractActuator
"""
return self.phiyMotor
@property
def alignment_z(self):
"""alignment_z motor object
Returns:
AbstractActuator
"""
return self.phizMotor
@property
def zoom(self):
"""zoom motor object
NBNB HACK TODO - ocnfigure this in graphics object
(which now calls this property)
Returns:
AbstractActuator
"""
return self.zoomMotor
def set_sample_info(self, sample_info):
self.currentSampleInfo = sample_info
def emit_diffractometer_moved(self, *args):
self.emit("diffractometerMoved", ())
[docs] def is_ready(self):
return self.get_state() and self.get_state().name == "READY"
def is_valid(self):
return (
self.sampleXMotor is not None
and self.sampleYMotor is not None
and self.zoomMotor is not None
and self.phiMotor is not None
and self.phizMotor is not None
and self.phiyMotor is not None
and HWR.beamline.sample_view.camera is not None
)
@property
def in_plate_mode(self):
return False
def apertureChanged(self, *args):
# will trigger minidiffReady signal for update of beam size in video
self.equipmentReady()
def equipmentReady(self):
self.emit("minidiffReady", ())
def equipmentNotReady(self):
self.emit("minidiffNotReady", ())
def wagoLightStateChanged(self, state):
pass
def phiMotorStateChanged(self, state):
self.emit("phiMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def phizMotorStateChanged(self, state):
self.emit("phizMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def phiyMotorStateChanged(self, state):
self.emit("phiyMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def getCalibrationData(self, offset):
if self.zoomMotor is not None:
for position in self.zoomMotor.get_property("positions", ()):
if abs(position["offset"] - offset) <= self.zoomMotor.delta:
calibrationData = position["calibrationData"]
return (
float(calibrationData.pixelsPerMmY) or 0,
float(calibrationData.pixelsPerMmZ) or 0,
)
return (None, None)
def get_pixels_per_mm(self):
self.pixelsPerMmY, self.pixelsPerMmZ = self.getCalibrationData(None)
return self.pixelsPerMmY, self.pixelsPerMmZ
def getBeamInfo(self, callback=None):
beam_info = HWR.beamline.beam.get_beam_info()
if callable(callback):
callback(beam_info)
return beam_info
def zoomMotorPredefinedPositionChanged(self, positionName, offset=None):
if not positionName:
return
self.pixelsPerMmY, self.pixelsPerMmZ = self.getCalibrationData(offset)
self.emit("zoomMotorPredefinedPositionChanged", (positionName, offset))
def zoomMotorStateChanged(self, state):
HWR.beamline.beam.re_emit_values()
self.emit("zoomMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def sampleXMotorStateChanged(self, state):
self.emit("sampxMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def sampleYMotorStateChanged(self, state):
self.emit("sampyMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def invalidateCentring(self):
if self.current_centring_procedure is None and self.centringStatus["valid"]:
self.centringStatus = {"valid": False}
self.emitProgressMessage("")
self.emit("centringInvalid", ())
def phizMotorMoved(self, pos):
if time.time() - self.centredTime > 1.0:
self.invalidateCentring()
def phiyMotorMoved(self, pos):
if time.time() - self.centredTime > 1.0:
self.invalidateCentring()
def sampleXMotorMoved(self, pos):
if time.time() - self.centredTime > 1.0:
self.invalidateCentring()
def sampleYMotorMoved(self, pos):
if time.time() - self.centredTime > 1.0:
self.invalidateCentring()
def sampleChangerSampleIsLoaded(self, state):
if time.time() - self.centredTime > 1.0:
self.invalidateCentring()
def _move_to_beam_chip(self, x, y):
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
dx = (x - beam_pos_x) / self.pixelsPerMmY
dy = (y - beam_pos_y) / self.pixelsPerMmZ
vertical_motor = self.get_object_by_role("phiy")
horizontal_motor = self.get_object_by_role("ssx_translation")
try:
vertical_motor.set_value_relative(dy)
horizontal_motor.set_value_relative(dx)
except Exception:
logging.getLogger("HWR").exception(
"MiniDiff: could not center to beam, aborting"
)
def _move_to_beam(self, x, y):
self.pixelsPerMmY, self.pixelsPerMmZ = self.getCalibrationData(
self.zoomMotor.get_value()
)
if None in (self.pixelsPerMmY, self.pixelsPerMmZ):
return 0, 0
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
dx = (x - beam_pos_x) / self.pixelsPerMmY
dy = (y - beam_pos_y) / self.pixelsPerMmZ
phi_angle = math.radians(
self.centringPhi.direction * self.centringPhi.get_value()
)
sampx = -self.centringSamplex.direction * self.centringSamplex.get_value()
sampy = self.centringSampley.direction * self.centringSampley.get_value()
phiy = self.centringPhiy.direction * self.centringPhiy.get_value()
phiz = self.centringPhiz.direction * self.centringPhiz.get_value()
rotMatrix = numpy.matrix(
[
[math.cos(phi_angle), -math.sin(phi_angle)],
[math.sin(phi_angle), math.cos(phi_angle)],
]
)
invRotMatrix = numpy.array(rotMatrix.I)
dsampx, dsampy = numpy.dot(numpy.array([0, dy]), invRotMatrix)
chi_angle = math.radians(self.chiAngle)
chiRot = numpy.matrix(
[
[math.cos(chi_angle), -math.sin(chi_angle)],
[math.sin(chi_angle), math.cos(chi_angle)],
]
)
sx, sy = numpy.dot(numpy.array([dsampx, dsampy]), numpy.array(chiRot))
sampx = sampx + sx
sampy = sampy + sy
phiy = phiy + dx
try:
self.centringSamplex.set_value(-sampx)
self.centringSampley.set_value(sampy)
self.centringPhiy.set_value(-phiy)
except Exception:
self.log.exception("MiniDiff: could not center to beam, aborting")
def _move_to_beam_plate(self, x, y):
try:
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
self.centringVertical.set_value_relative(
self.centringPhiz.direction
* (y - beam_pos_y)
/ float(self.pixelsPerMmZ)
)
self.centringPhiy.set_value_relative(
self.centringPhiy.direction
* (x - beam_pos_x)
/ float(self.pixelsPerMmY)
)
except Exception:
logging.getLogger("user_level_log").exception(
"Microdiff: could not move to beam, aborting"
)
def move_to_beam(self, x, y):
if self.get_head_configuration():
self._move_to_beam_chip(x, y)
elif self.in_plate_mode():
self._move_to_beam_plate(x, y)
else:
self._move_to_beam(x, y)
def getAvailableCentringMethods(self):
return self.centringMethods.keys()
def run_custom_centring_script(self, method, sample_info):
try:
if method == "Manual 3-click":
self.wait_ready(30)
fun = self.centringMethods[method]
else:
time.sleep(0.5)
self.wait_ready(60)
self.log.info("Using MD script for sample centring")
self.run_script("sample_centring")
time.sleep(0.5)
self.wait_ready(120)
# if the centring fails move to the next sample
try:
res_centring = self.get_last_task_info()
if (
res_centring[0].endswith("sample_centring.java")
and res_centring[6] == "-1"
):
self.log.exception("MiniDiff: problem while centring")
self.emitCentringFailed()
else:
self.log.info(
"MiniDiff: centring went fine with %s" % str(res_centring)
)
except:
self.log.exception("MD script for sample centring had a problem")
except KeyError as diag:
self.log.error("MiniDiff: unknown centring method (%s)" % str(diag))
self.emitCentringFailed()
else:
try:
if method == "Manual 3-click":
fun(sample_info)
else:
pass
except Exception:
self.log.exception("MiniDiff: problem while centring")
self.emitCentringFailed()
def run_standard_centring(self, method, sample_info):
self.emitCentringStarted(method)
try:
self.wait_ready(30)
fun = self.centringMethods[method]
except KeyError as diag:
self.log.error("MiniDiff: unknown centring method (%s)" % str(diag))
self.emitCentringFailed()
else:
try:
fun(sample_info)
except Exception:
self.log.exception("MiniDiff: problem while centring")
self.emitCentringFailed()
def start_centring_method(self, method, sample_info=None):
if not self.do_centring:
self.emitCentringStarted(method)
def fake_centring_procedure():
return {"motors": {}, "method": method, "valid": True}
self.current_centring_procedure = gevent.spawn(fake_centring_procedure)
self.emitCentringSuccessful()
return
if self.current_centring_procedure is not None:
self.log.error(
"MiniDiff: already in centring method %s" % self.currentCentringMethod
)
return
curr_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.centringStatus = {"valid": False, "startTime": curr_time}
_use_custom = self.get_property("use_custom_centring_script", False)
if _use_custom:
self.run_custom_centring_script(method, sample_info)
else:
self.run_standard_centring(method, sample_info)
def cancel_centring_method(self, reject=False):
if self.current_centring_procedure is not None:
try:
self.current_centring_procedure.kill(block=True)
except Exception:
self.log.exception("MiniDiff: problem aborting the centring method")
self.log.exception("MiniDiff: Centring canceled")
try:
fun = self.cancel_centring_methods[self.currentCentringMethod]
except KeyError as diag:
self.emitCentringFailed()
else:
try:
fun()
except Exception:
self.emitCentringFailed()
else:
self.emitCentringFailed()
self.emitProgressMessage("")
if reject:
self.reject_centring()
self.wait_ready(30)
def currentCentringMethod(self):
return self.currentCentringMethod
def save_current_position(self):
self.centringStatus["motors"] = self.get_positions()
self.accept_centring()
def start_manual_centring(self, sample_info=None):
self.log.info("Starting centring procedure ...")
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
self.wait_ready(5)
self.current_centring_procedure = sample_centring.start(
{
"phi": self.centringPhi,
"phiy": self.centringPhiy,
"sampx": self.centringSamplex,
"sampy": self.centringSampley,
"phiz": self.centringPhiz,
},
self.pixelsPerMmY,
self.pixelsPerMmZ,
beam_pos_x,
beam_pos_y,
chi_angle=self.chiAngle,
)
self.current_centring_procedure.link(self.manualCentringDone)
def motor_positions_to_screen(self, centred_positions_dict):
self.pixelsPerMmY, self.pixelsPerMmZ = self.getCalibrationData(
self.zoomMotor.get_value()
)
if None in (self.pixelsPerMmY, self.pixelsPerMmZ):
return 0, 0
phi_angle = math.radians(
self.centringPhi.direction * self.centringPhi.get_value()
)
sampx = self.centringSamplex.direction * (
centred_positions_dict["sampx"] - self.centringSamplex.get_value()
)
sampy = self.centringSampley.direction * (
centred_positions_dict["sampy"] - self.centringSampley.get_value()
)
phiy = self.centringPhiy.direction * (
centred_positions_dict["phiy"] - self.centringPhiy.get_value()
)
phiz = self.centringPhiz.direction * (
centred_positions_dict["phiz"] - self.centringPhiz.get_value()
)
rotMatrix = numpy.matrix(
[
math.cos(phi_angle),
-math.sin(phi_angle),
math.sin(phi_angle),
math.cos(phi_angle),
]
)
rotMatrix.shape = (2, 2)
invRotMatrix = numpy.array(rotMatrix.I)
dsx, dsy = (
numpy.dot(numpy.array([sampx, sampy]), invRotMatrix) * self.pixelsPerMmY
)
chi_angle = math.radians(self.chiAngle)
chiRot = numpy.matrix(
[
math.cos(chi_angle),
-math.sin(chi_angle),
math.sin(chi_angle),
math.cos(chi_angle),
]
)
chiRot.shape = (2, 2)
sx, sy = numpy.dot(numpy.array([0, dsy]), numpy.array(chiRot)) # .I))
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
x = sx + (phiy * self.pixelsPerMmY) + beam_pos_x
y = sy + (phiz * self.pixelsPerMmZ) + beam_pos_y
return float(x), float(y)
def get_centred_point_from_coord(self, x, y, return_by_names=None):
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
dx = (x - beam_pos_x) / self.pixelsPerMmY
dy = (y - beam_pos_y) / self.pixelsPerMmZ
self.pixelsPerMmY, self.pixelsPerMmZ = self.getCalibrationData(
self.zoomMotor.get_value()
)
if None in (self.pixelsPerMmY, self.pixelsPerMmZ):
return 0, 0
phi_angle = math.radians(self.centringPhi.get_value())
sampx = self.centringSamplex.get_value()
sampy = self.centringSampley.get_value()
phiy = self.centringPhiy.get_value()
phiz = self.centringPhiz.get_value()
rotMatrix = numpy.matrix(
[
math.cos(phi_angle),
-math.sin(phi_angle),
math.sin(phi_angle),
math.cos(phi_angle),
]
)
rotMatrix.shape = (2, 2)
invRotMatrix = numpy.array(rotMatrix.I)
dsampx, dsampy = numpy.dot(numpy.array([0, dy]), invRotMatrix)
sampx = sampx + dsampx
sampy = sampy + dsampy
phiy = phiy - dx
# chi_angle = math.radians(self.chiAngle)
# chiRot = numpy.matrix([math.cos(chi_angle), -math.sin(chi_angle),
# math.sin(chi_angle), math.cos(chi_angle)])
# chiRot.shape = (2,2)
# sx, sy = numpy.dot(numpy.array([0, dsy]),
# numpy.array(chiRot)) ))
return {
"phi": self.centringPhi.get_value(),
"phiz": float(phiz),
"phiy": float(phiy),
"sampx": float(sampx),
"sampy": float(sampy),
}
def manualCentringDone(self, manual_centring_procedure):
try:
motor_pos = manual_centring_procedure.get()
if isinstance(motor_pos, gevent.GreenletExit):
raise motor_pos
except Exception:
logging.exception("Could not complete manual centring")
self.emitCentringFailed()
else:
self.emitProgressMessage("Moving sample to centred position...")
self.emitCentringMoving()
try:
sample_centring.end()
except Exception:
logging.exception("Could not move to centred position")
self.emitCentringFailed()
# logging.info("EMITTING CENTRING SUCCESSFUL")
self.centredTime = time.time()
self.emitCentringSuccessful()
self.emitProgressMessage("")
def autoCentringDone(self, auto_centring_procedure):
self.emitProgressMessage("")
self.emit("newAutomaticCentringPoint", (-1, -1))
try:
res = auto_centring_procedure.get()
except Exception:
logging.error("Could not complete automatic centring")
logging.getLogger("user_level_log").info("Automatic loop centring failed")
self.emitCentringFailed()
self.reject_centring()
else:
if res is None:
logging.error("Could not complete automatic centring")
logging.getLogger("user_level_log").info(
"Automatic loop centring failed"
)
self.emitCentringFailed()
self.reject_centring()
else:
self.emitCentringSuccessful()
self.accept_centring()
logging.getLogger("user_level_log").info(
"Automatic loop centring successful"
)
def start_auto_centring(self, sample_info=None, loop_only=False):
beam_pos_x, beam_pos_y = HWR.beamline.beam.get_beam_position_on_screen()
self.set_phase("centring", wait=True)
self.wait_ready(30)
self.current_centring_procedure = sample_centring.start_auto(
HWR.beamline.sample_view,
{
"phi": self.centringPhi,
"phiy": self.centringPhiy,
"sampx": self.centringSamplex,
"sampy": self.centringSampley,
"phiz": self.centringPhiz,
},
self.pixelsPerMmY,
self.pixelsPerMmZ,
beam_pos_x,
beam_pos_y,
chi_angle=float(self.chiAngle),
msg_cb=self.emitProgressMessage,
new_point_cb=lambda point: self.emit("newAutomaticCentringPoint", point),
)
self.current_centring_procedure.link(self.autoCentringDone)
self.emitProgressMessage("Starting automatic centring procedure...")
@task
def moveToCentredPosition(self, centred_position):
return self.move_motors(centred_position.as_dict())
def image_clicked(self, x, y):
logging.getLogger("user_level_log").info(
"Centring click at, x: %s, y: %s" % (int(x), int(y))
)
sample_centring.user_click(x, y, True)
def emitCentringStarted(self, method):
self.currentCentringMethod = method
self.emit("centringStarted", (method, False))
logging.getLogger("user_level_log").info("Starting centring")
def accept_centring(self):
self.save_centring_positions()
self.centringStatus["valid"] = True
self.centringStatus["accepted"] = True
self.emit("centringAccepted", (True, self.get_centring_status()))
# save position in MD2 software
self.save_centring_positions()
self.log.info("DEBUG %s" % self.get_centring_status())
logging.getLogger("user_level_log").info("Centring successful")
def reject_centring(self):
if self.current_centring_procedure:
self.current_centring_procedure.kill()
self.centringStatus = {"valid": False}
self.emitProgressMessage("")
self.emit("centringAccepted", (False, self.get_centring_status()))
logging.getLogger("user_level_log").info("Centring cancelled")
def emitCentringMoving(self):
self.emit("centringMoving", ())
def emitCentringFailed(self):
self.centringStatus = {"valid": False}
method = self.currentCentringMethod
self.currentCentringMethod = None
self.current_centring_procedure = None
self.emit("centringFailed", (method, self.get_centring_status()))
def emitCentringSuccessful(self):
if self.current_centring_procedure is not None:
curr_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.centringStatus["endTime"] = curr_time
self.centringStatus["motors"] = self.get_positions()
centred_pos = self.current_centring_procedure.get()
for role in self.centringStatus["motors"]:
motor = self.get_object_by_role(role)
try:
self.centringStatus["motors"][role] = centred_pos[motor]
except KeyError:
continue
self.centringStatus["method"] = self.currentCentringMethod
self.centringStatus["valid"] = True
method = self.currentCentringMethod
self.emit("centringSuccessful", (method, self.get_centring_status()))
self.currentCentringMethod = None
self.current_centring_procedure = None
else:
self.log.debug(
"MiniDiff: trying to emit centringSuccessful outside of a centring"
)
def emitProgressMessage(self, msg=None):
# self.log.debug("%s: %s", self.name, msg)
self.emit("progressMessage", (msg,))
def get_centring_status(self):
return copy.deepcopy(self.centringStatus)
def get_positions(self):
return {
"phi": float(self.phiMotor.get_value()),
"focus": float(self.focusMotor.get_value()),
"phiy": float(self.phiyMotor.get_value()),
"phiz": float(self.phizMotor.get_value()),
"sampx": float(self.sampleXMotor.get_value()),
"sampy": float(self.sampleYMotor.get_value()),
"kappa": float(self.kappaMotor.get_value()) if self.kappaMotor else None,
"kappa_phi": (
float(self.kappaPhiMotor.get_value()) if self.kappaPhiMotor else None
),
"zoom": float(self.zoomMotor.get_value()),
}
def move_motors(self, roles_positions_dict):
motor = {
"phi": self.phiMotor,
"focus": self.focusMotor,
"phiy": self.phiyMotor,
"phiz": self.phizMotor,
"sampx": self.sampleXMotor,
"sampy": self.sampleYMotor,
"kappa": self.kappaMotor,
"kappa_phi": self.kappaPhiMotor,
"zoom": self.zoomMotor,
}
for role, pos in roles_positions_dict.items():
m = motor.get(role)
if None not in (m, pos):
m.set_value(pos)
# TODO: remove this sleep, the motors states should
# be MOVING since the beginning (or READY if move is
# already finished)
time.sleep(1)
while not all(
[m.get_state() == m.READY for m in motor.values() if m is not None]
):
time.sleep(0.1)
def take_snapshot(self, image_path_list: list) -> None:
if len(image_path_list) > 0 and self.get_current_phase() != "Centring":
use_custom_snapshot_routine = self.get_property(
"custom_snapshot_script_dir", False
)
if not use_custom_snapshot_routine:
self.set_phase("Centring", wait=True, timeout=200)
for image_path in image_path_list:
snapshot_index = image_path_list.index(image_path)
logging.getLogger("user_level_log").info(
f"Taking {snapshot_index + 1} sample snapshot(s)"
)
HWR.beamline.sample_view.save_snapshot(path=image_path)
# do not move 90 degrees if not needed
if not self.in_plate_mode and snapshot_index < len(image_path_list) - 1:
self.phiMotor.set_value_relative(90, timeout=5)
def snapshotsDone(self, snapshotsProcedure):
HWR.beamline.sample_view.camera.forceUpdate = False
try:
self.centringStatus["images"] = snapshotsProcedure.get()
except Exception:
self.log.exception("MiniDiff: could not take crystal snapshots")
self.emit("centringSnapshots", (False,))
self.emitProgressMessage("")
else:
self.emit("centringSnapshots", (True,))
self.emitProgressMessage("")
self.emitProgressMessage("Sample is centred!")
# self.emit('centringAccepted', (True,self.get_centring_status()))
def simulateAutoCentring(self, sample_info=None):
pass
[docs] def wait_ready(self, timeout=None):
pass
def get_head_configuration(self) -> Union[GonioHeadConfiguration, None]:
chip_def_fpath = self.get_property("chip_definition_file", "")
chip_def_fpath = HWR.get_hardware_repository().find_in_repository(
chip_def_fpath
)
data = None
if chip_def_fpath and os.path.isfile(chip_def_fpath):
with open(chip_def_fpath, "r") as _f:
chip_def = json.load(_f)
try:
data = GonioHeadConfiguration(**chip_def)
except ValidationError:
self.log.exception("Validation error in %s" % chip_def_fpath)
return data
def set_head_configuration(self, str_data: str) -> None:
data = json.loads(str_data)
chip_def_fpath = self.get_property("chip_definition_file", "")
chip_def_fpath = HWR.get_hardware_repository().find_in_repository(
chip_def_fpath
)
if chip_def_fpath and os.path.isfile(chip_def_fpath):
with open(chip_def_fpath, "w+") as _f:
try:
GonioHeadConfiguration(**data)
except ValidationError:
self.log.exception("Validation error in %s" % chip_def_fpath)
_f.write(json.dumps(data, indent=4))
def set_chip_layout(self, layout_name: str) -> True:
data = self.get_head_configuration().dict()
data["current"] = layout_name
self.set_head_configuration(json.dumps(data))
def run_script(self, script_cmd, wait=True):
self._run_script(script_cmd)
if wait:
self._wait_ready(300)