Source code for mxcubecore.HardwareObjects.CatsMaint

"""
CATS maintenance commands hardware object.

Functionality in addition to sample-transfer functionality: power control,
lid control, error-recovery commands, ...

Derived from Michael Hellmig's implementation for the BESSY CATS sample changer
 -add more controls, including Regulation Off, Gripper Dry/Open/Close, Reset Memory, Set Sample On Diff
 -add CATS dewar layout

Vicente Rey - add support for ISARA Model

"""

import logging
import time

import gevent
from PyTango import DeviceProxy

from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.TaskUtils import task

__author__ = "Jie Nan"
__credits__ = ["The MxCuBE collaboration"]

__email__ = "jie.nan@maxlab.lu.se"
__status__ = "Alpha"

TOOL_FLANGE, TOOL_UNIPUCK, TOOL_SPINE, TOOL_PLATE, TOOL_LASER, TOOL_DOUBLE_GRIPPER = (
    0,
    1,
    2,
    3,
    4,
    5,
)

TOOL_TO_STR = {
    "Flange": TOOL_FLANGE,
    "Unipuck": TOOL_UNIPUCK,
    "EMBL": TOOL_SPINE,
    "Rotat": TOOL_SPINE,
    "Plate": TOOL_PLATE,
    "Laser": TOOL_LASER,
    "Double": TOOL_DOUBLE_GRIPPER,
}


