import logging
import os
import time
import gevent
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects.abstract.AbstractMultiCollect import (
AbstractMultiCollect,
)
from mxcubecore.TaskUtils import task
[docs]class MultiCollectMockup(AbstractMultiCollect, HardwareObject):
def __init__(self, name):
AbstractMultiCollect.__init__(self)
HardwareObject.__init__(self, name)
self._centring_status = None
self.ready_event = None
self.actual_frame_num = 0
[docs] def execute_command(self, command_name, *args, **kwargs):
return
[docs] def init(self):
self.setControlObjects(
diffractometer=HWR.beamline.diffractometer,
sample_changer=HWR.beamline.sample_changer,
lims=HWR.beamline.lims,
safety_shutter=HWR.beamline.safety_shutter,
machine_current=HWR.beamline.machine_info,
cryo_stream=HWR.beamline.cryo,
energy=HWR.beamline.energy,
resolution=HWR.beamline.resolution,
detector_distance=HWR.beamline.detector.distance,
transmission=HWR.beamline.transmission,
undulators=HWR.beamline.config.undulators,
flux=HWR.beamline.flux,
detector=HWR.beamline.detector,
beam_info=HWR.beamline.beam,
)
self.emit("collectConnected", (True,))
self.emit("collectReady", (True,))
@task
def loop(self, owner, data_collect_parameters_list):
failed_msg = "Data collection failed!"
failed = True
collections_analyse_params = []
self.emit("collectReady", (False,))
self.emit("collectStarted", (owner, 1))
for data_collect_parameters in data_collect_parameters_list:
logging.debug("collect parameters = %r", data_collect_parameters)
failed = False
data_collect_parameters["status"] = "Data collection successful"
(
osc_id,
sample_id,
sample_code,
sample_location,
) = self.update_oscillations_history(data_collect_parameters)
self.emit(
"collectOscillationStarted",
(
owner,
sample_id,
sample_code,
sample_location,
data_collect_parameters,
osc_id,
),
)
for image in range(
data_collect_parameters["oscillation_sequence"][0]["number_of_images"]
):
time.sleep(
data_collect_parameters["oscillation_sequence"][0]["exposure_time"]
)
self.emit("collectImageTaken", image)
data_collect_parameters["status"] = "Running"
data_collect_parameters["status"] = "Data collection successful"
self.emit(
"collectOscillationFinished",
(
owner,
True,
data_collect_parameters["status"],
"12345",
osc_id,
data_collect_parameters,
),
)
self.emit(
"collectEnded",
owner,
not failed,
failed_msg if failed else "Data collection successful",
)
self.log.info("data collection successful in loop")
self.emit("collectReady", (True,))
@task
def data_collection_hook(self, data_collect_parameters):
return
def do_prepare_oscillation(self, start, end, exptime, npass):
self.actual_frame_num = 0
@task
def oscil(self, start, end, exptime, npass):
return
@task
def data_collection_cleanup(self):
return
@task
def close_fast_shutter(self):
return
@task
def open_fast_shutter(self):
return
@task
def move_motors(self, motor_position_dict):
return
@task
def open_safety_shutter(self):
return
def safety_shutter_opened(self):
return
@task
def close_safety_shutter(self):
return
@task
def prepare_intensity_monitors(self):
return
def prepare_acquisition(
self, take_dark, start, osc_range, exptime, npass, number_of_images, comment=""
):
return
def set_detector_filenames(
self, frame_number, start, filename, jpeg_full_path, jpeg_thumbnail_full_path
):
return
def prepare_oscillation(self, start, osc_range, exptime, npass):
return (start, start + osc_range)
def do_oscillation(self, start, end, exptime, shutterless, npass, first_frame):
gevent.sleep(exptime)
def start_acquisition(self, exptime, npass, first_frame):
return
def write_image(self, last_frame):
self.actual_frame_num += 1
return
def last_image_saved(self):
return self.actual_frame_num
def stop_acquisition(self):
return
def reset_detector(self):
return
@task
def write_input_files(self, collection_id):
return
def get_wavelength(self):
return
def get_undulators_gaps(self):
return []
def get_resolution_at_corner(self):
return
def get_beam_size(self):
return None, None
def get_slit_gaps(self):
return None, None
def get_beam_shape(self):
return
def get_machine_current(self):
if self.bl_control.machine_current is not None:
return self.bl_control.machine_current.get_current()
else:
return 0
def get_machine_message(self):
if self.bl_control.machine_current is not None:
return self.bl_control.machine_current.get_message()
else:
return ""
def get_machine_fill_mode(self):
if self.bl_control.machine_current is not None:
return self.bl_control.machine_current.get_fill_mode()
else:
""""""
def get_cryo_temperature(self):
if self.bl_control.cryo_stream is not None:
return self.bl_control.cryo_stream.getTemperature()
def get_current_energy(self):
return
def get_beam_centre(self):
return None, None
def get_beamline_configuration(self, *args):
return self.bl_config._asdict()
def is_connected(self):
return True
[docs] def is_ready(self):
return True
def sample_changer_HO(self):
return self.bl_control.sample_changer
def diffractometer(self):
return self.bl_control.diffractometer
def sanity_check(self, collect_params):
return
def set_brick(self, brick):
return
def directory_prefix(self):
return self.bl_config.directory_prefix
def store_image_in_lims(self, frame, first_frame, last_frame):
return True
def get_oscillation(self, oscillation_id):
return self.oscillations_history[oscillation_id - 1]
def sample_accept_centring(self, accepted, centring_status):
self.sample_centring_done(accepted, centring_status)
def set_centring_status(self, centring_status):
self._centring_status = centring_status
def get_oscillations(self, session_id):
return []
def set_helical(self, helical_on):
return
def set_helical_pos(self, helical_oscil_pos):
return
def get_archive_directory(self, directory):
archive_dir = os.path.join(directory, "archive")
return archive_dir
@task
def generate_image_jpeg(self, filename, jpeg_path, jpeg_thumbnail_path):
pass