Source code for mxcubecore.HardwareObjects.mockup.PlateManipulatorMockup

#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
[Name] PlateManipulatorMockup

[Description]
Plate manipulator hardware object is used to use diffractometer in plate mode.
It is compatible with md2, md3 diffractometers. Class is based on
SampleChanger, so it has all the sample changed functionalities, like
mount, unmount sample (in this case move to plate position).
Plate is organized in rows and columns. Each cell (Cell) contains drop (Drop).
Each drop could contain several crystals (Xtal). If CRIMS is available then
each drop could have several crystals.

[Channels]

[Commands]

[Emitted signals]

 - emitted signals defined in SampleChanger class

[Included Hardware Objects]
-----------------------------------------------------------------------
| name            | signals          | functions
-----------------------------------------------------------------------
-----------------------------------------------------------------------
"""

import logging
import time

from mxcubecore.HardwareObjects.abstract import AbstractSampleChanger
from mxcubecore.HardwareObjects.abstract.sample_changer import (
    Container,
    Crims,
    Sample,
)


[docs]class Xtal(Sample.Sample): __NAME_PROPERTY__ = "Name" __LOGIN_PROPERTY__ = "Login" def __init__(self, drop, index): super(Xtal, self).__init__(drop, Xtal._get_xtal_address(drop, index), False) self._drop = drop self._index = index self._set_image_x(None) self._set_image_y(None) self._set_image_url(None) self._set_name(None) self._set_login(None) self._set_info_url(None) self._set_info(False, False, False) self._set_loaded(False, False) self.present = True def _set_name(self, value): self._set_property(self.__NAME_PROPERTY__, value) def _set_login(self, value): self._set_property(self.__LOGIN_PROPERTY__, value) def get_login(self): return self.get_property(self.__LOGIN_PROPERTY__) def get_drop(self): return self._drop def get_cell(self): return self.get_drop().get_cell()
[docs] def get_basket_no(self): """ In this cas we assume a drop is a basket or puck """ return self.get_drop().get_index() + 1
[docs] def get_cell_no(self): """ In this cas we assume a well in the row is a cell """ return self.get_cell().get_row_index() + 1
@staticmethod def _get_xtal_address(drop, index): return str(drop.get_address()) + "-" + str(index)
[docs] def get_index(self): """ Descript. : Sample index is calculated relative to the row (Basket) In this case we assume that in drop is one xtal This should be changed to various num of xtals in the drop """ cell_index = self.get_cell().get_index() drops_in_cell_num = self.get_cell().get_drops_no() drop_index = self._drop.get_index() return cell_index * drops_in_cell_num + drop_index
[docs] def get_container(self): return self.get_cell().get_container()
def get_name(self): return "%s%d:%d" % ( self.get_cell().get_row_chr(), self.get_cell().get_index() + 1, self._drop.get_index() + 1, )
[docs]class Drop(Container.Container): __TYPE__ = "Drop" def __init__(self, cell, drops_num): super(Drop, self).__init__( self.__TYPE__, cell, Drop._get_drop_address(cell, drops_num), False ) self._cell = cell self._drops_num = drops_num @staticmethod def _get_drop_address(cell, drop_num): return str(cell.get_address()) + ":" + str(drop_num) def get_cell(self): return self._cell def get_well_no(self): return self.get_index() + 1
[docs] def is_loaded(self): """ Returns if the sample is currently loaded for data collection :rtype: bool """ sample = self.get_sample() return sample.is_loaded()
[docs] def get_sample(self): """ In this cas we assume that there is one crystal per drop """ sample = self.get_components() return sample[0]
[docs]class Cell(Container.Container): __TYPE__ = "Cell" def __init__(self, row, row_chr, col_index, drops_num): Container.Container.__init__( self, self.__TYPE__, row, Cell._get_cell_address(row_chr, col_index), False ) self._row = row self._row_chr = row_chr self._col_index = col_index self._drops_num = drops_num for drop_index in range(self._drops_num): drop = Drop(self, drop_index + 1) self._add_component(drop) xtal = Xtal(drop, drop.get_number_of_components()) drop._add_component(xtal) self._transient = True def get_row(self): return self._row def get_row_chr(self): return self._row_chr def get_row_index(self): return ord(self._row_chr.upper()) - ord("A") def get_col(self): return self._col_index def get_drops_no(self): return self._drops_num @staticmethod def _get_cell_address(row, col): return str(row) + str(col)
class PlateManipulatorMockup(AbstractSampleChanger.SampleChanger): """ """ __TYPE__ = "PlateManipulator" def __init__(self, *args, **kwargs): super(PlateManipulatorMockup, self).__init__( self.__TYPE__, False, *args, **kwargs ) self.plate_label = None self.num_cols = None self.num_rows = None self.num_drops = None self.current_phase = None self.reference_pos_x = None self.timeout = 3 # default timeout self.plate_location = None self.crims_url = None self.crims_user_agent = None self.plate_barcode = None self.harvester_key = None def init(self): """ Descript. : """ self.num_cols = self.get_property("numCols") self.num_rows = self.get_property("numRows") self.num_drops = self.get_property("numDrops") self.crims_url = self.get_property("crimsWsRoot") self.crims_user_agent = self.get_property("crimsUserAgent") self.plate_barcode = self.get_property("PlateBarcode") self.plate_label = self.get_property("plateLabel") self.harvester_key = self.get_property("harvesterKey") self.reference_pos_x = self.get_property("referencePosX") if not self.reference_pos_x: self.reference_pos_x = 0.5 self._init_sc_contents() AbstractSampleChanger.SampleChanger.init(self) self.update_state(self.STATES.READY) def _read_state(self): return "ready" def _ready(self): return True def _on_state_changed(self, state): """ Descript. : state change callback. Based on diffractometer state sets PlateManipulatorMockup state. """ SampleChangerState = AbstractSampleChanger.SampleChangerState if state is None: self._set_state(SampleChangerState.Unknown) else: if state == "Alarm": self._set_state(SampleChangerState.Alarm) elif state == "Fault": self._set_state(SampleChangerState.Fault) elif state == "Moving" or state == "Running": self._set_state(SampleChangerState.Moving) elif state == "Ready": if self.current_phase == "Transfer": self._set_state(SampleChangerState.Charging) elif self.current_phase == "Centring": self._set_state(SampleChangerState.Ready) else: self._set_state(SampleChangerState.StandBy) elif state == "Initializing": self._set_state(SampleChangerState.Initializing) def _init_sc_contents(self): """ Descript. : Initializes content of plate. """ if self.num_rows is None: return self._set_info(False, None, False) self._clear_components() for row in range(self.num_rows): # row is like a basket basket = Container.Basket(self, row + 1, samples_num=0, name="Row") present = True datamatrix = "" scanned = False basket._set_info(present, datamatrix, scanned) self._add_component(basket) for col in range(self.num_cols): cell = Cell(basket, chr(65 + row), col + 1, self.num_drops) basket._add_component(cell) self._set_state(AbstractSampleChanger.SampleChangerState.Ready) def _do_abort(self): """ Descript. : """ self._abort() def _do_change_mode(self, mode): """ Descript. : """ if mode == AbstractSampleChanger.SampleChangerMode.Charging: self._set_phase("Transfer") elif mode == AbstractSampleChanger.SampleChangerMode.Normal: self._set_phase("Centring") def _do_load(self, sample=None): """ Descript. : """ selected = self.get_selected_sample() if sample is None: sample = self.get_selected_sample() if sample is not None: if sample != selected: self._do_select(sample) self._set_loaded_sample(sample) def load(self, sample=None, wait=True): comp = self._resolve_component(sample) coords = comp.get_coords() res = self._load_sample(coords) if res: self._set_loaded_sample(comp) comp._set_loaded(True, True) return res def _load_sample(self, sample_location=None): """ Descript. : function to move to plate location. Location is estimated by sample location and reference positions. """ row = sample_location[0] - 1 col = (sample_location[1] - 1) / self.num_drops drop = sample_location[1] - self.num_drops * col pos_y = float(drop) / (self.num_drops + 1) self.plate_location = [row, col, self.reference_pos_x, pos_y] col += 1 cell = self.get_component_by_address("%s%d" % (chr(65 + row), col)) drop = cell.get_component_by_address("%s%d:%d" % (chr(65 + row), col, drop)) new_sample = drop.get_sample() old_sample = self.get_loaded_sample() new_sample = drop.get_sample() if old_sample != new_sample: self.update_state(self.STATES.BUSY) msg = "Moving to position %s:%d" % ((chr(65 + row), col)) logging.getLogger("user_level_log").warning( "Plate Manipulator: %s Please wait..." % msg ) self.emit("progressInit", (msg, 100)) for step in range(50): self.emit("progressStep", step * 2) time.sleep(0.02) self.emit("progressStop", ()) if old_sample is not None: old_sample._set_loaded(False, True) if new_sample is not None: new_sample._set_loaded(True, True) self.update_info() logging.getLogger("user_level_log").info("Plate Manipulator: Sample loaded") self.update_state(self.STATES.READY) def _do_unload(self, sample_slot=None): """ Descript. : """ self._reset_loaded_sample() self._on_state_changed("Ready") def _do_reset(self): """ Descript. : """ self._reset(False) self._wait_device_ready() def _do_scan(self, component, recursive): """ Descript. : """ if not isinstance(component, PlateManipulatorMockup): raise Exception("Not supported") self._initializeData() if self.get_token() is None: raise Exception("No plate barcode defined") self._load_data(self.get_token()) def _do_select(self, component): """ Descript. : """ pos_x = self.reference_pos_x pos_y = 0.5 if isinstance(component, Xtal): self._select_sample( component.get_cell().get_row_index(), component.get_cell().get_col() - 1, component.get_drop().get_well_no() - 1, ) self._set_selected_sample(component) component.get_container()._set_selected(True) component.get_container().get_container()._set_selected(True) elif isinstance(component, Crims.CrimsXtal): col = component.Column - 1 row = ord(component.Row.upper()) - ord("A") pos_x = component.offsetX pos_y = component.offsetY cell = self.get_component_by_address( Cell._get_cell_address(component.Row, component.Column) ) drop = self.get_component_by_address( Drop._get_drop_address(cell, component.Shelf) ) drop._set_selected(True) drop.get_container()._set_selected(True) elif isinstance(component, Drop): self._select_sample( component.get_cell().get_row_index(), component.get_cell().get_col() - 1, component.get_well_no() - 1, ) component._set_selected(True) component.get_container().get_container()._set_selected(True) elif isinstance(component, Cell): self._select_sample(component.get_row_index(), component.get_col() - 1, 0) component._set_selected(True) elif isinstance(component, list): row = component[0] col = component[1] if len(component > 2): pos_x = component[2] pos_y = component[3] cell = self.get_component_by_address(Cell._get_cell_address(row, col)) cell._set_selected(True) else: raise Exception("Invalid selection") self._reset_loaded_sample() self._wait_device_ready() def _load_data(self, barcode): processing_plan = Crims.get_processing_plan( barcode, self.crims_url, self.crims_user_agent, self.harvester_key ) if processing_plan is None: msg = "No information about plate with barcode %s found in CRIMS" % barcode logging.getLogger("user_level_log").error(msg) else: msg = "Information about plate with barcode %s found in CRIMS" % barcode logging.getLogger("user_level_log").info(msg) self._set_info(True, processing_plan.plate.barcode, True) for x in processing_plan.plate.xtal_list: cell = self.get_component_by_address( Cell._get_cell_address(x.row, x.column) ) cell._set_info(True, "", True) drop = self.get_component_by_address( Drop._get_drop_address(cell, x.shelf) ) drop._set_info(True, "", True) xtal = Xtal(drop, drop.get_number_of_components()) xtal._set_info(True, x.pin_id, True) xtal._set_image_url(x.image_url) xtal._set_image_x(x.offset_x) xtal._set_image_y(x.offset_y) xtal._set_login(x.login) xtal._set_name(x.sample) xtal._set_info_url(x.summary_url) drop._add_component(xtal) return processing_plan def _do_update_info(self): """ Descript. : """ self._update_loaded_sample() def _update_loaded_sample(self): """Updates plate location""" old_sample = self.get_loaded_sample() # plate_location = None # if self.chan_plate_location is not None: # plate_location = self.chan_plate_location.get_value() if self.plate_location is not None: new_sample = self.get_sample(self.plate_location) if old_sample != new_sample: if old_sample is not None: # there was a sample on the gonio loaded = False has_been_loaded = True old_sample._set_loaded(loaded, has_been_loaded) if new_sample is not None: # self._update_sample_barcode(new_sample) loaded = True has_been_loaded = True new_sample._set_loaded(loaded, has_been_loaded) def get_sample(self, plate_location): row = int(plate_location[0]) col = int(plate_location[1]) y_pos = float(plate_location[3]) drop_index = abs(y_pos * self.num_drops) + 1 if drop_index > self.num_drops: drop_index = self.num_drops cell = self.get_component_by_address("%s%d" % (chr(65 + row), col + 1)) if cell: drop = cell.get_component_by_address( "%s%d:%d" % (chr(65 + row), col + 1, drop_index) ) return drop.get_sample() def get_sample_list(self): """ Descript. : This is ugly """ sample_list = [] for basket in self.get_components(): if isinstance(basket, Container.Basket): for cell in basket.get_components(): if isinstance(cell, Cell): for drop in cell.get_components(): sample_list.append(drop.get_sample()) return sample_list def is_mounted_sample(self, sample_location): row = sample_location[0] - 1 col = (sample_location[1] - 1) / self.num_drops drop = sample_location[1] - self.num_drops * col pos_y = float(drop) / (self.num_drops + 1) sample = self.get_sample((row, col, drop, pos_y)) return sample.loaded def get_plate_info(self): """ Descript. : returns dict with plate info """ plate_info_dict = {} plate_info_dict["num_cols"] = self.num_cols plate_info_dict["num_rows"] = self.num_rows plate_info_dict["num_drops"] = self.num_drops plate_info_dict["plate_label"] = self.plate_label or "Demo plate label" plate_info_dict["plate_barcode"] = self.plate_barcode or "" return plate_info_dict def get_plate_location(self): return self.plate_location def change_plate_barcode(self, barcode): if self._load_data(barcode): self.plate_barcode = barcode return True else: raise Exception("barcode unknown") def sync_with_crims(self): return self._load_data(self.plate_barcode) def re_emit_values(self): return