Source code for mxcubecore.HardwareObjects.mockup.EnergyScanMockup

import os
import time

import gevent
import gevent.event
from matplotlib.backends.backend_agg import FigureCanvasAgg
from matplotlib.figure import Figure

from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractEnergyScan import AbstractEnergyScan
from mxcubecore.TaskUtils import cleanup

scan_test_data = [
    (10841.0, 20.0),
    (10842.0, 20.0),
    (10843.0, 20.0),
    (10844.0, 20.0),
    (10845.0, 20.0),
    (10846.0, 20.0),
    (10847.0, 20.0),
    (10848.0, 20.0),
    (10849.0, 20.0),
    (10850.0, 20.0),
    (10851.0, 20.0),
    (10852.0, 20.0),
    (10853.0, 20.0),
    (10854.0, 20.0),
    (10855.0, 20.0),
    (10856.0, 20.0),
    (10857.0, 20.0),
    (10858.0, 20.00),
    (10859.0, 20.0),
    (10860.0, 20.0),
    (10861.0, 20.1),
    (10862.0, 21.4),
    (10863.0, 30.4),
    (10864.9, 80.7),
    (10865.9, 299.0),
    (10866.7, 820.8),
    (10867.5, 2009.2),
    (10868.2, 4305.5),
    (10869.0, 8070.2),
    (10869.8, 13246.7),
    (10870.6, 19124.1),
    (10871.4, 24430.5),
    (10872.2, 27843.1),
    (10873.0, 28654.8),
    (10873.8, 27092.5),
    (10874.6, 24138.5),
    (10875.4, 20957.3),
    (10876.0, 18373.6),
    (10877.0, 16373.8),
    (10878.0, 15474.8),
    (10879.0, 15163.9),
    (10880.0, 15080.5),
    (10881.0, 15063.0),
    (10882.0, 15060.3),
    (10883.0, 15059.8),
    (10884.0, 15059.7),
    (10885.0, 15059.0),
    (10886.0, 15059.0),
    (10887.0, 15059.0),
    (10888.0, 15059.0),
    (10889.0, 15059.0),
    (10890.0, 15059.0),
    (10891.0, 15059.0),
    (10892.0, 15059.0),
    (10893.0, 15059.0),
    (10894.0, 15059.0),
    (10895.0, 15059.0),
    (10896.0, 15059.0),
    (10897.0, 15059.0),
    (10898.0, 15059.0),
    (10899.0, 15059.0),
    (10900.0, 15059.0),
    (10901.0, 15059.0),
    (10902.0, 15059.0),
    (10903.0, 15059.0),
    (10904.0, 15059.0),
    (10905.0, 15059.0),
    (10906.0, 15059.0),
    (10907.0, 15059.0),
    (10908.0, 15059.0),
    (10909.0, 15059.0),
    (10910.0, 15059.0),
]

chooch_graph_data = (
    (12628, 0, -5),
    (12629, 0, -5),
    (12630, 0, -5),
    (12631, 0, -5),
    (12632, 0, -5),
    (12633, 0, -5),
    (12634, 0, -5),
    (12635, 0, -5),
    (12636, 0, -5),
    (12637, 0, -5),
    (12638, 0, -5),
    (12639, 0, -5),
    (12640, 0, -5),
    (12641, 0, -6),
    (12642, 0, -6),
    (12643, 0, -6),
    (12644, 0, -6),
    (12645, 0, -6),
    (12646, 0, -6),
    (12647, 0, -6),
    (12648, 0, -6),
    (12649, 0, -7),
    (12650, 0, -7),
    (12651, 0, -7),
    (12652, 0, -8),
    (12653, 0, -8),
    (12654, 0, -8),
    (12655, 1, -9),
    (12655, 2, -9),
    (12656, 3, -10),
    (12657, 4, -9),
    (12658, 5, -9),
    (12659, 6, -8),
    (12659, 6, -7),
    (12660, 6, -6),
    (12661, 5, -5),
    (12662, 4, -5),
    (12663, 4, -5),
    (12664, 3, -5),
    (12665, 3, -5),
    (12666, 3, -5),
    (12667, 3, -5),
    (12668, 3, -5),
    (12669, 3, -5),
    (12670, 3, -5),
    (12671, 3, -5),
    (12672, 3, -5),
    (12673, 3, -5),
    (12674, 3, -5),
    (12675, 3, -5),
    (12676, 3, -5),
    (12677, 3, -5),
    (12678, 3, -5),
    (12679, 3, -5),
    (12680, 3, -5),
    (12681, 3, -5),
    (12682, 3, -5),
    (12683, 3, -5),
    (12684, 3, -4),
    (12685, 3, -4),
    (12686, 3, -4),
    (12687, 3, -4),
    (12688, 3, -4),
    (12689, 3, -4),
    (12690, 3, -4),
    (12691, 3, -4),
    (12692, 3, -4),
    (12693, 3, -4),
    (12694, 3, -4),
    (12695, 3, -4),
    (12696, 3, -4),
)


