import ast
import importlib
import logging
import operator
import sys
import gevent
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.CommandContainer import (
ARGUMENT_TYPE_LIST,
TWO_STATE_COMMAND_T,
CommandObject,
)
from mxcubecore.TaskUtils import task
from mxcubecore.utils.conversion import camel_to_snake
[docs]class ControllerCommand(CommandObject):
def __init__(self, name, cmd=None, username=None, klass=None):
CommandObject.__init__(self, name, username)
self.log = logging.getLogger("HWR")
if not cmd:
self._cmd = klass()
else:
self._cmd = cmd
self._cmd_execution = None
self.type = "CONTROLLER"
if self.name() == "Anneal":
self.add_argument("Time [s]", "float")
if self.name() == "Test":
self.add_argument("combo test", "combo", [{"value1": 0, "value2": 1}])
[docs] def is_connected(self):
return True
@task
def __call__(self, *args, **kwargs):
"""Call the command"""
self.emit("commandBeginWaitReply", (str(self.name()),))
self._cmd_execution = gevent.spawn(self._cmd, *args, **kwargs)
self._cmd_execution.link(self._cmd_done)
def _cmd_done(self, cmd_execution):
"""Handle the command execution.
Args:
(obj): Command execution greenlet.
"""
try:
try:
res = cmd_execution.get()
res = res if res else ""
except Exception:
self.log.exception("%s: execution failed", str(self.name()))
self.emit("commandFailed", (str(self.name()),))
else:
if isinstance(res, gevent.GreenletExit):
# command aborted
self.emit("commandFailed", (str(self.name()),))
else:
self.emit("commandReplyArrived", (str(self.name()), res))
finally:
self.emit("commandReady", (str(self.name()), ""))
[docs] def abort(self):
"""Abort the execution."""
if self._cmd_execution and not self._cmd_execution.ready():
self._cmd_execution.kill()
def value(self):
return None
[docs]class HWObjActuatorCommand(CommandObject):
"""Class for two state hardware objects"""
def __init__(self, name, hwobj):
super().__init__(name)
self._hwobj = hwobj
self.type = TWO_STATE_COMMAND_T
self.argument_type = ARGUMENT_TYPE_LIST
self._hwobj.connect("valueChanged", self._cmd_done)
self._running = False
def _get_action(self):
"""Return which action has to be executed.
Return:
(str): The name of the command
"""
values = [v.name for v in self._hwobj.VALUES]
values.remove("UNKNOWN")
values.remove(self._hwobj.get_value().name)
return self._hwobj.VALUES[values[0]]
@task
def __call__(self, *args, **kwargs):
"""Execute the action.
Args: None
Kwargs: None
"""
self._running = True
self.emit("commandBeginWaitReply", (str(self.name()),))
value = self._get_action()
self._hwobj.set_value(value, timeout=60)
def _cmd_done(self, state):
"""Handle the command execution.
Args:
(obj): Command execution greenlet.
"""
gevent.sleep(1)
try:
res = self._hwobj.get_value().name
except Exception:
self.emit("commandFailed", (str(self.name()),))
else:
if isinstance(res, gevent.GreenletExit):
self.emit("commandFailed", (str(self.name()),))
elif self._running:
self.emit("commandReplyArrived", (str(self.name()), res))
self._running = False
[docs] def value(self):
"""Return the current command value.
Return:
(str): The value as a string
"""
value = "UNKNOWN"
if hasattr(self._hwobj, "get_value"):
value = self._hwobj.get_value()
try:
return value.name
except AttributeError:
return value
return value
[docs]class AnnotatedCommand(CommandObject):
def __init__(self, beamline_action_ho, name, cmd_name):
self._beamline_action_ho = beamline_action_ho
self._name = name
self._cmd_name = cmd_name
self._value = ""
self._last_result = None
self._messages = []
self.task = None
def get_value(self):
return self._value
def set_last_result(self, result):
self._last_result = result
@property
def cmd_name(self):
return self._cmd_name
[docs]class BeamlineActions(HardwareObject):
def __init__(self, name):
HardwareObject.__init__(self, name)
self._annotated_commands = []
self._annotated_command_dict = {}
self._command_list = []
self._current_command = None
def _get_command_object_class(self, path_str):
parts = path_str.split(".")
_module_name = "mxcubecore." + ".".join(parts[:-1])
_cls_name = parts[-1]
self._annotated_commands.append(_cls_name)
# assume import from current module if only class name given (no module)
if len(parts) == 1:
_cls = getattr(sys.modules[__name__], _cls_name)
else:
_mod = importlib.import_module(_module_name)
_cls = getattr(_mod, _cls_name)
return _cls
[docs] def init(self): # noqa: C901
command_list = self.get_property("commands")
if isinstance(command_list, str):
command_list = ast.literal_eval(command_list.strip().replace("\n", ""))
for command in command_list:
action = command["command"].split(".")[-1]
attrname = camel_to_snake(action)
if command.get("disabled", False):
self.log.warning(f"Action {attrname} is disabled in the configuration.")
continue
if hasattr(self, attrname):
msg = f"Action {action} already exists"
self.log.warning(msg)
continue
if command["type"] == "annotated":
try:
_cls = self._get_command_object_class(command["command"])
except Exception:
self.log.exception(f"failed to load annotated action {action}")
else:
fname = camel_to_snake(_cls.__name__)
_cls_inst = _cls(self, fname.replace("_", " ").title(), fname)
self._annotated_command_dict[fname] = _cls_inst
setattr(self, attrname, getattr(_cls_inst, fname))
self._exports_config_list.append(attrname)
elif command["type"] == "controller":
try:
cmd = operator.attrgetter(command["command"])(self)
_cmd_obj = ControllerCommand(command["name"], cmd, command["name"])
except AttributeError:
try:
_cls = self._get_command_object_class(command["command"])
_cmd_obj = ControllerCommand(
command["name"], None, command["name"], klass=_cls
)
except Exception:
self.log.exception(f"failed to load controller action {action}")
continue
self._command_list.append(_cmd_obj)
setattr(self, attrname, _cmd_obj)
elif command["type"] == "actuator":
try:
cmd = operator.attrgetter(command["command"])
_cmd_obj = HWObjActuatorCommand(command["name"], cmd)
self._command_list.append(_cmd_obj)
except AttributeError:
pass
super().init()
def get_annotated_command(self, name):
return self._annotated_command_dict[name]
def get_annotated_commands(self):
return list(self._annotated_command_dict.values())
def _execute_annotated_command(self, name, args):
cmd = getattr(self, name, None)
self._annotated_command_dict[name].emit("commandBeginWaitReply", name)
_model = self.pydantic_model[name](**args)
_all_child_models = []
for _key in _model.dict().keys():
_all_child_models.append(getattr(_model, _key))
_t = gevent.spawn(cmd, *_all_child_models)
_t.link(self._command_done)
self._current_command = cmd.__self__
self._current_command.task = _t
[docs] def get_commands(self):
return self._command_list
def _execute_command(self, name, args):
try:
cmds = self.get_commands()
except Exception:
cmds = []
for cmd in cmds:
if cmd.name() == name:
try:
cmd.emit("commandBeginWaitReply", name)
logging.getLogger("user_level_log").info(
"Starting %s(%s)",
cmd.name(),
", ".join(map(str, args)),
)
cmd(*args)
except Exception as ex:
err = str(sys.exc_info()[1])
raise Exception(str(err)) from ex
[docs] def execute_command(self, name, args):
cmd = getattr(self, name, None)
if cmd:
self._execute_annotated_command(name, args)
else:
self._execute_command(name, args)
def _command_done(self, greenlet):
cmd_obj = self._annotated_command_dict[self._current_command.cmd_name]
result = ""
try:
result = greenlet.get()
except Exception:
self.log.exception("%s: execution failed", self._current_command.cmd_name)
cmd_obj.emit("commandFailed", (self._current_command.cmd_name,))
else:
self._current_command.set_last_result(result)
if isinstance(result, gevent.GreenletExit):
# command aborted
cmd_obj.emit("commandFailed", (self._current_command.cmd_name,))
result = ""
else:
cmd_obj.emit(
"commandReplyArrived", (self._current_command.cmd_name, result)
)
finally:
cmd_obj.emit(
"commandReady",
(self._current_command.cmd_name, result),
)
self._current_command = None
def abort_command(self, name):
if (
self._current_command
and self._current_command.cmd_name in self._annotated_command_dict
):
self._annotated_command_dict[self._current_command.cmd_name]
self._current_command.task.kill()
else:
for cmd in self.get_commands():
if cmd.name() == name:
cmd.abort()
break