Source code for mxcubecore.HardwareObjects.Harvester

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE.  If not, see <http://www.gnu.org/licenses/>.

"""
[Name] Harvester

[Description]
Harvester is use as a replacement of the Dewar sample storage
This hardware object is use in couple with a Sample changer.
Sample changer load the sample from Harvester instead of dewar.
It is compatible with the Crystal Direct Harvester 3.
It has some functionalities, like Harvest Sample, etc....

[Commands]

 - getSampleList : Get list of available sample from Harvester
 - Harvest : Harvest sample make it ready to load

 [Example xml file:]
    <object class="Harvester">
        <username>harvester</username>
        <exporter_address>wid30harvest:9001</exporter_address>
    </object>
-----------------------------------------------------------------
"""

from __future__ import annotations

import logging
from typing import List, Optional, Union

import gevent

from mxcubecore.BaseHardwareObjects import HardwareObject


[docs]class HarvesterState: """ Enumeration of Harvester states """ Unknown = 0 Initializing = 1 Ready = 2 Harvested = 3 Running = 4 Harvesting = 5 ContinueHarvesting = 6 # Disabled = 7 # Running = 8 # StandBy = 9 # Alarm = 10 # Fault = 11 STATE_DESC = { Initializing: "Initializing", Ready: "Ready", Harvested: "Waiting Sample Transfer", Running: "Running", Harvesting: "Harvesting 1 Crystals", ContinueHarvesting: "Finishing Harvesting", } @staticmethod def tostring(state): return HarvesterState.STATE_DESC.get(state, "Unknown")
[docs]class Harvester(HardwareObject): """ Harvester functionality The Harvester Class consists of methods that execute exporter commands this class communicate with the Crystal Direct Harvester Machine """ __TYPE__ = "Harvester" def __init__(self, name): super().__init__(name) self.timeout = 600 # default timeout # Internal variables ----------- self.calibration_state = False
[docs] def init(self): """Init""" self.exporter_addr = self.get_property("exporter_address") self.crims_upload_url = self.get_property("crims_upload_url") self.crims_upload_key = self.get_property("crims_upload_key")
[docs] def set_calibration_state(self, state: bool): """Set Calibration state Args: state (bool) : Whether a calibration procedure is on going """ self.calibration_state = state
def _wait_ready(self, timeout: Union[float, None] = None): """Wait Harvester to be ready Args: (timeout) : Whether to wait for a amount of time timeout is None wait forever, timeout <=0 use default timeout """ if timeout is not None and timeout <= 0: timeout = self.timeout err_msg = "Timeout waiting for Harvester to be ready" with gevent.Timeout(timeout, RuntimeError(err_msg)): while not self._ready(): logging.getLogger("user_level_log").info( "Waiting Harvester to be Ready" ) gevent.sleep(3) def _wait_sample_transfer_ready(self, timeout: Union[float, None] = None): """Wait Harvester to be ready to transfer a sample Args: timeout (second) : Whether to wait for a amount of time timeout is None wait forever, timeout <=0 use default timeout """ if timeout is not None and timeout <= 0: timeout = self.timeout err_msg = "Timeout waiting for Harvester to be ready to transfer" try: with gevent.Timeout(timeout, RuntimeError(err_msg)): while not self._ready_to_transfer(): logging.getLogger("user_level_log").info( "Waiting Harvester to be ready to transfer for 10 minutes" ) gevent.sleep(3) except RuntimeError as exc: # In case of timeout we as abort, park and trash self.abort() self.park() self._wait_ready(self.timeout) logging.getLogger("user_level_log").info("Trash current Sample") self.trash_sample() raise RuntimeError("Harvester failed to become ready in time") from exc def _execute_cmd_exporter(self, cmd, *args, **kwargs): """Exporter Command implementation Args: cmd (string) : command type args, kwargs (string): commands arguments, and command or attribute return : respond """ ret = None if args: args_str = "%s" % "\t".join(map(str, args)) if kwargs.pop("command", None): exp_cmd = self.add_command( { "type": "exporter", "exporter_address": self.exporter_addr, "name": "%s" % cmd, }, "%s" % cmd, ) if args: ret = exp_cmd(args_str) else: ret = exp_cmd() if kwargs.pop("attribute", None): exp_attr = self.add_channel( { "type": "exporter", "exporter_address": self.exporter_addr, "name": "%s" % cmd, }, "%s" % cmd[3:], ) if cmd.startswith("get"): return exp_attr.get_value() if cmd.startswith("set"): ret = exp_attr.set_value(args_str) return ret # ---------------------- State --------------------------------
[docs] def get_state(self) -> str: """Get the Harvester State Return (str): state "Ready, Running etc.." """ return self._execute_cmd_exporter("getState", attribute=True)
[docs] def get_status(self) -> str: """Get the Harvester Status Return (str): Status """ return self._execute_cmd_exporter("getStatus", attribute=True)
def _ready(self) -> str: """check whether the Harvester is READY Return (bool): True if Harvester is Ready otherwise False """ return self._execute_cmd_exporter("getState", attribute=True) == "Ready" def _busy(self) -> bool: """check whether the Harvester is BUSY Return (bool): True if Harvester is not Ready otherwise False """ return self._execute_cmd_exporter("getState", attribute=True) != "Ready" def _ready_to_transfer(self) -> bool: """check whether the Harvester is Waiting Sample Transfer Return (bool): True if Harvester is Waiting Sample Transfer otherwise False """ return ( self._execute_cmd_exporter("getStatus", attribute=True) == "Waiting Sample Transfer" )
[docs] def get_samples_state(self) -> List[str]: """Get the Harvester Samples State Return (List): list of crystal state "waiting_for_transfer, Running etc.." """ return self._execute_cmd_exporter("getSampleStates", attribute=True)
[docs] def get_current_crystal(self) -> str: """Get the Harvester current harvested crystal Return (str): the crystal uuid """ return self._execute_cmd_exporter("getCurrentSampleID", attribute=True)
[docs] def is_crystal_harvested(self, crystal_uuid: str) -> str: """Check Whether if the current crystal is harvested args: the crystal uuid Return (bool): True if the crystal is the current harvested crystal """ res = False in_list = crystal_uuid in self.get_crystal_uuids() if in_list: Current_SampleID = self.get_current_crystal() if crystal_uuid == Current_SampleID: res = True return res
[docs] def current_crystal_state(self, crystal_uuid: str) -> str: """get current crystal state Args: state (str) : Crystal uuid Return (str): State of the crystal uuid """ sample_states = self.get_samples_state() crystal_uuids = self.get_crystal_uuids() for index, x_tal in enumerate(crystal_uuids): if crystal_uuid == x_tal: return sample_states[index] return None
[docs] def check_crystal_state(self, crystal_uuid: str) -> Optional[str]: """Check whether if a Crystal is in pending_and_current or not Args (str) : Crystal uuid Return (str): status of the crystal_uuid pending / current """ sample_states = self.get_samples_state() crystal_uuids = self.get_crystal_uuids() for index, x_tal in enumerate(crystal_uuids): if crystal_uuid == x_tal and sample_states[index] == "waiting_for_transfer": return "pending_and_current" elif ( crystal_uuid != x_tal and sample_states[index] == "waiting_for_transfer" ): return "pending_not_current" else: return None
[docs] def get_crystal_uuids(self) -> List[str]: """Get the Harvester Sample List uuid Return (List): list of crystal by uuid from the current processing plan" """ harvester_crystal_list = self._execute_cmd_exporter( "getSampleList", attribute=True ) return harvester_crystal_list
[docs] def get_sample_names(self) -> List[str]: """Get the Harvester Sample List Name Return (List): list of crystal by names from the current processing plan" """ harvester_sample_names = self._execute_cmd_exporter( "getSampleNames", attribute=True ) return harvester_sample_names
[docs] def get_crystal_images_urls(self, crystal_uuid: str) -> str: """Get the Harvester Sample List Images Args (str) : Crystal uuid Return (str): image_url from current processing plan for current crystal uuid" """ crystal_images_url = self._execute_cmd_exporter( "getImageURL", crystal_uuid, command=True ) return crystal_images_url
[docs] def get_sample_acronyms(self) -> List[str]: """Get the Harvester Sample List by Acronyms Return (List): list of crystal by Acronyms from the current processing plan" """ harvester_sample_acronyms = self._execute_cmd_exporter( "getSampleAcronyms", attribute=True ) return harvester_sample_acronyms
# ------------------------------------------------------------------------------------
[docs] def abort(self) -> str: """Send Abort command Abort any current Harvester Actions """ return self._execute_cmd_exporter("abort", command=True)
[docs] def park(self) -> str: """Send Park command Park Harvester Actions """ return self._execute_cmd_exporter("park", command=True)
[docs] def harvest_crystal(self, crystal_uuid: str) -> str: """Harvester crystal Args (str) : Crystal uuid """ try: self._execute_cmd_exporter("harvestCrystal", crystal_uuid, command=True) return "Crystal Harvested properly" except Exception: logging.getLogger("user_level_log").warning( f"Warning: Could not harvest sample: {crystal_uuid}" ) return "Could not Harvest Crystal"
[docs] def transfer_sample(self) -> None: """Transfer the current Harvested Crystal""" return self._execute_cmd_exporter("startTransfer", command=True)
[docs] def trash_sample(self): """Trash the current Harvested Crystal""" return self._execute_cmd_exporter("trashSample", command=True)
# -----------------------------------------------------------------------------
[docs] def load_plate(self, plate_id: str) -> str: """Change Harvester current plate Args (str) : Plate ID Return (str) : current Plate ID """ self._execute_cmd_exporter("loadPlate", plate_id, command=True) return self._execute_cmd_exporter("getPlateID", attribute=True)
[docs] def get_plate_id(self) -> str: """get current plate ID Args: Return (str) : current Plate ID """ return self._execute_cmd_exporter("getPlateID", attribute=True)
[docs] def get_image_target_x(self, crystal_uuid: str) -> float: """Get the crystal images position x Args (str) : Crystal uuid Return (float): Crystal x coordinate in plate """ return self._execute_cmd_exporter("getImageTargetX", crystal_uuid, command=True)
[docs] def get_image_target_y(self, crystal_uuid: str) -> float: """Get the crystal images position Y Args (str) : Crystal uuid Return (float): Crystal Y coordinate in plate """ return self._execute_cmd_exporter("getImageTargetY", crystal_uuid, command=True)
[docs] def get_room_temperature_mode(self) -> bool: """get RoomTemperature Mode state Args (str) : Crystal uuid Return (bool): TemperatureMode , True if Room Temp else False """ return self._execute_cmd_exporter("getRoomTemperatureMode", attribute=True)
[docs] def set_room_temperature_mode(self, value: bool) -> bool: """Set Harvester temperature mode Args: (bool) set room temperature when true Return (bool): TemperatureMode """ self._execute_cmd_exporter("setRoomTemperatureMode", value, command=True) print("setting HA Room temperature to: %s" % value) return self.get_room_temperature_mode()
# -------------------- Calibrate Drift Shape offset ----------------------------
[docs] def get_last_sample_drift_offset_x(self) -> float: """Sample Offset X position when drifted Return (float): last pin drift offset x """ last_sample_drift_offset_x = self._execute_cmd_exporter( "getLastSampleDriftOffsetX", attribute=True ) return last_sample_drift_offset_x
[docs] def get_last_sample_drift_offset_y(self) -> float: """Sample Offset Y position when drifted Return (float): last pin drift offset y """ last_sample_drift_offset_y = self._execute_cmd_exporter( "getLastSampleDriftOffsetY", attribute=True ) return last_sample_drift_offset_y
[docs] def get_last_sample_drift_offset_z(self) -> float: """Sample Offset Z position when drifted Return (float): last pin drift offset z """ pin_last_drift_offset_z = self._execute_cmd_exporter( "getLastSampleDriftOffsetZ", attribute=True ) return pin_last_drift_offset_z
# ---------------------- Calibrate Cut Shape offset----------------------------
[docs] def get_last_pin_cut_shape_offset_x(self) -> float: """Pin shape Offset x position Return (float): last pin cut shape offset x """ pin_last_cut_shape_offset_x = self._execute_cmd_exporter( "getLastSampleCutShapeOffsetX", attribute=True ) return pin_last_cut_shape_offset_x
[docs] def get_last_pin_cut_shape_offset_y(self) -> float: """Pin shape Offset Y position Return (float): last pin cut shape offset y """ pin_last_cut_shape_offset_y = self._execute_cmd_exporter( "getLastSampleCutShapeOffsetY", attribute=True ) return pin_last_cut_shape_offset_y
# =============== Pin / Calibration -----------------------------
[docs] def load_calibrated_pin(self) -> None: """Load a Pin for Calibration Procedure""" return self._execute_cmd_exporter("loadCalibratedPin", command=True)
[docs] def store_calibrated_pin(self, x: float, y: float, z: float) -> None: """Store x , y , z offsets position to crystal direct machine after calibration procedure Args: (float) x, y, z offsets """ return self._execute_cmd_exporter("storePinToBeamOffset", x, y, z, command=True)
[docs] def get_calibrated_pin_offset(self) -> tuple[float]: """Get Stored x , y , z offsets position after calibration procedure return: (float) x, y, z offsets """ pin_to_beam_offset = self._execute_cmd_exporter( "getPinToBeamOffset", command=True ) return pin_to_beam_offset
[docs] def get_number_of_available_pin(self) -> int: """Get number of available pin return: (Integer) """ return self._execute_cmd_exporter("getNbRemainingPins", command=True)
[docs] def queue_harvest_sample( self, sample_loc_str, sample_uuid: str, current_queue_list: list[str] ) -> bool: """ While queue execution send harvest request current_queue_list : a build representation of the queue """ harvest_res = False current_queue_index = None try: current_queue_index = current_queue_list.index(sample_loc_str) except (ValueError, IndexError): current_queue_index = None wait_before_load = not self.get_room_temperature_mode() if self.get_number_of_available_pin() > 0: gevent.sleep(2) if current_queue_index == 0: logging.getLogger("user_level_log").info("Harvesting First Sample") harvest_res = self.harvest_sample_before_mount( sample_uuid, wait_before_load ) if harvest_res is False: # if sample could not be Harvest, but no exception is raised, let's skip the sample logging.getLogger("user_level_log").error( "Harvester could not Harvest sample, Stopping queue" ) else: logging.getLogger("user_level_log").info("checking last Harvesting") harvest_res = self.harvest_sample_before_mount( sample_uuid, wait_before_load ) if harvest_res is False: # if sample could not be Harvest, but no exception is raised, let's skip the sample logging.getLogger("user_level_log").error( "There is no more Pins in the Harvester, Stopping queue" ) elif self.get_number_of_available_pin() == 0 and self._ready_to_transfer(): logging.getLogger("user_level_log").warning( "Warning: Harvester pins is approaching to ZERO" ) logging.getLogger("user_level_log").warning( "Warning: Mounting last Sample, Queue will stop on next one" ) # in this case we just load the sample that is ready in the Harester harvest_res = True else: # raise Not enough pins available in the pin provider logging.getLogger("user_level_log").error( "There is no more Pins in the Harvester, Stopping queue" ) return harvest_res
[docs] def queue_harvest_next_sample(self, next_sample_loc_str: str, sample_uuid: str): """ While queue execution send harvest request on next sample of the queue list """ if next_sample_loc_str is not None and self.get_number_of_available_pin() > 0: logging.getLogger("user_level_log").info("Harvesting Next Sample") self._wait_ready(self.timeout) self.harvest_sample_before_mount(sample_uuid, False) else: logging.getLogger("user_level_log").warning( "Warning: Could not harvest next sample" )
[docs] def harvest_sample_before_mount( self, sample_uuid: str, wait_before_load: bool = False ) -> bool: """send harvest sample command Check and set the current state of the Harvester and the sample before Harvest Return (bool): whether the sample has been harvest then mount (True) or had and exception (False) """ res = None if sample_uuid: if self.get_status() == "Ready": try: if self.check_crystal_state(sample_uuid) == "pending_not_current": print(self.get_samples_state()) logging.getLogger("user_level_log").info( "Harvester:Trashing pending Sample" ) self.trash_sample() self._wait_ready(self.timeout) if ( self.current_crystal_state(sample_uuid) == "ready_to_execute" or self.current_crystal_state(sample_uuid) == "needs_repositionning" ): logging.getLogger("user_level_log").info("Harvesting started") self.harvest_crystal(sample_uuid) if wait_before_load: self._wait_sample_transfer_ready(self.timeout) res = True elif self.check_crystal_state(sample_uuid) == "pending_and_current": logging.getLogger("user_level_log").info( "Putting Harvester in Transfer Mode" ) self.transfer_sample() if wait_before_load: self._wait_sample_transfer_ready(self.timeout) res = True else: # logging.getLogger("user_level_log").info("ERROR: Sample Could not be Harvested (Harvester Ready, ) ") msg = self.get_status() logging.getLogger("user_level_log").exception( "ERROR: Sample Could not be Harvested" ) logging.getLogger("user_level_log").exception(msg) res = False return res except RuntimeError: return False elif self._ready_to_transfer(): try: if ( self.current_crystal_state(sample_uuid) == "waiting_for_transfer" ): logging.getLogger("user_level_log").info( "Sample Already Harvested, continue" ) res = True else: self.abort() self.park() self._wait_ready(self.timeout) logging.getLogger("user_level_log").info("Trash current Sample") self.trash_sample() self._wait_ready(self.timeout) if ( self.current_crystal_state(sample_uuid) == "ready_to_execute" or self.current_crystal_state(sample_uuid) == "needs_repositionning" ): logging.getLogger("user_level_log").info( "Harvesting started" ) self.harvest_crystal(sample_uuid) if wait_before_load: self._wait_sample_transfer_ready(self.timeout) res = True else: msg = self.get_status() logging.getLogger("user_level_log").info( "Warning: Sample Could not be Harvested Try Again" ) return self.harvest_sample_before_mount(sample_uuid) return res except RuntimeError: return False elif ( "Harvesting" in self.get_status() or self.get_status() == "Finishing Harvesting" ): logging.getLogger("user_level_log").info( "Warning: Harvesting In Progress Try Again" ) self._wait_sample_transfer_ready(self.timeout) return self.harvest_sample_before_mount(sample_uuid) else: msg = self.get_status() logging.getLogger("user_level_log").exception( "ERROR: Sample Could not be Harvested" ) logging.getLogger("user_level_log").exception(msg) # Try an abort and move to next sample self.abort() self.park() self._wait_ready(self.timeout) return False else: msg = self.get_status() logging.getLogger("user_level_log").exception("ERROR: No sample uuid Found") logging.getLogger("user_level_log").exception(msg) # Try an abort and move to next sample return False
[docs] def get_offsets_for_sample_centring(self) -> tuple[float]: """Calculate sample centring offsets based on Harvested pin shape pre-calculated offsets Return (tuple(float)): (phiy_offset, centringFocus, centringTableVertical) """ pin_to_beam = tuple(self.get_calibrated_pin_offset()) sample_drift_x = float(self.get_last_sample_drift_offset_x()) sample_drift_y = float(self.get_last_sample_drift_offset_y()) sample_drift_z = -float(self.get_last_sample_drift_offset_z()) pin_cut_shape_x = float(self.get_last_pin_cut_shape_offset_x()) pin_cut_shape_y = float(self.get_last_pin_cut_shape_offset_y()) phiy_offset = sample_drift_x - pin_cut_shape_x + float(pin_to_beam[1]) centringFocus = sample_drift_z + float(pin_to_beam[0]) centringTableVertical = sample_drift_y - pin_cut_shape_y + float(pin_to_beam[2]) return (phiy_offset, centringFocus, centringTableVertical)