Source code for mxcubecore.Command.Exporter

# -*- coding: utf-8 -*-
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""Exporter, ExporterChannel and ExporterCommand implementation"""

import logging

import gevent
from gevent.queue import Queue

from mxcubecore.CommandContainer import (
    ChannelObject,
    CommandObject,
)

from .exporter import ExporterClient
from .exporter.StandardClient import PROTOCOL

__copyright__ = """ Copyright © 2019 by the MXCuBE collaboration """
__license__ = "LGPLv3+"

EXPORTER_CLIENTS = {}


[docs]def start_exporter(address, port, timeout=3, retries=1): """Start the exporter""" global EXPORTER_CLIENTS if (address, port) not in EXPORTER_CLIENTS: client = Exporter(address, port, timeout) EXPORTER_CLIENTS[(address, port)] = client client.start() return client return EXPORTER_CLIENTS[(address, port)]
[docs]class Exporter(ExporterClient.ExporterClient, object): """Exporter class""" STATE_EVENT = "State" STATUS_EVENT = "Status" VALUE_EVENT = "Value" POSITION_EVENT = "Position" MOTOR_STATES_EVENT = "MotorStates" STATE_READY = "Ready" STATE_INITIALIZING = "Initializing" STATE_STARTING = "Starting" STATE_RUNNING = "Running" STATE_MOVING = "Moving" STATE_CLOSING = "Closing" STATE_REMOTE = "Remote" STATE_STOPPED = "Stopped" STATE_COMMUNICATION_ERROR = "Communication Error" STATE_INVALID = "Invalid" STATE_OFFLINE = "Offline" STATE_ALARM = "Alarm" STATE_FAULT = "Fault" STATE_UNKNOWN = "Unknown" def __init__(self, address, port, timeout=3, retries=1): super(Exporter, self).__init__(address, port, PROTOCOL.STREAM, timeout, retries) self.started = False self.callbacks = {} self.events_queue = Queue() self.events_processing_task = None
[docs] def start(self): """Start"""
[docs] def stop(self): """Stop""" self.disconnect()
[docs] def execute(self, *args, **kwargs): """Execute""" ret = ExporterClient.ExporterClient.execute(self, *args, **kwargs) return self._to_python_value(ret)
[docs] def get_state(self): """Read the state""" return self.execute("getState")
[docs] def read_property(self, *args, **kwargs): """Read a property""" ret = ExporterClient.ExporterClient.read_property(self, *args, **kwargs) return self._to_python_value(ret)
[docs] def reconnect(self): """Reconnect""" return
[docs] def on_disconnected(self): """Actions on disconnect"""
[docs] def register(self, name, cb): """Register to a callback""" if callable(cb): self.callbacks.setdefault(name, []).append(cb) if not self.events_processing_task: self.events_processing_task = gevent.spawn(self.process_events_from_queue)
def _to_python_value(self, value): """Convert exporter value to python one Args: value (str): String from the exporter """ if value is None: return value if "\x1f" in value: value = self.parse_array(value) try: value = list(map(int, value)) except (TypeError, ValueError): try: value = list(map(float, value)) except (TypeError, ValueError): pass else: if value == "false": value = False elif value == "true": value = True else: try: value = int(value) except (TypeError, ValueError): try: value = float(value) except (TypeError, ValueError): pass return value
[docs] def on_event(self, name, value, timestamp): """Put the event in the queue Args: name: Name value: Value timestamp: Timestamp """ self.events_queue.put((name, value))
[docs] def process_events_from_queue(self): """Process events from the queue""" while True: try: name, value = self.events_queue.get() except Exception: return for cb in self.callbacks.get(name, []): try: cb(self._to_python_value(value)) except Exception: msg = "Exception while executing callback {} for event {}".format( cb, name ) logging.exception(msg) continue
[docs]class ExporterCommand(CommandObject): """Command implementation for Exporter""" def __init__( self, name, command, username=None, address=None, port=None, timeout=3, **kwargs ): CommandObject.__init__(self, name, username, **kwargs) self.command = command self.__exporter = start_exporter(address, port, timeout) msg = "Attaching Exporter command: {} {}".format(address, name) logging.getLogger("HWR").debug(msg) def __call__(self, *args, **kwargs): self.emit("commandBeginWaitReply", (str(self.name()),)) try: ret = self.__exporter.execute(self.command, args, kwargs.get("timeout", -1)) except Exception: self.emit("commandFailed", (-1, self.name())) raise else: self.emit("commandReplyArrived", (ret, str(self.name()))) return ret
[docs] def abort(self): """Abort"""
[docs] def get_state(self): """FRead the state Returns: (str): State """ return self.__exporter.get_state()
[docs] def is_connected(self): """Check if connected. Returns: (bool): True if connected """ return self.__exporter.is_connected()
[docs]class ExporterChannel(ChannelObject): """Channel implementation for Exporter""" def __init__( self, name, attribute_name, username=None, address=None, port=None, timeout=3, **kwargs, ): ChannelObject.__init__(self, name, username, **kwargs) self.__exporter = start_exporter(address, port, timeout) self.attribute_name = attribute_name self.value = None self.__exporter.register(attribute_name, self.update) msg = "Attaching Exporter channel: {} {} ".format(address, name) logging.getLogger("HWR").debug(msg) self.update()
[docs] def update(self, value=None): """Emit signal update when value changed""" value = value or self.get_value() if isinstance(value, tuple): value = list(value) self.value = value self.emit("update", value)
[docs] def get_value(self): """Get the value Returns: (str): The value """ value = self.__exporter.read_property(self.attribute_name) return value
[docs] def set_value(self, value): """Set a value Args: value (str): Value to set """ self.__exporter.write_property(self.attribute_name, value)
[docs] def is_connected(self): """Check if connected. Returns: (bool): True if connected. """ return self.__exporter.is_connected()