Source code for mxcubecore.HardwareObjects.FlexHCD

import base64
import pickle

import gevent
from PyTango.gevent import DeviceProxy

from mxcubecore.HardwareObjects.abstract.AbstractSampleChanger import (
    SampleChanger,
    SampleChangerState,
)
from mxcubecore.HardwareObjects.abstract.sample_changer.Container import (
    Container,
    Sample,
)
from mxcubecore.TaskUtils import task


[docs]class Pin(Sample): def __init__(self, basket, cell_no, basket_no, sample_no): super(Pin, self).__init__( basket, Pin.get_sample_address(cell_no, basket_no, sample_no), True ) self._set_holder_length(22.0) self.present = True def get_basket_no(self): return self.get_container().get_index() + 1 def get_vial_no(self): return self.get_index() + 1 def get_cell_no(self): return self.get_container().get_container().get_index() + 1 def get_cell(self): return self.get_container().get_container() @staticmethod def get_sample_address(cell_number, basket_number, sample_number): return ( str(cell_number) + ":" + str(basket_number) + ":" + "%02d" % sample_number )
[docs]class Basket(Container): __TYPE__ = "Puck" def __init__(self, container, cell_no, basket_no, unipuck=False): super(Basket, self).__init__( self.__TYPE__, container, Basket.get_basket_address(cell_no, basket_no), True, ) for i in range(16 if unipuck else 10): slot = Pin(self, cell_no, basket_no, i + 1) self._add_component(slot) self.present = True @staticmethod def get_basket_address(cell_number, basket_number): return str(cell_number) + ":" + str(basket_number) def get_cell_no(self): return self.get_container().get_index() + 1 def get_cell(self): return self.get_container()
[docs] def clear_info(self): self.get_container()._reset_basket_info(self.get_index() + 1) self.get_container()._trigger_info_changed_event()
[docs]class Cell(Container): __TYPE__ = "Cell" def __init__(self, container, number, sc3_pucks=True): super(Cell, self).__init__( self.__TYPE__, container, Cell.get_cell_address(number), True ) self.present = True if sc3_pucks: for i in range(3): self._add_component( Basket(self, number, i + 1, unipuck=1 - (number % 2)) ) else: for i in range(3): self._add_component(Basket(self, number, i + 1, unipuck=True)) @staticmethod def get_cell_address(cell_number): return str(cell_number) def _reset_basket_info(self, basket_no): pass
[docs] def clear_info(self): self.get_container()._reset_cell_info(self.get_index() + 1) self.get_container()._trigger_info_changed_event()
def get_cell(self): return self
[docs]class FlexHCD(SampleChanger): __TYPE__ = "HCD" def __init__(self, *args, **kwargs): super(FlexHCD, self).__init__(self.__TYPE__, True, *args, **kwargs)
[docs] def init(self): sc3_pucks = self.get_property("sc3_pucks", True) for i in range(8): cell = Cell(self, i + 1, sc3_pucks) self._add_component(cell) self.robot = self.get_property("tango_device") if self.robot: self.robot = DeviceProxy(self.robot) self.exporter_addr = self.get_property("exporter_address") if self.exporter_addr: self.swstate_attr = self.add_channel( { "type": "exporter", "exporter_address": self.exporter_addr, "name": "swstate", }, "State", ) self.controller = self.get_object_by_role("controller") self.prepareLoad = self.get_command_object("moveToLoadingPosition") self.timeout = 3 self.gripper_types = { -1: "No Gripper", 1: "UNIPUCK", 2: "MINISPINE", 3: "FLIPPING", 4: "UNIPUCK_DOUBLE", 5: "PLATE", } SampleChanger.init(self) # self._set_state(SampleChangerState.Disabled) self._update_selection() self.state = self._read_state()
@task def prepare_load(self): if self.controller: self.controller.hutch_actions(enter=True) else: self.prepareLoad() @task def _prepare_centring_task(self): if self.controller: self.controller.hutch_actions(enter=False, sc_loading=True) else: gevent.sleep(2) self.get_command_object("unlockMinidiffMotors")(wait=True) self.get_command_object("prepareCentring")(wait=True) def prepare_centring(self): self._prepare_centring_task()
[docs] def get_sample_properties(self): return (Pin.__HOLDER_LENGTH_PROPERTY__,)
def get_basket_list(self): basket_list = [] # put here only the baskets that exist, not all the possible ones # if self.exporter_addr: # basket_list = for cell in self.get_components(): for basket in cell.get_components(): if isinstance(basket, Basket): basket_list.append(basket) return basket_list def _do_change_mode(self, *args, **kwargs): return def _do_update_info(self): # self._update_selection() self._update_state() def _do_scan(self, component, recursive=True, saved={"barcodes": None}): return def _execute_cmd(self, cmd, *args, **kwargs): timeout = kwargs.pop("timeout", None) if args: cmd_str = "flex.%s(%s)" % (cmd, ",".join(map(repr, args))) else: cmd_str = "flex.%s()" % cmd cmd_id = self.robot.eval(cmd_str) if not cmd_id: cmd_id = self.robot.eval(cmd_str) with gevent.Timeout( timeout, RuntimeError("Timeout while executing %s" % repr(cmd_str)) ): while True: if self.robot.is_finished(cmd_id): break gevent.sleep(0.2) res = self.robot.get_result(cmd_id) if res: res = pickle.loads(base64.b64decode(res)) if isinstance(res, Exception): raise res else: return res def _execute_cmd_exporter(self, cmd, *args, **kwargs): ret = None timeout = kwargs.pop("timeout", 900) if args: args_str = "%s" % "\t".join(map(repr, args)) if kwargs.pop("command", None): exp_cmd = self.add_command( { "type": "exporter", "exporter_address": self.exporter_addr, "name": "%s" % cmd, }, "%s" % cmd, ) if args: ret = exp_cmd(args_str) else: ret = exp_cmd() if kwargs.pop("attribute", None): exp_attr = self.add_channel( { "type": "exporter", "exporter_address": self.exporter_addr, "name": "%s" % cmd, }, "%s" % cmd[3:], ) if cmd.startswith("get"): return exp_attr.get_value() if cmd.startswith("set"): ret = exp_attr.set_value(args_str) self._wait_ready(timeout=timeout) return ret def _assert_ready(self): if self.exporter_addr: if not self._ready(): raise RuntimeError("Sample changer is busy cant mount/unmount") def _ready(self): return self.swstate_attr.get_value() == "Ready" def _wait_ready(self, timeout=None): err_msg = "Timeout waiting for sample changer to be ready" # None means infinite timeout <=0 means default timeout if timeout is not None and timeout <= 0: timeout = self.timeout with gevent.Timeout(timeout, RuntimeError(err_msg)): while not self._ready(): gevent.sleep(0.5) def _do_select(self, component): if isinstance(component, Cell): cell_pos = component.get_index() + 1 elif isinstance(component, Basket) or isinstance(component, Pin): cell_pos = component.get_cell_no() if self.exporter_addr: self._execute_cmd_exporter("moveDewar", cell_pos, command=True) else: self._execute_cmd("moveDewar", cell_pos) self._update_selection()
[docs] def chained_load(self, old_sample, sample): self._assert_ready() if self.exporter_addr: unload_load_task = gevent.spawn( self._execute_cmd_exporter, "loadSample", sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), command=True, ) else: unload_load_task = gevent.spawn( self._execute_cmd, "chainedUnldLd", [ old_sample.get_cell_no(), old_sample.get_basket_no(), old_sample.get_vial_no(), ], [sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no()], ) gevent.sleep(10) err_msg = "Timeout waiting for sample changer to be in safe position" while not unload_load_task.ready(): if self.exporter_addr: loading_state = self._execute_cmd_exporter( "getCurrentLoadSampleState", attribute=True ) if "on_gonio" in loading_state: self._set_loaded_sample(sample) with gevent.Timeout(60, RuntimeError(err_msg)): self.log.info(err_msg) while not self._execute_cmd_exporter( "getRobotIsSafe", attribute=True ): gevent.sleep(0.5) return True else: loading_state = str( self._execute_cmd("get_robot_cache_variable", "LoadSampleStatus") ) if "on_gonio" in loading_state: self._set_loaded_sample(sample) with gevent.Timeout(60, RuntimeError(err_msg)): self.log.info(err_msg) while ( not self._execute_cmd( "get_robot_cache_variable", "data:dioRobotIsSafe" ) == "true" ): gevent.sleep(0.5) return True gevent.sleep(2) self.log.info("unload load task done") for msg in self.get_robot_exceptions(): self.log.error(msg) return self._check_pin_on_gonio()
def _check_pin_on_gonio(self): if self.exporter_addr: _on_gonio = self._execute_cmd_exporter("pin_on_gonio", command=True) else: _on_gonio = self._execute_cmd("pin_on_gonio") if _on_gonio: # finish the loading actions self._prepare_centring_task() return True else: self.log.info("reset loaded sample") self._reset_loaded_sample() # if self.controller: # self.controller.hutch_actions(release_interlock=True) return False def reset_loaded_sample(self): if self.exporter_addr: self._execute_cmd_exporter("resetLoadedPosition", command=True) else: self._execute_cmd("reset_loaded_position") self._reset_loaded_sample() def get_robot_exceptions(self): if self.exporter_addr: """ return self._execute_cmd_exporter('getRobotExceptions', attribute=True) """ return "" else: return self._execute_cmd("getRobotExceptions") @task def load(self, sample): self.prepare_load(wait=True) self.enable_power() try: res = SampleChanger.load(self, sample) finally: for msg in self.get_robot_exceptions(): self.log.error(msg) if res: self.prepare_centring() return res @task def unload_sample( self, holderLength, sample_id=None, sample_location=None, successCallback=None, failureCallback=None, ): self._assert_ready() cell, basket, sample = sample_location sample = self.get_component_by_address( Pin.get_sample_address(cell, basket, sample) ) return self.unload(sample) @task def unload(self, sample): self.prepare_load(wait=True) self.enable_power() if not sample: sample = self.get_loaded_sample().get_address() try: SampleChanger.unload(self, sample) finally: for msg in self.get_robot_exceptions(): self.log.error(msg) def get_gripper(self): if self.exporter_addr: gripper_type = self._execute_cmd_exporter( "get_gripper_type", attribute=True ) else: gripper_type = self._execute_cmd("get_gripper_type") return self.gripper_types.get(gripper_type, "?") def get_available_grippers(self): grippers = [] try: if self.exporter_addr: ret = sorted( self._execute_cmd_exporter("getSupportedGrippers", attribute=True) ) for gripper in ret: grippers.append(self.gripper_types[gripper]) else: ret = [1, 3] # self._execute_cmd("get_supported_grippers") for gripper in ret: grippers.append(self.gripper_types[gripper]) except Exception: grippers = [-1] return grippers @task def change_gripper(self, gripper=None): self.prepare_load(wait=True) self.enable_power() if self.exporter_addr: if gripper: self._execute_cmd_exporter("setGripper", gripper, command=True) else: self._execute_cmd_exporter("changeGripper", command=True) else: self._execute_cmd("changeGripper") @task def home(self): self.prepare_load(wait=True) self.enable_power() if self.exporter_addr: self._execute_cmd_exporter("homeClear", command=True) else: self._execute_cmd("homeClear") @task def enable_power(self): if not self.exporter_addr: self._execute_cmd("enablePower", 1) @task def defreeze(self): self.prepare_load(wait=True) self.enable_power() if self.exporter_addr: self._execute_cmd_exporter("defreezeGripper", command=True) else: self._execute_cmd("defreezeGripper") def _do_load(self, sample=None): self._update_state() if self.exporter_addr: load_task = gevent.spawn( self._execute_cmd_exporter, "loadSample", sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), command=True, ) else: load_task = gevent.spawn( self._execute_cmd, "loadSample", sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), ) gevent.sleep(10) err_msg = "Timeout waiting for sample changer to be in safe position" while not load_task.ready(): if self.exporter_addr: loading_state = self._execute_cmd_exporter( "getCurrentLoadSampleState", attribute=True ) if "on_gonio" in loading_state: self._set_loaded_sample(sample) with gevent.Timeout(20, RuntimeError(err_msg)): while not self._execute_cmd_exporter( "getRobotIsSafe", attribute=True ): gevent.sleep(0.5) return True else: loading_state = str( self._execute_cmd("get_robot_cache_variable", "LoadSampleStatus") ) if "on_gonio" in loading_state: self._set_loaded_sample(sample) with gevent.Timeout(20, RuntimeError(err_msg)): while ( not self._execute_cmd( "get_robot_cache_variable", "data:dioRobotIsSafe" ) == "true" ): gevent.sleep(0.5) return True gevent.sleep(2) if self.exporter_addr: loaded_sample = self._execute_cmd_exporter( "get_loaded_sample", attribute=True ) else: loaded_sample = self._execute_cmd("get_loaded_sample") if loaded_sample == ( sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), ): self._set_loaded_sample(sample) return True return self._check_pin_on_gonio() def _do_unload(self, sample=None): loaded_sample = self.get_loaded_sample() if loaded_sample is not None and loaded_sample != sample: raise RuntimeError("Cannot unload another sample") if self.exporter_addr: self._execute_cmd_exporter( "unloadSample", sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), command=True, ) loaded_sample = self._execute_cmd_exporter( "getLoadedSample", attribute=True ) else: self._execute_cmd( "unloadSample", sample.get_cell_no(), sample.get_basket_no(), sample.get_vial_no(), ) loaded_sample = self._execute_cmd("get_loaded_sample") if loaded_sample == (-1, -1, -1): self._reset_loaded_sample() if self.controller: self.controller.hutch_actions(release_interlock=True) return True return False def _do_abort(self): if self.exporter_addr: self._execute_cmd_exporter("abort", command=True) else: self._execute_cmd("abort") def _do_reset(self): if self.controller: self.controller.hutch_actions(enter=True) if self.exporter_addr: self._execute_cmd_exporter("homeClear", command=True) else: self._execute_cmd("homeClear") def clear_basket_info(self, basket): return self._reset_basket_info(basket) def _reset_basket_info(self, basket): pass def clear_cell_info(self, cell): return self._reset_cell_info(cell) def _reset_cell_info(self, cell): pass def _update_state(self): # see if the command exists for exporter if not self.exporter_addr: pass # defreezing = self._execute_cmd("isDefreezing") # if defreezing: # self._set_state(SampleChangerState.Moving) try: state = self._read_state() except Exception: state = SampleChangerState.Unknown self._set_state(state) def is_sequencer_ready(self): if self.prepareLoad: cmdobj = self.get_command_object return all( [cmd.isSpecReady() for cmd in (cmdobj("moveToLoadingPosition"),)] ) return True def _read_state(self): # should read state from robot if self.exporter_addr: state = self.swstate_attr.get_value().upper() else: state = "RUNNING" if self._execute_cmd("robot.isBusy") else "STANDBY" if state == "STANDBY" and not self.is_sequencer_ready(): state = "RUNNING" state_converter = { "ALARM": SampleChangerState.Alarm, "FAULT": SampleChangerState.Fault, "RUNNING": SampleChangerState.Moving, "READY": SampleChangerState.Ready, "STANDBY": SampleChangerState.Ready, } return state_converter.get(state, SampleChangerState.Unknown) def _is_device_busy(self, state=None): if state is None: state = self._read_state() return state not in ( SampleChangerState.Ready, SampleChangerState.Loaded, SampleChangerState.Alarm, SampleChangerState.Disabled, SampleChangerState.Fault, SampleChangerState.StandBy, ) def _is_device_ready(self): state = self._read_state() return state in (SampleChangerState.Ready, SampleChangerState.Charging) def _wait_device_ready(self, timeout=None): with gevent.Timeout(timeout, Exception("Timeout waiting for device ready")): while not self._is_device_ready(): gevent.sleep(0.01) def _update_selection(self): if self.exporter_addr: sample_cell, sample_puck, sample = self._execute_cmd_exporter( "get_loaded_sample", attribute=True ) cell = sample_cell puck = sample_puck else: cell, puck = self._execute_cmd("get_cell_position") sample_cell, sample_puck, sample = self._execute_cmd("get_loaded_sample") for c in self.get_components(): i = c.get_index() if cell == i + 1: self._set_selected_component(c) break # find sample for s in self.get_sample_list(): if s.get_coords() == (sample_cell, sample_puck, sample): self._set_loaded_sample(s) self._set_selected_sample(s) return for s in self.get_sample_list(): s._set_loaded(False) self._set_selected_sample(None) def prepare_hutch(self, **kwargs): if self.exporter_addr: return user_port = kwargs.get("user_port") robot_port = kwargs.get("robot_port") if user_port is not None: self._execute_cmd("robot.user_port(user_port)") if robot_port is not None: self._execute_cmd("robot.robot_port(robot_port)")