Source code for mxcubecore.Command.Sardana

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.


"""Sardana Control System"""

from __future__ import absolute_import

import logging
import os

import gevent
from gevent.event import Event

from mxcubecore.dispatcher import saferef

try:
    import Queue as queue
except ImportError:
    import queue

gevent_version = list(map(int, gevent.__version__.split(".")))


from mxcubecore.CommandContainer import (
    ChannelObject,
    CommandObject,
    ConnectionError,
)

# This is a site specific module where some of the dependencies might not be capture by the ``pyproject.toml`` during installation

try:
    import PyTango
    from PyTango import (
        ConnectionFailed,
        DevFailed,
    )
except Exception:
    logging.getLogger("HWR").warning("Pytango is not available in this computer.")

try:
    import taurus
    from taurus import (
        Attribute,
        Device,
    )
except Exception:
    logging.getLogger("HWR").warning("Sardana is not available in this computer.")


__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"


def processSardanaEvents():
    while not SardanaObject._eventsQueue.empty():
        try:
            ev = SardanaObject._eventsQueue.get_nowait()
        except queue.Empty:
            break
        else:
            try:
                receiver_cb_ref = SardanaObject._eventReceivers[id(ev)]
                receiver_cb = receiver_cb_ref()
                if receiver_cb is not None:
                    try:
                        gevent.spawn(receiver_cb, ev)
                    except AttributeError:
                        logging.getLogger("HWR").exception("")
            except KeyError:
                logging.getLogger("HWR").exception("")


def wait_end_of_command(cmdobj):
    while (
        cmdobj.macrostate == SardanaMacro.RUNNING
        or cmdobj.macrostate == SardanaMacro.STARTED
    ):
        gevent.sleep(0.05)
    return cmdobj.door.result


def end_of_macro(macobj):
    macobj._reply_arrived_event.wait()


class AttributeEvent:
    def __init__(self, event):
        self.event = event


