Source code for mxcubecore.Command.Tango

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

import logging

import gevent
import gevent.event

try:
    import Queue as queue
except ImportError:
    import queue

import numpy

from mxcubecore import Poller
from mxcubecore.CommandContainer import (
    ChannelObject,
    CommandObject,
    ConnectionError,
)
from mxcubecore.dispatcher import saferef

gevent_version = list(map(int, gevent.__version__.split(".")))

log = logging.getLogger("HWR")

try:
    import tango
    from tango.gevent import DeviceProxy
except ImportError:
    logging.getLogger("HWR").warning("Tango support is not available.")


__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"

log = logging.getLogger("HWR")


[docs]class TangoCommand(CommandObject): def __init__(self, name, command, tangoname=None, username=None, **kwargs): CommandObject.__init__(self, name, username, **kwargs) self.command = command self.device_name = tangoname self.device = None def init_device(self): try: self.device = DeviceProxy(self.device_name) except tango.DevFailed: logging.getLogger("HWR").exception( "Failed to set-up tango command %s on %s device", self.name(), self.device_name, ) self.device = None else: try: self.device.ping() except tango.ConnectionFailed: self.device = None raise ConnectionError def __call__(self, *args, **kwargs): self.emit("commandBeginWaitReply", (str(self.name()),)) if self.device is None: # TODO: emit commandFailed # beware of infinite recursion with Sample Changer # (because of procedure exception cleanup...) self.init_device() try: tango_cmd_object = getattr(self.device, self.command) ret = tango_cmd_object( *args ) # eval('self.device.%s(*%s)' % (self.command, args)) except tango.DevFailed: logging.getLogger("HWR").exception( "Failed to run tango command %s on %s device", self.name(), self.device_name, ) except Exception: logging.getLogger("HWR").exception( "%s: an error occured when calling Tango command %s", str(self.name()), self.command, ) else: self.emit("commandReplyArrived", (ret, str(self.name()))) return ret self.emit("commandFailed", (-1, self.name())) def abort(self): pass def set_device_timeout(self, timeout): if self.device is None: self.init_device() self.device.set_timeout_millis(timeout)
[docs] def is_connected(self): return self.device is not None
def process_tango_events(): while not TangoChannel._tangoEventsQueue.empty(): try: ev = TangoChannel._tangoEventsQueue.get_nowait() except queue.Empty: break else: try: receiverCbRef = TangoChannel._eventReceivers[id(ev)] receiverCb = receiverCbRef() if receiverCb is not None: try: gevent.spawn(receiverCb, ev.event.attr_value.value) except AttributeError: logging.getLogger("HWR").exception("") except KeyError: logging.getLogger("HWR").exception("") class E: def __init__(self, event): self.event = event def _device_has_attribute(device: DeviceProxy, attribute_name: str) -> bool: """Check if a tango device has an attribute.""" try: device.attribute_query(attribute_name) except tango.DevFailed as ex: if ex.args[0].reason == "API_AttrNotFound": # query failed with 'attribute not found' error return False # unexpected exception, re-raise raise # attribute query was successful, thus we know the attribute exits return True
[docs]class TangoChannel(ChannelObject): _tangoEventsQueue = queue.Queue() _eventReceivers = {} _tangoEventsProcessingTimer = gevent.get_hub().loop.async_() # start Tango events processing timer _tangoEventsProcessingTimer.start(process_tango_events) def __init__( self, name, attribute_name, tangoname=None, username=None, polling=None, timeout=10000, **kwargs, ): ChannelObject.__init__(self, name, username, **kwargs) self.attribute_name = attribute_name self.device_name = tangoname self.device = None self.value = Poller.NotInitializedValue self.poller = None self.polling = polling self.polling_timer = None self.polling_events = False self.timeout = int(timeout) self.read_as_str = kwargs.get("read_as_str", False) self._device_initialized = gevent.event.Event() self.init_device() self.continue_init(None)
[docs] def stop_polling(self): """Stop polling the underlying Tango attribute. If this channel is currently polling its tango attribute, via 'attribute read' calls, stop polling. If no polling is active, this method does nothing. """ if self.poller is not None: self.poller.stop() self.poller = None
def init_poll_failed(self, e, poller_id): self._device_initialized.clear() logging.warning( "%s/%s (%s): could not complete init. " "(hint: device server is not running, or has to be restarted)", self.device_name, self.attribute_name, self.name(), ) self.init_poller = self.init_poller.restart(3000) def continue_init(self, _): if isinstance(self.polling, int): self.raw_device = DeviceProxy(self.device_name) self.poller = Poller.poll( self.poll, polling_period=self.polling, value_changed_callback=self.update, error_callback=self.poll_failed, ) else: if self.polling == "events": # try to register event try: self.polling_events = True self.device.subscribe_event( self.attribute_name, tango.EventType.CHANGE_EVENT, self, [], True, ) except Exception: logging.getLogger("HWR").exception("could not subscribe event") self._device_initialized.set() def init_device(self): try: self.device = DeviceProxy(self.device_name) except tango.DevFailed: self.imported = False logging.getLogger("HWR").exception( "Failed to set-up tango channel %s on %s device", self.name(), self.device_name, ) else: self.imported = True try: self.device.ping() except tango.ConnectionFailed: self.device = None raise ConnectionError else: self.device.set_timeout_millis(self.timeout) # check that the attribute exists (to avoid Abort in PyTango grrr) if not _device_has_attribute(self.device, self.attribute_name): logging.getLogger("HWR").error( "no attribute %s in Tango device %s", self.attribute_name, self.device_name, ) self.device = None def push_event(self, event): if ( event.attr_value is None or event.err or event.attr_value.quality != tango.AttrQuality.ATTR_VALID ): return ev = E(event) TangoChannel._eventReceivers[id(ev)] = saferef.safe_ref(self.update) TangoChannel._tangoEventsQueue.put(ev) TangoChannel._tangoEventsProcessingTimer.send() def poll(self): if self.read_as_str: value = self.raw_device.read_attribute( self.attribute_name, tango.DeviceAttribute.ExtractAs.String ).value else: value = self.raw_device.read_attribute(self.attribute_name).value return value def poll_failed(self, e, poller_id): self.emit("update", None) poller = Poller.get_poller(poller_id) if poller is not None: poller.restart(1000) def get_info(self): self._device_initialized.wait(timeout=3) return self.device.get_attribute_config(self.attribute_name)
[docs] def update(self, value=Poller.NotInitializedValue): # start with checking if we have a numpy array, as comparing # numpy.ndarray to Poller.NotInitializedValue raises a ValueError exception if isinstance(value, numpy.ndarray): value = value.tolist() elif value == Poller.NotInitializedValue: value = self.get_value() elif isinstance(value, tuple): value = list(value) self.value = value self.emit("update", value)
[docs] def get_value(self): if self.read_as_str: value = self.device.read_attribute( self.attribute_name, tango.DeviceAttribute.ExtractAs.String ).value else: value = self.device.read_attribute(self.attribute_name).value if isinstance(value, numpy.ndarray): if not numpy.array_equal(value, self.value): self.update(value) elif value != self.value: self.update(value) return value
def set_value(self, new_value): self.device.write_attribute(self.attribute_name, new_value)
[docs] def is_connected(self): return self.device is not None