Source code for mxcubecore.HardwareObjects.Marvin

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

import logging
import os
import tempfile
import time
from datetime import datetime

import gevent

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObjectState
from mxcubecore.HardwareObjects.abstract import AbstractSampleChanger
from mxcubecore.HardwareObjects.abstract.sample_changer import (
    Container,
    Sample,
)

POSITION_DESC = {
    "Park": "Parked",
    "PickDew": "Pick from dewar",
    "PutDew": "Put in the dewar",
    "DewToMD": "On the way from dewar to MD",
    "MD": "MD",
    "Dryer": "Dryer",
    "CTB": "Center to base",
    "BTC": "Base to center",
    "CTEject": "Center puck eject",
    "EjectTC": "Put to cener puck",
}

STATUS_DESC = {
    "idl": "Idle",
    "bsy": "Busy",
    "err": "Error",
    "opn": "Opened",
    "on": "On",
    "off": "Off",
}

STATUS_STR_DESC = {
    "Sys": "Controller",
    "Rob": "Robot",
    "Grp": "Gripper",
    "Lid": "Dewar Lid",
    "Mag": "MD smart magnet",
    "Cry": "Cryo stream position",
    "Gui": "Guillotine",
    "Trsf": "Sample transfer state",
    "MD": "MD transfer state",
    "CDor": "Robot cage door",
    "CPuck": "Central puck",
    "VDet": "Vial in gripper detected",
    "Dry": "Dry gripper routine",
    "Prgs": "Progress bar",
    "PSw": "Puck switches",
    "SDet": "Sample detected on MD",
    "RPos": "Robot positions",
    "SPNr": "Sample puck in operation",
    "Err": "Error",
    "LErr": "Last 5 errors",
    "LSPmnt": "Last sample mounted",
    "CMD": "Command in progress",
    "MntPos": "MD mounting position",
    "Vial": "Vial detected",
}

CMD_STR_DESC = {
    "IDL": "Idle",
    "Nxt": "Nxt ?",
    "Mnt": "Mount sample",
    "Dis": "Dismount sample",
    "Tst": "Test ?",
    "Dry": "Dry",
}

ERROR_STR_DESC = {
    0: "No Error",
    1: "Guillotine valve 1",
    2: "Guillotine valve 2",
    3: "Puck switches",
    4: "Gripper",
    5: "Air pressure",
    6: "Lid valve 1",
    7: "Lid valve 2",
    8: "Crash",
    9: "Magnet",
    10: "Transfer",
    11: "Communication with diffractometer",
}


