Source code for mxcubecore.HardwareObjects.DataPublisher

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
import json
from enum import (
    Enum,
    unique,
)

import gevent
import redis

from mxcubecore.BaseHardwareObjects import HardwareObject


[docs]@unique class PlotType(Enum): """ Defines default plot """ SCATTER = "scatter"
[docs]class PlotDim(Enum): """ Defines data dimension """ ONE_D = 1 TWO_D = 2
[docs]class DataType(Enum): """ Defines available data types """ FLOAT = "float"
[docs]class FrameType(Enum): """ Enum defining the message frame types """ DATA = "data" START = "start" STOP = "stop"
[docs]def one_d_data(x, y): """ Convenience function for creating x, y data """ return {"x": x, "y": y}
[docs]def two_d_data(x, y): """ Convenience function for creating x, y, z data """ return {"x": x, "y": y, "z": z}
[docs]class DataPublisher(HardwareObject): """ DataPublisher handles data publishing """ def __init__(self, name): super(DataPublisher, self).__init__(name) self._r = None self._subsribe_task = None
[docs] def init(self): """ FWK2 Init method """ super(DataPublisher, self).init() rhost = self.get_property("host", "localhost") rport = self.get_property("port", 6379) rdb = self.get_property("db", 11) self._r = redis.Redis( host=rhost, port=rport, db=rdb, encoding="utf-8", decode_responses=True ) if not self._subsribe_task: self._subsribe_task = gevent.spawn(self._handle_messages)
def _handle_messages(self): """ Listens for published data and handles the data. """ pubsub = self._r.pubsub(ignore_subscribe_messages=True) pubsub.psubscribe("HWR_DP_NEW_DATA_POINT_*") _data = {} # The descriptions of active sources for fast access # while publishing data active_source_desc = {} for message in pubsub.listen(): if message: try: redis_channel = message["channel"] _id = redis_channel.split("_")[-1] data = json.loads(message["data"]) if data["type"] == FrameType.START.value: _data[redis_channel] = {"x": [], "y": []} self._update_description(_id, {"running": True}) # Clear previous data so that we are not acumelating # with previously published data self._clear_data(_id) self.emit( "start", self.get_description(_id, include_data=True)[0] ) active_source_desc[redis_channel] = self._get_description(_id) elif data["type"] == FrameType.STOP.value: self._update_description(_id, {"running": False}) self.emit( "end", self.get_description(_id, include_data=True)[0] ) active_source_desc.pop(redis_channel) elif data["type"] == FrameType.DATA.value: _data[redis_channel] = { "x": _data[redis_channel]["x"] + [data["data"]["x"]], "y": _data[redis_channel]["y"] + [data["data"]["y"]], } self.emit( "data", {"id": _id, "data": data["data"]}, ) self._append_data( _id, data["data"], active_source_desc[redis_channel] ) else: msg = "Unknown frame type %s" % message self.log.error(msg) except Exception: msg = "Could not parse data in %s" % message self.log.exception(msg) def _remove_available(self, _id): """ Remove source with _id from list of available sources Args: _id (str): The id of the source to remove """ sources = self._get_available() sources = {} if not sources else sources sources.pop(_id) self._r.set("HWR_DP_PUBLISHERS", json.dumps(sources)) def _add_avilable(self, _id): """ Add source with _id to list of available sources Args: _id (str): The id of the sources to remove """ sources = self._get_available() sources = {} if not sources else sources sources[_id] = "HWR_DP_NEW_DATA_POINT_%s" % _id self._r.set("HWR_DP_PUBLISHERS", json.dumps(sources)) def _get_available(self): """ Returns: (dict): Where the key is the id of source and the value the channel name to publish data to. """ sources = self._r.get("HWR_DP_PUBLISHERS") sources = json.loads(sources) if sources else {} return sources def _set_description(self, _id, desc): """ Sets the description of source with _id to desc Args: _id (str): The id of the source to remove desc (dict): "id": str, "channel": str, "name": str, "data_type": str "data_dim": float "plot_type": str "sample_rate": float "content_type": str "range": list (min, max) "meta": str "running": boolean, """ self._r.set("HWR_DP_%s_DESCRIPTION" % _id, json.dumps(desc)) def _get_description(self, _id): """ Return the description of source with _id Returns: (dict): "id": str, "channel": str, "name": str, "data_type": str "data_dim": float "plot_type": str "sample_rate": float "content_type": str "range": list (min, max) "meta": str "running": boolean, """ return json.loads(self._r.get("HWR_DP_%s_DESCRIPTION" % _id)) def _update_description(self, _id, data): """ Update the description of source with _id with data Args: _id (str): The id of the source to remove desc (dict): with key, value pairs to update """ desc = self._get_description(_id) desc.update(data) self._set_description(_id, desc) def _append_data(self, _id, data, desc): """ Append data to source with _id Args: _id (str): The id of the source to remove desc (dict): Publisher description data: x, y, (z) data to append """ self._r.rpush("HWR_DP_%s_DATA_X" % _id, data.get("x", float("nan"))) self._r.rpush("HWR_DP_%s_DATA_Y" % _id, data.get("y", float("nan"))) if desc["data_dim"] > 1: self._r.rpush("HWR_DP_%s_DATA_Z" % _id, data.get("z", float("nan"))) def _clear_data(self, _id): """ Clear data of source with _id """ desc = self._get_description(_id) self._r.delete("HWR_DP_%s_DATA_X" % _id) self._r.delete("HWR_DP_%s_DATA_Y" % _id) if desc["data_dim"] > 1: self._r.delete("HWR_DP_%s_DATA_Z" % _id) def _publish(self, _id, data): """ Publish data to source with _id Args: _id (str): The id of the source to remove data: x, y, (z) data to append """ self._r.publish("HWR_DP_NEW_DATA_POINT_%s" % _id, json.dumps(data)) def register( self, _id, name, channel, axis_labels=["x", "y", "z"], data_type=DataType.FLOAT, data_dim=PlotDim.ONE_D, plot_type=PlotType.SCATTER, content_type="", sample_rate=0.5, _range=(None, None), meta={}, ): plot_description = { "id": _id, "name": name, "axis_labels": axis_labels, "channel": channel, "data_type": data_type.value, "data_dim": data_dim.value, "plot_type": plot_type.value, "sample_rate": sample_rate, "content_type": content_type, "range": _range, "meta": meta, "running": False, } self._set_description(_id, plot_description) self._add_avilable(_id) return _id def pub(self, _id, data): self._publish(_id, {"type": FrameType.DATA.value, "data": data}) def start(self, _id): self._publish(_id, {"type": FrameType.START.value, "data": {}})
[docs] def stop(self, _id): self._update_description(_id, {"running": False}) self._publish(_id, {"type": FrameType.STOP.value, "data": {}})
def get_description(self, _id=None, include_data=False): desc = [] if _id: _d = self._get_description(_id) if include_data: _d.update({"values": self.get_data(_id)}) desc = [_d] else: available = self._get_available() for _id in available.keys(): _d = self._get_description(_id) if include_data: _d.update({"values": self.get_data(_id)}) desc.append(_d) return desc def get_data(self, _id): desc = self._get_description(_id) data = { "x": self._r.lrange("HWR_DP_%s_DATA_X" % _id, 0, -1), "y": self._r.lrange("HWR_DP_%s_DATA_Y" % _id, 0, -1), } if desc["data_dim"] > 1: data.update( { "z": self._r.lrange("HWR_DP_%s_DATA_Z" % _id, 0, -1), } ) return data