[docs]class SardanaObject(object): """Sardana Object""" _eventsQueue = queue.Queue() _eventReceivers = {} if gevent_version < [1, 3, 0]: _eventsProcessingTimer = getattr(gevent.get_hub().loop, "async")() else: _eventsProcessingTimer = gevent.get_hub().loop.async_() # start Sardana events processing timer _eventsProcessingTimer.start(processSardanaEvents) def object_listener(self, *args): ev = AttributeEvent(args) # NBNB self.update not defined SardanaObject._eventReceivers[id(ev)] = saferef.safe_ref(self.update) SardanaObject._eventsQueue.put(ev) SardanaObject._eventsProcessingTimer.send()
[docs]class SardanaMacro(CommandObject, SardanaObject, ChannelObject): """Sardana macro""" macroStatusAttr = None INIT, STARTED, RUNNING, DONE = range(4) def __init__(self, name, macro, doorname=None, username=None, **kwargs): super(SardanaMacro, self).__init__(name, username, **kwargs) self._reply_arrived_event = Event() self.macro_format = macro self.doorname = doorname self.door = None self.id_result = -1 self.macrostate = SardanaMacro.INIT self.doorstate = None self.t0 = 0 self.init_device() def init_device(self): self.door = Device(self.doorname) self.door.set_timeout_millis(10000) self.doorstate = self.door.state.name.upper() if self.macroStatusAttr is None: self.macroStatusAttr = self.door.getAttribute("State") self.macroStatusAttr.addListener(self.object_listener) def result_callback(self, received_event): val = received_event.attr_value.value if val is not None: self.emit("macroResultUpdated", received_event.attr_value.value) def __call__(self, *args, **kwargs): self._reply_arrived_event.clear() self.result = None wait = kwargs.get("wait", False) if self.door is None: self.init_device() logging.getLogger("HWR").debug( "Executing sardana macro: %s" % self.macro_format ) logging.getLogger("HWR").debug( " args=%s / kwargs=%s" % (str(args), str(kwargs)) ) try: fullcmd = self.macro_format + " " + " ".join([str(a) for a in args]) except Exception: logging.getLogger("HWR").info( " - Wrong format for macro arguments. Macro is %s / args are (%s)" % (self.macro_format, str(args)) ) logging.getLogger("HWR").exception("") return try: import time self.t0 = time.time() if self.doorstate in ["ON", "ALARM", "READY"]: self.id_result = self.door.subscribe_event( "Result", PyTango.EventType.CHANGE_EVENT, self.result_callback ) self.door.runMacro(fullcmd.split()) self.macrostate = SardanaMacro.STARTED self.emit("commandBeginWaitReply", (str(self.name()),)) else: logging.getLogger("HWR").error( "%s. Cannot execute. Door is not READY", str(self.name()) ) self.emit("commandFailed", (-1, self.name())) except TypeError: logging.getLogger("HWR").error( "%s. Cannot properly format macro code. Format is: %s, args are %s", str(self.name()), self.macro_format, str(args), ) self.emit("commandFailed", (-1, self.name())) except DevFailed as error_dict: logging.getLogger("HWR").error( "%s: Cannot run macro. %s", str(self.name()), error_dict ) self.emit("commandFailed", (-1, self.name())) except AttributeError as error_dict: logging.getLogger("HWR").error( "%s: MacroServer not running?, %s", str(self.name()), error_dict ) self.emit("commandFailed", (-1, self.name())) except Exception: logging.getLogger("HWR").exception( "%s: an error occured when calling Tango command %s", str(self.name()), self.macro_format, ) self.emit("commandFailed", (-1, self.name())) if wait: logging.getLogger("HWR").debug("... start waiting...") t = gevent.spawn(end_of_macro, self) t.get() logging.getLogger("HWR").debug("... end waiting...") return
[docs] def update(self, event): """update the macro command status: ``commandCanExecute``, ``commandReady``, ``commandNotReady``, ``commandReplyArrive``, ``commandReplyAbort`` and ``commandFailed``""" data = event.event[2] try: if not isinstance(data, PyTango.DeviceAttribute): # Events different than a value changed on attribute. # Taurus sends an event with attribute info. return # Handling macro state changed event doorstate = str(data.value) logging.getLogger("HWR").debug( "doorstate changed. it is %s" % str(doorstate) ) if doorstate != self.doorstate: self.doorstate = doorstate self.emit("commandCanExecute", (self.can_execute(),)) if doorstate in ["ON", "ALARM"]: self.emit("commandReady", ()) else: self.emit("commandNotReady", ()) if self.macrostate == SardanaMacro.STARTED and doorstate == "RUNNING": self.macrostate = SardanaMacro.RUNNING elif self.macrostate == SardanaMacro.RUNNING and ( doorstate in ["ON", "ALARM"] ): logging.getLogger("HWR").debug("Macro execution finished") self.macrostate = SardanaMacro.DONE self.result = self.door.result self.emit("commandReplyArrived", (self.result, str(self.name()))) if doorstate == "ALARM": self.emit("commandAborted", (str(self.name()),)) self._reply_arrived_event.set() elif ( self.macrostate == SardanaMacro.DONE or self.macrostate == SardanaMacro.INIT ): # already handled in the general case above pass else: logging.getLogger("HWR").debug("Macroserver state changed") self.emit("commandFailed", (-1, str(self.name()))) except ConnectionFailed: logging.getLogger("HWR").debug("Cannot connect to door %s" % self.doorname) self.emit("commandFailed", (-1, str(self.name()))) except Exception: logging.getLogger("HWR").exception("SardanaMacro / event handling problem.") self.emit("commandFailed", (-1, str(self.name())))
def abort(self): if self.door is not None: logging.getLogger("HWR").debug("SardanaMacro / aborting macro") self.door.abortMacro()
[docs] def is_connected(self): return self.door is not None
def can_execute(self): return self.door is not None and (self.doorstate in ["ON", "ALARM"])
[docs]class SardanaCommand(CommandObject): """SardanaCommand""" def __init__(self, name, command, taurusname=None, username=None, **kwargs): CommandObject.__init__(self, name, username, **kwargs) self.command = command self.taurusname = taurusname self.device = None def init_device(self): try: self.device = Device(self.taurusname) except DevFailed as traceback: last_error = traceback[-1] logging.getLogger("HWR").error( "%s: %s", str(self.name()), last_error["desc"] ) self.device = None else: try: self.device.ping() except ConnectionFailed: self.device = None raise ConnectionError def __call__(self, *args, **kwargs): self.emit("commandBeginWaitReply", (str(self.name()),)) if self.device is None: self.init_device() try: cmdObject = getattr(self.device, self.command) ret = cmdObject(*args) except DevFailed as error_dict: logging.getLogger("HWR").error( "%s: Tango, %s", str(self.name()), error_dict ) except Exception: logging.getLogger("HWR").exception( "%s: an error occured when calling Tango command %s", str(self.name()), self.command, ) else: self.emit("commandReplyArrived", (ret, str(self.name()))) return ret self.emit("commandFailed", (-1, self.name())) def abort(self): pass
[docs] def is_connected(self): return self.device is not None
[docs]class SardanaChannel(ChannelObject, SardanaObject): """Creates a Sardana Channel""" def __init__( self, name, attribute_name, username=None, uribase=None, polling=None, **kwargs ): super(SardanaChannel, self).__init__(name, username, **kwargs) class ChannelInfo(object): def __init__(self): super(ChannelInfo, self).__init__() self.attribute_name = attribute_name self.model = os.path.join(uribase, attribute_name) self.attribute = None self.value = None self.polling = polling self.info = ChannelInfo() self.info.minval = None self.info.maxval = None self.init_device() def init_device(self): try: self.attribute = Attribute(self.model) except DevFailed as traceback: self.imported = False logging.getLogger("HWR").exception("") return # read information try: if int(taurus.Release.version[0]) == 3: ranges = self.attribute.getConfig().getRanges() if ranges is not None and ranges[0] != "Not specified": self.info.minval = float(ranges[0]) if ranges is not None and ranges[-1] != "Not specified": self.info.maxval = float(ranges[-1]) elif int(taurus.Release.version[0]) > 3: # taurus 4 and beyond minval, maxval = self.attribute.getRange() if minval is not None: self.info.minval = minval.magnitude if maxval is not None: self.info.maxval = maxval.magnitude except Exception: logging.getLogger("HWR").info("info initialized. Cannot get limits") logging.getLogger("HWR").exception("") # prepare polling # if the polling value is a number set it as the taurus polling period if self.polling: if isinstance(self.polling, int): self.attribute.changePollingPeriod(self.polling) self.attribute.addListener(self.object_listener)
[docs] def get_value(self): return self._read_value()
def set_value(self, new_value): self._write_value(new_value) def _write_value(self, new_value): self.attribute.write(new_value) def _read_value(self): value = self.attribute.read().value return value def get_info(self): try: limits = self.attribute.getLimits() self.info.minval = limits[0].magnitude self.info.maxval = limits[1].magnitude except Exception: logging.getLogger("HWR").exception("") return self.info
[docs] def update(self, event): data = event.event[2] try: new_value = data.value if new_value is None: new_value = self.get_value() if isinstance(new_value, tuple): new_value = list(new_value) self.value = new_value self.emit("update", self.value) except AttributeError: # No value in data... this is probably a connection error logging.getLogger("HWR").exception("")
[docs] def is_connected(self): return self.attribute is not None
def channel_listener(self, *args): ev = AttributeEvent(args) SardanaChannel._eventReceivers[id(ev)] = saferef.safe_ref(self.update) SardanaChannel._eventsQueue.put(ev) SardanaChannel._eventsProcessingTimer.send()