# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Lesser Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
[Name] Harvester
[Description]
Harvester mockup is use as a replacement of the Dewar sample storage
This hardware object is use in couple with a Sample changer .
and it is compatible with the Crystal Direct Harvester 3.
It has some functionalities, like Harvest Sample, .
[Commands]
- getSampleList : Get list of available sample from Harvester
- Harvest : load sample from Harvester
-----------------------------------------------------------------------
"""
import logging
import gevent
from mxcubecore.BaseHardwareObjects import HardwareObject
[docs]class HarvesterMockup(HardwareObject):
"""Harvester functionality
The Harvester Class consists of methods executing exporter commands
to communicate with the Crystal Direct Harvester Machine
"""
__TYPE__ = "Harvester"
def __init__(self, name):
super().__init__(name)
self.timeout = 3 # default timeout
# Internal variables -----------
self.calibration_state = False
self.room_temperature_mode = False
[docs] def init(self):
"""Init"""
self.exporter_addr = self.get_property("exporter_address")
[docs] def set_calibration_state(self, state):
"""Set Calibration state
Args:
state (bool) : Whether the a calibration was perform
"""
self.calibration_state = state
def _wait_ready(self, timeout=None):
"""Wait Harvester to be ready
Args:
(timeout) : Whether to wait for a amount of time
None means wait forever timeout <=0 use default timeout
"""
if timeout is not None and timeout <= 0:
timeout = self.timeout
err_msg = "Timeout waiting for Harvester to be ready"
with gevent.Timeout(timeout, RuntimeError(err_msg)):
while not self._ready():
logging.getLogger("user_level_log").info(
"Waiting Harvester to be Ready"
)
gevent.sleep(3)
def _wait_sample_transfer_ready(self, timeout=None):
"""Wait Harvester to be ready to transfer a sample
Args:
timeout (second) : Whether to wait for a amount of time
None means wait forever timeout <=0 use default timeout
"""
if timeout is not None and timeout <= 0:
timeout = self.timeout
err_msg = "Timeout waiting for Harvester to be ready to transfer"
with gevent.Timeout(timeout, RuntimeError(err_msg)):
while not self._ready_to_transfer():
logging.getLogger("user_level_log").info(
"Waiting Harvester to be ready to transfer"
)
gevent.sleep(3)
def _execute_cmd_exporter(self, cmd, *args, **kwargs):
"""Exporter Command implementation
Args:
cmd (string) : command type
args, kwargs (string): commands arguments, and command or attribute
return : respond
"""
ret = None
if args:
args_str = "%s" % "\t".join(map(str, args))
if kwargs.pop("command", None):
exp_cmd = self.add_command(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "%s" % cmd,
},
"%s" % cmd,
)
if args:
ret = exp_cmd(args_str)
else:
ret = exp_cmd()
if kwargs.pop("attribute", None):
exp_attr = self.add_channel(
{
"type": "exporter",
"exporter_address": self.exporter_addr,
"name": "%s" % cmd,
},
"%s" % cmd[3:],
)
if cmd.startswith("get"):
return exp_attr.get_value()
if cmd.startswith("set"):
ret = exp_attr.set_value(args_str)
return ret
# ---------------------- State --------------------------------
[docs] def get_state(self):
"""Get the Harvester State
Return (str): state "Ready, Running etc.."
"""
return "Ready"
[docs] def get_status(self):
"""Get the Harvester Status
Return (str): Status
"""
# return self._execute_cmd_exporter("getStatus", attribute=True)
return "Ready"
def _ready(self):
"""check whether the Harvester is READY
Return (bool): True if Harvester is Ready otherwise False
"""
return True
def _busy(self):
"""check whether the Harvester is BUSY
Return (bool): True if Harvester is not Ready otherwise False
"""
return not self._ready()
def _ready_to_transfer(self):
"""check whether the Harvester is Waiting Sample Transfer
Return (bool): True if Harvester is Waiting Sample Transfer otherwise False
"""
# return self._execute_cmd_exporter("getStatus", attribute=True) == "Waiting Sample Transfer"
return "Waiting Sample Transfer"
[docs] def get_samples_state(self):
"""Get the Harvester Samples State
Return (List): list of crystal state "waiting_for_transfer, Running etc.."
"""
# return self._execute_cmd_exporter("getSampleStates", command=True)
return ["ready_to_execute", "needs_repositionning", "failed", "harvested"]
[docs] def get_current_crystal(self):
"""Get the Harvester current harvested crystal
Return (str): the crystal uuid
"""
# return self._execute_cmd_exporter("getCurrentSampleID", attribute=True)
return None
[docs] def is_crystal_harvested(self, crystal_uuid):
"""Check Whether if the current crystal is harvested
args: the crystal uuid
Return (bool): True if the crystal is the current harvested crystal
"""
res = False
in_list = crystal_uuid in self.get_crystal_uuids()
if in_list:
Current_SampleID = self.get_current_crystal()
if crystal_uuid == Current_SampleID:
res = True
return res
[docs] def current_crystal_state(self, crystal_uuid):
"""get current crystal state
Args:
state (str) : Crystal uuid
Return (str): State of the crystal uuid
"""
sample_states = self.get_samples_state()
crystal_uuids = self.get_crystal_uuids()
for index, x_tal in enumerate(crystal_uuids):
if crystal_uuid == x_tal:
return sample_states[index]
return None
[docs] def check_crystal_state(self, crystal_uuid):
"""Check whether if a Crystal is in pending_and_current or not
Args (str) : Crystal uuid
Return (str): status of the crystal_uuid pending / current
"""
sample_states = self.get_samples_state()
crystal_uuids = self.get_crystal_uuids()
for index, x_tal in enumerate(crystal_uuids):
if crystal_uuid == x_tal and sample_states[index] == "waiting_for_transfer":
return "pending_and_current"
elif (
crystal_uuid != x_tal and sample_states[index] == "waiting_for_transfer"
):
return "pending_not_current"
else:
return None
[docs] def get_crystal_uuids(self):
"""Get the Harvester Sample List uuid
Return (List): list of crystal by uuid from the current processing plan"
"""
# harvester_crystal_list = self._execute_cmd_exporter("getSampleList", attribute=True)
return [
"c9ca5e0d-35fd-4ff9-9ad2-4de9fd47fd83",
"94730c39-bf66-416f-ab97-f755e45f6a3a",
"5d5dfc88-009d-4486-8526-33bf58c4d5d9",
"46aca4a4-0c00-4685-b1e1-53071bdcb66d",
]
[docs] def get_sample_names(self):
"""Get the Harvester Sample List Name
Return (List): list of crystal by names from the current processing plan"
"""
# harvester_sample_names = self._execute_cmd_exporter("getSampleNames", attribute=True)
return ["TestSample1", "TestSample2", "TestSample3", "TestSample4"]
[docs] def get_crystal_images_urls(self, crystal_uuid):
"""Get the Harvester Sample List Images
Args (str) : Crystal uuid
Return (List): list of crystal by image_url from current processing plan"
"""
# crystal_images_url = self._execute_cmd_exporter("getImageURL", crystal_uuid, command=True)
return ""
[docs] def get_sample_acronyms(self):
"""Get the Harvester Sample List by Acronyms
Return (List): list of crystal by Acronyms from the current processing plan"
"""
# harvester_sample_acronyms = self._execute_cmd_exporter("getSampleAcronyms", attribute=True)
return ["cryoprotectant", "cryoprotectant", "cryoprotectant", "cryoprotectant"]
# ------------------------------------------------------------------------------------
[docs] def abort(self):
"""Send Abort command
Abort any current Harvester Actions
"""
# return self._execute_cmd_exporter("abort", command=True)
return None
[docs] def harvest_crystal(self, crystal_uuid):
"""Harvester crystal
Args (str) : Crystal uuid
"""
# return self._execute_cmd_exporter("harvestCrystal", crystal_uuid, command=True)
return False
[docs] def transfer_sample(self):
"""Transfer the current Harvested Crystal"""
# return self._execute_cmd_exporter("startTransfer", command=True)
return False
[docs] def trash_sample(self):
"""Trash the current Harvested Crystal"""
# return self._execute_cmd_exporter("trashSample", command=True)
return False
# -----------------------------------------------------------------------------
[docs] def load_plate(self, plate_id):
"""Load a plate from Harvester to MD
Args (str) : Plate ID
"""
# return self._execute_cmd_exporter("loadPlate", plate_id, command=True)
return
[docs] def get_plate_id(self):
"""get current plate ID
Args:
Return (str) : current Plate ID
"""
# return self._execute_cmd_exporter("getPlateID", attribute=True)
return ""
[docs] def get_image_target_x(self, crystal_uuid):
"""Get the crystal images position x
Args (str) : Crystal uuid
Return (float): Crystal x coordinate in plate
"""
# return self._execute_cmd_exporter("getImageTargetX", crystal_uuid, command=True)
return 26.868617890692
[docs] def get_image_target_y(self, crystal_uuid):
"""Get the crystal images position Y
Args (str) : Crystal uuid
Return (float): Crystal Y coordinate in plate
"""
# return self._execute_cmd_exporter("getImageTargetY", crystal_uuid, command=True)
return 50.122652377553
[docs] def get_room_temperature_mode(self):
"""get RoomTemperature Mode state
Args (str) : Crystal uuid
Return (bool): TemperatureMode , True if Room Temp else False
"""
# return self._execute_cmd_exporter("getRoomTemperatureMode", attribute=True)
return True
[docs] def set_room_temperature_mode(self, value):
"""Set Harvester temperature mode
Args: (bool) set room temperature when true
"""
# self._execute_cmd_exporter("setRoomTemperatureMode", value, command=True)
print("setting HA Room temperature to: %s" % value)
return self.get_room_temperature_mode()
# -------------------- Calibrate Drift Shape offset ----------------------------
[docs] def get_last_sample_drift_offset_x(self):
"""Sample Offset X position when drifted
Return (float): last pin drift offset x
"""
# last_sample_drift_offset_x = self._execute_cmd_exporter("getLastSampleDriftOffsetX", attribute=True)
return
[docs] def get_last_sample_drift_offset_y(self):
"""Sample Offset Y position when drifted
Return (float): last pin drift offset y
"""
# last_sample_drift_offset_y = self._execute_cmd_exporter("getLastSampleDriftOffsetY", attribute=True)
return
[docs] def get_last_sample_drift_offset_z(self):
"""Sample Offset Z position when drifted
Return (float): last pin drift offset z
"""
# pin_last_drift_offset_z = self._execute_cmd_exporter("getLastSampleDriftOffsetZ", attribute=True)
return
# ---------------------- Calibrate Cut Shape offset----------------------------
[docs] def get_last_pin_cut_shape_offset_x(self):
"""Pin shape Offset x position when
Return (float): last pin cut shape offset x
"""
# pin_last_cut_shape_offset_x = self._execute_cmd_exporter("getLastSampleCutShapeOffsetX", attribute=True)
return
[docs] def get_last_pin_cut_shape_offset_y(self):
"""Pin shape Offset Y position when
Return (float): last pin cut shape offset y
"""
# pin_last_cut_shape_offset_y = self._execute_cmd_exporter("getLastSampleCutShapeOffsetY", attribute=True)
return
# =============== Pin / Calibration -----------------------------
[docs] def load_calibrated_pin(self):
"""Start Pin Calibration Procedure"""
# return self._execute_cmd_exporter("loadCalibratedPin", command=True)
return
[docs] def store_calibrated_pin(self, x, y, z):
"""Store x , y , z offsets position after calibration procedure
Args: (float) x, y, z offsets
"""
# return self._execute_cmd_exporter("storePinToBeamOffset", x , y , z, command=True)
return
[docs] def get_calibrated_pin_offset(self):
"""Get Stored x , y , z offsets position after calibration procedure
return: (float) x, y, z offsets
"""
# pin_to_beam_offset = self._execute_cmd_exporter("getPinToBeamOffset", command=True)
return
[docs] def get_number_of_available_pin(self):
"""Get number of available pin
return: (Integer)
"""
# return self._execute_cmd_exporter("getNbRemainingPins", command=True)
return 1
def get_offsets_for_sample_centring(self):
pin_to_beam = tuple(self.get_calibrated_pin_offset())
sample_drift_x = float(self.get_last_sample_drift_offset_x())
sample_drift_y = float(self.get_last_sample_drift_offset_y())
sample_drift_z = -float(self.get_last_sample_drift_offset_z())
pin_cut_shape_x = float(self.get_last_pin_cut_shape_offset_x())
pin_cut_shape_y = float(self.get_last_pin_cut_shape_offset_y())
phiy_offset = sample_drift_x - pin_cut_shape_x + float(pin_to_beam[1])
centringFocus = sample_drift_z + float(pin_to_beam[0])
centringTableVertical = sample_drift_y - pin_cut_shape_y + float(pin_to_beam[2])
return (phiy_offset, centringFocus, centringTableVertical)