# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""Abstract hardware object for data collection."""
from __future__ import annotations
import abc
import collections
import errno
import logging
import os
import socket
import time
import gevent
import gevent.event
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.TaskUtils import task
__credits__ = ["MXCuBE collaboration"]
BeamlineConfig = collections.namedtuple(
"BeamlineConfig",
[
"synchrotron_name",
"directory_prefix",
"default_exposure_time",
"minimum_exposure_time",
"detector_fileext",
"detector_type",
"detector_manufacturer",
"detector_model",
"detector_px",
"detector_py",
"detector_binning_mode",
"undulators",
"focusing_optic",
"monochromator_type",
"beam_divergence_vertical",
"beam_divergence_horizontal",
"polarisation",
"input_files_server",
],
)
[docs]class AbstractCollect(HardwareObject, object):
"""Abstract hardware object for data collection.
Define a sequence in which data collection is executed.
Emits:
collectReady (bool):
collectOscillationStarted (dict):
progressInit (tuple[str, int, bool]):
For example: ``("Collection", progress, bool)``.
collectOscillationFailed (dict):
collectOscillationFinished (dict):
progressStop:
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name) -> None:
HardwareObject.__init__(self, name)
self.bl_config = BeamlineConfig(*[None] * 18)
self._collecting = False
self._error_msg = ""
self.exp_type_dict = {}
self.collection_id = None
self.data_collect_task = None
self.current_dc_parameters = None
self.current_lims_sample = {}
self.run_offline_processing = None
self.run_online_processing = None
self.ready_event = None
[docs] def init(self):
self.ready_event = gevent.event.Event()
beam_div_hor, beam_div_ver = HWR.beamline.beam.get_beam_divergence()
self.set_beamline_configuration(
synchrotron_name=HWR.beamline.session.synchrotron_name,
directory_prefix=self.get_property("directory_prefix"),
default_exposure_time=HWR.beamline.detector.get_property(
"default_exposure_time"
),
minimum_exposure_time=HWR.beamline.detector.get_property(
"minimum_exposure_time"
),
detector_fileext=HWR.beamline.detector.get_property("fileSuffix"),
detector_type=HWR.beamline.detector.get_property("type"),
detector_manufacturer=HWR.beamline.detector.get_property("manufacturer"),
detector_model=HWR.beamline.detector.get_property("model"),
detector_px=HWR.beamline.detector.get_property("px"),
detector_py=HWR.beamline.detector.get_property("py"),
detector_binning_mode=HWR.beamline.detector.get_binning_mode(),
undulators=self.get_property("undulators", []),
focusing_optic=self.get_property("focusing_optic"),
monochromator_type=self.get_property("monochromator"),
beam_divergence_vertical=beam_div_hor,
beam_divergence_horizontal=beam_div_ver,
polarisation=self.get_property("polarisation"),
input_files_server=self.get_property("input_files_server"),
)
[docs] def set_beamline_configuration(self, **configuration_parameters) -> None:
"""Sets beamline configuration
Args:
configuration_parameters (dict): Configuration parameters.
"""
self.bl_config = BeamlineConfig(**configuration_parameters)
[docs] def collect(self, owner, dc_parameters_list):
"""Main collection method.
It spawns "do_collect" as well as ready event handling.
Args:
owner (str): Owner instance.
dc_parameters_list (list[dict]):
List of dictionary containing all collection parameters.
"""
self.ready_event.clear()
self.current_dc_parameters = dc_parameters_list[0]
self.data_collect_task = gevent.spawn(self.do_collect, owner)
self.ready_event.wait()
self.ready_event.clear()
return self.data_collect_task
[docs] def do_collect(self, owner):
"""Actually collect the sequence.
Args:
owner (str): owner instance
"""
log = logging.getLogger("user_level_log")
log.info("Collection: Preparing to collect")
self.emit("collectReady", (False,))
self.emit(
"collectOscillationStarted",
(owner, None, None, None, self.current_dc_parameters, None),
)
self.emit("progressInit", ("Collection", 100, False))
self.collection_id = None
try:
# ----------------------------------------------------------------
# Prepare data collection
self.open_detector_cover()
self.open_safety_shutter()
self.open_fast_shutter()
# ----------------------------------------------------------------
# Store information in LIMS
self.current_dc_parameters["status"] = "Running"
self.current_dc_parameters["collection_start_time"] = time.strftime(
"%Y-%m-%d %H:%M:%S"
)
self.log.info("Collection parameters: %s" % str(self.current_dc_parameters))
log.info("Collection: Storing data collection in LIMS")
self.store_data_collection_in_lims()
log.info(
"Collection: Creating directories for raw images and processing files"
)
self.create_file_directories()
log.info("Collection: Getting sample info from parameters")
self.get_sample_info()
log.info("Collection: Storing sample info in LIMS")
self.store_sample_info_in_lims()
if all(
item is None for item in self.current_dc_parameters["motors"].values()
):
# No centring point defined
# create point based on the current position
current_diffractometer_position = (
HWR.beamline.diffractometer.get_positions()
)
for motor in self.current_dc_parameters["motors"].keys():
self.current_dc_parameters["motors"][motor] = (
current_diffractometer_position.get(motor)
)
# ----------------------------------------------------------------
# Move to the centered position and take crystal snapshots
log.info("Collection: Moving to centred position")
self.move_to_centered_position()
self.take_crystal_snapshots()
self.move_to_centered_position()
# ----------------------------------------------------------------
# Set data collection parameters
transmission = self.current_dc_parameters.get("transmission")
wavelength = self.current_dc_parameters.get("wavelength")
energy = self.current_dc_parameters.get("energy")
detector_distance = self.current_dc_parameters.get("detector_distance")
resolution = self.current_dc_parameters.get("resolution")
if transmission:
log.info(
"Collection: Setting transmission to %.2f",
transmission,
)
self.set_transmission(transmission)
if wavelength:
# Wavelength (not having a default) overrides energy
log.info("Collection: Setting wavelength to %.4f", wavelength)
energy = HWR.beamline.energy.calculate_energy(wavelength)
elif energy:
log.info("Collection: Setting energy to %.4f", energy)
wavelength = HWR.beamline.energy.calculate_wavelength(energy)
if energy:
self.set_energy(energy)
if detector_distance:
# detector_distance (not having a default) overrides resolution
resolution = HWR.beamline.resolution.distance_to_resolution(
detector_distance, wavelength
)
log.info(
"Collection: Setting detector distance to %.2f",
detector_distance,
)
self.set_resolution(resolution)
elif resolution:
log.info("Collection: Setting resolution to %.2f", resolution)
self.set_resolution(resolution)
# ----------------------------------------------------------------
# Site specific implementation of a data collection
# In order to call the hook with original parameters
# before update_data_collection_in_lims changes them
# TODO check why this happens
self.data_collection_hook()
# ----------------------------------------------------------------
# Store information in LIMS
log.info("Collection: Updating data collection in LIMS")
self.update_data_collection_in_lims()
except RuntimeError as e:
failed_msg = "Data collection failed!\n%s" % str(e)
self.collection_failed(failed_msg)
else:
self.collection_finished()
finally:
self.data_collection_cleanup()
[docs] def data_collection_cleanup(self):
"""
Method called when at end of data collection, successful or not.
"""
self.close_fast_shutter()
self.close_safety_shutter()
self.close_detector_cover()
[docs] def collection_failed(self, failed_msg=None):
"""Collection failed method"""
if not failed_msg:
failed_msg = "Data collection failed!"
self.current_dc_parameters["status"] = "Failed"
self.current_dc_parameters["comments"] = "%s\n%s" % (
failed_msg,
self._error_msg,
)
self.emit(
"collectOscillationFailed",
(
None,
False,
failed_msg,
self.current_dc_parameters.get("collection_id"),
None,
),
)
self.emit("progressStop", ())
self._collecting = False
self.update_data_collection_in_lims()
self.ready_event.set()
[docs] def collection_stopped(self):
"""Collection stopped method"""
self.current_dc_parameters["status"] = "Stopped"
self.current_dc_parameters["comments"] = "Stopped by the user"
self.emit("progressStop", ())
self._collecting = False
self.update_data_collection_in_lims()
self.ready_event.set()
if self.data_collect_task is not None:
self.data_collect_task.kill(block=False)
[docs] def collection_finished(self):
"""Collection finished behaviour"""
success_msg = "Data collection successful"
self.current_dc_parameters["status"] = success_msg
if self.current_dc_parameters["experiment_type"] != "Collect - Multiwedge":
self.update_data_collection_in_lims()
last_frame = self.current_dc_parameters["oscillation_sequence"][0][
"number_of_images"
]
if (
last_frame > 1
and self.current_dc_parameters["experiment_type"] != "Mesh"
):
# We do not store first and last frame for mesh
self.store_image_in_lims_by_frame_num(last_frame)
if (
self.current_dc_parameters["experiment_type"] in ("OSC", "Helical")
and self.current_dc_parameters["oscillation_sequence"][0]["offset"] == 0
and last_frame > 19
):
self.trigger_auto_processing("after", 0)
self.emit(
"collectOscillationFinished",
(
None,
True,
success_msg,
self.current_dc_parameters.get("collection_id"),
None,
self.current_dc_parameters,
),
)
self.emit("progressStop", ())
self._collecting = False
self.ready_event.set()
def store_image_in_lims_by_frame_num(self, frame_number):
pass
[docs] def stop_collect(self):
"""Stop data collection."""
if self.data_collect_task is not None:
self.data_collect_task.kill(block=False)
[docs] def open_detector_cover(self):
"""Open detector cover."""
[docs] def open_safety_shutter(self):
"""Open safety shutter."""
[docs] def open_fast_shutter(self):
"""Open fast shutter."""
[docs] def close_fast_shutter(self):
"""Close fast shutter."""
[docs] def close_safety_shutter(self):
"""Close safety shutter."""
[docs] def close_detector_cover(self):
"""Close detector cover."""
[docs] def set_transmission(self, value: float):
"""Set transmission.
Args:
value: Transmission value to set.
"""
[docs] def set_wavelength(self, value: float):
"""Set wavelength.
Args:
value: Wavelength value to set.
"""
[docs] def set_energy(self, value: float):
"""Set energy.
Args:
value: Energy value to set.
"""
[docs] def set_resolution(self, value: float):
"""Set resolution.
Args:
value: Resolution value to set.
"""
HWR.beamline.energy.wait_ready()
HWR.beamline.resolution.set_value(value)
def get_total_absorbed_dose(self):
return
[docs] def get_wavelength(self) -> float:
"""Get current wavelength.
Returns:
Current beamline wavelength [Ä].
"""
if HWR.beamline.energy is not None:
return HWR.beamline.energy.get_wavelength()
[docs] def get_detector_distance(self) -> float:
"""Get current detector distance.
Returns:
Current detector distance position [mm].
"""
if HWR.beamline.detector is not None:
return HWR.beamline.detector.distance.get_value()
[docs] def get_resolution(self) -> float:
"""Get current resolution.
Returns:
Current resolution [Ã…].
"""
if HWR.beamline.resolution is not None:
return HWR.beamline.resolution.get_value()
[docs] def get_transmission(self) -> float:
"""Get current beamline transmission.
Returns:
Current transmission.
"""
if HWR.beamline.transmission is not None:
return HWR.beamline.transmission.get_value()
[docs] def get_beam_size(self) -> tuple:
"""Get beam size.
Returns:
Beam width, and beam height.
"""
if HWR.beamline.beam is not None:
return HWR.beamline.beam.get_beam_size()
else:
return None, None
[docs] def get_slit_gaps(self) -> tuple:
"""Get slit gap distance.
Returns:
Horizontal gap, and vertical gap.
"""
if HWR.beamline.beam is not None:
return HWR.beamline.beam.get_slits_gap()
return None, None
[docs] def get_undulators_gaps(self) -> dict:
"""Get undulator gaps.
Returns:
Undulator gaps.
"""
return {}
# return HWR.beamline.energy.get_undulator_gaps()
[docs] def get_machine_current(self) -> float:
"""Get machine current.
Returns:
Machine current [mA].
"""
if HWR.beamline.machine_info:
return HWR.beamline.machine_info.get_current()
else:
return 0
[docs] def get_machine_message(self) -> str:
"""Get machine message.
Returns:
Current machine message if any.
"""
if HWR.beamline.machine_info:
return HWR.beamline.machine_info.get_message()
else:
return ""
[docs] def get_machine_fill_mode(self) -> str:
"""Get machine fill mode.
Returns:
Machine filling mode.
"""
if HWR.beamline.machine_info:
fill_mode = str(HWR.beamline.machine_info.get_message())
return fill_mode[:20]
else:
return ""
[docs] def get_measured_intensity(self) -> float:
"""Get beam intensity.
Returns:
Beam intensity.
"""
return
[docs] def get_cryo_temperature(self) -> float:
"""Get cryogenic temperature.
Returns:
Current cryo temperature [K].
"""
return
[docs] def create_file_directories(self):
"""Method to create directories for raw and processing files.
Directory names for xds, mosflm and hkl are created.
"""
self.create_directories(
self.current_dc_parameters["fileinfo"]["directory"],
self.current_dc_parameters["fileinfo"]["process_directory"],
self.current_dc_parameters["fileinfo"]["archive_directory"],
)
xds_directory, mosflm_directory, hkl2000_directory = self.prepare_input_files()
if xds_directory:
self.current_dc_parameters["xds_dir"] = xds_directory
[docs] def create_directories(self, *args):
"""Creates folders on disk.
Args:
args (list(str)): List of directories to create.
"""
for directory in args:
try:
os.makedirs(directory)
except os.error as e:
if e.errno != errno.EEXIST:
raise
[docs] def store_data_collection_in_lims(self):
"""Store current data collection information in LIMS database."""
lims = HWR.beamline.lims
if (
lims
and lims.is_connected()
and not self.current_dc_parameters["in_interleave"]
):
try:
self.current_dc_parameters["synchrotronMode"] = (
self.get_machine_fill_mode()
)
(
collection_id,
detector_id,
) = HWR.beamline.lims.store_data_collection(
self.current_dc_parameters, self.bl_config
)
self.current_dc_parameters["collection_id"] = collection_id
self.collection_id = collection_id
if detector_id:
self.current_dc_parameters["detector_id"] = detector_id
except BaseException:
self.log.exception("Could not store data collection in LIMS")
[docs] def update_data_collection_in_lims(self) -> None:
"""Update current data collection information in LIMS database."""
params = self.current_dc_parameters
if HWR.beamline.lims and not params["in_interleave"]:
params["flux"] = HWR.beamline.flux.get_value()
params["flux_end"] = params["flux"]
params["totalAbsorbedDose"] = self.get_total_absorbed_dose()
params["wavelength"] = HWR.beamline.energy.get_wavelength()
params["detectorDistance"] = HWR.beamline.detector.distance.get_value()
params["resolution"] = HWR.beamline.resolution.get_value()
params["transmission"] = HWR.beamline.transmission.get_value()
beam_centre_x, beam_centre_y = HWR.beamline.detector.get_beam_position()
pixel_x, pixel_y = HWR.beamline.detector.get_pixel_size()
params["xBeam"] = beam_centre_x * pixel_x
params["yBeam"] = beam_centre_y * pixel_y
und = self.get_undulators_gaps()
i = 1
for jj in self.bl_config.undulators:
key = jj.type
if key in und:
params["undulatorGap%d" % (i)] = und[key]
i += 1
params["resolutionAtCorner"] = HWR.beamline.resolution.get_value_at_corner()
beam_size_x, beam_size_y = HWR.beamline.beam.get_beam_size()
params["beamSizeAtSampleX"] = beam_size_x
params["beamSizeAtSampleY"] = beam_size_y
params["beamShape"] = HWR.beamline.beam.get_beam_shape()
hor_gap, vert_gap = self.get_slit_gaps()
params["slitGapHorizontal"] = hor_gap
params["slitGapVertical"] = vert_gap
try:
HWR.beamline.lims.update_data_collection(params)
except BaseException:
self.log.exception("Could not update data collection in LIMS")
[docs] def store_sample_info_in_lims(self):
"""Store current sample information in LIMS database."""
lims = HWR.beamline.lims
if (
lims
and lims.is_connected()
and not self.current_dc_parameters["in_interleave"]
):
HWR.beamline.lims.update_bl_sample(self.current_lims_sample)
[docs] def store_image_in_lims(
self, frame_number: int, motor_position_id: int | None = None
):
"""Store image information in LIMS database.
Args:
frame_number: Frame number of the image within the data collection.
motor_position_id: Motor position ID.
"""
lims = HWR.beamline.lims
if (
lims
and lims.is_connected()
and not self.current_dc_parameters["in_interleave"]
):
file_location = self.current_dc_parameters["fileinfo"]["directory"]
image_file_template = self.current_dc_parameters["fileinfo"]["template"]
filename = image_file_template % frame_number
lims_image = {
"dataCollectionId": self.current_dc_parameters.get("collection_id"),
"fileName": filename,
"fileLocation": file_location,
"imageNumber": frame_number,
"measuredIntensity": self.get_measured_intensity(),
"synchrotronCurrent": self.get_machine_current(),
"machineMessage": self.get_machine_message(),
"temperature": self.get_cryo_temperature(),
}
archive_directory = self.current_dc_parameters["fileinfo"][
"archive_directory"
]
if archive_directory:
jpeg_filename = "%s.jpeg" % os.path.splitext(image_file_template)[0]
thumb_filename = (
"%s.thumb.jpeg" % os.path.splitext(image_file_template)[0]
)
jpeg_file_template = os.path.join(
archive_directory, jpeg_filename
).replace("cbf.jpeg", "jpeg")
jpeg_thumbnail_file_template = os.path.join(
archive_directory, thumb_filename
).replace("cbf.thumb", "thumb")
jpeg_full_path = jpeg_file_template % frame_number
jpeg_thumbnail_full_path = jpeg_thumbnail_file_template % frame_number
lims_image["jpegFileFullPath"] = jpeg_full_path
lims_image["jpegThumbnailFileFullPath"] = jpeg_thumbnail_full_path
if motor_position_id:
lims_image["motorPositionId"] = motor_position_id
image_id = HWR.beamline.lims.store_image(lims_image)
return image_id
[docs] def update_lims_with_workflow(
self, workflow_id: int, grid_snapshot_filename: str
) -> None:
"""Update collection with information about workflow.
Args:
workflow_id: Workflow ID.
grid_snapshot_filename: Grid snapshot file path.
"""
lims = HWR.beamline.lims
if lims and lims.is_connected():
try:
self.current_dc_parameters["workflow_id"] = workflow_id
if grid_snapshot_filename:
self.current_dc_parameters["xtalSnapshotFullPath3"] = (
grid_snapshot_filename
)
HWR.beamline.lims.update_data_collection(self.current_dc_parameters)
except BaseException:
self.log.exception("Could not store data collection into ISPyB")
[docs] def get_sample_info(self) -> None:
"""Get current sample information in LIMS database.
Information is stored in the internal "current_dc_parameters" dictionary.
"""
sample_info = self.current_dc_parameters.get("sample_reference")
try:
sample_id = int(sample_info["blSampleId"])
except BaseException:
sample_id = None
self.current_dc_parameters["blSampleId"] = sample_id
if HWR.beamline.diffractometer.in_plate_mode:
# TODO store plate location in lims
pass
elif HWR.beamline.sample_changer:
try:
self.current_dc_parameters["actualSampleBarcode"] = (
HWR.beamline.sample_changer.getLoadedSample().getID()
)
self.current_dc_parameters["actualContainerBarcode"] = (
HWR.beamline.sample_changer.getLoadedSample().getContainer().getID()
)
logging.getLogger("user_level_log").info("Getting loaded sample coords")
basket, vial = HWR.beamline.sample_changer.getLoadedSample().getCoords()
self.current_dc_parameters["actualSampleSlotInContainer"] = vial
self.current_dc_parameters["actualContainerSlotInSC"] = basket
except BaseException:
self.current_dc_parameters["actualSampleBarcode"] = None
self.current_dc_parameters["actualContainerBarcode"] = None
else:
self.current_dc_parameters["actualSampleBarcode"] = None
self.current_dc_parameters["actualContainerBarcode"] = None
[docs] def move_to_centered_position(self) -> None:
"""Move motors to the centered position.
Move motors to the centered position
as stored in "current_dc_parameters" dictionary.
"""
positions_str = ""
for motor, position in self.current_dc_parameters["motors"].items():
if position:
if isinstance(motor, str):
positions_str += " %s=%f" % (motor, position)
else:
positions_str += " %s=%f" % (motor.getMotorMnemonic(), position)
self.current_dc_parameters["actualCentringPosition"] = positions_str
self.move_motors(self.current_dc_parameters["motors"])
@abc.abstractmethod
@task
def move_motors(self, motor_position_dict) -> None:
return
[docs] def take_crystal_snapshots(self) -> None:
"""Take crystal snapshots of the currently loaded sample."""
number_of_snapshots = self.current_dc_parameters["take_snapshots"]
if self.current_dc_parameters["experiment_type"] == "Mesh":
number_of_snapshots = 1
snapshot_directory = self.current_dc_parameters["fileinfo"]["archive_directory"]
if number_of_snapshots > 0 or self.current_dc_parameters.get("take_video"):
if not os.path.exists(snapshot_directory):
try:
self.create_directories(snapshot_directory)
except BaseException:
self.log.exception("Collection: Error creating snapshot directory")
if number_of_snapshots > 0 and not self.current_dc_parameters["in_interleave"]:
logging.getLogger("user_level_log").info(
"Collection: Taking %d sample snapshot(s)" % number_of_snapshots
)
for snapshot_index in range(number_of_snapshots):
snapshot_filename = os.path.join(
snapshot_directory,
"%s_%s_%s.snapshot.jpeg"
% (
self.current_dc_parameters["fileinfo"]["prefix"],
self.current_dc_parameters["fileinfo"]["run_number"],
(snapshot_index + 1),
),
)
self.current_dc_parameters[
"xtalSnapshotFullPath%i" % (snapshot_index + 1)
] = snapshot_filename
self._take_crystal_snapshot(snapshot_filename)
if number_of_snapshots > 1:
HWR.beamline.diffractometer.omega.set_value_relative(90)
if (
not HWR.beamline.diffractometer.in_plate_mode
and self.current_dc_parameters.get("take_video")
):
# Add checkbox to allow enable/disable creation of gif
logging.getLogger("user_level_log").info("Collection: Saving animated gif")
animation_filename = os.path.join(
snapshot_directory,
"%s_%s_animation.gif"
% (
self.current_dc_parameters["fileinfo"]["prefix"],
self.current_dc_parameters["fileinfo"]["run_number"],
),
)
self.current_dc_parameters["xtalSnapshotFullPath2"] = animation_filename
self._take_crystal_animation(animation_filename, duration_sec=1)
@abc.abstractmethod
@task
def _take_crystal_snapshot(self, snapshot_filename):
"""
Depends on gui version how this method is implemented.
In Qt3 diffractometer has a function,
In Qt4 graphics_manager is making crystal snapshots
"""
pass
def _take_crystal_animation(self, animation_filename, duration_sec=1):
"""Rotates sample by 360 and composes a gif file"""
pass
[docs] @abc.abstractmethod
def data_collection_hook(self):
"""
Descript. :
"""
pass
[docs] @abc.abstractmethod
def trigger_auto_processing(self, process_event, frame_number):
"""
Descript. :
"""
pass
[docs] def set_helical(self, arg):
"""
Descript. :
"""
pass
[docs] def set_helical_pos(self, arg):
"""
Descript. :
"""
pass
[docs] def set_fast_characterisation(self, arg):
"""
Descript. :
"""
pass
[docs] def setCentringStatus(self, status):
"""
Descript. :
"""
pass
def prepare_interleave(self, data_model, param_list):
self.current_dc_parameters = param_list[0]
self.current_dc_parameters["status"] = "Running"
self.current_dc_parameters["collection_start_time"] = time.strftime(
"%Y-%m-%d %H:%M:%S"
)
self.take_crystal_snapshots()
self.store_data_collection_in_lims()
self.current_dc_parameters["status"] = "Data collection successful"
self.update_data_collection_in_lims()
# specifies the next scan will be a mesh scan
def set_mesh(self, mesh_on):
self.mesh = mesh_on
[docs] def set_mesh_scan_parameters(
self,
num_lines: int,
total_nb_frames: int,
mesh_center_param: tuple,
mesh_range_param: tuple,
) -> None:
"""Set the mesh scan parameters."""
return
# self.mesh_num_lines = num_lines
# self.mesh_total_nb_frames = total_nb_frames
# self.mesh_range = mesh_range_param
# self.mesh_center = mesh_center_param
[docs] def adxv_notify(self, image_filename: str, image_num: int = 1):
"""
Notify ADXV of new image
Args:
image_filename: full path to image file
image_num: image number within image file to open (if it contains
multiple images i.e HDF5)
"""
self.log.info("ADXV notify '%s'", image_filename)
adxv_host = self.get_property("adxv_host", "localhost")
adxv_port = int(self.get_property("adxv_port", "8100"))
message = f"load_image {image_filename}\n slab {image_num}\n"
try:
with socket.create_connection((adxv_host, adxv_port)) as sock:
sock.sendall(message.encode("utf-8"))
except socket.timeout:
self.log.warning(
"ADXV: Timeout while connecting/sending for image '%s'", image_filename
)
except OSError:
self.log.exception("ADXV: Failed to load image '%s'", image_filename)