Source code for mxcubecore.HardwareObjects.DESY.P11AlbulaView

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"

import sys
import time

from PIL import Image

from mxcubecore.BaseHardwareObjects import HardwareObject

sys.path.append("/opt/dectris/albula/4.0/python/")


try:
    from dectris import albula
except:
    print("albula module not found")


[docs]class P11AlbulaView(HardwareObject): default_interval = 0.5 # secs stoptimer = -1.0
[docs] def init(self): self.alive = False self.stream = False self.viewer = None self.subframe = None interval = self.get_property("update_interval") self.interval = interval is not None and interval or self.default_interval self.mask_4m = None self.mask_16m = None mask_path_4m = self.get_property("mask_4m") mask_path_16m = self.get_property("mask_16m") self.eigerMonitor = None self.eigerThread = None if parent is not None and hasattr(parent, "eigerThread"): try: prop = parent.eigerThread.proxyMonitor.get_property("Host")["Host"] except: pass else: self.eigerMonitor = albula.DEigerMonitor(prop[0]) self.eigerThread = parent.eigerThread try: mask_tif_4m = Image.open(mask_path_4m) mask_tif_16m = Image.open(mask_path_16m) except: pass else: mask_array_4m = np.asarray(mask_tif_4m) mask_array_16m = np.asarray(mask_tif_16m) self.mask_4m = albula.DImage(mask_array_4m) self.mask_16m = albula.DImage(mask_array_16m)
def start(self, path=None, filetype=None, interval=None, stream=False): if filetype is not None: self.filetype = filetype if interval is not None: self.interval = interval self.stream = stream self.stoptimer = -1 # HDF5 self.eigerThread.setMonitorMode("enabled") if stream: self.eigerThread.setMonitorDiscardNew(False) else: self.eigerThread.clearMonitorBuffer() self.eigerThread.setMonitorDiscardNew(True) super().start()
[docs] def stop(self, interval=0.0): if self.stoptimer < 0.0 and interval > 0.0: self.stoptimer = interval return print("Live view thread: Stopping thread") self.alive = False self.wait() # waits until run stops on his own
def check_app(self): try: self.subframe.setOffsetX(0) except BaseException as e: self.subframe = None if self.subframe is None: try: self.viewer.enableClose() except BaseException as e: self.viewer = None if self.viewer is None: self.viewer = albula.openMainFrame() if self.subframe is None: self.subframe = self.viewer.openSubFrame() def run(self): self.alive = True while self.alive: # hdf5 wavelength = 12398.4 / energy try: energy = self.eigerThread.photonEnergy detector_distance = self.eigerThread.proxyEiger.read_attribute( "DetectorDistance" ).value beam_center_x = self.eigerThread.proxyEiger.read_attribute( "BeamCenterX" ).value beam_center_y = self.eigerThread.proxyEiger.read_attribute( "BeamCenterY" ).value except: detector_distance = 1.0 beam_center_x = 2074 beam_center_y = 2181 # get latest file from receiver timestamp = time.time() if self.stream: try: img = self.eigerMonitor.next() except: time.sleep(0.1) if self.stoptimer > 0: self.stoptimer -= 0.1 if self.stoptimer <= 0.0: self.stoptimer = -1.0 self.alive = False continue else: try: img = self.eigerMonitor.monitor() self.eigerThread.clearMonitorBuffer() except: print(sys.exc_info()) continue # display image img.optionalData().set_wavelength(wavelength) img.optionalData().set_detector_distance(detector_distance) img.optionalData().set_beam_center_x(beam_center_x) img.optionalData().set_beam_center_y(beam_center_y) img.optionalData().set_x_pixel_size(0.000075) img.optionalData().set_y_pixel_size(0.000075) if self.eigerMask16M is not None and img.width() == 4148: img.optionalData().set_pixel_mask(mask_16m) elif self.eigerMask4M is not None and img.width() == 2068: img.optionalData().set_pixel_mask(mask_4m) self.check_app() self.subframe.hideResolutionRings() self.subframe.loadImage(img) self.subframe.showResolutionRings() # wait interval / check if stop requested interval = time.time() - timestamp if self.stoptimer > 0: self.stoptimer -= interval if self.stoptimer <= 0.0: self.stoptimer = -1.0 self.alive = False if self.stream: time.sleep(0.1) continue # sleep for self.interval and check stop in between while interval < self.interval and self.alive: if self.stoptimer > 0.0: self.stoptimer -= 0.05 if self.stoptimer <= 0.0: self.stoptimer = -1.0 self.alive = False time.sleep(0.05) interval += 0.05 # end hdf5 print("Live view thread: Thread for Live view died") self.alive = False if interval is not None: self.interval = interval
if __name__ == "__main__": lv = LiveView() lv.start() time.sleep(200) lv.stop()