[docs]class CatsMaint(HardwareObject): __TYPE__ = "CATS" NO_OF_LIDS = 3 """ Actual implementation of the CATS Sample Changer, MAINTENANCE COMMANDS ONLY BESSY BL14.1 installation with 3 lids """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._state = None self._running = None self._powered = None self._toolopen = None self._message = None self._regulating = None self._lid1state = None self._lid2state = None self._lid3state = None self._charging = None
[docs] def init(self): self.cats_device = DeviceProxy(self.tangoname) try: self.cats_model = self.cats_device.read_attribute("CatsModel").value except Exception: self.cats_model = "CATS" if self.is_isara(): self.nb_of_lids = 1 else: self.nb_of_lids = 3 self._chnState = self.add_channel( { "type": "tango", "name": "_chnState", "tangoname": self.tangoname, "polling": 1000, }, "State", ) self._chnPathRunning = self.add_channel( { "type": "tango", "name": "_chnPathRunning", "tangoname": self.tangoname, "polling": 1000, }, "PathRunning", ) self._chnPowered = self.add_channel( { "type": "tango", "name": "_chnPowered", "tangoname": self.tangoname, "polling": 1000, }, "Powered", ) self._chnMessage = self.add_channel( { "type": "tango", "name": "_chnMessage", "tangoname": self.tangoname, "polling": 1000, }, "Message", ) self._chnToolOpenClose = self.add_channel( { "type": "tango", "name": "_chnToolOpenClose", "tangoname": self.tangoname, "polling": 1000, }, "di_ToolOpen", ) self._chnLN2Regulation = self.add_channel( { "type": "tango", "name": "_chnLN2Regulation", "tangoname": self.tangoname, "polling": 1000, }, "LN2Regulating", ) self._chnBarcode = self.add_channel( { "type": "tango", "name": "_chnBarcode", "tangoname": self.tangoname, "polling": 1000, }, "Barcode", ) self._chnLid1State = self.add_channel( { "type": "tango", "name": "_chnLid1State", "tangoname": self.tangoname, "polling": 1000, }, "di_Lid1Open", ) self._chnLid1State.connect_signal("update", self._update_lid1_state) if self.nb_of_lids > 1: self._chnLid2State = self.add_channel( { "type": "tango", "name": "_chnLid2State", "tangoname": self.tangoname, "polling": 1000, }, "di_Lid2Open", ) self._chnLid2State.connect_signal("update", self._update_lid2_state) if self.nb_of_lids > 2: self._chnLid3State = self.add_channel( { "type": "tango", "name": "_chnLid3State", "tangoname": self.tangoname, "polling": 1000, }, "di_Lid3Open", ) self._chnLid3State.connect_signal("update", self._update_lid3_state) self._chnState.connect_signal("update", self._update_state) self._chnPathRunning.connect_signal("update", self._update_running_state) self._chnPowered.connect_signal("update", self._update_powered_state) self._chnToolOpenClose.connect_signal("update", self._update_tool_state) self._chnMessage.connect_signal("update", self._update_message) self._chnLN2Regulation.connect_signal("update", self._update_regulation_state) self._chnBarcode.connect_signal("update", self._updateBarcode) self._chnCurrentTool = self.add_channel( {"type": "tango", "name": "_chnCurrentTool", "tangoname": self.tangoname}, "Tool", ) self._cmdPowerOn = self.add_command( {"type": "tango", "name": "_cmdPowerOn", "tangoname": self.tangoname}, "powerOn", ) self._cmdPowerOff = self.add_command( {"type": "tango", "name": "_cmdPowerOff", "tangoname": self.tangoname}, "powerOff", ) self._cmdOpenTool = self.add_command( {"type": "tango", "name": "_cmdOpenTool", "tangoname": self.tangoname}, "opentool", ) self._cmdCloseTool = self.add_command( {"type": "tango", "name": "_cmdCloseTool", "tangoname": self.tangoname}, "closetool", ) self._cmdMagnetOn = self.add_command( {"type": "tango", "name": "_cmdMagnetOn", "tangoname": self.tangoname}, "magnetOn", ) self._cmdMagnetOff = self.add_command( {"type": "tango", "name": "_cmdMagnetOff", "tangoname": self.tangoname}, "magnetOff", ) # LIDs self._cmdOpenLid1 = self.add_command( {"type": "tango", "name": "_cmdOpenLid1", "tangoname": self.tangoname}, "openlid1", ) self._cmdCloseLid1 = self.add_command( {"type": "tango", "name": "_cmdCloseLid1", "tangoname": self.tangoname}, "closelid1", ) if self.nb_of_lids > 1: self._cmdOpenLid2 = self.add_command( {"type": "tango", "name": "_cmdOpenLid1", "tangoname": self.tangoname}, "openlid2", ) self._cmdCloseLid2 = self.add_command( {"type": "tango", "name": "_cmdCloseLid1", "tangoname": self.tangoname}, "closelid2", ) if self.nb_of_lids > 2: self._cmdOpenLid3 = self.add_command( {"type": "tango", "name": "_cmdOpenLid1", "tangoname": self.tangoname}, "openlid3", ) self._cmdCloseLid3 = self.add_command( {"type": "tango", "name": "_cmdCloseLid1", "tangoname": self.tangoname}, "closelid3", ) self._cmdRegulOn = self.add_command( {"type": "tango", "name": "_cmdRegulOn", "tangoname": self.tangoname}, "regulon", ) self._cmdRegulOff = self.add_command( {"type": "tango", "name": "_cmdRegulOff", "tangoname": self.tangoname}, "reguloff", ) self._cmdToolOpen = self.add_command( {"type": "tango", "name": "_cmdToolOpen", "tangoname": self.tangoname}, "opentool", ) self._cmdToolClose = self.add_command( {"type": "tango", "name": "_cmdToolClose", "tangoname": self.tangoname}, "closetool", ) # Paths self._cmdAbort = self.add_command( {"type": "tango", "name": "_cmdAbort", "tangoname": self.tangoname}, "abort" ) self._cmdDry = self.add_command( {"type": "tango", "name": "_cmdDry", "tangoname": self.tangoname}, "dry" ) self._cmdSafe = self.add_command( {"type": "tango", "name": "_cmdSafe", "tangoname": self.tangoname}, "safe" ) self._cmdHome = self.add_command( {"type": "tango", "name": "_cmdHome", "tangoname": self.tangoname}, "home" ) self._cmdSoak = self.add_command( {"type": "tango", "name": "_cmdSoak", "tangoname": self.tangoname}, "soak" ) self._cmdBack = self.add_command( {"type": "tango", "name": "_cmdBack", "tangoname": self.tangoname}, "back" ) self._cmdCalibration = self.add_command( {"type": "tango", "name": "_cmdCalibration", "tangoname": self.tangoname}, "toolcalibration", ) self._cmdClearMemory = self.add_command( {"type": "tango", "name": "_cmdClearMemory", "tangoname": self.tangoname}, "clear_memory", ) self._cmdReset = self.add_command( {"type": "tango", "name": "_cmdReset", "tangoname": self.tangoname}, "reset" ) self._cmdResetParameters = self.add_command( { "type": "tango", "name": "_cmdResetParameters", "tangoname": self.tangoname, }, "reset_parameters", ) self._cmdRecoverFailure = self.add_command( { "type": "tango", "name": "_cmdRecoverFailure", "tangoname": self.tangoname, }, "recoverFailure", ) self._cmdResetMotion = self.add_command( {"type": "tango", "name": "_cmdResetMotion", "tangoname": self.tangoname}, "resetmotion", ) self._cmdSetOnDiff = self.add_command( {"type": "tango", "name": "_cmdSetOnDiff", "tangoname": self.tangoname}, "setondiff", ) self._cmdSetOnTool = self.add_command( {"type": "tango", "name": "_cmdSetOnTool", "tangoname": self.tangoname}, "settool", ) self._cmdSetOnTool2 = self.add_command( {"type": "tango", "name": "_cmdSetOnTool2", "tangoname": self.tangoname}, "settool2", ) self.state_actions = { "power": { "in_open": self._cmdPowerOn, "out_close": self._cmdPowerOff, "state": self._chnPowered, } }
def is_isara(self): return self.cats_model == "ISARA" def is_cats(self): return self.cats_model != "ISARA" def get_current_tool(self): current_value = self._chnCurrentTool.get_value() tool = TOOL_TO_STR.get(current_value, None) return tool ################################################################################
[docs] def back_traj(self): """ Moves a sample from the gripper back into the dewar to its logged position. """ return self._execute_task(False, self._do_back)
[docs] def safe_traj(self): """ Safely Moves the robot arm and the gripper to the home position """ return self._execute_task(False, self._do_safe)
def _do_abort(self): """ Launch the "abort" trajectory on the CATS Tango DS :returns: None :rtype: None """ self._cmdAbort() def _do_home(self): """ Launch the "abort" trajectory on the CATS Tango DS :returns: None :rtype: None """ tool = self.get_current_tool() self._cmdHome(tool) def _do_reset(self): """ Launch the "reset" command on the CATS Tango DS :returns: None :rtype: None """ self.log.debug("CatsMaint. doing reset") return self._cmdReset() def _do_reset_memory(self): """ Launch the "reset memory" command on the CATS Tango DS :returns: None :rtype: None """ self._cmdClearMemory() time.sleep(1) self._cmdResetParameters() time.sleep(1) def _do_reset_motion(self): """ Launch the "reset_motion" command on the CATS Tango DS :returns: None :rtype: None """ self._cmdResetMotion() def _do_recover_failure(self): """ Launch the "recoverFailure" command on the CATS Tango DS :returns: None :rtype: None """ self._cmdRecoverFailure() def _do_calibration(self): """ Launch the "toolcalibration" command on the CATS Tango DS :returns: None :rtype: None """ tool = self.get_current_tool() self._cmdCalibration(tool) def _do_open_tool(self): """ Launch the "opentool" command on the CATS Tango DS :returns: None :rtype: None """ self._cmdOpenTool() def _do_close_tool(self): """ Launch the "closetool" command on the CATS Tango DS :returns: None :rtype: None """ self._cmdCloseTool() def _do_dry_gripper(self): """ Launch the "dry" command on the CATS Tango DS :returns: None :rtype: None """ tool = self.get_current_tool() self._cmdDry(tool) def _do_set_on_diff(self, sample): """ Launch the "setondiff" command on the CATS Tango DS, an example of sample value is 2:05 :returns: None :rtype: None """ if sample is None: raise Exception("No sample selected") else: str_tmp = str(sample) sample_tmp = str_tmp.split(":") # calculate CATS specific lid/sample number lid = (int(sample_tmp[0]) - 1) / 3 + 1 puc_pos = ((int(sample_tmp[0]) - 1) % 3) * 10 + int(sample_tmp[1]) argin = [str(lid), str(puc_pos), "0"] logging.getLogger().info("to SetOnDiff %s", argin) self._execute_server_task(self._cmdSetOnDiff, argin) def _do_back(self): """ Launch the "back" trajectory on the CATS Tango DS :returns: None :rtype: None """ tool = self.get_current_tool() argin = [str(tool), "0"] # to send string array with two arg... self._execute_server_task(self._cmdBack, argin) def _do_safe(self): """ Launch the "safe" trajectory on the CATS Tango DS :returns: None :rtype: None """ argin = self.get_current_tool() self._execute_server_task(self._cmdSafe, argin) def _do_power_state(self, state=False): """ Switch on CATS power if >state< == True, power off otherwise :returns: None :rtype: None """ self.log.debug(" running power state command ") if state: self._cmdPowerOn() else: self._cmdPowerOff() self.do_state_action("power", state) def _do_enable_regulation(self): """ Switch on CATS regulation :returns: None :rtype: None """ self._cmdRegulOn() def _do_disable_regulation(self): """ Switch off CATS regulation :returns: None :rtype: None """ self._cmdRegulOff() def _do_lid1_state(self, state=True): """ Opens lid 1 if >state< == True, closes the lid otherwise :returns: None :rtype: None """ if state: self._execute_server_task(self._cmdOpenLid1) else: self._execute_server_task(self._cmdCloseLid1) def _do_lid2_state(self, state=True): """ Opens lid 2 if >state< == True, closes the lid otherwise :returns: None :rtype: None """ if state: self._execute_server_task(self._cmdOpenLid2) else: self._execute_server_task(self._cmdCloseLid2) def _do_lid3_state(self, state=True): """ Opens lid 3 if >state< == True, closes the lid otherwise :returns: None :rtype: None """ if state: self._execute_server_task(self._cmdOpenLid3) else: self._execute_server_task(self._cmdCloseLid3) def _do_magnet_on(self): self._execute_server_task(self._cmdMagnetOn) def _do_magnet_off(self): self._execute_server_task(self._cmdMagnetOff) def _do_tool_open(self): self._execute_server_task(self._cmdToolOpen) def _do_tool_close(self): self._execute_server_task(self._cmdToolClose) # ######################## PROTECTED ######################### def _execute_task(self, wait, method, *args): ret = self._run(method, wait=False, *args) if wait: return ret.get() else: return ret @task def _run(self, method, *args): exception = None ret = None try: ret = method(*args) except Exception as ex: exception = ex if exception is not None: raise exception return ret # ######################## PRIVATE ######################### def _update_running_state(self, value): self._running = value self.emit("runningStateChanged", (value,)) self._update_global_state() def _update_powered_state(self, value): self._powered = value self.emit("powerStateChanged", (value,)) self._update_global_state() def _update_tool_state(self, value): self._toolopen = value self.emit("toolStateChanged", (value,)) self._update_global_state() def _update_message(self, value): self._message = value self.emit("messageChanged", (value,)) self._update_global_state() def _update_regulation_state(self, value): self._regulating = value self.emit("regulationStateChanged", (value,)) self._update_global_state() def _updateBarcode(self, value): self._barcode = value self.emit("barcodeChanged", (value,)) def _update_state(self, value): self._state = value self._update_global_state() def _update_lid1_state(self, value): self._lid1state = value self.emit("lid1StateChanged", (value,)) self._update_global_state() def _update_lid2_state(self, value): self._lid2state = value self.emit("lid2StateChanged", (value,)) self._update_global_state() def _update_lid3_state(self, value): self._lid3state = value self.emit("lid3StateChanged", (value,)) self._update_global_state() def _update_operation_mode(self, value): self._charging = not value def _update_global_state(self): state_dict, cmd_state, message = self.get_global_state() self.emit("globalStateChanged", (state_dict, cmd_state, message))
[docs] def get_global_state(self): """ Update clients with a global state that contains different: - first param (state_dict): collection of state bits - second param (cmd_state): list of command identifiers and the status of each of them True/False representing whether the command is currently available or not - message a message describing current state information as a string """ _ready = str(self._state) in ("READY", "ON") if self._running: state_str = "MOVING" elif not (self._powered) and _ready: state_str = "DISABLED" elif _ready: state_str = "READY" else: state_str = str(self._state) state_dict = { "toolopen": self._toolopen, "powered": self._powered, "running": self._running, "regulating": self._regulating, "lid1": self._lid1state, "lid2": self._lid2state, "lid3": self._lid3state, "state": state_str, } cmd_state = { "powerOn": (not self._powered) and _ready, "powerOff": (self._powered) and _ready, "regulon": (not self._regulating) and _ready, "openlid1": (not self._lid1state) and self._powered and _ready, "closelid1": self._lid1state and self._powered and _ready, "dry": (not self._running) and self._powered and _ready, "soak": (not self._running) and self._powered and _ready, "home": (not self._running) and self._powered and _ready, "back": (not self._running) and self._powered and _ready, "safe": (not self._running) and self._powered and _ready, "clear_memory": True, "reset": True, "abort": True, } message = self._message return state_dict, cmd_state, message
[docs] def get_cmd_info(self): """return information about existing commands for this object the information is organized as a list with each element contains [ cmd_name, display_name, category ] """ """ [cmd_id, cmd_display_name, nb_args, cmd_category, description ] """ cmd_list = [ [ "Power", [ ["powerOn", "PowerOn", "Switch Power On"], ["powerOff", "PowerOff", "Switch Power Off"], ["regulon", "Regulation On", "Swich LN2 Regulation On"], ], ], [ "Lid", [ ["openlid1", "Open Lid", "Open Lid"], ["closelid1", "Close Lid", "Close Lid"], ], ], [ "Actions", [ ["home", "Home", "Actions", "Home (trajectory)"], ["dry", "Dry", "Actions", "Dry (trajectory)"], ["soak", "Soak", "Actions", "Soak (trajectory)"], ], ], [ "Recovery", [ [ "clear_memory", "Clear Memory", "Clear Info in Robot Memory " " (includes info about sample on Diffr)", ], ["reset", "Reset Message", "Reset Cats State"], ["back", "Back", "Reset Cats State"], ["safe", "Safe", "Reset Cats State"], ], ], ["Abort", [["abort", "Abort", "Abort Execution of Command"]]], ] return cmd_list
def _execute_server_task(self, method, *args): task_id = method(*args) ret = None # introduced wait because it takes some time before the attribute PathRunning is set # after launching a transfer # after setting refresh in the Tango DS to 0.1 s a wait of 1s is enough time.sleep(1.0) while str(self._chnPathRunning.get_value()).lower() == "true": gevent.sleep(0.1) ret = True return ret def send_command(self, cmd_name, args=None): lid = 1 toolcal = 0 tool = self.get_current_tool() if cmd_name in ["dry", "safe", "home"]: if tool is not None: args = [tool] else: raise Exception("Cannot detect type of TOOL in Cats. Command ignored") if cmd_name == "soak": if tool in [TOOL_DOUBLE, TOOL_UNIPUCK]: args = [str(tool), str(lid)] else: raise Exception("Can SOAK only when UNIPUCK tool is mounted") if cmd_name == "back": if tool is not None: args = [tool, toolcal] else: raise Exception("Cannot detect type of TOOL in Cats. Command ignored") cmd = getattr(self.cats_device, cmd_name) try: if args is not None: if len(args) > 1: ret = cmd(map(str, args)) else: ret = cmd(*args) else: ret = cmd() return ret except Exception as exc: self.log.exception("") raise
def test_hwo(hwo): print((hwo.get_current_tool()))