[docs]class Marvin(AbstractSampleChanger.SampleChanger): __TYPE__ = "Marvin" def __init__(self, *args, **kwargs): super(Marvin, self).__init__(self.__TYPE__, False, *args, **kwargs) self._selected_sample = None self._selected_basket = None self._num_baskets = None self._status_list = [] self._state_string = None self._puck_switches = None self._centre_puck = None self._mounted_puck = None self._mounted_sample = None self._action_started = None self._progress = None self._veto = None self._sample_detected = None self._focusing_mode = None self._process_step_info = None self._command_list = None self._info_dict = {} self._in_error_state = False self._was_mount_error = False self._command_acknowledgement = False self.chan_status = None self.chan_sample_is_loaded = None self.chan_puck_switched = None self.chan_mounted_sample_puck = None self.chan_process_step_info = None self.cmd_mount_sample = None self.cmd_unmount_sample = None self.cmd_open_lid = None self.cmd_close_lid = None self.cmd_base_to_center = None self.cmd_center_to_base = None self.cmd_dry_gripper = None self.beam_focusing_hwobj = None
[docs] def init(self): self._puck_switches = 0 self._num_basket = self.get_property("numBaskets") if not self._num_basket: self._num_basket = 17 for i in range(self._num_basket): basket = Container.Basket(self, i + 1) self._add_component(basket) self.chan_mounted_sample_puck = self.get_channel_object("chanMountedSamplePuck") self.chan_mounted_sample_puck.connect_signal( "update", self.mounted_sample_puck_changed ) self.chan_process_step_info = self.get_channel_object( "chanProcessStepInfo", optional=True ) if self.chan_process_step_info is not None: self.chan_process_step_info.connect_signal( "update", self.process_step_info_changed ) self.chan_command_list = self.get_channel_object( "chanCommandList", optional=True ) if self.chan_command_list is not None: self.chan_command_list.connect_signal("update", self.command_list_changed) self.chan_puck_switches = self.get_channel_object("chanPuckSwitches") self.chan_puck_switches.connect_signal("update", self.puck_switches_changed) self.chan_status = self.get_channel_object("chanStatusList") self.chan_status.connect_signal("update", self.status_list_changed) self.chan_sample_is_loaded = self.get_channel_object("chanSampleIsLoaded") self.chan_sample_is_loaded.connect_signal( "update", self.sample_is_loaded_changed ) self.chan_veto = self.get_channel_object("chanVeto", optional=True) if self.chan_veto is not None: self.chan_veto.connect_signal("update", self.veto_changed) self.cmd_mount_sample = self.get_command_object("cmdMountSample") self.cmd_unmount_sample = self.get_command_object("cmdUnmountSample") self.cmd_open_lid = self.get_command_object("cmdOpenLid") self.cmd_close_lid = self.get_command_object("cmdCloseLid") self.cmd_base_to_center = self.get_command_object("cmdBaseToCenter") self.cmd_center_to_base = self.get_command_object("cmdCenterToBase") self.cmd_dry_gripper = self.get_command_object("cmdDryGripper") self.beam_focusing_hwobj = self.get_object_by_role("beam_focusing") if self.beam_focusing_hwobj is not None: self.connect( self.beam_focusing_hwobj, "focusingModeChanged", self.focusing_mode_changed, ) ( self._focusing_mode, beam_size, ) = self.beam_focusing_hwobj.get_active_focus_mode() self.focusing_mode_changed(self._focusing_mode, beam_size) else: self._focusing_mode = "P13mode" self._init_sc_contents() self._update_state() self._updateSCContents() self._update_loaded_sample() self.log_filename = self.get_property("log_filename") if self.log_filename is None: self.log_filename = os.path.join( tempfile.gettempdir(), "mxcube", "marvin.log" ) self.log.debug("Marvin log filename: %s" % self.log_filename) AbstractSampleChanger.SampleChanger.init(self) self.update_state(HardwareObjectState.READY) self.status_list_changed(self.chan_status.get_value()) self.puck_switches_changed(self.chan_puck_switches.get_value()) self.mounted_sample_puck_changed(self.chan_mounted_sample_puck.get_value()) self.sample_is_loaded_changed(self.chan_sample_is_loaded.get_value())
def get_status_str_desc(self): return STATUS_STR_DESC
[docs] def get_log_filename(self): """Returns log filename""" return self.log_filename
[docs] def run_test(self): """Test method mounts/dismounts samples""" samples_mounted = 0 for cycle in range(5): for sample_index in range(1, 11): logging.getLogger("GUI").info( "Sample changer: Mounting sample 1:%d" % sample_index ) self.load("1:%02d" % sample_index, wait=True) logging.getLogger("GUI").info( "Sample changer: Total mounts done: %d" % (samples_mounted + 1) ) samples_mounted += 1 gevent.sleep(1)
[docs] def puck_switches_changed(self, puck_switches): """Updates puck switches""" self._puck_switches = int(puck_switches) self._info_dict["puck_switches"] = int(puck_switches) self._updateSCContents()
[docs] def sample_is_loaded_changed(self, sample_detected): """Updates sample is loaded""" if self._sample_detected != sample_detected: if sample_detected: self.log.debug("Sample changer: sample re-appeared") else: self.log.debug("Sample changer: sample disappeared") self._sample_detected = sample_detected self._info_dict["sample_detected"] = sample_detected self._update_loaded_sample() self.update_info()
def wait_command_acknowledgement(self, timeout): with gevent.Timeout( timeout, Exception("Timeout waiting for command acknowldegement") ): self.log.debug("Sample changer: start waiting command acknowldegement") while not self._command_acknowledgement: gevent.sleep(0.05) self.log.debug("Sample changer: done waiting command acknowldegement") def wait_sample_to_disappear(self, timeout): with gevent.Timeout( timeout, Exception("Timeout waiting for sample to disappear") ): self.log.debug("Sample changer: start waiting sample to disappear") while self._sample_detected: if self._was_mount_error: self._was_mount_error = False return gevent.sleep(0.05) self.log.debug("Sample changer: done waiting sample to disappear") def wait_sample_to_appear(self, timeout): with gevent.Timeout(timeout, Exception("Timeout waiting for sample to appear")): self.log.debug("Sample changer: start waiting sample to appear") while not self._sample_detected: if self._was_mount_error: self._was_mount_error = False return gevent.sleep(0.05) self.log.debug("Sample changer: done waiting sample to appear") def wait_sample_on_gonio(self, timeout): # with gevent.Timeout(timeout, Exception("Timeout waiting for sample on gonio")): # while not self._sample_detected: # gevent.sleep(0.05) with gevent.Timeout(timeout, Exception("Timeout waiting for centring phase")): while ( HWR.beamline.diffractometer.get_current_phase() != HWR.beamline.diffractometer.PHASE_CENTRING ): if not self._is_device_busy(): return gevent.sleep(0.05) def is_sample_on_gonio(self): return self.chan_sample_is_loaded.get_value() # logging.getLogger("GUI").info("Sample on gonio check 1: %s" %first_try) # gevent.sleep(1.0) # second_try = self.chan_sample_is_loaded.get_value() # logging.getLogger("GUI").info("Sample on gonio check 2: %s" %second_try) # return first_try and second_try
[docs] def mounted_sample_puck_changed(self, mounted_sample_puck): """Updates mounted puck index""" mounted_sample = mounted_sample_puck[0] - 1 mounted_puck = mounted_sample_puck[1] - 1 if mounted_puck != self._mounted_puck: self._mounted_puck = mounted_puck if self._focusing_mode == "P13mode": self._info_dict["mounted_puck"] = mounted_puck else: self._info_dict["mounted_puck"] = mounted_puck + 1 self._updateSCContents() if mounted_sample != self._mounted_sample: self._mounted_sample = mounted_sample if self._focusing_mode == "P13mode": self._info_dict["mounted_sample"] = mounted_sample else: self._info_dict["mounted_sample"] = mounted_sample + 1 self._update_loaded_sample()
[docs] def veto_changed(self, status): """Veto changed callback. Used to wait for ready""" self._veto = status self._info_dict["veto"] = self._veto
[docs] def focusing_mode_changed(self, focusing_mode, beam_size): """Sets CRL combination based on the focusing mode""" self._focusing_mode = focusing_mode self._info_dict["focus_mode"] = self._focusing_mode
def process_step_info_changed(self, process_step_info): self._process_step_info = process_step_info.replace("\n", " ") self._command_acknowledgement = True if "error" in process_step_info.lower(): self._was_mount_error = True logging.getLogger("GUI").error( "Sample changer: %s" % self._process_step_info ) # GB: 20190304: this seemed to lock mxcube forever on any marvin error # self._in_error_state = True # self._set_state(AbstractSampleChanger.SampleChangerState.Alarm) else: logging.getLogger("GUI").info( "Sample changer: %s" % self._process_step_info ) self._info_dict["process_step"] = self._process_step_info def command_list_changed(self, cmd_list): self._command_list = cmd_list.replace("\n", "") logging.getLogger("GUI").info( "Sample changer: Last command - %s" % self._command_list ) self._info_dict["command_list"] = self._command_list def open_lid(self): self.cmd_open_lid(1) def close_lid(self): self.cmd_close_lid(1) def base_to_center(self): return # self.cmd_base_to_center(1) def center_to_base(self): return # self.cmd_center_to_base(1) def dry_gripper(self): self.cmd_dry_gripper(1)
[docs] def get_sample_properties(self): """Gets sample properties""" return (Container.Pin.__HOLDER_LENGTH_PROPERTY__,)
[docs] def assert_can_execute_task(self): return
def _do_update_info(self): """Updates the sample changers status: mounted pucks, state, currently loaded sample """ # self._update_state() # self._updateSCContents() # call this method if status string changed # self._update_loaded_sample() def _directly_update_selected_component(self, basket_no, sample_no): """Directly updates necessary sample""" basket = None sample = None if basket_no is not None and basket_no > 0 and basket_no <= self._num_basket: basket = self.get_component_by_address( Container.Basket.get_basket_address(basket_no) ) if ( sample_no is not None and sample_no > 0 and sample_no <= len(basket.get_sample_list()) ): sample = self.get_component_by_address( Container.Pin.get_sample_address(basket_no, sample_no) ) self._set_selected_component(basket) self._set_selected_sample(sample) def _do_select(self, component): """Selects a new component (basket or sample). Uses method >_directly_update_selected_component< to actually search and select the corrected positions. """ if type(component) in (Container.Pin, Sample.Sample): selected_basket_no = component.get_basket_no() selected_sample_no = component.get_index() + 1 elif isinstance(component, Container.Container) and ( component.get_type() == Container.Basket.__TYPE__ ): selected_basket_no = component.get_index() + 1 selected_sample_no = None self._directly_update_selected_component(selected_basket_no, selected_sample_no) def _do_scan(self, component, recursive): """Scans the barcode of a single sample, puck or recursively even the complete sample changer. Not implemented """ print("_do_scan TODO") def _do_load(self, sample=None): """Loads a sample on the diffractometer. Performs a simple put operation if the diffractometer is empty, and a sample exchange (unmount of old + mount of new sample) if a sample is already mounted on the diffractometer. """ # self._set_state(AbstractSampleChanger.SampleChangerState.Ready) log = logging.getLogger("GUI") if self._focusing_mode not in ( "Collimated", "Double", "Imaging", "TREXX", "P13mode", ): error_msg = "Focusing mode is undefined. Sample loading is disabled" log.error(error_msg) return # if self._focusing_mode in ("Collimated", "Double") and not self._centre_puck: # log.error("No center puck detected. Please do Base-to-Center with any puck.") # return if self._in_error_state: log.error( "Sample changer is in error state. " + "All commands are disabled." + "Fix the issue and reset sample changer in MXCuBE" ) return start_time = datetime.now() selected = self.get_selected_sample() if sample is not None: if sample != selected: self._do_select(sample) selected = self.get_selected_sample() else: if selected is not None: sample = selected else: raise Exception("No sample selected") basket_index = selected.get_basket_no() sample_index = selected.get_vial_no() # 1. Check if sample is on gonio. This should never happen # because if mount is requested and on gonio is sample then # first sample is dismounted if self._focusing_mode == "P13mode": if self.is_sample_on_gonio(): if selected == self.get_loaded_sample(): msg = ( "The sample " + str(self.get_loaded_sample().get_address()) + " is already loaded" ) raise Exception(msg) else: self._do_unload() msg = "Sample changer: Loading sample %d:%d" % ( int(basket_index), int(sample_index), ) log.warning(msg + " Please wait...") self.emit("progressInit", (msg, 100, False)) # 2. Set diffractometer transfer phase self.log.debug( "%s %s" % ( HWR.beamline.diffractometer.get_current_phase(), HWR.beamline.diffractometer.PHASE_TRANSFER, ) ) if ( HWR.beamline.diffractometer.get_current_phase() != HWR.beamline.diffractometer.PHASE_TRANSFER ): self.log.debug("set transfer") HWR.beamline.diffractometer.set_phase( HWR.beamline.diffractometer.PHASE_TRANSFER, 60.0 ) time.sleep(2) if ( HWR.beamline.diffractometer.get_current_phase() != HWR.beamline.diffractometer.PHASE_TRANSFER ): log.error( "Diffractometer is not in the transfer phase. " + "Sample will not be mounted" ) raise Exception("Unable to set Transfer phase") # self.log.debug("Sample changer: Closing guillotine...") # HWR.beamline.detector.close_cover() # self.log.debug("Sample changer: Guillotine closed") # 3. If necessary move detector to save position if self._focusing_mode == "P13mode": if HWR.beamline.detector.distance.get_value() < 399.0: log.info("Sample changer: Moving detector to save position...") self._veto = 1 HWR.beamline.detector.distance.set_value(400, timeout=45) time.sleep(1) self.waitVeto(20.0) log.info("Sample changer: Detector moved to save position") else: pass # self.log.debug("Sample changer: Closing guillotine...") # HWR.beamline.detector.close_cover() ##self.log.debug("Sample changer: Guillotine closed") # 4. Executed command and wait till device is ready if self._focusing_mode == "P13mode": self._execute_server_task( self.cmd_mount_sample, int(sample_index), int(basket_index) ) else: if ( self._focusing_mode == "Collimated" or self._focusing_mode == "Imaging" or self._focusing_mode == "TREXX" ): self._execute_server_task( self.cmd_mount_sample, int(sample_index), int(basket_index), 1 ) elif self._focusing_mode == "Double": self._execute_server_task( self.cmd_mount_sample, int(sample_index), int(basket_index), 3 ) self.emit("progressStop", ()) if self.is_sample_on_gonio(): log.info( "Sample changer: Sample %d:%d loaded" % (int(basket_index), int(sample_index)) ) if self._focusing_mode == "P13mode": HWR.beamline.diffractometer.set_phase( HWR.beamline.diffractometer.PHASE_CENTRING, 60.0 ) # HWR.beamline.diffractometer.close_kappa() else: log.error( "Sample changer: Failed to load sample %d:%d" % (int(basket_index), int(sample_index)) ) raise Exception("Sample not loaded!")
[docs] def load(self, sample=None, wait=True): """Load a sample""" # self._set_state(AbstractSampleChanger.SampleChangerState.Ready) if self._focusing_mode == "P13mode": AbstractSampleChanger.SampleChanger.load(self, sample, wait) else: sample = self._resolve_component(sample) self.assert_not_charging() return self._execute_task( AbstractSampleChanger.SampleChangerState.Loading, wait, self._do_load, sample, )
def _do_unload(self, sample_slot=None): """Unloads a sample from the diffractometer""" log = logging.getLogger("GUI") # self._set_state(AbstractSampleChanger.SampleChangerState.Ready) if self._focusing_mode not in ( "Collimated", "Double", "Imaging", "TREXX", "P13mode", ): error_msg = "Focusing mode is undefined. Sample loading is disabled" log.error(error_msg) return if self._in_error_state: log.error( "Sample changer is in error state. " + "All commands are disabled." + "Fix the issue and reset sample changer in MXCuBE" ) return if self._focusing_mode == "P13mode": sample_index = self._mounted_sample basket_index = self._mounted_puck else: sample_index = self._mounted_sample + 1 basket_index = self._mounted_puck + 1 msg = "Sample changer: Unloading sample %d:%d" % (basket_index, sample_index) log.warning(msg + ". Please wait...") self.emit("progressInit", (msg, 100, False)) if ( HWR.beamline.diffractometer.get_current_phase() != HWR.beamline.diffractometer.PHASE_TRANSFER ): HWR.beamline.diffractometer.set_phase( HWR.beamline.diffractometer.PHASE_TRANSFER, 60 ) if ( HWR.beamline.diffractometer.get_current_phase() != HWR.beamline.diffractometer.PHASE_TRANSFER ): log.error( "Diffractometer is not in the transfer phase. " + "Sample will not be mounted" ) raise Exception("Unable to set Transfer phase") # HWR.beamline.detector.close_cover() if self._focusing_mode == "P13mode": if HWR.beamline.detector.distance.get_value() < 399.0: log.info("Sample changer: Moving detector to save position ...") self._veto = 1 HWR.beamline.detector.distance.set_value(400, timeout=45) time.sleep(1) self.waitVeto(20.0) log.info("Sample changer: Detector moved to save position") else: pass # HWR.beamline.detector.close_cover() start_time = datetime.now() if self._focusing_mode == "P13mode": self._execute_server_task( self.cmd_unmount_sample, sample_index, basket_index ) else: if ( self._focusing_mode == "Collimated" or self._focusing_mode == "Imaging" or self._focusing_mode == "TREXX" ): self._execute_server_task( self.cmd_unmount_sample, sample_index, basket_index, 1 ) elif self._focusing_mode == "Double": self._execute_server_task( self.cmd_unmount_sample, sample_index, basket_index, 3 ) self.emit("progressStop", ()) if self.is_sample_on_gonio(): log.error( "Sample changer: Failed to unload sample %d:%d" % (basket_index, sample_index) ) raise Exception("Sample not unloaded!") else: log.info( "Sample changer: Sample %d:%d unloaded" % (basket_index, sample_index) )
[docs] def clear_basket_info(self, basket): """Clears information about basket""" # TODO return
def _do_change_mode(self, mode): """Changes the mode of sample changer""" return def _do_abort(self): """Aborts the sample changer""" return def _do_reset(self): """Clean all sample info, move sample to his position and move puck from center to base""" self._set_state(AbstractSampleChanger.SampleChangerState.Ready) self._init_sc_contents() self._in_error_state = False def _execute_server_task(self, method, *args): """Executes called cmd, waits until sample changer is ready and updates loaded sample info """ # self.wait_ready(60.0) self._state_string = "Bsy" self._progress = 5 arg_arr = [] for arg in args: arg_arr.append(arg) self.log.debug( "Sample changer: Sending cmd with arguments: %s..." % str(arg_arr) ) self._command_acknowledgement = False method(arg_arr) self.log.debug("Sample changer: Waiting ready...") self.wait_command_acknowledgement(5.0) self._action_started = True gevent.sleep(5) if method == self.cmd_mount_sample: # self.wait_sample_on_gonio(120.0) self._was_mount_error = False self.wait_sample_to_disappear(40.0) self.wait_sample_to_appear(60.0) else: self.wait_ready(120.0) self.log.debug("Sample changer: Ready") self.log.debug("Sample changer: Waiting veto...") self.waitVeto(20.0) self.log.debug("Sample changer: Veto ready") # if self._is_device_busy(): # raise Exception("Action finished to early. Sample changer is not ready!!!") self.sample_is_loaded_changed(self.chan_sample_is_loaded.get_value()) # self._update_state() self._update_loaded_sample() # self._set_state(AbstractSampleChanger.SampleChangerState.Ready) self.update_state(HardwareObjectState.READY) self._action_started = False def _update_state(self): state = self._read_state() if state == HardwareObjectState.BUSY and self._is_device_busy(self.get_state()): return self._set_state(state) def _read_state(self): """Converts state string to defined state""" state_converter = { "ALARM": HardwareObjectState.FAULT, "Err": HardwareObjectState.FAULT, "Idl": HardwareObjectState.READY, "Bsy": HardwareObjectState.BUSY, } return state_converter.get(self._state_string, HardwareObjectState.UNKNOWN) def _is_device_busy(self, state=None): """Checks whether Sample changer is busy""" if state is None: state = self._read_state() if self._progress >= 100 and state in ( HardwareObjectState.READY, HardwareObjectState.FAULT, ): return False else: return True def _is_device_ready(self): """Checks whether Sample changer is ready""" state = self._read_state() return state in (HardwareObjectState.READY,)
[docs] def wait_ready(self, timeout=None): """Waits until the sample changer is ready""" with gevent.Timeout(timeout, Exception("Timeout waiting for device ready")): while self._is_device_busy(): gevent.sleep(0.05)
def waitVeto(self, timeout=20): with gevent.Timeout(timeout, Exception("Timeout waiting for veto")): while self._veto == 1: self.veto_changed(self.chan_veto.get_value) gevent.sleep(0.1) def _update_selection(self): """Updates selected basked and sample""" basket = None sample = None try: basket_no = self._selected_basket if ( basket_no is not None and basket_no > 0 and basket_no <= self._num_basket ): basket = self.get_component_by_address( Container.Basket.get_basket_address(basket_no) ) sample_no = self._selected_sample if ( sample_no is not None and sample_no > 0 and sample_no <= Container.Basket.NO_OF_SAMPLES_PER_PUCK ): sample = self.get_component_by_address( Container.Pin.get_sample_address(basket_no, sample_no) ) except Exception: self.log.exception("") self._set_selected_component(basket) self._set_selected_sample(sample) def _update_loaded_sample(self): """ Updates loaded sample """ if ( self._sample_detected and self._mounted_sample > -1 and self._mounted_puck > -1 and self._centre_puck ): if self._focusing_mode == "P13mode": new_sample = self.get_component_by_address( Container.Pin.get_sample_address( self._mounted_puck, self._mounted_sample ) ) else: new_sample = self.get_component_by_address( Container.Pin.get_sample_address( self._mounted_puck + 1, self._mounted_sample + 1 ) ) else: new_sample = None if self.get_loaded_sample() != new_sample: old_sample = self.get_loaded_sample() if old_sample is not None: # there was a sample on the gonio loaded = False has_been_loaded = True old_sample._set_loaded(loaded, has_been_loaded) if new_sample is not None: self._update_sample_barcode(new_sample) loaded = True has_been_loaded = True new_sample._set_loaded(loaded, has_been_loaded) def _update_sample_barcode(self, sample): """ Updates the barcode of >sample< in the local database after scanning with the barcode reader. """ datamatrix = "NotAvailable" scanned = len(datamatrix) != 0 if not scanned: datamatrix = "----------" sample._set_info(sample.is_present(), datamatrix, scanned) def _init_sc_contents(self): """ Initializes the sample changer content with default values. """ basket_list = [("", 4)] * self._num_basket for basket_index in range(self._num_basket): basket = self.get_components()[basket_index] datamatrix = None present = scanned = True basket._set_info(present, datamatrix, scanned) # create temporary list with default sample information and indices sample_list = [] for basket_index in range(self._num_basket): for sample_index in range(10): sample_list.append( ( "", basket_index + 1, sample_index + 1, 1, Container.Pin.STD_HOLDERLENGTH, ) ) # write the default sample information into permanent Pin objects for spl in sample_list: sample = self.get_component_by_address( Container.Pin.get_sample_address(spl[1], spl[2]) ) datamatrix = None present = scanned = loaded = has_been_loaded = False sample._set_info(present, datamatrix, scanned) sample._set_loaded(loaded, has_been_loaded) sample._set_holder_length(spl[4]) def _updateSCContents(self): """ Updates sample changer content """ for basket_index in range(self._num_basket): basket = self.get_components()[basket_index] if self._focusing_mode == "P13mode": bsk_index = basket_index + 1 else: bsk_index = basket_index if ( (int(self._puck_switches) & pow(2, basket_index) > 0) or (self._mounted_puck == bsk_index) and self._centre_puck ): # f puck_switches & (1 << basket_index): # basket was mounted present = True scanned = False datamatrix = None else: # basket was removed present = False scanned = False datamatrix = None basket._set_info(present, datamatrix, scanned) # set the information for all dependent samples """ for sample_index in range(10): sample = self.get_component_by_address(Pin.get_sample_address(\ (basket_index + 1), (sample_index + 1))) present = sample.get_container().is_present() if present: datamatrix = '%d:%d - Not defined' % \ (bsk_index, sample_index) else: datamatrix = None datamatrix = None scanned = False sample._set_info(present, datamatrix, scanned) # forget about any loaded state in newly mounted or removed basket) loaded = _has_been_loaded = False sample._set_loaded(loaded, has_been_loaded) """ self._trigger_selection_changed_event() def status_list_changed(self, status_string): tmp_string = status_string.replace(" ", "") tmp_string1 = tmp_string.replace("On\r", "On") status_string = tmp_string1.replace("\rSys", ";Sys") self._status_list = status_string.split(";") for status in self._status_list: property_status_list = status.split(":") if len(property_status_list) < 2: continue prop_name = property_status_list[0] prop_value = property_status_list[1] if prop_name == "Rob": if ( self._state_string != prop_value and prop_value in ("Idl", "Bsy", "Err") and self._action_started ): self._state_string = prop_value self.log.debug( "Sample changer: status changed: %s" % self._state_string ) self._update_state() elif prop_name == "Prgs": try: if int(prop_value) != self._progress and self._action_started: self._progress = int(prop_value) self.emit("progressStep", self._progress) self._info_dict["progress"] = self._progress except Exception: self.log.exception("") elif prop_name == "CPuck": if prop_value == "1": centre_puck = True elif prop_value == "0": centre_puck = False if centre_puck != self._centre_puck: self._centre_puck = centre_puck self._info_dict["centre_puck"] = self._centre_puck self._updateSCContents() self._update_loaded_sample() elif prop_name == "Lid": self._info_dict["lid_opened"] = prop_value == "Opn" elif prop_name == "Err": logging.getLogger("GUI").error( "Sample changer: Error (%s)" % prop_value ) logging.getLogger("GUI").error("Details: ") for status in self._status_list: property_status_list = status.split(":") if len(property_status_list) < 2: continue prop_name = property_status_list[0] prop_value = property_status_list[1] if prop_name in STATUS_STR_DESC: logging.getLogger("GUI").error( " - %s: %s " % (STATUS_STR_DESC[prop_name], prop_value) ) self.emit("statusListChanged", self._status_list) self.emit("infoDictChanged", self._info_dict)
[docs] def force_emit_signals(self): self.emit("statusListChanged", self._status_list) self.emit("infoDictChanged", self._info_dict) self._trigger_info_changed_event() self._trigger_selection_changed_event()