Source code for mxcubecore.HardwareObjects.EMBLFlexHarvester

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube.
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE.  If not, see <http://www.gnu.org/licenses/>.

"""FlexHarvester Linux Java implementation of the Flex sample changer for Harvester Use.
Example xml file:
<object class = "EMBLFlexHarvester">
  <username>Sample Changer</username>
  <exporter_address>lid231flex1:9001</exporter_address>
</object>
"""

from __future__ import annotations

import logging
import time
from typing import (
    Any,
    List,
)

import gevent

from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.EMBLFlexHCD import EMBLFlexHCD
from mxcubecore.model import queue_model_objects as qmo
from mxcubecore.queue_entry.base_queue_entry import (
    CENTRING_METHOD,
    QueueExecutionException,
)
from mxcubecore.TaskUtils import task


[docs]class EMBLFlexHarvester(EMBLFlexHCD): """EMBLFlexHarvester is the Hardware Object interface for the EMBL Flex Sample Changer It inherits from EMBLFlexHCD and implements the Harvester interface. """ __TYPE__ = "Flex Sample Changer" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._harvester_hwo = None self.pin_cleaning = None self._loaded_sample = None
[docs] def init(self): self.pin_cleaning = self.get_property("pin_cleaning") self._loaded_sample = (-1, -1, -1) self._harvester_hwo = self.get_object_by_role("harvester") EMBLFlexHCD.init(self)
[docs] def get_room_temperature_mode(self) -> bool: """Get the Harvester Room Temperature Mode""" return self._execute_cmd_exporter("getRoomTemperatureMode", attribute=True)
[docs] def set_room_temperature_mode(self, value) -> bool: """Set the Harvester Room Temperature Mode""" self._execute_cmd_exporter("setRoomTemperatureMode", value, command=True) logging.getLogger("user_level_log").info( "setting Robot Room temperature to %s", value ) return self.get_room_temperature_mode()
[docs] def mount_from_harvester(self): """This enable Mounting a sample from Harvester""" return True
[docs] def get_sample_list(self) -> List[Any]: """ Get Sample List related to the Harvester content/processing Plan """ sample_list = super().get_sample_list() present_sample_list = [] # Retrieve metadata lists from harvester HWO try: ha_sample_lists = self._harvester_hwo.get_crystal_uuids() ha_sample_names = self._harvester_hwo.get_sample_names() ha_sample_states = self._harvester_hwo.get_samples_state() except Exception as e: self.user_log.error( "Failed retrieving sample metadata from Harvester: %s", e ) return present_sample_list ha_sample_acronyms = self._harvester_hwo.get_sample_acronyms() # If no samples reported by Harvester if not ha_sample_lists or ( len(ha_sample_lists) == 1 and ha_sample_lists[0] == "n" ): self.user_log.warning("No samples reported by Harvester") return present_sample_list # Process each sample independently to avoid global failure for index, x_tal_uuid in enumerate(ha_sample_lists): sample = sample_list[index] img_url = self._harvester_hwo.get_crystal_images_urls(x_tal_uuid) img_target_x = self._harvester_hwo.get_image_target_x(x_tal_uuid) img_target_y = self._harvester_hwo.get_image_target_y(x_tal_uuid) # Enrich sample object in MXCuBE sample.id = x_tal_uuid sample.state = ha_sample_states[index] sample._set_image_url(img_url) sample._set_image_x(img_target_x) sample._set_image_y(img_target_y) sample._name = ha_sample_names[index] # Protein acronym logic (fallback-safe) if ha_sample_acronyms and len(ha_sample_acronyms) == len(ha_sample_lists): sample.proteinAcronym = ha_sample_acronyms[index] else: sample.proteinAcronym = ( ha_sample_acronyms[0] if ha_sample_acronyms else "" ) present_sample_list.append(sample) return present_sample_list
def _hw_get_mounted_sample(self) -> str: """Get the currently mounted sample location string Exporter Does not know the loaded sample location we get it from our own tracking """ loaded_sample = self._loaded_sample return ( str(loaded_sample[0]) + ":" + str(loaded_sample[1]) + ":" + "%02d" % loaded_sample[2] )
[docs] def get_loaded_sample(self): """ Returns: (Sample) Currently loaded sample """ return self.get_sample_with_address(self._loaded_sample)
def _hw_get_mounted_crystal_id(self) -> str: """Get the currently mounted crystal UUID""" return self._execute_cmd_exporter("getMountedCrystalId", attribute=True) @task def load_a_pin_for_calibration(self): """ Load a Pin from Harvester """ try: self.prepare_load() self.enable_power() load_task = gevent.spawn( self._execute_cmd_exporter, "loadSampleFromHarvester", self.pin_cleaning, command=True, ) self._wait_busy(30) err_msg = "Timeout while waiting to sample to be loaded" with gevent.Timeout(600, RuntimeError(err_msg)): while not load_task.ready(): logging.getLogger("user_level_log").info("wait loading task") gevent.sleep(2) with gevent.Timeout(600, RuntimeError(err_msg)): while True: logging.getLogger("user_level_log").info("Wait Robot Safe position") is_safe = self._execute_cmd_exporter( "getRobotIsSafe", attribute=True ) if is_safe: break gevent.sleep(2) return True except RuntimeError: return False def start_harvester_centring(self): try: dm = HWR.beamline.diffractometer logging.getLogger("user_level_log").info("Start Auto Harvesting Centring") computed_offset = HWR.beamline.harvester.get_offsets_for_sample_centring() dm.start_harvester_centring(computed_offset) except Exception as exc: logging.getLogger("user_level_log").exception( "Could not center sample, skipping" ) raise QueueExecutionException( "Could not center sample, skipping", self ) from exc def _set_loaded_sample_and_prepare(self, loaded_sample_tup, previous_sample_tup): res = False loaded_sample = self.get_sample_with_address(loaded_sample_tup) if -1 not in loaded_sample_tup and loaded_sample_tup != previous_sample_tup: self._set_loaded_sample(loaded_sample) self._prepare_centring_task() res = True if res: if not self._harvester_hwo.get_room_temperature_mode(): self.queue_harvest_next_sample(loaded_sample.get_address()) # we expect CENTRING_METHOD to be None # NB: move this call to base_queue_entry mount_sample and add Harvester Centring METHOD HWR.beamline.queue_manager.centring_method = CENTRING_METHOD.NONE self.start_harvester_centring() return res def _do_load(self, sample=None) -> bool: """ Load a Sample from Harvester """ # 1. Initial setup and harvesting harvesting_result = self.queue_harvest_sample(sample.get_address()) if not harvesting_result: raise QueueExecutionException("Harvester could not Harvest sample", self) self._update_state() self._wait_ready(600) sample_uuid = self.get_sample_uuid(sample.get_address()) previous_sample = self._loaded_sample # 2. Main loading sequence if not self._perform_loading_sequence(sample_uuid): return False # 3 Finalization loaded_sample = ( sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), ) # Flex/Exporter Does not know the loaded sample location we keep track of it ourselfs self._loaded_sample = loaded_sample return self._set_loaded_sample_and_prepare(loaded_sample, previous_sample) def _perform_loading_sequence(self, sample_uuid) -> bool: """Start loading from harvester""" logging.getLogger("user_level_log").info( "Start loading from harvester SAMPLE_UUID %s", sample_uuid ) load_task = gevent.spawn( self._execute_cmd_exporter, "loadSampleFromHarvester", sample_uuid, self.pin_cleaning, command=True, ) # Wait for sample changer to start activity try: _tt = time.time() self._wait_busy(300) logging.getLogger("HWR").info("Waited SC activity %s", time.time() - _tt) except Exception: logging.getLogger("user_level_log").error( "ERROR While Waited SC activity to start" ) for msg in self.get_robot_exceptions(): logging.getLogger("user_level_log").error(msg) raise # Wait for sample to be loaded err_msg = "Timeout while waiting to sample to be loaded" try: with gevent.Timeout(600, RuntimeError(err_msg)): while not load_task.ready(): logging.getLogger("user_level_log").info("Wait loading task") loaded_sample = self._hw_get_mounted_crystal_id() if loaded_sample == sample_uuid: break gevent.sleep(2) except RuntimeError: logging.getLogger("user_level_log").error(err_msg) return False return self._finalize_loading_sequence() def _finalize_loading_sequence(self) -> bool: """Finalize loading sequence""" # Wait for robot to be in safe state err_msg = "Timeout while waiting for robot to be in safe state" try: with gevent.Timeout(600, RuntimeError(err_msg)): while True: is_safe = self._execute_cmd_exporter( "getRobotIsSafe", attribute=True ) if is_safe: break gevent.sleep(2) except RuntimeError: logging.getLogger("user_level_log").error(err_msg) return False # Check for robot exceptions for msg in self.get_robot_exceptions(): if msg is not None and "Pin Cleaning Station" not in msg: logging.getLogger("user_level_log").error( "ERROR While SC activity After Loaded Sample" ) logging.getLogger("HWR").error(msg) logging.getLogger("user_level_log").error(msg) return False return True
[docs] def harvest_and_mount_sample(self, xtal_uuid: str, sample) -> bool: """ return (Bool) : whether the crystal has been Harvest then mount """ try: self._harvester_hwo.harvest_crystal(xtal_uuid) self._harvester_hwo._wait_sample_transfer_ready(None) self._do_load(sample) except Exception: logging.getLogger("user_level_log").exception("Could not Harvest Crystal") return "Could not Harvest Crystal"
[docs] def queue_list(self) -> List[str]: """ builds a List representation of the queue based. """ node = HWR.beamline.queue_model.get_model_root() result = [] if isinstance(node, List): node_list = node else: node_list = node.get_children() for node in node_list: if isinstance(node, qmo.Sample): result.append(node.loc_str) return result
[docs] def get_sample_uuid(self, sample_loc_str: str) -> str | None: """ Get Sample UUID of the sample_loc_str from Sample List """ samples_list = self.get_sample_list() sample_uuid = None for s in samples_list: # it seems get_id() was broken if s.get_address() == sample_loc_str or s.id == sample_loc_str: sample_uuid = s.id return sample_uuid return sample_uuid
[docs] def queue_harvest_sample(self, sample_loc_str: str) -> bool: """ While queue execution send harvest request """ current_queue = self.queue_list() sample_uuid = self.get_sample_uuid(sample_loc_str) return self._harvester_hwo.queue_harvest_sample( sample_loc_str, sample_uuid, current_queue )
[docs] def queue_harvest_next_sample(self, sample_loc_str) -> None: """ While queue execution send harvest request on next sample of the queue list """ current_queue_list = self.queue_list() next_sample = None try: next_sample = current_queue_list[ current_queue_list.index(sample_loc_str) + 1 ] except (ValueError, IndexError): next_sample = None sample_uuid = self.get_sample_uuid(next_sample) self._harvester_hwo.queue_harvest_next_sample(next_sample, sample_uuid)