Source code for mxcubecore.HardwareObjects.XMLRPCServer

"""
XMLRPC-Server that makes it possible to access core features of MXCuBE like
the queue from external applications. The Server is implemented as a
hardware object and is configured with an XML-file. See the example
configuration XML for more information.
"""

import atexit
import inspect
import json
import logging
import pkgutil
import socket
import sys
import time
import types
import xml
from functools import reduce

import gevent
import jsonpickle

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects.SecureXMLRpcRequestHandler import (
    SecureXMLRpcRequestHandler,
)

if sys.version_info > (3, 0):
    from xmlrpc.server import SimpleXMLRPCServer
else:
    from SimpleXMLRPCServer import SimpleXMLRPCServer


__author__ = "Marcus Oskarsson, Matias Guijarro"
__copyright__ = "Copyright 2012, ESRF"
__credits__ = ["MxCuBE collaboration"]

__version__ = ""
__maintainer__ = "Marcus Oskarsson"
__email__ = "marcus.oscarsson@esrf.fr"
__status__ = "Draft"


[docs]class XMLRPCServer(HardwareObject): def __init__(self, name): HardwareObject.__init__(self, name) self.host = None self.port = None self.all_interfaces = None self.enforceUseOfToken = None self.wokflow_in_progress = True self.xmlrpc_prefixes = set() self.current_entry_task = None self.host = None self.use_token = True atexit.register(self.close) self.gphl_workflow_status = None
[docs] def init(self): """ Method inherited from HardwareObject, called by framework-2. """ self.all_interfaces = self.get_property("all_interfaces", False) # Listen on all interfaces if <all_interfaces>True</all_interfaces> # otherwise only on the interface corresponding to socket.gethostname() if self.all_interfaces: host = "" else: host = socket.gethostname() self.host = host self.port = self.get_property("port") self.use_token = self.get_property("use_token", True) try: self.open() except Exception: self.log.exception("Can't start XML-RPC server")
def close(self): try: self.xmlrpc_server_task.kill() self._server.server_close() del self._server except AttributeError: self.log.exception("") def open(self): # The value of the member self.port is set in the xml configuration # file. The initialization is done by the baseclass HardwareObject. if hasattr(self, "_server"): return self.xmlrpc_prefixes = set() if self.use_token: self._server = SimpleXMLRPCServer( (self.host, int(self.port)), requestHandler=SecureXMLRpcRequestHandler, logRequests=False, allow_none=True, ) else: self._server = SimpleXMLRPCServer( (self.host, int(self.port)), logRequests=False, allow_none=True ) msg = "XML-RPC server listening on: %s:%s" % (self.host, self.port) self.log.info(msg) self._connect_gphl_workflow_finished() self._server.register_introspection_functions() self._server.register_function(self.start_queue) self._server.register_function(self.log_message) self._server.register_function(self.is_queue_executing) self._server.register_function(self.queue_execute_entry_with_id) self._server.register_function(self.queue_set_workflow_lims_id) self._server.register_function(self.shape_history_get_grid) self._server.register_function(self.shape_history_set_grid_data) self._server.register_function(self.beamline_setup_read) self._server.register_function(self.get_default_path_template) self._server.register_function(self.get_default_acquisition_parameters) self._server.register_function(self.get_diffractometer_positions) self._server.register_function(self.get_resolution_limits) self._server.register_function(self.move_diffractometer) self._server.register_function(self.save_snapshot) self._server.register_function(self.save_multiple_snapshots) self._server.register_function(self.save_twelve_snapshots_script) self._server.register_function(self.cryo_temperature) self._server.register_function(self.flux) self._server.register_function(self.check_for_beam) self._server.register_function(self.set_beam_size) self._server.register_function(self.get_beam_size) self._server.register_function(self.get_available_beam_size) self._server.register_function(self.set_aperture) self._server.register_function(self.get_aperture) self._server.register_function(self.get_aperture_list) self._server.register_function(self.get_cp) self._server.register_function(self.save_current_pos) self._server.register_function(self.anneal) self._server.register_function(self.open_dialog) self._server.register_function(self.workflow_end) self._server.register_function(self.dozor_batch_processed) self._server.register_function(self.dozor_status_changed) self._server.register_function(self.processing_status_changed) self.image_num = 0 self._server.register_function(self.get_image_num, "get_image_num") self._server.register_function(self.set_zoom_level) self._server.register_function(self.get_zoom_level) self._server.register_function(self.centre_beam) self._server.register_function(self.get_gphl_workflow_status) self._server.register_function(self.add_xray_centring) self._server.register_function(self.add_gphl_workflow) self._server.register_function(self.clear_ispyb_client_group_id) self._server.register_function(self.set_characterisation_result) self._server.register_function(self.set_rotation_axis_position) self._server.register_function(self.get_current_cd_crystal_id) # Register functions from modules specified in <apis> element apis = self.get_property("apis", {}) for api in apis.get("api"): recurse = api.get("recurse", True) self._register_module_functions(api.get("module"), recurse=recurse) self.xmlrpc_server_task = gevent.spawn(self._server.serve_forever) self.beamcmds_hwobj = self.get_object_by_role("beamcmds") def anneal(self, time): cryoshutter_hwobj = self.get_object_by_role("cryoshutter") try: cryoshutter_hwobj.getCommandObject("anneal")(time) except Exception as ex: self.log.exception(str(ex)) raise return True def _add_to_queue(self, task, set_on=True): """ Adds the TaskNode objects contained in the list of TaskNodes passed in <task>. The TaskNodes are marked as activated in the queue if <set_on> is True and to inactivated if False. :param task: TaskNode object to add to queue :type parent: TaskNode :param set_on: Mark TaskNode as activated if True and as inactivated if false. :type set_on: bool :returns: True on success otherwise False :rtype: bool """ # The exception is re raised so that it will # be sent to the client. try: self.emit("add_to_queue", (task, None, set_on)) except Exception as ex: self.log.exception(str(ex)) raise return True
[docs] def start_queue(self): """ Starts the queue execution. :returns: True on success otherwise False :rtype: bool """ try: self.emit("start_queue") except Exception as ex: self.log.exception(str(ex)) raise return True
[docs] def log_message(self, message, level="info"): """ Logs a message in the user_level_log of MxCuBE, normally displayed at the bottom of the MxCuBE window. :param message: The message to log :type parent: str :param message: The log level, one of the strings: 'info'. 'warning', 'error' :type parent: str :returns: True on success otherwise False :rtype: bool """ status = True if level == "info": logging.getLogger("user_level_log").info(message) elif level == "warning": logging.getLogger("user_level_log").warning(message) elif level == "error": logging.getLogger("user_level_log").error(message) else: status = False return status
def _model_add_child(self, parent_id, child): """ Adds the model node task to parent_id. :param parent_id: The id of the parent. :type parent_id: int :param child: The TaskNode object to add. :type child: TaskNode :returns: The id of the added TaskNode object. :rtype: int """ try: node_id = HWR.beamline.queue_model.add_child_at_id(parent_id, child) except Exception as ex: self.log.exception(str(ex)) raise return node_id def _model_get_node(self, node_id): """ :returns the TaskNode object with the node id <node_id> :rtype: TaskNode """ try: node = HWR.beamline.queue_model.get_node(node_id) except Exception as ex: self.log.exception(str(ex)) raise return node
[docs] def queue_execute_entry_with_id(self, node_id, use_async=False): """ Execute the entry that has the model with node id <node_id>. :param node_id: The node id of the model to find. :type node_id: int """ try: model = HWR.beamline.queue_model.get_node(node_id) entry = HWR.beamline.queue_manager.get_entry_with_model(model) if entry: self.current_entry_task = HWR.beamline.queue_manager.execute_entry( entry, use_async=use_async ) except Exception as ex: self.log.exception(str(ex)) raise return True
[docs] def queue_set_workflow_lims_id(self, node_id, lims_id): """ Set lims id of workflow node with id <node_id> :param node_id: The node id of the workflow node :type node_id: int :param lims_id: The lims id :type lims_id: int """ try: model = HWR.beamline.queue_model.get_node(node_id) model.lims_id = lims_id except Exception as ex: self.log.exception(str(ex)) raise else: return True
[docs] def is_queue_executing(self, node_id=None): """ :returns: True if the queue is executing otherwise False :rtype: bool """ try: return HWR.beamline.queue_manager.is_executing(node_id) except Exception as ex: self.log.exception(str(ex)) raise
def queue_status(self): pass
[docs] def shape_history_get_grid(self, sid): """ :param sid: Shape id :returns: Grid with id <sid> :rtype: dict Format of the returned dictionary: {'id': id, 'dx_mm': float, 'dy_mm': float, 'steps_x': int, 'steps_y': int, 'x1': float, 'y1': float, 'angle': float} """ grid_dict = HWR.beamline.sample_view.get_shape(sid).as_dict() grid_dict.update({"result": None}) return grid_dict
def shape_history_set_grid_data(self, key, result_data, data_file_path=None): if isinstance(result_data, list): result = {} for result in result_data.items(): # int_based_result is not defined int_based_result[int(result[0])] = result[1] else: result = result_data HWR.beamline.sample_view.set_grid_data(key, result, data_file_path) return True
[docs] def get_cp(self): """ :returns: a json encoded list with all centred positions """ cplist = [] points = HWR.beamline.sample_view.get_points() for point in points: cp = point.get_centred_positions()[0].as_dict() cplist.append(cp) json_cplist = json.dumps(cplist) return json_cplist
def _getattr_from_path(self, obj, attr, delim="/"): """Recurses through an attribute chain to get the attribute.""" return reduce(getattr, attr.split(delim), obj) def beamline_setup_read(self, path): value = None if path.strip("/").endswith("default-acquisition-parameters"): value = jsonpickle.encode(self.get_default_acquisition_parameters()) elif path.strip("/").endswith("default-path-template"): value = jsonpickle.encode(self.get_default_path_template()) else: try: path = path[1:] if path[0] == "/" else path ho = self._getattr_from_path(HWR, path) value = ho.get_value() except: self.log.exception("Could no get %s " % str(path)) return value def get_default_path_template(self): return HWR.beamline.get_default_path_template() def get_default_acquisition_parameters(self): return HWR.beamline.get_default_acquisition_parameters() def workflow_set_in_progress(self, state): if state: self.wokflow_in_progress = True else: self.wokflow_in_progress = False def get_resolution_limits(self): return HWR.beamline.resolution.get_limits() def get_diffractometer_positions(self): return HWR.beamline.sample_view.get_positions() def move_diffractometer(self, roles_positions_dict): HWR.beamline.diffractometer.set_value_motors(roles_positions_dict) return True def save_twelve_snapshots_script(self, path): path = path[14:] # NBNB: Temporary fix, to be addressed in calling code self.log.info("Taking 6 snapshots to be saved in %s " % str(path)) HWR.beamline.diffractometer.run_custom_script("Take6Snapshots, " + path) def save_multiple_snapshots(self, path_list, show_scale=False): self.log.info("Taking snapshot %s " % str(path_list)) try: for angle, path in path_list: HWR.beamline.diffractometer.omega.set_value(angle) # give some time to get the snapshot time.sleep(1) HWR.beamline.diffractometer.wait_status_ready() self.save_snapshot(path, show_scale, handle_light=False) except Exception as ex: self.log.exception("Could not take snapshot %s " % str(ex)) def save_snapshot(self, imgpath, show_scale=False, handle_light=True): res = True self.log.info("Taking snapshot %s " % str(imgpath)) try: if show_scale: HWR.beamline.sample_view.save_snapshot(imgpath) else: HWR.beamline.sample_view.save_snapshot(imgpath, overlay=False, bw=False) except Exception as ex: self.log.exception("Could not take snapshot %s " % str(ex)) res = False finally: pass return res
[docs] def save_current_pos(self): """ Saves the current position as a centered position. """ self.log.debug("Saving position via XMLRPC") HWR.beamline.sample_view.centring_done() HWR.beamline.sample_view.accept_centring() return True
def cryo_temperature(self): return HWR.beamline.cryo.get_value() def flux(self): flux = HWR.beamline.flux.get_value() if flux is None: flux = 0 return float(flux) def check_for_beam(self): return HWR.beamline.flux.is_beam()
[docs] def set_beam_size(self, size): """Set the beam size. Args: size (list): Width, height or (str): Size label. """ HWR.beamline.beam.set_value(size) return True
[docs] def get_beam_size(self): """Get the beam size [um], its shape and label. Returns: (tuple): (width, height, shape, label), with types (float, float, str, str) """ return HWR.beamline.beam.get_value_xml()
[docs] def get_available_beam_size(self): """Get the available predefined beam sizes. Returns: (dict): Dictionary with list of available beam size labels and the corresponding size (width,height) tuples. {"label": [str, str, ...], "size": [(w,h), (w,h), ...]} """ return HWR.beamline.beam.get_defined_beam_size()
def set_aperture(self, pos_name): HWR.beamline.beam.set_value(pos_name) return True def get_aperture(self): return HWR.beamline.beam.get_value()[-1] def get_aperture_list(self): return HWR.beamline.beam.get_available_size()["values"]
[docs] def open_dialog(self, dict_dialog): """ Opens the workflow dialog in mxCuBE. This call blocks util the dialog is ended by the user. """ return_map = {} workflow_hwobj = HWR.beamline.workflow if workflow_hwobj is not None: return_map = workflow_hwobj.open_dialog(dict_dialog) self.emit("open_dialog", dict_dialog) return return_map
[docs] def workflow_end(self): """ Notify the workflow HO that the workflow has finished. """ workflow_hwobj = HWR.beamline.workflow if workflow_hwobj is not None: workflow_hwobj.workflow_end()
def dozor_batch_processed(self, dozor_batch_dict): HWR.beamline.online_processing.batch_processed(dozor_batch_dict) def dozor_status_changed(self, status): HWR.beamline.online_processing.set_processing_status(status) def processing_status_changed(self, collection_id, method, status, msg=""): for queue_entry in HWR.beamline.queue_model.get_all_dc_queue_entries(): data_model = queue_entry.get_data_model() if data_model.id == collection_id: prefix = data_model.acquisitions[0].path_template.get_image_file_name() prefix = prefix.replace("%05d", "#####") if status in ("started", "success"): logging.getLogger("user_level_log").info( "EDNA %s: processing of data collection %s %s %s" % (method, prefix, status, msg) ) elif status == "failed": logging.getLogger("user_level_log").error( "EDNA %s: processing of data collection %s %s %s" % (method, prefix, status, msg) ) queue_entry.add_processing_msg( str(time.strftime("%Y-%m-%d %H:%M:%S")), method, status, msg ) def image_taken(self, image_num): self.image_num = image_num def get_image_num(self): return self.image_num
[docs] def set_zoom_level(self, pos): """ Sets the zoom to a pre-defined level. """ zoom = HWR.beamline.diffractometer.zoom zoom.set_value(zoom.value_to_enum(pos))
[docs] def get_zoom_level(self): """ Returns the zoom level. """ return HWR.beamline.diffractometer.zoom.get_value().value
def _register_module_functions(self, module_name, recurse=True, prefix=""): log = logging.getLogger("HWR") # log.info("Registering functions in module %s with XML-RPC server" % module_name) if module_name not in sys.modules: __import__(module_name) module = sys.modules[module_name] if not hasattr(module, "xmlrpc_prefix"): log.error( ( 'Module %s has no attribute "xmlrpc_prefix": cannot ' + "register its functions. Skipping" ) % module_name ) else: prefix += module.xmlrpc_prefix if len(prefix) > 0 and prefix[-1] != "_": prefix += "_" if prefix in self.xmlrpc_prefixes: msg = "Prefix %s already used: cannot register for module %s" % ( prefix, module_name, ) log.error(msg) raise Exception(msg) self.xmlrpc_prefixes.add(prefix) for f in inspect.getmembers(module, inspect.isfunction): if f[0][0] != "_": xmlrpc_name = prefix + f[0] # log.info( # "Registering function %s.%s as XML-RPC function %s" # % (module_name, f[1].__name__, xmlrpc_name) # ) # Bind method to this XMLRPCServer instance but don't set attribute # This is sufficient to register it as an xmlrpc function. bound_method = types.MethodType(f[1], self) self._server.register_function(bound_method, xmlrpc_name) # TODO: Still need to test with deeply-nested modules, in particular that # modules and packages are both handled correctly in complex cases. if recurse and hasattr(module, "__path__"): sub_modules = pkgutil.walk_packages(module.__path__) try: sub_module = next(sub_modules) self._register_module_functions( module_name + "." + sub_module[1], recurse=False, prefix=prefix ) except StopIteration: pass def set_token(self, token): SecureXMLRpcRequestHandler.setReferenceToken(token) def clear_ispyb_client_group_id(self): HWR.beamline.lims.group_id = None def set_characterisation_result(self, characterisation_result): HWR.beamline.characterisation.characterisationResult = ( xml.sax.saxutils.unescape(characterisation_result) )
[docs] def add_xray_centring(self, parent_node_id, **centring_parameters): """Add Xray centring to queue.""" from mxcubecore.model import queue_model_objects as qmo xc_model = qmo.XrayCentring2(**centring_parameters) child_id = HWR.beamline.queue_model.add_child_at_id(parent_node_id, xc_model) return child_id
[docs] def add_gphl_workflow(self, parent_node_id, task_dict, workflow_id): """Add GPhL workflow to queue.""" self.workflow_id = workflow_id from mxcubecore.model import queue_model_objects as qmo gphl_model = qmo.GphlWorkflow() parent_model = HWR.beamline.queue_model.get_node(int(parent_node_id)) sample_model = parent_model.get_sample_node() gphl_model.init_from_task_data(sample_model, task_dict) child_id = HWR.beamline.queue_model.add_child_at_id(parent_node_id, gphl_model) self.gphl_workflow_status = "RUNNING" self._connect_gphl_workflow_finished() return child_id
def _connect_gphl_workflow_finished(self): gphl_workflow = HWR.beamline.gphl_workflow try: self.disconnect( gphl_workflow, "gphl_workflow_finished", self._async_job_completed, ) except KeyError: pass self.connect( gphl_workflow, "gphl_workflow_finished", self._async_job_completed, ) def _async_job_completed(self, job_status): self.gphl_workflow_status = job_status def get_gphl_workflow_status(self): return self.gphl_workflow_status def set_rotation_axis_position(self, value: float): HWR.beamline.sample_view.set_rotation_axis_position(value)
[docs] def centre_beam(self): """ Centers the beam using the beamcmds hardware object. """ actions = HWR.beamline.beamline_actions.get_object_by_role("controller") actions.centrebeam()
def get_current_cd_crystal_id(self): return HWR.beamline.harvester.get_current_crystal_id()