"""Class for cameras connected to Lima Tango Device Servers
Example configuration:
----------------------
<object class="TangoLimaVideo">
<username>Prosilica 1350C</username>
<tangoname>id23/limaccd/minidiff2</tangoname>
<bpmname>id23/limabeamviewer/minidiff2</bpmname>
<interval>15</interval>
<video_mode>RGB24</video_mode>
</object>
If video mode is not specified, BAYER_RG16 is used by default.
"""
import io
import struct
import time
import gevent
import PyTango
from PIL import Image
from PyTango.gevent import DeviceProxy
from mxcubecore import BaseHardwareObjects
def poll_image(lima_tango_device, video_mode, FORMATS):
img_data = lima_tango_device.video_last_image
hfmt = ">IHHqiiHHHH"
hsize = struct.calcsize(hfmt)
_, _, _, _, width, height, _, _, _, _ = struct.unpack(hfmt, img_data[1][:hsize])
raw_data = img_data[1][hsize:]
_from, _to = FORMATS.get(video_mode, (None, None))
if _from and _to:
img = Image.frombuffer(_from, (height, width), raw_data, "raw", _from, 0, 1)
img_bytes = io.BytesIO()
img.save(img_bytes, format=_to)
img = img.tobytes()
else:
img = raw_data
return img, width, height
[docs]class TangoLimaVideo(BaseHardwareObjects.HardwareObject):
def __init__(self, name):
super().__init__(name)
self.__polling = None
self._video_mode = None
self._last_image = (0, 0, 0)
# Dictionary containing conversion information for a given
# video_mode. The camera video mode is the key and the first
# index of the tuple contains the corresponding PIL mapping
# and the second the desried output format. The image is
# passed on as it is of the video mode is not in the dictionary
self._FORMATS = {
"RGB8": ("L", "BMP"),
"RGB24": ("RGB", "BMP"),
"RGB32": ("RGBA", "BMP"),
"NO_CONVERSION": (None, None),
}
[docs] def init(self):
self.device = None
try:
self._video_mode = self.get_property("video_mode", "RGB24")
self.device = DeviceProxy(self.get_property("tangoname"))
# try a first call to get an exception if the device
# is not exported
self.device.ping()
except PyTango.DevFailed as traceback:
last_error = traceback[-1]
self.log.error("%s: %s", str(self.name()), last_error.desc)
self.device = BaseHardwareObjects.Null()
else:
if self.get_property("exposure_time"):
self._sleep_time = float(self.get_property("exposure_time"))
elif self.get_property("interval"):
self._sleep_time = float(self.get_property("interval")) / 1000.0
if self.get_property("control_video", "True"):
self.log.info("MXCuBE controlling video")
if self.device.video_live:
self.device.video_live = False
self.device.video_mode = self._video_mode
self.set_exposure(self._sleep_time)
self.device.video_live = True
else:
self.log.info("MXCuBE NOT controlling video")
self.update_state(BaseHardwareObjects.HardwareObjectState.READY)
def get_last_image(self):
return poll_image(self.device, self._video_mode, self._FORMATS)
def _do_polling(self, sleep_time):
lima_tango_device = self.device
while True:
data, width, height = poll_image(
lima_tango_device, self._video_mode, self._FORMATS
)
self._last_image = data, width, height
self.emit("imageReceived", data, width, height, False)
time.sleep(sleep_time)
def connect_notify(self, signal):
if signal == "imageReceived":
if self.__polling is None:
self.__polling = gevent.spawn(
self._do_polling, self.device.video_exposure
)
def get_width(self):
return self.device.image_width
def get_height(self):
return self.device.image_height
def set_live(self, mode):
curr_state = self.device.video_live
if mode:
if not curr_state:
self.device.video_live = True
else:
if curr_state:
self.device.video_live = False
def set_exposure(self, exposure):
self.device.video_exposure = exposure