[docs]class EnergyScanMockup(AbstractEnergyScan): def __init__(self, name): AbstractEnergyScan.__init__(self, name)
[docs] def init(self): self.ready_event = gevent.event.Event() self.energy_scan_parameters = {} self.result_value_emitter = None self.scan_data = [] self.thEdgeThreshold = 5 self.energy2WavelengthConstant = 12.3980 self.defaultWavelength = 0.976
def emit_result_values(self): for value_tuple in scan_test_data: x = value_tuple[0] y = value_tuple[1] if not (x == 0 and y == 0): # if x is in keV, transform into eV otherwise let it like it is # if point larger than previous point (for chooch) if len(self.scan_data) > 0: if x > self.scan_data[-1][0]: self.scan_data.append([(x < 1000 and x * 1000.0 or x), y]) else: self.scan_data.append([(x < 1000 and x * 1000.0 or x), y]) self.emit("scanNewPoint", (x < 1000 and x * 1000.0 or x), y) time.sleep(0.05) self.scanCommandFinished()
[docs] def execute_energy_scan(self, energy_scan_parameters): self.energy_scan_parameters["exposureTime"] = 0.01 print(self.cpos) if HWR.beamline.transmission is not None: self.energy_scan_parameters["transmissionFactor"] = ( HWR.beamline.transmission.get_value() ) else: self.energy_scan_parameters["transmissionFactor"] = None size_hor = None size_ver = None if HWR.beamline.beam is not None: size_hor, size_ver = HWR.beamline.beam.get_beam_size() size_hor = size_hor * 1000 size_ver = size_ver * 1000 self.energy_scan_parameters["beamSizeHorizontal"] = size_hor self.energy_scan_parameters["beamSizeVertical"] = size_ver self.energy_scan_parameters["startEnergy"] = 0 self.energy_scan_parameters["endEnergy"] = 0 self.energy_scan_parameters["fluorescenceDetector"] = "Mockup detector" self.scan_data = [] self.result_value_emitter = gevent.spawn(self.emit_result_values)
[docs] def do_chooch(self, elt, edge, scan_directory, archive_directory, prefix): """ Descript. : """ symbol = "_".join((elt, edge)) scan_file_prefix = os.path.join(scan_directory, prefix) archive_file_prefix = os.path.join(archive_directory, prefix) if os.path.exists(scan_file_prefix + ".raw"): i = 1 while os.path.exists(scan_file_prefix + "%d.raw" % i): i = i + 1 scan_file_prefix += "_%d" % i archive_file_prefix += "_%d" % i scan_file_raw_filename = os.path.extsep.join((scan_file_prefix, "raw")) archive_file_raw_filename = os.path.extsep.join((archive_file_prefix, "raw")) scan_file_efs_filename = os.path.extsep.join((scan_file_prefix, "efs")) archive_file_efs_filename = os.path.extsep.join((archive_file_prefix, "efs")) scan_file_png_filename = os.path.extsep.join((scan_file_prefix, "png")) archive_file_png_filename = os.path.extsep.join((archive_file_prefix, "png")) try: if not os.path.exists(scan_directory): os.makedirs(scan_directory) if not os.path.exists(archive_directory): os.makedirs(archive_directory) except Exception: self.log.exception( "EnergyScan: could not create energy scan result directory." ) self.store_energy_scan() self.emit("energyScanFailed", ()) return try: scan_file_raw = open(scan_file_raw_filename, "w") archive_file_raw = open(archive_file_raw_filename, "w") except Exception: self.log.exception( "EnergyScan: could not create energy scan result raw file" ) self.store_energy_scan() self.emit("energyScanFailed", ()) return else: scanData = [] for i in range(len(self.scan_data)): x = float(self.scan_data[i][0]) x = x < 1000 and x * 1000.0 or x y = float(self.scan_data[i][1]) scanData.append((x, y)) scan_file_raw.write("%f,%f\r\n" % (x, y)) archive_file_raw.write("%f,%f\r\n" % (x, y)) scan_file_raw.close() archive_file_raw.close() self.energy_scan_parameters["scanFileFullPath"] = str( scan_file_raw_filename ) pk = 7.519 ip = 7.516 rm = 7.54 fpPeak = -12.6 fppPeak = 20.7 fpInfl = -21.1 fppInfl = 11.9 comm = "Mockup results" self.energy_scan_parameters["edgeEnergy"] = 0.1 self.thEdge = self.energy_scan_parameters["edgeEnergy"] self.log.info( "th. Edge %s ; chooch results are pk=%f, ip=%f, rm=%f" % (self.thEdge, pk, ip, rm) ) self.energy_scan_parameters["peakEnergy"] = pk self.energy_scan_parameters["inflectionEnergy"] = ip self.energy_scan_parameters["remoteEnergy"] = rm self.energy_scan_parameters["peakFPrime"] = fpPeak self.energy_scan_parameters["peakFDoublePrime"] = fppPeak self.energy_scan_parameters["inflectionFPrime"] = fpInfl self.energy_scan_parameters["inflectionFDoublePrime"] = fppInfl self.energy_scan_parameters["comments"] = comm self.energy_scan_parameters["choochFileFullPath"] = scan_file_efs_filename self.energy_scan_parameters["filename"] = archive_file_raw_filename self.energy_scan_parameters["workingDirectory"] = archive_directory chooch_graph_x, chooch_graph_y1, chooch_graph_y2 = zip(*chooch_graph_data) chooch_graph_x = list(chooch_graph_x) for i in range(len(chooch_graph_x)): chooch_graph_x[i] = chooch_graph_x[i] / 1000.0 title = "%s %s %s\n%.4f %.2f %.2f\n%.4f %.2f %.2f" % ( "energy", "f'", "f''", pk, fpPeak, fppPeak, ip, fpInfl, fppInfl, ) fig = Figure(figsize=(15, 11)) ax = fig.add_subplot(211) ax.set_title("%s" % title) ax.grid(True) ax.plot(*(zip(*self.scan_data)), **{"color": "black"}) ax.set_xlabel("Energy") ax.set_ylabel("MCA counts") ax2 = fig.add_subplot(212) ax2.grid(True) ax2.set_xlabel("Energy") ax2.set_ylabel("") handles = [] handles.append(ax2.plot(chooch_graph_x, chooch_graph_y1, color="blue")) handles.append(ax2.plot(chooch_graph_x, chooch_graph_y2, color="red")) canvas = FigureCanvasAgg(fig) self.energy_scan_parameters["jpegChoochFileFullPath"] = str( archive_file_png_filename ) try: self.log.info( "Rendering energy scan and Chooch " + "graphs to PNG file : %s", scan_file_png_filename, ) canvas.print_figure(scan_file_png_filename, dpi=80) except Exception: self.log.exception("could not print figure") try: self.log.info( "Saving energy scan to archive " + "directory for ISPyB : %s", archive_file_png_filename, ) canvas.print_figure(archive_file_png_filename, dpi=80) except Exception: self.log.exception("could not save figure") self.store_energy_scan() self.log.info("<chooch> returning") self.emit( "choochFinished", ( pk, fppPeak, fpPeak, ip, fppInfl, fpInfl, rm, chooch_graph_x, chooch_graph_y1, chooch_graph_y2, title, ), ) return ( pk, fppPeak, fpPeak, ip, fppInfl, fpInfl, rm, chooch_graph_x, chooch_graph_y1, chooch_graph_y2, title, )
[docs] def get_elements(self): elements = [] try: for el in self.config.elements["element"]: elements.append({"symbol": el["symbol"], "energy": el["energy"]}) except IndexError: self.log.exception("") return elements
# # def getDefaultMadEnergies(self): # energies = [] # try: # for el in self["mad"]: # energies.append([float(el.energy), el.directory]) # except IndexError: # pass # return energies def is_connected(self): return True
[docs] def scanCommandStarted(self, *args): """ Descript. : """ print(self.energy_scan_parameters) title = "%s %s: %s %s" % ( self.energy_scan_parameters["sessionId"], self.energy_scan_parameters["blSampleId"], self.energy_scan_parameters["element"], self.energy_scan_parameters["edgeEnergy"], ) dic = { "xlabel": "energy", "ylabel": "counts", "scaletype": "normal", "title": title, } self.emit("scanStart", dic) self.emit("energyScanStarted", dic) self.energy_scan_parameters["startTime"] = time.strftime("%Y-%m-%d %H:%M:%S") self.scanning = True
[docs] def scanCommandFinished(self, *args): """ Descript. : """ with cleanup(self.ready_event.set): self.energy_scan_parameters["endTime"] = time.strftime("%Y-%m-%d %H:%M:%S") self.log.debug("Energy Scan: finished") self.scanning = False self.energy_scan_parameters["startEnergy"] = self.scan_data[-1][0] / 1000.0 self.energy_scan_parameters["endEnergy"] = self.scan_data[-1][1] / 1000.0 self.emit("energyScanFinished", self.energy_scan_parameters)
[docs] def get_scan_data(self): """ Descript. : """ return self.scan_data
[docs] def store_energy_scan(self): """ Descript. : """ if HWR.beamline.lims: db_status = HWR.beamline.lims.storeEnergyScan(self.energy_scan_parameters)