import logging
import os
import shutil
from mxcubecore import HardwareRepository as HWR
from mxcubecore.TaskUtils import task
from .ESRFMultiCollect import ESRFMultiCollect
[docs]class MD2MultiCollect(ESRFMultiCollect):
def __init__(self, name):
ESRFMultiCollect.__init__(self, name)
self.fast_characterisation = None
@task
def data_collection_hook(self, data_collect_parameters):
ESRFMultiCollect.data_collection_hook(self, data_collect_parameters)
self._detector.shutterless = data_collect_parameters["shutterless"]
if hasattr(HWR.beamline.sample_changer, "get_crystal_id"):
comment = HWR.beamline.sample_changer.get_crystal_id()
data_collect_parameters["comment"] = comment
@task
def get_beam_size(self):
_width, _height, _, _ = HWR.beamline.beam.get_value()
return _width, _height
@task
def get_slit_gaps(self):
self.get_object_by_role("controller")
return (None, None)
@task
def get_beam_shape(self):
return HWR.beamline.beam.get_value()[2].name
def get_resolution_at_corner(self):
return HWR.beamline.resolution.get_value_at_corner()
def ready(*motors):
return not any([m.motorIsMoving() for m in motors])
@task
def move_motors(self, motors_to_move_dict):
# We do not want to modify the input dict
motor_positions_copy = motors_to_move_dict.copy()
diffr = HWR.beamline.diffractometer
for tag in ("kappa", "kappa_phi", "zoom"):
if tag in motor_positions_copy:
del motor_positions_copy[tag]
diffr.set_value_motors(motor_positions_copy, simultaneous=True)
@task
def take_crystal_snapshots(self, number_of_snapshots, image_path_list=[]):
HWR.beamline.sample_view.take_acq_snapshot(image_path_list)
def do_prepare_oscillation(self, *args, **kwargs):
diffr = HWR.beamline.diffractometer
# send again the command as MD2 software only handles one
# centered position!!
# has to be where the motors are and before changing the phase
# diffr.get_command_object("save_centring_positions")()
# switch on the front light
diffr.frontlightswitch.set_value(diffr.frontlightswitch.VALUES.IN)
# diffr.frontlight.set_value(2)
diffr.backlightswitch.disable()
# move to DataCollection phase
logging.getLogger("user_level_log").info("Moving MD2 to DataCollection")
# AB next line to speed up the data collection
diffr.set_phase(diffr.get_phase_enum.COLLECT, timeout=0)
@task
def data_collection_cleanup(self):
HWR.beamline.diffractometer.wait_status_ready(10)
self.close_fast_shutter()
HWR.beamline.diffractometer.backlightswitch.enable()
@task
def oscil(self, start, end, exptime, number_of_images, wait=True):
diffr = HWR.beamline.diffractometer
# make sure the diffractometer is ready to do the scan
diffr.wait_status_ready(100)
if self.helical:
diffr.do_line_scan(start, end, exptime, number_of_images, self.helical_pos)
elif self.mesh:
det = HWR.beamline.detector
latency_time = det.get_property("latecy_time_mesh") or det.get_deadtime()
sequence_trigger = self.get_property("lima_sequnce_trigger_mode") or False
if sequence_trigger:
msg = "Using LIMA sequnce trigger mode for Eiger"
self.log.info(msg)
mesh_total_nb_frames = self.mesh_num_lines
else:
mesh_total_nb_frames = self.mesh_total_nb_frames
diffr.do_mesh_scan(
start,
end,
exptime,
latency_time,
self.mesh_num_lines,
mesh_total_nb_frames,
self.mesh_center,
self.mesh_range,
)
elif self.fast_characterisation:
nb_frames = 10
nb_scan = 4
angle = 90
exptime *= 10
scan_range = (end - start) * 10
diffr.do_characterisation_scan(
start,
scan_range,
nb_frames,
exptime,
nb_scan,
angle,
)
else:
diffr.do_oscillation_scan(start, end, exptime, number_of_images)
@task
def prepare_acquisition(
self, take_dark, start, osc_range, exptime, npass, number_of_images, comment=""
):
if self.fast_characterisation:
number_of_images *= 40
ext_gate = self.mesh or self.fast_characterisation
self._detector.prepare_acquisition(
take_dark,
start,
osc_range,
exptime,
npass,
number_of_images,
comment,
ext_gate,
self.mesh_num_lines,
)
def open_fast_shutter(self):
fs = HWR.beamline.fast_shutter
fs.set_value(fs.VALUES.OPEN)
def close_fast_shutter(self):
fs = HWR.beamline.fast_shutter
fs.set_value(fs.VALUES.CLOSED)
def set_helical(self, helical_on):
self.helical = helical_on
def set_helical_pos(self, helical_oscil_pos):
self.helical_pos = helical_oscil_pos
# specifies the next scan will be a mesh scan
def set_mesh(self, mesh_on):
self.mesh = mesh_on
[docs] def set_mesh_scan_parameters(
self, num_lines, total_nb_frames, mesh_center_param, mesh_range_param
):
"""
sets the mesh scan parameters :
- vertcal range
- horizontal range
- nb lines
- nb frames per line
- invert direction (boolean) # NOT YET DONE
"""
self.mesh_num_lines = num_lines
self.mesh_total_nb_frames = total_nb_frames
self.mesh_range = mesh_range_param
self.mesh_center = mesh_center_param
[docs] def set_fast_characterisation(self, value=False):
self.fast_characterisation = value
def get_cryo_temperature(self):
return 0
@task
def prepare_intensity_monitors(self):
return
def get_beam_centre(self):
pixel_x, pixel_y = HWR.beamline.detector.get_pixel_size()
bcx, bcy = HWR.beamline.detector.get_beam_position()
return [bcx * pixel_x, bcy * pixel_y]
@task
def write_input_files(self, datacollection_id):
# copy *geo_corr.cbf* files to process directory
try:
process_dir = os.path.join(self.xds_directory, "..")
raw_process_dir = os.path.join(self.raw_data_input_file_dir, "..")
for dir in (process_dir, raw_process_dir):
for filename in ("x_geo_corr.cbf.bz2", "y_geo_corr.cbf.bz2"):
dest = os.path.join(dir, filename)
if os.path.exists(dest):
continue
shutil.copyfile(
os.path.join(
self.get_property("template_file_directory"), filename
),
dest,
)
except Exception:
logging.exception("Exception happened while copying geo_corr files")
return ESRFMultiCollect.write_input_files(self, datacollection_id)