Source code for mxcubecore.model.queue_model_objects

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
This module contain objects that combined make up the data model.
Any object that inherits from TaskNode can be added to and handled by
the QueueModel.
"""

import ast
import copy
import logging
import os

from mxcubecore.model import queue_model_enumerables

try:
    from ruamel.yaml import YAML

    from mxcubecore.model import crystal_symmetry

    # If you want to write out copies of the file, use typ="rt" instead
    # pure=True uses yaml version 1.2, with fewer gotchas for strange type conversions
    yaml = YAML(typ="safe", pure=True)
    # The following are not needed for load, but define the default style.
    yaml.default_flow_style = False
    yaml.indent(mapping=2, sequence=4, offset=2)
except Exception:
    logging.getLogger("HWR").warning(
        "Cannot import dependencies needed for GPHL workflows - GPhL workflows might not work"
    )

from mxcubecore import HardwareRepository as HWR

__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"


[docs]class TaskNode(object): """ Objects that inherit TaskNode can be added to and handled by the QueueModel object. """ def __init__(self, task_data=None): self._children = [] self._name = str() self._number = 0 self._executed = False self._running = False self._parent = None self._names = {} self._enabled = True self._node_id = None self._requires_centring = True self._origin = None self._task_data = task_data @property def task_data(self): return self._task_data
[docs] def is_enabled(self): """ :returns: True if enabled and False if disabled """ return self._enabled
[docs] def set_enabled(self, state): """ Sets the enabled state, True represents enabled (executable) and false disabled (not executable). :param state: The state, True or False :type state: bool """ self._enabled = state
[docs] def get_children(self): """ :returns: The children of this node. :rtype: List of TaskNode objects. """ return self._children
[docs] def get_parent(self): """ :returns: The parent of this node. :rtype: TaskNode """ return self._parent
[docs] def set_name(self, name): """ Sets the name. :param name: The new name. :type name: str :returns: none """ if self.get_parent(): self._set_name(str(name)) else: self._name = str(name)
[docs] def set_origin(self, node_id): """ Sets the origin of this node, the node id of the node, if any, that somehow generated this node. :param name: node_id :type name: The node id that is the origin of this node :returns: none """ self._origin = node_id
[docs] def get_origin(self): """ :returns: The node id that is the origin of this node. :rtype: int """ return self._origin
[docs] def set_number(self, number): """ Sets the number of this node. The number can be used to give the task a unique number when for instance, the name is not unique for this node. :param number: number :type number: int """ self._number = int(number) if self.get_parent(): # Bump the run number for nodes with this name if self.get_parent()._names[self._name] < number: self.get_parent()._names[self._name] = number
def _set_name(self, name): if name in self.get_parent()._names: if self.get_parent()._names[name] < self._number: self.get_parent()._names[name] = self._number else: self.get_parent()._names[name] += 1 else: self.get_parent()._names[name] = self._number self._name = name def get_name(self): return "%s - %i" % (self._name, self._number) def get_next_number_for_name(self, name): num = self._names.get(name) if num: num += 1 else: num = 1 return num def get_full_name(self): name_list = [self.get_name()] parent = self._parent while parent: name_list.append(parent.get_name()) parent = parent._parent return name_list def get_display_name(self): return self.get_name() def get_acq_parameters(self): return None def get_path_template(self): return None def get_files_to_be_written(self): return [] def get_centred_positions(self): return [] def set_centred_positions(self, cp): pass def is_executed(self): return self._executed def set_executed(self, executed): self._executed = executed def is_running(self): # IK maybe replace is_executed and is_running with state? return self._running def set_running(self, running): self._running = running def requires_centring(self): return self._requires_centring def set_requires_centring(self, state): self._requires_centring = state def get_root(self): parent = self._parent root = self if parent: while parent: root = parent parent = parent._parent return root def copy(self): new_node = copy.deepcopy(self) return new_node def __repr__(self): s = "<%s object at %s>" % (self.__class__.__name__, hex(id(self))) return s
[docs] def get_sample_node(self): """get Sample task node that this entry is executed on""" result = self while result is not None and not isinstance(result, Sample): result = result._parent return result
def set_snapshot(self, snapshot): pass
[docs]class DelayTask(TaskNode): """Dummy task, for mock testing only""" def __init__(self, delay=10): TaskNode.__init__(self) self._name = "Delay" self.delay = delay
[docs]class RootNode(TaskNode): def __init__(self): TaskNode.__init__(self) self._name = "root" self._total_node_count = 0
[docs]class TaskGroup(TaskNode): def __init__(self): TaskNode.__init__(self) self.lims_group_id = None self.interleave_num_images = None self.inverse_beam_num_images = None def set_name_from_task(self, task): if isinstance(task, DataCollection): self._name = "Standard"
[docs]class Sample(TaskNode): def __init__(self): TaskNode.__init__(self) self.code = str() self.lims_code = str() self.holder_length = 22.0 self.lims_id = -1 self.name = str() self.lims_sample_location = -1 self.lims_container_location = -1 self.free_pin_mode = False self.loc_str = str() self.diffraction_plan = None # A pair <basket_number, sample_number> self.location = (None, None) self.lims_location = (None, None) self.container_code = None # Crystal information self.crystals = [Crystal()] self.processing_parameters = ProcessingParameters() self.processing_parameters.num_residues = 200 self.processing_parameters.process_data = True self.processing_parameters.anomalous = False self.processing_parameters.pdb_code = None self.processing_parameters.pdb_file = str() self.energy_scan_result = EnergyScanResult() def __str__(self): s = "<%s object at %s>" % (self.__class__.__name__, hex(id(self))) return s def _print(self): print(("sample: %s" % self.loc_str)) def has_lims_data(self): try: if int(self.lims_id) > -1: return True except TypeError: return False return False def get_name(self): return self._name def get_display_name(self): display_name = HWR.beamline.session.get_default_prefix(self) if self.lims_code: display_name += " (%s)" % self.lims_code if self.get_name(): display_name = self.get_name() return display_name def init_from_sc_sample(self, sc_sample): self.loc_str = str(sc_sample[1]) + ":" + str(sc_sample[2]) self.location = (sc_sample[1], sc_sample[2]) self.set_name(self.loc_str) if sc_sample[3] != "": self.set_name(sc_sample[3]) else: self.set_name(self.loc_str) def init_from_lims_object(self, lims_sample): if hasattr(lims_sample, "cellA"): self.crystals[0].cell_a = lims_sample.cellA self.processing_parameters.cell_a = lims_sample.cellA else: self.crystals[0].cell_a = lims_sample.get("cellA") self.processing_parameters.cell_a = lims_sample.get("cellA") if hasattr(lims_sample, "cellAlpha"): self.crystals[0].cell_alpha = lims_sample.cellAlpha self.processing_parameters.cell_alpha = lims_sample.cellAlpha else: self.crystals[0].cell_alpha = lims_sample.get("cellAlpha") self.processing_parameters.cell_alpha = lims_sample.get("cellAlpha") if hasattr(lims_sample, "cellB"): self.crystals[0].cell_b = lims_sample.cellB self.processing_parameters.cell_b = lims_sample.cellB else: self.crystals[0].cell_b = lims_sample.get("cellB") self.processing_parameters.cell_b = lims_sample.get("cellB") if hasattr(lims_sample, "cellBeta"): self.crystals[0].cell_beta = lims_sample.cellBeta self.processing_parameters.cell_beta = lims_sample.cellBeta else: self.crystals[0].cell_beta = lims_sample.get("cellBeta") self.processing_parameters.cell_beta = lims_sample.get("cellBeta") if hasattr(lims_sample, "cellC"): self.crystals[0].cell_c = lims_sample.cellC self.processing_parameters.cell_c = lims_sample.cellC else: self.crystals[0].cell_c = lims_sample.get("cellC") self.processing_parameters.cell_c = lims_sample.get("cellC") if hasattr(lims_sample, "cellGamma"): self.crystals[0].cell_gamma = lims_sample.cellGamma self.processing_parameters.cell_gamma = lims_sample.cellGamma else: self.crystals[0].cell_gamma = lims_sample.get("cellGamma") self.processing_parameters.cell_gamma = lims_sample.get("cellGamma") if hasattr(lims_sample, "proteinAcronym"): self.crystals[0].protein_acronym = lims_sample.proteinAcronym self.processing_parameters.protein_acronym = lims_sample.proteinAcronym else: self.crystals[0].protein_acronym = lims_sample.get("proteinAcronym") self.processing_parameters.protein_acronym = lims_sample.get( "proteinAcronym" ) if hasattr(lims_sample, "crystalSpaceGroup"): self.crystals[0].space_group = lims_sample.crystalSpaceGroup self.processing_parameters.space_group = lims_sample.crystalSpaceGroup else: self.crystals[0].space_group = lims_sample.get("crystalSpaceGroup") self.processing_parameters.space_group = lims_sample.get( "crystalSpaceGroup" ) if hasattr(lims_sample, "code"): self.lims_code = lims_sample.code else: self.lims_code = lims_sample.get("code") if hasattr(lims_sample, "holderLength"): self.holder_length = lims_sample.holderLength else: self.holder_length = lims_sample.get("holderLength") if hasattr(lims_sample, "sampleId"): self.lims_id = lims_sample.sampleId else: self.lims_id = lims_sample.get("sampleId") if hasattr(lims_sample, "sampleName"): self.name = str(lims_sample.sampleName) else: self.name = str(lims_sample.get("sampleName")) if hasattr(lims_sample, "containerSampleChangerLocation") and hasattr( lims_sample, "sampleLocation" ): if ( lims_sample.containerSampleChangerLocation and lims_sample.sampleLocation ): self.lims_sample_location = int(lims_sample.sampleLocation) self.lims_container_location = int( lims_sample.containerSampleChangerLocation ) else: try: self.lims_sample_location = int(lims_sample.get("sampleLocation")) self.lims_container_location = int( lims_sample.get("containerSampleChangerLocation") ) except Exception: pass _lims = (self.lims_container_location, self.lims_sample_location) self.lims_location = _lims self.location = _lims self.loc_str = str(self.lims_location[0]) + ":" + str(self.lims_location[1]) if hasattr(lims_sample, "containerCode"): self.container_code = str(lims_sample.containerCode) else: self.container_code = str(lims_sample.get("containerCode")) if hasattr(lims_sample, "diffractionPlan"): self.diffraction_plan = lims_sample.diffractionPlan else: self.diffraction_plan = lims_sample.get("diffractionPlan") self.set_name(HWR.beamline.session.get_default_prefix(self)) def set_from_dict(self, p): self.code = p.get("code", "") self.lims_code = p.get("limsCode", "") self.holder_length = p.get("holderLength", 22.0) self.lims_id = p.get("limsID", -1) self.lims_sample_location = p.get("sampleLocation", -1) self.lims_container_location = p.get("containerSampleChangerLocation", -1) self.free_pin_mode = p.get("freePinMode", False) self.loc_str = p.get("locStr", "") self.diffraction_plan = p.get("diffractionPlan") self.name = p.get("sampleName", self.name) self.crystals[0].space_group = p.get("spaceGroup") or p.get( "crystalSpaceGroup", "" ) self.crystals[0].cell_a = p.get("cellA", "") self.crystals[0].cell_alpha = p.get("cellAlpha", "") self.crystals[0].cell_b = p.get("cellB", "") self.crystals[0].cell_beta = p.get("cellBeta", "") self.crystals[0].cell_c = p.get("cellC", "") self.crystals[0].cell_gamma = p.get("cellGamma", "") self.crystals[0].protein_acronym = p.get("proteinAcronym", "") self.crystals[0].crystal_uuid = p.get("crystalUUID", "") def get_processing_parameters(self): processing_params = ProcessingParameters() processing_params.space_group = self.crystals[0].space_group processing_params.cell_a = self.crystals[0].cell_a processing_params.cell_alpha = self.crystals[0].cell_alpha processing_params.cell_b = self.crystals[0].cell_b processing_params.cell_beta = self.crystals[0].cell_beta processing_params.cell_c = self.crystals[0].cell_c processing_params.cell_gamma = self.crystals[0].cell_gamma processing_params.protein_acronym = self.crystals[0].protein_acronym return processing_params
[docs]class Basket(TaskNode): """ Class represents a basket in the tree. It has not task assigned. It represents a parent for samples with the same basket id. """ def __init__(self): TaskNode.__init__(self) self.name = str() self.location = None self.free_pin_mode = False self.sample_list = [] @property def is_present(self): return self.get_is_present() def init_from_sc_basket(self, sc_basket, name="Puck"): self._basket_object = sc_basket[1] # self.is_present = sc_basket[2] if name == "Row" or self._basket_object is None: self.location = sc_basket[0] if name == "Row": self.name = "%s %s" % (name, chr(65 + self.location)) else: self.name = "%s %d" % (name, self.location) else: self.location = self._basket_object.get_coords() if len(self.location) == 2: self.name = "Cell %d, puck %d" % self.location elif len(self.location) == 1: self.name = "%s %s" % (name, self.location[0]) else: self.name = "%s %s" % (name, self.location) def get_name(self): return self.name def get_location(self): return self.location def get_is_present(self): return self._basket_object.present def clear_sample_list(self): self.sample_list = [] def add_sample(self, sample): self.sample_list.append(sample) def get_sample_list(self): return self.sample_list def get_display_name(self): display_name = self.name if self.sample_list: for sample in self.sample_list: if sample.container_code: display_name += " (%s)" % sample.container_code break return display_name
[docs]class DataCollection(TaskNode): def __init__( self, acquisition_list=None, crystal=None, processing_parameters=None, name="", task_data=None, ): TaskNode.__init__(self, task_data=task_data) if not acquisition_list: acquisition_list = [Acquisition()] if not crystal: crystal = Crystal() if not processing_parameters: processing_parameters = ProcessingParameters() self.acquisitions = acquisition_list self.crystal = crystal self.processing_parameters = processing_parameters self.set_name(name) self.previous_acquisition = None self.experiment_type = queue_model_enumerables.EXPERIMENT_TYPE.NATIVE self.html_report = str() self.id = int() self.lims_group_id = None self.run_offline_processing = True self.run_online_processing = False self.grid = None self.shape = None self.online_processing_results = {"raw": None, "aligned": None} self.processing_msg_list = [] self.workflow_id = None self.center_before_collect = False self.ispyb_group_data_collections = False # The 'workflow_parameters' attribute is used for passing parameters # from the automation workflows (BES) to ispyb-DRAC (see ICATLIMS.py) self.workflow_parameters = {} @staticmethod def set_processing_methods(processing_methods): DataCollection.processing_methods = processing_methods def as_dict(self): acq = self.acquisitions[0] path_template = acq.path_template parameters = acq.acquisition_parameters return { **parameters.as_dict(), # centred position is stored as an object reference. The as_dict method above expects json-serializable data. "centred_position": parameters.centred_position, "prefix": path_template.get_prefix(), "run_number": path_template.run_number, "path": path_template.directory, "sample": str(self.crystal), "acquisitions": str(self.acquisitions), "acq_parameters": str(parameters), "snapshot": parameters.centred_position.snapshot_image, } def set_experiment_type(self, exp_type): self.experiment_type = exp_type if self.experiment_type == queue_model_enumerables.EXPERIMENT_TYPE.MESH: self.set_requires_centring(False) def is_fast_characterisation(self): return self.experiment_type == queue_model_enumerables.EXPERIMENT_TYPE.EDNA_REF def is_helical(self): return self.experiment_type == queue_model_enumerables.EXPERIMENT_TYPE.HELICAL def is_mesh(self): return self.experiment_type == queue_model_enumerables.EXPERIMENT_TYPE.MESH def is_still(self): return self.experiment_type == queue_model_enumerables.EXPERIMENT_TYPE.STILL def get_name(self): return "%s_%i" % ( self.acquisitions[0].path_template.get_prefix(), self.acquisitions[0].path_template.run_number, ) def is_collected(self): return self.is_executed() def set_collected(self, collected): return self.set_executed(collected) def set_comments(self, comments): self.acquisitions[0].acquisition_parameters.comments = comments def get_acq_parameters(self): return self.acquisitions[0].acquisition_parameters def get_path_template(self): return self.acquisitions[0].path_template def get_files_to_be_written(self): path_template = self.acquisitions[0].path_template file_locations = path_template.get_files_to_be_written() return file_locations def get_centred_positions(self): centred_pos = [] for pos in self.acquisitions: centred_pos.append(pos.acquisition_parameters.centred_position) return centred_pos # return [self.acquisitions[0].acquisition_parameters.centred_position] def set_centred_positions(self, cp): self.acquisitions[0].acquisition_parameters.centred_position = cp def __str__(self): s = "<%s object at %s>" % (self.__class__.__name__, hex(id(self))) return s def copy(self): new_node = copy.deepcopy(self) cpos = self.acquisitions[0].acquisition_parameters.centred_position if cpos: snapshot_image = self.acquisitions[ 0 ].acquisition_parameters.centred_position.snapshot_image if snapshot_image: # snapshot_image_copy = snapshot_image.copy() acq_parameters = new_node.acquisitions[0].acquisition_parameters acq_parameters.centred_position.snapshot_image = None return new_node
[docs] def get_point_index(self): """ Descript. : Returns point index associated to the data collection Args. : Return : index (integer) """ cp = self.get_centred_positions() return cp[0].get_index()
[docs] def get_helical_point_index(self): """ Descript. : Return indexes of points associated to the helical line Args. : Return : index (integer), index (integer) """ cp = self.get_centred_positions() return cp[0].get_index(), cp[1].get_index()
[docs] def set_grid_id(self, grid_id): """ Descript. : Sets grid id associated to the data collection Args. : grid_id (integer) Return : """ self.grid_id = grid_id
[docs] def get_display_name(self): """ Descript. : Returns display name depending from collection type Args. : Return : display_name (string) """ if self.is_helical(): start_index, end_index = self.get_helical_point_index() if None not in (start_index, end_index): display_name = "%s (Line %d:%d)" % ( self.get_name(), start_index, end_index, ) else: display_name = self.get_name() elif self.is_mesh(): display_name = "%s (%s)" % (self.get_name(), self.grid.get_display_name()) else: if self.requires_centring(): index = self.get_point_index() if index: index = str(index) else: index = "not defined" display_name = "%s (Point %s)" % (self.get_name(), index) else: display_name = self.get_name() return display_name
def set_online_processing_results(self, raw, aligned): self.online_processing_results["raw"] = raw self.online_processing_results["aligned"] = aligned def get_online_processing_results(self): return self.online_processing_results def set_snapshot(self, snapshot): self.acquisitions[ 0 ].acquisition_parameters.centred_position.snapshot_image = snapshot def add_processing_msg(self, time, method, status, msg): self.processing_msg_list.append((time, method, status, msg))
class ProcessingParameters(object): def __init__(self): self.space_group = 0 self.cell_a = 0 self.cell_alpha = 0 self.cell_b = 0 self.cell_beta = 0 self.cell_c = 0 self.cell_gamma = 0 self.protein_acronym = "" self.num_residues = 200 self.process_data = True self.anomalous = False self.pdb_code = None self.pdb_file = str() self.resolution_cutoff = 2.0 def get_cell_str(self): return ",".join( map( str, ( self.cell_a, self.cell_b, self.cell_c, self.cell_alpha, self.cell_beta, self.cell_gamma, ), ) )
[docs]class Characterisation(TaskNode): def __init__( self, ref_data_collection=None, characterisation_parameters=None, name="" ): TaskNode.__init__(self) if not characterisation_parameters: characterisation_parameters = CharacterisationParameters() if not ref_data_collection: ref_data_collection = DataCollection() self.reference_image_collection = ref_data_collection self.characterisation_parameters = characterisation_parameters self.set_name(name) self.html_report = None self.run_characterisation = True self.characterisation_software = None self.wait_result = None self.run_diffraction_plan = None self.auto_add_diff_plan = True self.diffraction_plan = [] @staticmethod def set_char_compression(state): Characterisation.diff_plan_compression = state def get_name(self): return "%s_%i" % (self._name, self._number) def set_comments(self, comments): self.reference_image_collection.acquisitions[ 0 ].acquisition_parameters.comments = comments def get_acq_parameters(self): return self.reference_image_collection.acquisitions[0].acquisition_parameters def get_path_template(self): return self.reference_image_collection.acquisitions[0].path_template def get_files_to_be_written(self): path_template = self.reference_image_collection.acquisitions[0].path_template file_locations = path_template.get_files_to_be_written() return file_locations def get_centred_positions(self): return [ self.reference_image_collection.acquisitions[ 0 ].acquisition_parameters.centred_position ] def set_centred_positions(self, cp): self.reference_image_collection.acquisitions[ 0 ].acquisition_parameters.centred_position = cp def copy(self): new_node = copy.deepcopy(self) new_node.reference_image_collection = self.reference_image_collection.copy() return new_node
[docs] def get_point_index(self): """ Descript. : Returns point index associated to the data collection Args. : Return : index (integer) """ cp = self.get_centred_positions() return cp[0].get_index()
[docs] def get_display_name(self): """ Descript. : Returns display name of the collection Args. : Return : display_name (string) """ index = self.get_point_index() if index: index = str(index) else: index = "not defined" display_name = "%s (Point - %s)" % (self.get_name(), index) return display_name
def set_snapshot(self, snapshot): self.reference_image_collection.acquisitions[ 0 ].acquisition_parameters.centred_position.snaphot_image = snapshot
class CharacterisationParameters(object): def __init__(self): # Setting num_ref_images to EDNA_NUM_REF_IMAGES.NONE # will disable characterisation. self.path_template = PathTemplate() self.experiment_type = 0 # Optimisation parameters self.use_aimed_resolution = bool() self.aimed_resolution = float() self.use_aimed_multiplicity = bool() self.aimed_multiplicity = int() self.aimed_i_sigma = float() self.aimed_completness = float() self.strategy_complexity = int() self.strategy_program = "Optimal" self.induce_burn = bool() self.use_permitted_rotation = bool() self.permitted_phi_start = float() self.permitted_phi_end = float() self.low_res_pass_strat = bool() # Crystal self.max_crystal_vdim = float() self.min_crystal_vdim = float() self.max_crystal_vphi = float() self.min_crystal_vphi = float() self.space_group = "" # Characterisation type self.use_min_dose = bool() self.use_min_time = bool() self.min_dose = float() self.min_time = float() self.account_rad_damage = bool() self.auto_res = bool() self.opt_sad = bool() self.sad_res = float() self.determine_rad_params = bool() self.burn_osc_start = float() self.burn_osc_interval = int() # Radiation damage model self.rad_suscept = float() self.beta = float() self.gamma = float() def as_dict(self): return { "experiment_type": self.experiment_type, "use_aimed_resolution": self.use_aimed_resolution, "use_aimed_multiplicity": self.use_aimed_multiplicity, "aimed_multiplicity": self.aimed_multiplicity, "aimed_i_sigma": self.aimed_i_sigma, "aimed_completness": self.aimed_completness, "strategy_complexity": self.strategy_complexity, "strategy_program": self.strategy_program, "induce_burn": self.induce_burn, "use_permitted_rotation": self.use_permitted_rotation, "permitted_phi_start": self.permitted_phi_start, "permitted_phi_end": self.permitted_phi_end, "low_res_pass_strat": self.low_res_pass_strat, "max_crystal_vdim": self.max_crystal_vdim, "min_crystal_vdim": self.min_crystal_vdim, "max_crystal_vphi": self.max_crystal_vphi, "min_crystal_vphi": self.min_crystal_vphi, "space_group": self.space_group, "use_min_dose": self.use_min_dose, "use_min_time": self.use_min_time, "min_dose": self.min_dose, "min_time": self.min_time, "account_rad_damage": self.account_rad_damage, "auto_res": self.auto_res, "opt_sad": self.opt_sad, "sad_res": self.sad_res, "determine_rad_params": self.determine_rad_params, "burn_osc_start": self.burn_osc_start, "burn_osc_interval": self.burn_osc_interval, "rad_suscept": self.rad_suscept, "beta": self.beta, "gamma": self.gamma, } def set_from_dict(self, params_dict): for dict_item in params_dict.items(): if hasattr(self, dict_item[0]): setattr(self, dict_item[0], dict_item[1]) def __repr__(self): s = "<%s object at %s>" % (self.__class__.__name__, hex(id(self))) return s
[docs]class EnergyScan(TaskNode): def __init__(self, sample=None, path_template=None, cpos=None): TaskNode.__init__(self) self.element_symbol = None self.edge = None self.comments = None self.set_requires_centring(True) self.centred_position = cpos self.shape = None if not sample: self.sample = Sample() else: self.sample = sample if not path_template: self.path_template = PathTemplate() else: self.path_template = path_template self.result = EnergyScanResult() def get_run_number(self): return self.path_template.run_number def get_prefix(self): return self.path_template.get_prefix() def set_comments(self, comments): self.comments = comments def get_path_template(self): return self.path_template def set_scan_result_data(self, data): self.result.data = data def get_scan_result(self): return self.result def is_collected(self): return self.is_executed() def set_collected(self, collected): return self.set_executed(collected) def get_point_index(self): if self.centred_position: return self.centred_position.get_index() def get_display_name(self): index = self.get_point_index() if index: index = str(index) else: index = "not defined" display_name = "%s (%s %s, Point - %s)" % ( self.get_name(), self.element_symbol, self.edge, index, ) return display_name def copy(self): new_node = copy.deepcopy(self) cpos = self.centred_position if cpos: snapshot_image = self.centred_position.snapshot_image if snapshot_image: snapshot_image_copy = snapshot_image.copy() new_node.centred_position.snapshot_image = snapshot_image_copy return new_node def set_snapshot(self, snapshot): self.centred_position.snapshot_image = snapshot
class EnergyScanResult(object): def __init__(self): object.__init__(self) self.inflection = None self.peak = None self.first_remote = None self.second_remote = None self.data_file_path = PathTemplate() self.data = [] self.pk = None self.fppPeak = None self.fpPeak = None self.ip = None self.fppInfl = None self.fpInfl = None self.rm = None self.chooch_graph_x = None self.chooch_graph_y1 = None self.chooch_graph_y2 = None self.title = None
[docs]class XRFSpectrum(TaskNode): """ Class represents XRF spectrum task """ def __init__(self, sample=None, path_template=None, cpos=None): TaskNode.__init__(self) self.count_time = 1 self.comments = None self.set_requires_centring(True) self.centred_position = cpos self.adjust_transmission = True self.shape = None if not sample: self.sample = Sample() else: self.sample = sample if not path_template: self.path_template = PathTemplate() else: self.path_template = path_template self.result = XRFSpectrumResult() def get_run_number(self): return self.path_template.run_number def get_prefix(self): return self.path_template.get_prefix() def get_path_template(self): return self.path_template def get_point_index(self): if self.centred_position: return self.centred_position.get_index() def get_display_name(self): index = self.get_point_index() if index: index = str(index) else: index = "not defined" display_name = "%s (Point - %s)" % (self.get_name(), index) return display_name def set_count_time(self, count_time): self.count_time = count_time def is_collected(self): return self.is_executed() def set_collected(self, collected): return self.set_executed(collected) def set_comments(self, comments): self.comments = comments def get_spectrum_result(self): return self.result def copy(self): new_node = copy.deepcopy(self) cpos = self.centred_position if cpos: snapshot_image = self.centred_position.snapshot_image if snapshot_image: snapshot_image_copy = snapshot_image.copy() new_node.centred_position.snapshot_image = snapshot_image_copy return new_node def set_snaphot(self, snapshot): self.centred_position.snapshot_image = snapshot
class XRFSpectrumResult(object): def __init__(self): object.__init__(self) self.mca_data = None self.mca_calib = None self.mca_config = None
[docs]class XrayCentring(TaskNode): def __init__(self, ref_data_collection=None, crystal=None): TaskNode.__init__(self) self.set_requires_centring(False) if not ref_data_collection: ref_data_collection = DataCollection() if not crystal: crystal = Crystal() self.reference_image_collection = ref_data_collection self.line_collection = ref_data_collection.copy() self.line_collection.set_experiment_type( queue_model_enumerables.EXPERIMENT_TYPE.HELICAL ) self.line_collection.run_online_processing = "XrayCentring" self.line_collection.grid = None acq_two = Acquisition() self.line_collection.acquisitions.append(acq_two) self.line_collection.acquisitions[0].acquisition_parameters.num_images = 100 self.line_collection.acquisitions[0].acquisition_parameters.num_lines = 1 helical_acq_path_template = self.line_collection.acquisitions[0].path_template helical_acq_path_template.base_prefix = ( "line_" + helical_acq_path_template.base_prefix ) self.crystal = crystal self.html_report = None def get_display_name(self): return "Xray centring" def get_path_template(self): return self.reference_image_collection.acquisitions[0].path_template def get_files_to_be_written(self): path_template = self.reference_image_collection.acquisitions[0].path_template file_locations = path_template.get_files_to_be_written() return file_locations def add_task(self, task): pass
[docs]class XrayCentring2(TaskNode): """X-ray centring (2022 version) Contains all parameters necessary for X-ray centring This object is passed to the QueueEntry and HardwareObject Parameters not defined here must be set as defaults, somehow (transmission, grid step, ...) """ def __init__( self, name=None, motor_positions=None, grid_size=None, workflow_parameters=None ): """ :param name: (str) Task name - for queue display. Default to std. name :param motor_positions: (dict) Motor positions for centring (default to current) :param grid_size: (tuple) grid_size_x, grid_size_y (in mm) """ TaskNode.__init__(self) self._centring_result = None self._motor_positions = motor_positions.copy() if motor_positions else {} self._grid_size = tuple(grid_size) if grid_size else None self._workflow_parameters = workflow_parameters if workflow_parameters else {} # I do nto now if you need a path template; if not remove this # and the access to it in init_from_task_data self.path_template = PathTemplate() if name: self.set_name(name) def get_name(self): return self._name def get_motor_positions(self): return self._motor_positions.copy() def set_motor_positions(self, value): self._motor_positions = dict(value) if value else {} def get_grid_size(self): return self._grid_size def set_grid_size(self, value): self._grid_size = tuple(value) if value else None def get_centring_result(self): return self._centring_result def set_centring_result(self, value): if value is None or isinstance(value, CentredPosition): self._centring_result = value else: raise TypeError( "SampleCentring.centringResult must be a CentredPosition" " or None, was a %s" % value.__class__.__name__ ) def get_workflow_parameters(self): return self._workflow_parameters
[docs] def init_from_task_data(self, sample_model, params): """Set parameters from task input dictionary. sample_model is required as this may be called before the object is enqueued params is a dictionary with structure determined by mxcubeweb usage """ # Set path template self.path_template.set_from_dict(params) if params["prefix"]: self.path_template.base_prefix = params["prefix"] else: self.path_template.base_prefix = HWR.beamline.session.get_default_prefix( sample_model ) self.path_template.num_files = 0 self.path_template.directory = os.path.join( HWR.beamline.session.get_base_image_directory(), params.get("subdir", "") ) self.path_template.process_directory = os.path.join( HWR.beamline.session.get_base_process_directory(), params.get("subdir", ""), ) # Set parameters from params dict if "name" in params: self.set_name(params["name"]) if "motor_positions" in params: self.set_motor_positions(params["motor_positions"]) if "grid_size" in params: self.set_grid_size(params["grid_size"])
[docs]class SampleCentring(TaskNode): """Manual 3 click centring kappa and kappa_phi settings are applied first, and assume that the beamline does have axes with exactly these names Other motor_positions are applied afterwards, but in random order. motor_positions override kappa and kappa_phi if both are set Since setting one motor can change the position of another (on ESRF ID30B setting kappa and kappa_phi changes the translation motors) the order is important. """ def __init__(self, name=None, kappa=None, kappa_phi=None, motor_positions=None): TaskNode.__init__(self) self._tasks = [] self._other_motor_positions = motor_positions.copy() if motor_positions else {} self._centring_result = None if name: self.set_name(name) if "kappa" in self._other_motor_positions: kappa = self._other_motor_positions.pop("kappa") if "kappa_phi" in self._other_motor_positions: kappa_phi = self._other_motor_positions.pop("kappa_phi") self.kappa = kappa self.kappa_phi = kappa_phi def add_task(self, task_node): self._tasks.append(task_node) def get_tasks(self): return self._tasks def get_name(self): return self._name def get_kappa(self): return self.kappa def get_kappa_phi(self): return self.kappa_phi def get_other_motor_positions(self): return self._other_motor_positions.copy() def get_centring_result(self): return self._centring_result def set_centring_result(self, value): if value is None or isinstance(value, CentredPosition): self._centring_result = value else: raise TypeError( "SampleCentring.centringResult must be a CentredPosition" " or None, was a %s" % value.__class__.__name__ )
[docs]class OpticalCentring(TaskNode): """Optical automatic centring with lucid""" def __init__(self, user_confirms=False): TaskNode.__init__(self) if user_confirms: self.set_name("Optical automatic centring (user confirms)") else: self.set_name("Optical automatic centring") if user_confirms: self.try_count = 3 else: self.try_count = 1 def add_task(self, task_node): pass def get_name(self): return self._name
class Acquisition(object): def __init__(self): object.__init__(self) self.path_template = PathTemplate() self.acquisition_parameters = AcquisitionParameters() def get_preview_image_paths(self): """Returns the full paths, including the filename, to preview/thumbnail images stored in the archive directory. Returns: list: list of paths """ paths = [] for i in range( self.acquisition_parameters.first_image, self.acquisition_parameters.num_images + self.acquisition_parameters.first_image, ): path = os.path.join( self.path_template.get_archive_directory(), self.path_template.get_image_file_name(suffix="thumb.jpeg") % i, ) paths.append(path) return paths class PathTemplate(object): @staticmethod def set_data_base_path(base_directory): # os.path.abspath returns path without trailing slash, if any # eg. '/data/' => '/data'. PathTemplate.base_directory = os.path.abspath(base_directory) @staticmethod def set_archive_path(archive_base_directory, archive_folder): PathTemplate.archive_base_directory = os.path.abspath(archive_base_directory) PathTemplate.archive_folder = archive_folder @staticmethod def set_path_template_style(synchrotron_name, template=None): PathTemplate.synchrotron_name = synchrotron_name PathTemplate.template = template @staticmethod def set_precision(precision): PathTemplate.precision = precision @staticmethod def interpret_path(path): try: dirname, fname = os.path.split(path) fname, ext = os.path.splitext(fname) fname_parts = fname.split("_") # Get run number and image number from path run_number, img_number = map(try_parse_int, fname_parts[-2:]) # Get the prefix and filename part prefix = "_".join(fname_parts[:-2]) prefix_path = os.path.join(dirname, prefix) except IndexError: prefix_path, run_number, img_number = ["", -1, -1] return prefix_path, run_number, img_number def __init__(self): object.__init__(self) self.directory = str() self.process_directory = str() self.xds_dir = str() self.base_prefix = str() self.mad_prefix = str() self.reference_image_prefix = str() self.wedge_prefix = str() self.run_number = int() self.suffix = "h5" self.start_num = int() self.num_files = int() self.compression = False if not hasattr(self, "precision"): self.precision = str() def as_dict(self): return { "directory": self.directory, "process_directory": self.process_directory, "xds_dir": self.xds_dir, "base_prefix": self.base_prefix, "mad_prefix": self.mad_prefix, "reference_image_prefix": self.reference_image_prefix, "wedge_prefix": self.wedge_prefix, "run_number": self.run_number, "suffix": self.suffix, "precision": self.precision, "start_num": self.start_num, "num_files": self.num_files, "compression": self.compression, } def set_from_dict(self, params_dict): for dict_item in params_dict.items(): if hasattr(self, dict_item[0]): setattr(self, dict_item[0], dict_item[1]) def get_prefix(self): prefix = self.base_prefix if self.mad_prefix: prefix = str(self.base_prefix) + "-" + str(self.mad_prefix) if self.reference_image_prefix: prefix = self.reference_image_prefix + "-" + prefix if self.wedge_prefix: prefix = prefix + "_" + self.wedge_prefix return prefix def get_image_file_name(self, suffix=None): file_name = HWR.beamline.detector.get_image_file_name(self) return file_name def get_image_path(self): path = os.path.join(self.directory, self.get_image_file_name()) return path def get_archive_directory(self): """ Returns the archive directory, for longer term storage. synchrotron_name is set via static function called from session hwobj :rtype: str :returns: Archive directory """ folders = self.directory.split("/") if PathTemplate.synchrotron_name == "MAXLAB": archive_directory = self.directory archive_directory = archive_directory.replace( "/data/data1/visitor", "/data/ispyb" ) archive_directory = archive_directory.replace( "/data/data1/inhouse", "/data/ispyb" ) archive_directory = archive_directory.replace("/data/data1", "/data/ispyb") elif PathTemplate.synchrotron_name == "EMBL-HH": archive_directory = os.path.join( PathTemplate.archive_base_directory, PathTemplate.archive_folder, *folders[4:], ) elif PathTemplate.synchrotron_name == "DESY": logging.getLogger("HWR").debug( "PathTemplate (DESY) - (to be defined) directory is %s" % self.directory ) archive_directory = HWR.beamline.session.get_archive_directory() elif PathTemplate.synchrotron_name == "ALBA": logging.getLogger("HWR").debug( "PathTemplate (ALBA) - directory is %s" % self.directory ) directory = self.directory folders = directory.split(os.path.sep) user_dir = folders[5] session_date = folders[6] try: more = folders[8:] except Exception: more = [] archive_directory = os.path.join( PathTemplate.archive_base_directory, user_dir, session_date, *more ) logging.getLogger("HWR").debug( "PathTemplate (ALBA) - archive_directory is %s" % archive_directory ) else: directory = self.directory[len(PathTemplate.base_directory) :] folders = directory.split("/") if "visitor" in folders: endstation_name = folders[3] folders[1] = PathTemplate.archive_folder folders[3] = folders[2] folders[2] = endstation_name else: endstation_name = folders[1] folders[1] = PathTemplate.archive_folder folders[2] = endstation_name archive_directory = os.path.join( PathTemplate.archive_base_directory, *folders[1:] ) return archive_directory def __eq__(self, path_template): result = False lh_dir = os.path.normpath(self.directory) rh_dir = os.path.normpath(path_template.directory) if self.get_prefix() == path_template.get_prefix() and lh_dir == rh_dir: result = True return result def intersection(self, rh_pt): result = False # Only do the intersection if there is possibility for # Collision, that is directories are the same. if (self == rh_pt) and (self.run_number == rh_pt.run_number): if self.start_num < ( rh_pt.start_num + rh_pt.num_files ) and rh_pt.start_num < (self.start_num + self.num_files): result = True return result def get_files_to_be_written(self): file_locations = [] file_name_template = self.get_image_file_name() for i in range(self.start_num, self.start_num + self.num_files): file_locations.append(os.path.join(self.directory, file_name_template % i)) return file_locations def get_first_and_last_file(self): return HWR.beamline.detector.get_first_and_last_file(self) def is_part_of(self, path_template): result = False if self == path_template and self.run_number == path_template.run_number: if ( path_template.start_num >= self.start_num and path_template.num_files + path_template.start_num <= self.num_files + self.start_num ): result = True else: result = False return result def copy(self): return copy.deepcopy(self) class AcquisitionParameters(object): def __init__(self): object.__init__(self) self.first_image = int() self.num_images = int() self.osc_start = float() self.osc_range = float() self.osc_total_range = float() self.offset = float() self.kappa = float() self.kappa_phi = float() self.exp_time = float() self.num_passes = int() self.num_lines = 1 self.energy = float() self.centred_position = CentredPosition() self.resolution = float() # detector_distance used if resolution is 0 or None self.detector_distance = float() self.transmission = float() self.inverse_beam = False self.shutterless = False self.take_snapshots = True self.take_video = False self.take_dark_current = True self.skip_existing_images = False self.detector_binning_mode = str() self.detector_roi_mode = str() self.detector_mode_list = self.get_detector_mode_list() self.induce_burn = False self.mesh_range = () self.cell_counting = "zig-zag" self.mesh_center = "top-left" self.cell_spacing = (0, 0) self.mesh_snapshot = None self.comments = "" self.in_queue = False self.in_interleave = None self.sub_wedge_size = 10 self.disable_processing = False self.num_triggers = int() self.num_images_per_trigger = int() self.hare_num = 1 def set_from_dict(self, params_dict): for item in params_dict.items(): if hasattr(self, item[0]): if item[0] == "centred_position": self.centred_position.set_from_dict(item[1]) else: setattr(self, item[0], item[1]) def as_dict(self): return { "first_image": self.first_image, "num_images": self.num_images, "osc_start": self.osc_start, "osc_range": self.osc_range, "osc_total_range": self.osc_total_range, "offset": self.offset, "kappa": self.kappa, "kappa_phi": self.kappa_phi, "exp_time": self.exp_time, "num_passes": self.num_passes, "num_lines": self.num_lines, "energy": self.energy, "resolution": self.resolution, "detector_distance": self.detector_distance, "transmission": self.transmission, "inverse_beam": self.inverse_beam, "shutterless": self.shutterless, "take_snapshots": self.take_snapshots, "take_video": self.take_video, "take_dark_current": self.take_dark_current, "skip_existing_images": self.skip_existing_images, "detector_binning_mode": self.detector_binning_mode, "detector_roi_mode": self.detector_roi_mode, "detector_mode_list": self.detector_mode_list, "induce_burn": self.induce_burn, "mesh_range": self.mesh_range, "mesh_snapshot": self.mesh_snapshot, "comments": self.comments, "in_queue": self.in_queue, "in_interleave": self.in_interleave, "num_triggers": self.num_triggers, "num_images_per_trigger": self.num_images_per_trigger, "cell_counting": self.cell_counting, "mesh_center": self.mesh_center, "cell_spacing": self.cell_spacing, "sub_wedge_size": self.sub_wedge_size, "disable_processing": self.disable_processing, } def copy(self): return copy.deepcopy(self) def get_detector_mode_list(self): try: return ast.literal_eval( HWR.beamline.detector.get_property("roi_mode_list", "[]") ) except AttributeError: return [] class XrayImagingParameters(object): def __init__(self): object.__init__(self) self.ff_num_images = 30 self.ff_pre = False self.ff_post = False self.ff_apply = False self.ff_ssim_enabled = False self.sample_offset_a = 0.0 self.sample_offset_b = -1.0 self.sample_offset_c = 0.0 self.camera_trigger = True self.camera_live_view = False self.camera_hw_binning = 0 self.camera_hw_roi = 0 self.camera_write_data = True self.detector_distance = float() def copy(self): return copy.deepcopy(self) def as_dict(self): return { "ff_num_images": self.ff_num_images, "ff_pre": self.ff_pre, "ff_post": self.ff_post, "ff_apply": self.ff_apply, "ff_ssim_enabled": self.ff_ssim_enabled, "sample_offset_a": self.sample_offset_a, "sample_offset_b": self.sample_offset_b, "sample_offset_c": self.sample_offset_c, "camera_trigger": self.camera_trigger, "camera_live_view": self.camera_live_view, "camera_write_data": self.camera_write_data, "detector_distance": self.detector_distance, } class Crystal(object): def __init__(self): object.__init__(self) self.space_group = 0 self.cell_a = 0 self.cell_alpha = 0 self.cell_b = 0 self.cell_beta = 0 self.cell_c = 0 self.cell_gamma = 0 self.protein_acronym = "" self.crystal_uuid = "" # MAD energies self.energy_scan_result = EnergyScanResult() def set_from_dict(self, params_dict): for dict_item in params_dict.items(): if hasattr(self, dict_item[0]): setattr(self, dict_item[0], dict_item[1])
[docs]class CentredPosition(object): """ Class that represents a centred position. Can also be initialized with a mxcube motor dict which simply is a dictionary with the motornames and their corresponding values. """ MOTOR_POS_DELTA = 1e-4 DIFFRACTOMETER_MOTOR_NAMES = [] @staticmethod def set_diffractometer_motor_names(*names): CentredPosition.DIFFRACTOMETER_MOTOR_NAMES = names[:] def __init__(self, motor_dict=None): self.snapshot_image = None self.centring_method = True self.index = None self.motor_pos_delta = CentredPosition.MOTOR_POS_DELTA for motor_name in CentredPosition.DIFFRACTOMETER_MOTOR_NAMES: setattr(self, motor_name, None) if motor_dict is not None: for motor_name, position in motor_dict.items(): setattr(self, motor_name, position) def as_dict(self): return dict( zip( CentredPosition.DIFFRACTOMETER_MOTOR_NAMES, [ getattr(self, motor_name) for motor_name in CentredPosition.DIFFRACTOMETER_MOTOR_NAMES ], ) ) def set_from_dict(self, params_dict): for dict_item in params_dict.items(): if hasattr(self, dict_item[0]): setattr(self, dict_item[0], dict_item[1]) def as_str(self): motor_str = "" for motor_name in CentredPosition.DIFFRACTOMETER_MOTOR_NAMES: if getattr(self, motor_name): motor_str += "%s: %.3f " % (motor_name, abs(getattr(self, motor_name))) return motor_str def __repr__(self): return str(self.as_dict()) def __eq__(self, cpos): eq = len(CentredPosition.DIFFRACTOMETER_MOTOR_NAMES) * [False] for i, motor_name in enumerate(CentredPosition.DIFFRACTOMETER_MOTOR_NAMES): self_pos = getattr(self, motor_name) if not hasattr(cpos, motor_name): continue else: cpos_pos = getattr(cpos, motor_name) if self_pos == cpos_pos is None: eq[i] = True elif None in (self_pos, cpos_pos): continue else: eq[i] = abs(self_pos - cpos_pos) <= self.motor_pos_delta return all(eq) def __ne__(self, cpos): return not (self == cpos) def set_index(self, index): self.index = index def set_motor_pos_delta(self, delta): self.motor_pos_delta = delta def get_index(self): return self.index def get_kappa_value(self): return self.kappa def get_kappa_phi_value(self): return self.kappa_phi
[docs]class Workflow(TaskNode): def __init__(self): TaskNode.__init__(self) self.path_template = PathTemplate() self._type = str() self.set_requires_centring(False) self.lims_id = None def set_type(self, workflow_type): self._type = workflow_type def get_type(self): return self._type def get_path_template(self): return self.path_template
[docs]class GphlWorkflow(TaskNode): def __init__(self): TaskNode.__init__(self) workflow_hwobj = HWR.beamline.gphl_workflow # Workflow start attributes self.path_template = PathTemplate() self._name = str() self.strategy_settings = {} self.shape = str() self.initial_strategy = str() # From strategy. characterisation or diffreactcal self.maximum_dose_budget = 20.0 self.decay_limit = 25 self.characterisation_budget_fraction = 0.05 # string. Only active mode currently is 'MASSIF1' self.automation_mode = None # Automation mode acquisition parameters. Replace UI queried values # multiple dictionaries, in acquisition order (characterisation, then main) self.auto_acq_parameters = [{}] # Pre-strategy attributes # Set in def set_pre_strategy_params( self.input_space_group = str() self.space_group = str() self.crystal_classes = () self._cell_parameters = () self.crystal_thickness = 0.0 # Minimum dimension of crystal, in micron self.detector_setting = None # from 'resolution' parameter or defaults self.aimed_resolution = None # from 'resolution' parameter or defaults self.wavelengths = () # from 'energies' parameters self.use_cell_for_processing = False self.strategy_variant = None # from 'strategy' Used for acquisition self.strategy_options = {} self.relative_rad_sensitivity = 1.0 self.processing_macro = None # Directory containing SPOT.XDS file # For cases where characterisation and XDS processing are done # before workflow is started self.init_spot_dir = None # Pre-collection attributes # Attributes for workflow self.exposure_time = 0.0 self.image_width = 0.0 self.wedge_width = 0.0 self.transmission = 0.0 self.repetition_count = 1 self.snapshot_count = 2 self.reflecting_range_esd = None self.recentring_mode = "sweep" self.reference_reflection_files = [] # TEST attribute - if true collection is skipped. Set also if init_spot_dir self.skip_collection = False # Workflow interleave order (string). # Slowest changing first, characters 'g' (Goniostat position); # 's' (Scan number), 'b' (Beam wavelength), 'd' (Detector position) self.interleave_order = "gs" # from workflow strategy # Internal attributes - set only by program self.beamstop_setting = None # Not currently set or used self.goniostat_translations = () self.current_rotation_id = None self.characterisation_done = False self.characterisation_dose = 0.0 self.acquisition_dose = 0.0 self.strategy_length = 0.0 # Factor to account for transmission not being uniform self.dose_correction_factor = 1.0 # Workflow attributes - for passing to LIMS (conf Olof Svensson) self.workflow_parameters = {} # # Centring handling and MXCuBE-side flow self.set_requires_centring(False) self.set_from_dict(workflow_hwobj.config.settings["defaults"]) # Set missing values from BL defaults and limits. # NB cannot be done till after all HO are initialised. bl_defaults = HWR.beamline.get_default_acquisition_parameters().as_dict() exposure_time = self.exposure_time or bl_defaults.get("exp_time", 0) self.exposure_time = max( exposure_time, HWR.beamline.detector.get_exposure_time_limits()[0] or 0 ) self.image_width = self.image_width or bl_defaults.get("osc_range", 0.1)
[docs] def parameter_summary(self): """Main parameter summary, for output purposes""" summary = {"strategy": self.strategy_name} for tag in ( "automation_mode", "init_spot_dir", "exposure_time", "image_width", "strategy_length", "transmission", "input_space_group", "space_group", "crystal_classes", "_cell_parameters", "crystal_thickness", "use_cell_for_processing", "relative_rad_sensitivity", "aimed_resolution", "repetition_count", "initial_strategy", "strategy_variant", "strategy_options", ): summary[tag] = getattr(self, tag) summary["wavelengths"] = tuple(x.wavelength for x in self.wavelengths) summary["resolution"] = self.detector_setting.resolution summary["orgxy"] = self.detector_setting.orgxy summary["strategy_variant"] = self.strategy_options.get("variant", "not set") summary["orientation_count"] = len(self.goniostat_translations) summary["characterisation_dose"] = self.characterisation_dose summary["dose_per_repetition"] = self.acquisition_dose summary["total_radiation_dose"] = ( summary["dose_per_repetition"] * summary["repetition_count"] + summary["characterisation_dose"] ) summary["total_dose_budget"] = self.recommended_dose_budget() return summary
def set_from_dict(self, params_dict): for dict_item in params_dict.items(): if hasattr(self, dict_item[0]): setattr(self, dict_item[0], dict_item[1])
[docs] def set_pre_strategy_params( self, space_group="", crystal_classes=(), cell_parameters=(), resolution=None, energies=(), strategy="", strategy_options=None, init_spot_dir=None, relative_rad_sensitivity=None, use_cell_for_processing=None, crystal_thickness=None, reference_reflection_files=None, processing_macro=None, processing_macro_url=None, **unused, ): """ :param space_group (str): :param crystal_classes (tuple(str)): :param cell_parameters tuple(float): :param resolution (Optional[float]): :param energies tuple(float): :param strategy (str): :param strategy_options (dict): :param init_spot_dir (str): :param relative_rad_sensitivity (float): :param use_cell_for_processing (bool): :param crystal_thickness (float): :param reference_reflection_files (list(str)): :param processing_macro (str) :param processing_macro_url (str) :param unused (dict): :return (None): """ from mxcubecore.HardwareObjects.Gphl import GphlMessages if space_group: sginfo = crystal_symmetry.SPACEGROUP_MAP.get(space_group) if sginfo is None: raise ValueError( "Invalid space group %s, not in crystal_symmetry.SPACEGROUP_MAP" % space_group ) else: space_group = sginfo.name self.space_group = space_group else: space_group = self.space_group if space_group == "None": # Temporary fix - this should not happen # 20240926 Rasmus Fogh and Olof Svensson space_group = None if crystal_classes: self.crystal_classes = tuple(crystal_classes) elif space_group: self.crystal_classes = ( crystal_symmetry.SPACEGROUP_MAP[space_group].crystal_class, ) if cell_parameters: self.cell_parameters = cell_parameters if crystal_thickness: self.crystal_thickness = crystal_thickness interleave_order = self.strategy_settings.get("interleave_order") if interleave_order: self.interleave_order = interleave_order # NB this is an internal dictionary. DO NOT MODIFY settings = HWR.beamline.gphl_workflow.config.settings if energies: # Energies *reset* existing list, and there must be at least one if self.characterisation_done: energy_tags = self.strategy_settings.get( "beam_energy_tags", (settings["default_beam_energy_tag"],) ) elif self.wftype == "diffractcal": energy_tags = ("Main",) else: energy_tags = ("Characterisation",) if len(energies) not in (len(energy_tags), 1): raise ValueError( "Number of energies %s do not match available slots %s" % (energies, energy_tags) ) wavelengths = [] for iii, energy in enumerate(energies): role = energy_tags[iii] wavelengths.append( GphlMessages.PhasingWavelength( wavelength=HWR.beamline.energy.calculate_wavelength(energy), role=role, ) ) self.wavelengths = tuple(wavelengths) if not self.wavelengths: raise ValueError("Value for energy missing. Coding error?") wavelength = self.wavelengths[0].wavelength if self.detector_setting is None: resolution = resolution or self.aimed_resolution if resolution: distance = HWR.beamline.resolution.resolution_to_distance( resolution, wavelength ) distance_limits = HWR.beamline.detector.distance.get_limits() if None in distance_limits: distance_limits = (150, 500) if distance < distance_limits[0]: distance = distance_limits[0] resolution = HWR.beamline.resolution.distance_to_resolution(distance) elif distance > distance_limits[1]: distance = distance_limits[1] resolution = HWR.beamline.resolution.distance_to_resolution(distance) orgxy = HWR.beamline.detector.get_beam_position(distance, wavelength) self.detector_setting = GphlMessages.BcsDetectorSetting( resolution, orgxy=orgxy, Distance=distance ) maximum_chi = settings["maximum_chi"] maximum_chi_from_limits = HWR.beamline.gphl_workflow.derive_maximum_chi() if maximum_chi_from_limits: maximum_chi = min(maximum_chi, maximum_chi_from_limits) self.strategy_options = { "strategy_type": self.strategy_type, "angular_tolerance": settings["angular_tolerance"], "clip_kappa": settings["angular_tolerance"], "maximum_chi": maximum_chi, } for tag in ("allow_duplicate_orientations", "delphi_block", "stratcal_step"): if tag in settings: self.strategy_options[tag] = settings[tag] if strategy_options: self.strategy_options.update(strategy_options) strategy_variant = ( strategy or self.strategy_options.get("variant") or self.strategy_settings["variants"][0] ) if self.characterisation_done: self.strategy_options["variant"] = self.strategy_variant = strategy_variant elif self.wftype == "diffractcal": self.strategy_options["variant"] = self.strategy_variant = strategy_variant self.initial_strategy = strategy_variant elif self.wftype != "transcal": # This must be characterisation - here we do not accept defaults self.initial_strategy = ( strategy or settings["characterisation_strategies"][0] ) # NB init_spot_dir must be re-set every time, hence no if test self.init_spot_dir = init_spot_dir if relative_rad_sensitivity: self.relative_rad_sensitivity = relative_rad_sensitivity if use_cell_for_processing is not None: self.use_cell_for_processing = use_cell_for_processing if reference_reflection_files: self.reference_reflection_files = list(reference_reflection_files) # Processing macro if processing_macro: self.processing_macro = "macro:" + processing_macro elif processing_macro_url: self.processing_macro = processing_macro_url
[docs] def set_pre_acquisition_params( self, exposure_time=None, image_width=None, wedge_width=None, transmission=None, repetition_count=None, snapshot_count=None, recentring_mode=None, energies=(), skip_collection=False, **unused, ): """ Args: exposure_time: image_width: wedge_width: transmission: repetition_count: snapshot_count: recentring_mode: energies: skip_collection: **unused: Returns: """ from mxcubecore.HardwareObjects.Gphl import GphlMessages # NB this is an internal dictionary. DO NOT MODIFY settings = HWR.beamline.gphl_workflow.config.settings if exposure_time: self.exposure_time = float(exposure_time) if image_width: self.image_width = float(image_width) if wedge_width: self.wedge_width = float(wedge_width) if transmission: self.transmission = float(transmission) if repetition_count: self.repetition_count = int(repetition_count) if snapshot_count: self.snapshot_count = int(snapshot_count) if recentring_mode: self.recentring_mode = recentring_mode energy_tags = (settings["default_beam_energy_tag"],) if self.characterisation_done: energy_tags = self.strategy_settings.get("beam_energy_tags", energy_tags) if energies: # Energies are *added* to existing list wavelengths = list(self.wavelengths) offset = len(wavelengths) if len(energies) == len(energy_tags) - offset: for iii, energy in enumerate(energies): role = energy_tags[iii + offset] wavelengths.append( GphlMessages.PhasingWavelength( wavelength=HWR.beamline.energy.calculate_wavelength(energy), role=role, ) ) self.wavelengths = tuple(wavelengths) if len(self.wavelengths) != len(energy_tags): raise ValueError( "Number of energies: %s do not match slots %s" % (len(self.wavelengths), energy_tags) ) if skip_collection: self.skip_collection = True
[docs] def init_from_task_data(self, sample_model, params): """ sample_model is required as this may be called before the object is enqueued params is a dictionary with structure determined by mxcubeweb usage """ from mxcubecore.HardwareObjects.Gphl import GphlMessages if self.automation_mode == "TEST_FROM_FILE": fname = os.getenv("GPHL_TEST_INPUT") if os.path.isfile(fname): with open(fname, "r", encoding="utf-8") as fp0: task = yaml.load(fp0)["task"] print(task) params.update(task["parameters"]) else: print("WARNING no GPHL_TEST_INPUT found. test using default values") # Set attributes directly from params self.strategy_settings = HWR.beamline.gphl_workflow.workflow_strategies.get( params["strategy_name"] ) if not self.strategy_settings: raise ValueError( "No GPhL workflow strategy named %s found" % params["strategy_name"] ) self.shape = params.get("shape", "") for tag in ( "decay_limit", "maximum_dose_budget", "characterisation_budget_fraction", "crystal_thickness", ): value = params.get(tag) if value: setattr(self, tag, value) # For external workflow parameters (conf. Olof Svensson) dd1 = params.get("workflow_parameters") if dd1: self.workflow_parameters.update(dd1) settings = HWR.beamline.gphl_workflow.config.settings # NB settings is an internal attribute DO NOT MODIFY # Auto acquisition parameters acq_param_settings = settings.get("auto_acq_parameters") or [{}] self.auto_acq_parameters = ll1 = [copy.deepcopy(acq_param_settings[0])] if acq_param_settings[0] is acq_param_settings[-1]: ll1.append(copy.deepcopy(acq_param_settings[0])) else: ll1.append(copy.deepcopy(acq_param_settings[-1])) new_acq_params = params.pop("auto_acq_parameters", [{}]) ll1[0].update(new_acq_params[0]) ll1[-1].update(new_acq_params[-1]) if "automation_mode" in params: self.automation_mode = params["automation_mode"] # Set automation switches and basic acquisition parameters if new_acq_params[0].get("init_spot_dir"): # Characterisation is pre-acquired if not self.automation_mode: raise ValueError("init_spot_dir setting only valid in automation mode") if ( "exposure_time" not in new_acq_params[0] or "image_width" not in new_acq_params[0] ): raise ValueError( "Parameters 'exposure_time', and 'image_width' are mandatory" "when 'init_spot_dir' is set" ) self.transmission = HWR.beamline.transmission.get_value() # Path template and prefixes base_prefix = self.path_template.base_prefix = params.get( "prefix" ) or HWR.beamline.session.get_default_prefix(sample_model) self.set_name(base_prefix) self.path_template.suffix = ( params.get("suffix") or HWR.beamline.detector.config.fileSuffix ) self.path_template.num_files = 0 self.path_template.directory = os.path.join( HWR.beamline.session.get_base_image_directory(), params.get("subdir", "") ) self.path_template.process_directory = os.path.join( HWR.beamline.session.get_base_process_directory(), params.get("subdir", ""), ) # Set crystal parameters from sample node crystal = sample_model.crystals[0] tpl = ( crystal.cell_a, crystal.cell_b, crystal.cell_c, crystal.cell_alpha, crystal.cell_beta, crystal.cell_gamma, ) if all(tpl): self.cell_parameters = tpl self.protein_acronym = crystal.protein_acronym self.space_group = self.input_space_group = crystal.space_group use_cell_for_processing = False if self.input_space_group: # Only set use_cell_for_processing to True if spacegroup is set use_cell_for_processing = params.pop("use_cell_for_processing", False) self.use_cell_for_processing = use_cell_for_processing # Set to current wavelength for now - nothing else available wavelength = HWR.beamline.energy.get_wavelength() # Set parameters from diffraction plan plan = sample_model.diffraction_plan if plan: # It is not clear if diffraction_plan is a dict or an object, # and if so which kind # NB if 'val' is ever not None but zero we still want to skip it tag = "radiationSensitivity" val = getattr(plan, tag, plan.get(tag)) if val: self.relative_rad_sensitivity = val tag = "aimedResolution" val = getattr(plan, tag, plan.get(tag)) if val: self.aimed_resolution = val if self.automation_mode: tag = "exposureTime" val = getattr(plan, tag, plan.get(tag)) if val: self.auto_acq_parameters[-1]["exposure_time"] = val tag = "oscillationRange" val = getattr(plan, tag, plan.get(tag)) if val: self.auto_acq_parameters[-1]["image_width"] = val tag = "energy" val = getattr(plan, tag, plan.get(tag)) if val: energy_limits = HWR.beamline.energy.get_limits() val = max(val, energy_limits[0]) val = min(val, energy_limits[1]) nrg1 = self.auto_acq_parameters[-1].get("energy") if not nrg1: self.auto_acq_parameters[-1]["energy"] = val nrg0 = self.auto_acq_parameters[-1].get("energy") if nrg0: val = nrg0 else: self.auto_acq_parameters[0]["energy"] = val wavelength = HWR.beamline.energy.calculate_wavelength(val) if self.wftype == "diffractcal": energy_tag = "Main" else: energy_tag = "Characterisation" self.wavelengths = ( GphlMessages.PhasingWavelength(wavelength=wavelength, role=energy_tag), )
# Parameters for start of workflow def get_path_template(self): # Required to conform to TaskNode return self.path_template @property def wftype(self): """ "Workflow type: ("acquisition", "diffractcal", "transcal" """ return self.strategy_settings["wftype"] @property def wfname(self): """ "Workflow full name, e.g. "GPhL Diffractometer calibration" """ return self.strategy_settings["wfname"] @property def strategy_type(self): """ "Workflow type: ("native", "phasing", "diffractcal", "transcal" """ return self.strategy_settings["strategy_type"] @property def strategy_name(self): """ "Strategy full name, e.g. "Two-wavelength MAD" """ return self.strategy_settings["title"] # Run name equal to base_prefix def get_name(self): # Required to conform to TaskNode return self._name
[docs] def set_name(self, value): self._name = value
# Cell parameters - sequence of six floats (a,b,c,alpha,beta,gamma) @property def cell_parameters(self): return self._cell_parameters @cell_parameters.setter def cell_parameters(self, value): self._cell_parameters = () if value: if len(value) == 6: self._cell_parameters = tuple(float(x) for x in value) else: raise ValueError("invalid value for cell_parameters: %s" % str(value)) @property def total_strategy_length(self): """Total strategy length for a single repetition (but counting all wavelengths)""" result = self.strategy_length energy_tags = self.strategy_settings.get("beam_energy_tags") if energy_tags and self.characterisation_done: result *= len(energy_tags) return result
[docs] def calc_maximum_dose(self, energy=None, exposure_time=None, image_width=None): """Dose at transmission=100 for given energy, exposure time and image width The strategy length is taken from self.strategy_length Args: energy Optional[float]: Energy in keV; defaults to self.energy exposure_time Optional[float]: Value in s; defaults to self.exposure_time image_width Optional[float]: VAlue in deg; defaults to self.image_width Returns: float: Maximum dose in MGy """ energy = energy or HWR.beamline.energy.calculate_energy( self.wavelengths[0].wavelength ) dose_rate = HWR.beamline.gphl_workflow.maximum_dose_rate(energy) exposure_time = exposure_time or self.exposure_time image_width = image_width or self.image_width total_strategy_length = self.total_strategy_length if dose_rate and exposure_time and image_width and total_strategy_length: return ( dose_rate * total_strategy_length * self.dose_correction_factor * exposure_time / image_width ) msg = ( "WARNING: Dose could not be calculated from:\n" " energy:%s keV, total_strategy_length:%s deg, exposure_time:%s s, " "image_width:%s deg, dose_rate: %s" ) % (energy, total_strategy_length, exposure_time, image_width, dose_rate) logging.getLogger("HWR").warning(msg) logging.getLogger("user_level_log").warning(msg) return 0
[docs] def recommended_dose_budget(self, resolution=None): """Get resolution-dependent dose budget using current configuration :param resolution (float): Target resolution (in A), defaults to current setting :return: """ resolution = resolution or self.detector_setting.resolution if not resolution: raise ValueError("No resolution set to calculate dose budget") return HWR.beamline.gphl_workflow.resolution2dose_budget( resolution, decay_limit=self.decay_limit, maximum_dose_budget=self.maximum_dose_budget, relative_rad_sensitivity=self.relative_rad_sensitivity, )
[docs]class XrayImaging(TaskNode): def __init__(self, xray_imaging_params, acquisition=None, crystal=None, name=""): TaskNode.__init__(self) self.xray_imaging_parameters = xray_imaging_params if not acquisition: acquisition = Acquisition() if not crystal: crystal = Crystal() self.acquisitions = [acquisition] self.processing_parameters = ProcessingParameters() self.crystal = crystal self.set_name(name) self.experiment_type = ( queue_model_enumerables.EXPERIMENT_TYPE.NATIVE ) # TODO use IMAGING self.set_requires_centring(True) self.run_offline_processing = False self.run_online_processing = False self.lims_group_id = None def get_name(self): return "%s_%i" % ( self.acquisitions[0].path_template.get_prefix(), self.acquisitions[0].path_template.run_number, ) def get_path_template(self): return self.acquisitions[0].path_template def get_files_to_be_written(self): return self.acquisitions[0].path_template.get_files_to_be_written()
[docs]def addXrayCentring(parent_node, **centring_parameters): """Add Xray centring to queue.""" xc_model = XrayCentring2(**centring_parameters) HWR.beamline.queue_model.add_child(parent_node, xc_model) return xc_model
# # Collect hardware object utility function. #
[docs]def to_collect_dict(data_collection, sample, centred_pos=None): """ return [{'comment': '', 'helical': 0, 'motors': {}, 'take_video': False, 'take_snapshots': False, 'fileinfo': {'directory': '/data/id14eh4/inhouse/opid144/' +\ '20120808/RAW_DATA', 'prefix': 'opid144', 'run_number': 1, 'process_directory': '/data/id14eh4/inhouse/' +\ 'opid144/20120808/PROCESSED_DATA'}, 'in_queue': 0, 'detector_binning_mode': 2, 'shutterless': 0, 'sessionId': 32368, 'do_inducedraddam': False, 'sample_reference': {}, 'processing': 'False', 'residues': '', 'dark': True, 'scan4d': 0, 'input_files': 1, 'oscillation_sequence': [{'exposure_time': 1.0, 'kappaStart': 0.0, 'phiStart': 0.0, 'start_image_number': 1, 'number_of_images': 1, 'offset': 0.0, 'start': 0.0, 'range': 1.0, 'number_of_passes': 1}], 'nb_sum_images': 0, 'EDNA_files_dir': '', 'anomalous': 'False', 'file_exists': 0, 'experiment_type': 'SAD', 'skip_images': 0}]""" acquisition = data_collection.acquisitions[0] acq_params = acquisition.acquisition_parameters proc_params = data_collection.processing_parameters result = [ { "comments": acq_params.comments, "take_video": acq_params.take_video, "take_snapshots": acq_params.take_snapshots, "fileinfo": { "directory": acquisition.path_template.directory, "prefix": acquisition.path_template.get_prefix(), "run_number": acquisition.path_template.run_number, "archive_directory": acquisition.path_template.get_archive_directory(), # "process_directory": session.get_process_directory(), "process_directory": acquisition.path_template.process_directory, "template": acquisition.path_template.get_image_file_name(), "compression": acquisition.path_template.compression, }, "in_queue": acq_params.in_queue, "in_interleave": acq_params.in_interleave, "detector_binning_mode": acq_params.detector_binning_mode, "detector_roi_mode": acq_params.detector_roi_mode, "shutterless": acq_params.shutterless, "sessionId": data_collection.lims_session_id, "do_inducedraddam": acq_params.induce_burn, "sample_reference": { "spacegroup": proc_params.space_group, "cell": proc_params.get_cell_str(), "blSampleId": sample.lims_id, "sample_name": sample.name, "acronym": sample.crystals[0].protein_acronym, }, "processing": str(proc_params.process_data and True), "processing_offline": data_collection.run_offline_processing, "processing_online": data_collection.run_online_processing, "residues": proc_params.num_residues, "dark": acq_params.take_dark_current, "detector_distance": acq_params.detector_distance, "resolution": acq_params.resolution or None, "transmission": acq_params.transmission, "energy": acq_params.energy, "oscillation_sequence": [ { "exposure_time": acq_params.exp_time, "kappaStart": acq_params.kappa, "phiStart": acq_params.kappa_phi, "start_image_number": acq_params.first_image, "number_of_images": acq_params.num_images, "offset": acq_params.offset, "start": acq_params.osc_start, "range": acq_params.osc_range, "number_of_passes": acq_params.num_passes, "number_of_lines": acq_params.num_lines, "mesh_range": acq_params.mesh_range, "num_triggers": acq_params.num_triggers, "num_images_per_trigger": acq_params.num_images_per_trigger, } ], "group_id": data_collection.lims_group_id, "EDNA_files_dir": acquisition.path_template.process_directory, "xds_dir": acquisition.path_template.xds_dir, "anomalous": proc_params.anomalous, "experiment_type": queue_model_enumerables.EXPERIMENT_TYPE_STR[ data_collection.experiment_type ], "skip_images": acq_params.skip_existing_images, "position_name": ( centred_pos.get_index() if centred_pos is not None else None ), "motors": centred_pos.as_dict() if centred_pos is not None else {}, "ispyb_group_data_collections": data_collection.ispyb_group_data_collections, "workflow_parameters": data_collection.workflow_parameters, } ] return result
[docs]def create_subwedges(total_num_images, sw_size, osc_range, osc_start): """ Creates n subwedges where n = total_num_images / subwedge_size. :param total_num_images: The total number of images :type total_num_images: int :param osc_range: Oscillation range for each image :type osc_range: double :param osc_start: The start angle/offset of the oscillation :type osc_start: double :returns: List of tuples with the format: (start image number, number of images, oscilation start) """ number_of_subwedges = total_num_images / sw_size subwedges = [] for sw_num in range(0, number_of_subwedges): _osc_start = osc_start + (osc_range * sw_size * sw_num) subwedges.append((sw_num * sw_size + 1, sw_size, _osc_start)) return subwedges
[docs]def create_inverse_beam_sw(num_images, sw_size, osc_range, osc_start, run_number): """ Creates subwedges for inverse beam, and interleves the result. Wedges W1 and W2 are created 180 degrees apart, the result is interleaved and given on the form: (W1_1, W2_1), ... (W1_n-1, W2_n-1), (W1_n, W2_n) :param num_images: The total number of images :type num_images: int :param sw_size: Number of images in each subwedge :type sw_size: int :param osc_range: Oscillation range for each image :type osc_range: double :param osc_start: The start angle/offset of the oscillation :type osc_start: double :param run_number: Run number for the first wedge (W1), the run number of the second wedge will be run_number + 1. :returns: A list of tuples containing the swb wedges. The tuples are on the form: (start_image, num_images, osc_start, run_number) :rtype: List [(...), (...)] """ w1 = create_subwedges(num_images, sw_size, osc_range, osc_start) w2 = create_subwedges(num_images, sw_size, osc_range, 180 + osc_start) w1 = [pair + (run_number,) for pair in w1] w2 = [pair + (run_number + 1,) for pair in w2] # Interleave subwedges subwedges = [sw_pair for pair in zip(w1, w2) for sw_pair in pair] return subwedges
[docs]def create_interleave_sw(interleave_list, num_images, sw_size): """ Creates subwedges for interleved collection. Wedges W1, W2, Wm (where m is num_collections) are created: (W1_1, W2_1, ..., W1_m), ... (W1_n-1, W2_n-1, ..., Wm_n-1), (W1_n, W2_n, ..., Wm_n) :param interleave_list: list of interleaved items :type interleave_list: list of dict :param num_images: number of images of first collection. Based on the first collection certain number of subwedges will be created. If first collection contains more images than others then in the end the rest of images from first collections are created as last subwedge :type num_images: int :param sw_size: Number of images in each subwedge :type sw_size: int :returns: A list of tuples containing the swb wedges. The tuples are in the form: (collection_index, subwedge_index, subwedge_firt_image, subwedge_start_osc) :rtype: List [(...), (...)] """ subwedges = [] sw_first_image = None for sw_index in range(int(num_images / sw_size)): for collection_index in range(len(interleave_list)): collection_osc_start = ( interleave_list[collection_index]["data_model"] .acquisitions[0] .acquisition_parameters.osc_start ) collection_osc_range = ( interleave_list[collection_index]["data_model"] .acquisitions[0] .acquisition_parameters.osc_range ) collection_first_image = ( interleave_list[collection_index]["data_model"] .acquisitions[0] .acquisition_parameters.first_image ) collection_num_images = ( interleave_list[collection_index]["data_model"] .acquisitions[0] .acquisition_parameters.num_images ) if sw_index * sw_size <= collection_num_images: sw_actual_size = sw_size if sw_size > collection_num_images - (sw_index + 1) * sw_size > 0: sw_actual_size = collection_num_images % sw_size sw_first_image = collection_first_image + sw_index * sw_size sw_osc_start = ( collection_osc_start + collection_osc_range * sw_index * sw_size ) sw_osc_range = collection_osc_range * sw_actual_size subwedges.append( { "collect_index": collection_index, "collect_first_image": collection_first_image, "collect_num_images": collection_num_images, "sw_index": sw_index, "sw_first_image": sw_first_image, "sw_actual_size": sw_actual_size, "sw_osc_start": sw_osc_start, "sw_osc_range": sw_osc_range, } ) sw_first_image += sw_actual_size return subwedges
def try_parse_int(n): try: return int(n) except ValueError: return -1