"""
CATS maintenance mockup.
"""
import logging
import time
import gevent
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.TaskUtils import task
__author__ = "Mikel Eguiraun"
__credits__ = ["The MxCuBE collaboration"]
TOOL_FLANGE, TOOL_UNIPUCK, TOOL_SPINE, TOOL_PLATE, TOOL_LASER, TOOL_DOUBLE_GRIPPER = (
0,
1,
2,
3,
4,
5,
)
TOOL_TO_STR = {
"Flange": TOOL_FLANGE,
"Unipuck": TOOL_UNIPUCK,
"Rotat": TOOL_SPINE,
"Plate": TOOL_PLATE,
"Laser": TOOL_LASER,
"Double": TOOL_DOUBLE_GRIPPER,
}
[docs]class CatsMaintMockup(HardwareObject):
__TYPE__ = "CATS"
NO_OF_LIDS = 3
"""
Actual implementation of the CATS Sample Changer, MAINTENANCE COMMANDS ONLY
BESSY BL14.1 installation with 3 lids
"""
def __init__(self, name):
super().__init__(name)
self._state = "READY"
self._running = 0
self._powered = 0
self._toolopen = 0
self._message = "Nothing to report"
self._regulating = 0
self._lid1state = 0
self._lid2state = 0
self._lid3state = 0
self._charging = 0
self._currenttool = 1
[docs] def init(self):
try:
self.cats_model = self.cats_device.read_attribute("CatsModel").value
except Exception:
self.cats_model = "CATS"
def get_current_tool(self):
return self._currenttool
################################################################################
def _do_abort(self):
"""
Launch the "abort" trajectory on the CATS Tango DS
:returns: None
:rtype: None
"""
pass
def _do_reset(self):
"""
Launch the "reset" command on the CATS Tango DS
:returns: None
:rtype: None
"""
pass
def _do_dry_gripper(self):
"""
Launch the "dry" command on the CATS Tango DS
:returns: None
:rtype: None
"""
pass
def _do_set_on_diff(self, sample):
"""
Launch the "setondiff" command on the CATS Tango DS, an example of sample value is 2:05
:returns: None
:rtype: None
"""
if sample is None:
raise Exception("No sample selected")
else:
str_tmp = str(sample)
sample_tmp = str_tmp.split(":")
# calculate CATS specific lid/sample number
lid = (int(sample_tmp[0]) - 1) / 3 + 1
puc_pos = ((int(sample_tmp[0]) - 1) % 3) * 10 + int(sample_tmp[1])
argin = [str(lid), str(puc_pos), "0"]
logging.getLogger().info("to SetOnDiff %s", argin)
# self._execute_server_task(self._cmdSetOnDiff,argin)
def _do_power_state(self, state=False):
"""
Switch on CATS power if >state< == True, power off otherwise
:returns: None
:rtype: None
"""
self._powered = state
self._update_powered_state(state)
def _do_enable_regulation(self):
"""
Switch on CATS regulation
:returns: None
:rtype: None
"""
self._regulating = True
self._update_regulation_state(True)
def _do_disable_regulation(self):
"""
Switch off CATS regulation
:returns: None
:rtype: None
"""
self._regulating = False
self._update_regulation_state(False)
def _do_lid1_state(self, state=True):
"""
Opens lid 1 if >state< == True, closes the lid otherwise
:returns: None
:rtype: None
"""
self._lid1state = state
self._update_lid1_state(state)
def _do_lid2_state(self, state=True):
"""
Opens lid 2 if >state< == True, closes the lid otherwise
:returns: None
:rtype: None
"""
self._lid2state = state
self._update_lid2_state(state)
def _do_lid3_state(self, state=True):
"""
Opens lid 3 if >state< == True, closes the lid otherwise
:returns: None
:rtype: None
"""
self._lid3state = state
self._update_lid3_state(state)
######################### PROTECTED #########################
def _execute_task(self, wait, method, *args):
ret = self._run(method, *args)
if wait:
return ret.get()
else:
return ret
@task
def _run(self, method, *args):
exception = None
ret = None
try:
ret = method(*args)
except Exception as ex:
exception = ex
if exception is not None:
raise exception
return ret
######################### PRIVATE #########################
def _update_running_state(self, value):
self._running = value
self.emit("runningStateChanged", (value,))
self._update_global_state()
def _update_powered_state(self, value):
self._powered = value
self.emit("powerStateChanged", (value,))
self._update_global_state()
def _update_tool_state(self, value):
self._toolopen = value
self.emit("toolStateChanged", (value,))
self._update_global_state()
def _update_message(self, value):
self._message = value
self.emit("messageChanged", (value,))
self._update_global_state()
def _update_regulation_state(self, value):
self._regulating = value
self.emit("regulationStateChanged", (value,))
self._update_global_state()
def _update_state(self, value):
self._state = value
self._update_global_state()
def _update_lid1_state(self, value):
self._lid1state = value
self.emit("lid1StateChanged", (value,))
self._update_global_state()
def _update_lid2_state(self, value):
self._lid2state = value
self.emit("lid2StateChanged", (value,))
self._update_global_state()
def _update_lid3_state(self, value):
self._lid3state = value
self.emit("lid3StateChanged", (value,))
self._update_global_state()
def _update_operation_mode(self, value):
self._charging = not value
def _update_global_state(self):
state_dict, cmd_state, message = self.get_global_state()
self.emit("globalStateChanged", (state_dict, cmd_state, message))
[docs] def get_global_state(self):
"""
Update clients with a global state that
contains different:
- first param (state_dict):
collection of state bits
- second param (cmd_state):
list of command identifiers and the
status of each of them True/False
representing whether the command is
currently available or not
- message
a message describing current state information
as a string
"""
_ready = str(self._state) in ("READY", "ON")
if self._running:
state_str = "MOVING"
elif not (self._powered) and _ready:
state_str = "DISABLED"
elif _ready:
state_str = "READY"
else:
state_str = str(self._state)
state_dict = {
"toolopen": self._toolopen,
"powered": self._powered,
"running": self._running,
"regulating": self._regulating,
"lid1": self._lid1state,
"lid2": self._lid2state,
"lid3": self._lid3state,
"state": state_str,
}
cmd_state = {
"PowerOn": (not self._powered) and _ready,
"PowerOff": (self._powered) and _ready,
"regulon": (not self._regulating) and _ready,
"openlid1": (not self._lid1state) and self._powered and _ready,
"closelid1": self._lid1state and self._powered and _ready,
"dry": (not self._running) and self._powered and _ready,
"soak": (not self._running) and self._powered and _ready,
"home": (not self._running) and self._powered and _ready,
"back": (not self._running) and self._powered and _ready,
"safe": (not self._running) and self._powered and _ready,
"clear_memory": True,
"reset": True,
"abort": self._running,
}
message = self._message
return state_dict, cmd_state, message
[docs] def get_cmd_info(self):
"""return information about existing commands for this object
the information is organized as a list
with each element contains
[ cmd_name, display_name, category ]
"""
""" [cmd_id, cmd_display_name, nb_args, cmd_category, description ] """
cmd_list = [
[
"Power",
[
["PowerOn", "PowerOn", "Switch Power On"],
["PowerOff", "PowerOff", "Switch Power Off"],
["regulon", "Regulation On", "Swich LN2 Regulation On"],
],
],
[
"Lid",
[
["openlid1", "Open Lid", "Open Lid"],
["closelid1", "Close Lid", "Close Lid"],
],
],
[
"Actions",
[
["home", "Home", "Actions", "Home (trajectory)"],
["dry", "Dry", "Actions", "Dry (trajectory)"],
["soak", "Soak", "Actions", "Soak (trajectory)"],
],
],
[
"Recovery",
[
[
"clear_memory",
"Clear Memory",
"Clear Info in Robot Memory "
" (includes info about sample on Diffr)",
],
["reset", "Reset Message", "Reset Cats State"],
["back", "Back", "Reset Cats State"],
["safe", "Safe", "Reset Cats State"],
],
],
["Abort", [["abort", "Abort", "Abort Execution of Command"]]],
]
return cmd_list
def _execute_server_task(self, method, *args):
task_id = method(*args)
ret = None
# introduced wait because it takes some time before the attribute PathRunning is set
# after launching a transfer
# after setting refresh in the Tango DS to 0.1 s a wait of 1s is enough
time.sleep(1.0)
while str(self._chnPathRunning.get_value()).lower() == "true":
gevent.sleep(0.1)
ret = True
return ret
def send_command(self, cmd_name, args=None):
lid = 1
toolcal = 0
tool = self.get_current_tool()
if cmd_name in ["dry", "safe", "home"]:
if tool is not None:
args = [tool]
else:
raise Exception("Cannot detect type of TOOL in Cats. Command ignored")
if cmd_name == "soak":
if tool in [TOOL_DOUBLE_GRIPPER, TOOL_UNIPUCK]:
args = [str(tool), str(lid)]
else:
raise Exception("Can SOAK only when UNIPUCK tool is mounted")
if cmd_name == "back":
if tool is not None:
args = [tool, toolcal]
else:
raise Exception("Cannot detect type of TOOL in Cats. Command ignored")
if cmd_name == "PowerOn":
self._do_power_state(True)
if cmd_name == "PowerOff":
self._do_power_state(False)
if cmd_name == "regulon":
self._do_enable_regulation()
if cmd_name == "reguloff":
self._do_disable_regulation()
if cmd_name == "openlid1":
self._do_lid1_state(True)
if cmd_name == "closelid1":
self._do_lid1_state(False)
return True
def test_hwo(hwo):
print((hwo.get_current_tool()))