import binascii
import copy
import logging
import os
import subprocess
import time
from typing import List
from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractCharacterisation import (
AbstractCharacterisation,
)
from mxcubecore.HardwareObjects.SecureXMLRpcRequestHandler import (
SecureXMLRpcRequestHandler,
)
from mxcubecore.HardwareObjects.XSDataCommon import (
XSDataAngle,
XSDataBoolean,
XSDataDouble,
XSDataFlux,
XSDataImage,
XSDataInteger,
XSDataLength,
XSDataSize,
XSDataString,
XSDataTime,
XSDataWavelength,
)
from mxcubecore.HardwareObjects.XSDataMXCuBEv1_4 import (
XSDataInputMXCuBE,
XSDataMXCuBEDataSet,
XSDataResultMXCuBE,
)
from mxcubecore.model import queue_model_enumerables as qme
from mxcubecore.model import queue_model_objects as qmo
# from edna_test_data import EDNA_DEFAULT_INPUT
# from edna_test_data import EDNA_TEST_DATA
[docs]class EDNACharacterisation(AbstractCharacterisation):
def __init__(self, name) -> None:
super(EDNACharacterisation, self).__init__(name)
self.result = None
self.edna_default_file = None
self.start_edna_command = None
[docs] def init(self) -> None:
self.start_edna_command = self.get_property("edna_command")
self.edna_default_file = self.get_property("edna_default_file")
fp = HWR.get_hardware_repository().find_in_repository(self.edna_default_file)
if fp is None:
fp = self.edna_default_file
if not os.path.exists(fp):
raise ValueError("File %s not found in repository" % fp)
with open(fp, "r") as f:
self.edna_default_input = "".join(f.readlines())
def _modify_strategy_option(self, diff_plan, strategy_option) -> None:
"""Method for modifying the diffraction plan 'strategyOption' entry"""
if diff_plan.getStrategyOption() is None:
new_strategy_option = strategy_option
else:
new_strategy_option = (
diff_plan.getStrategyOption().getValue() + " " + strategy_option
)
diff_plan.setStrategyOption(XSDataString(new_strategy_option))
def _run_edna(
self, input_file, results_file, process_directory
) -> XSDataResultMXCuBE:
"""Starts EDNA"""
msg = "Starting EDNA characterisation using xml file %s" % input_file
logging.getLogger("queue_exec").info(msg)
self.characterisationResult = None
args = (self.start_edna_command, input_file, results_file, process_directory)
# subprocess.call("%s %s %s %s" % args, shell=True)
p = subprocess.Popen(
"%s %s %s %s --verbose --debug" % args,
shell=True,
stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
do_continue = True
self.result = None
start_time = time.time()
TIME_OUT = 120
while do_continue:
if self.characterisationResult is not None:
logging.getLogger("queue_exec").info(
"Received characterisation results via XMLRPC"
)
self.result = XSDataResultMXCuBE.parseString(
self.characterisationResult
)
do_continue = False
elif p.poll() is not None:
do_continue = False
elif time.time() - start_time > TIME_OUT:
do_continue = False
if do_continue:
logging.getLogger("queue_exec").info(
"Waiting for characterisation results..."
)
time.sleep(1)
if self.result is None and os.path.exists(results_file):
self.result = XSDataResultMXCuBE.parseFile(results_file)
return self.result
[docs] def get_html_report(self, edna_result) -> str:
"""
Returns the path to the html result report generated by the characterisation software.
Args:
output (EDNAResult) EDNAResult object
"""
html_report = None
try:
html_report = str(edna_result.getHtmlPage().getPath().getValue())
except AttributeError:
pass
return html_report
[docs] def characterise(self, edna_input) -> str:
"""
Runs Characterisation and returns the results.
Args:
input (EDNAInput) EDNA input object
"""
self.processing_done_event.set()
self.prepare_input(edna_input)
path = edna_input.process_directory
# if there is no data collection id, the id will be a random number
# this is to give a unique number to the EDNA input and result files;
# something more clever might be done to give a more significant
# name, if there is no dc id.
try:
dc_id = edna_input.getDataCollectionId().getValue()
except Exception:
dc_id = id(edna_input)
token = self.generate_new_token()
edna_input.token = XSDataString(token)
if hasattr(edna_input, "process_directory"):
edna_input_file = os.path.join(path, "EDNAInput_%s.xml" % dc_id)
edna_input.exportToFile(edna_input_file)
edna_results_file = os.path.join(path, "EDNAOutput_%s.xml" % dc_id)
if not os.path.isdir(path):
os.makedirs(path)
else:
raise RuntimeError("No process directory specified in edna_input")
self.result = self._run_edna(edna_input_file, edna_results_file, path)
self.processing_done_event.clear()
return self.result
[docs] def dc_from_output(
self, edna_result, reference_image_collection
) -> List[qmo.DataCollection]:
data_collections = []
crystal = copy.deepcopy(reference_image_collection.crystal)
ref_proc_params = reference_image_collection.processing_parameters
processing_parameters = copy.deepcopy(ref_proc_params)
try:
char_results = edna_result.getCharacterisationResult()
edna_strategy = char_results.getStrategyResult()
collection_plan = edna_strategy.getCollectionPlan()[0]
wedges = collection_plan.getCollectionStrategy().getSubWedge()
except Exception:
pass
else:
try:
resolution = (
collection_plan.getStrategySummary().getResolution().getValue()
)
resolution = round(resolution, 3)
except AttributeError:
resolution = None
try:
transmission = (
collection_plan.getStrategySummary().getAttenuation().getValue()
)
transmission = round(transmission, 2)
except AttributeError:
transmission = None
try:
screening_id = edna_result.getScreeningId().getValue()
except AttributeError:
screening_id = None
for i in range(0, len(wedges)):
wedge = wedges[i]
exp_condition = wedge.getExperimentalCondition()
goniostat = exp_condition.getGoniostat()
beam = exp_condition.getBeam()
acq = qmo.Acquisition()
acq.acquisition_parameters = (
HWR.beamline.get_default_acquisition_parameters()
)
acquisition_parameters = acq.acquisition_parameters
acquisition_parameters.centred_position = (
reference_image_collection.acquisitions[
0
].acquisition_parameters.centred_position
)
acq.path_template = HWR.beamline.get_default_path_template()
# Use the same path template as the reference_collection
# and update the members the needs to be changed. Keeping
# the directories of the reference collection.
ref_pt = reference_image_collection.acquisitions[0].path_template
acq.path_template = copy.deepcopy(ref_pt)
acq.path_template.directory = "/".join(
ref_pt.directory.split("/")[0:-2]
)
acq.path_template.wedge_prefix = "w" + str(i + 1)
acq.path_template.reference_image_prefix = str()
if resolution:
acquisition_parameters.resolution = resolution
if transmission:
acquisition_parameters.transmission = transmission
if screening_id:
acquisition_parameters.screening_id = screening_id
try:
acquisition_parameters.osc_start = (
goniostat.getRotationAxisStart().getValue()
)
except AttributeError:
pass
try:
acquisition_parameters.osc_end = (
goniostat.getRotationAxisEnd().getValue()
)
except AttributeError:
pass
try:
acquisition_parameters.osc_range = (
goniostat.getOscillationWidth().getValue()
)
except AttributeError:
pass
try:
num_images = int(
abs(
acquisition_parameters.osc_end
- acquisition_parameters.osc_start
)
/ acquisition_parameters.osc_range
)
acquisition_parameters.first_image = 1
acquisition_parameters.num_images = num_images
acq.path_template.num_files = num_images
acq.path_template.start_num = 1
except AttributeError:
pass
try:
acquisition_parameters.transmission = (
beam.getTransmission().getValue()
)
except AttributeError:
pass
try:
acquisition_parameters.energy = round(
(123984.0 / beam.getWavelength().getValue()) / 10000.0, 4
)
except AttributeError:
pass
try:
acquisition_parameters.exp_time = beam.getExposureTime().getValue()
except AttributeError:
pass
dc = qmo.DataCollection([acq], crystal, processing_parameters)
data_collections.append(dc)
return data_collections
[docs] def get_default_characterisation_parameters(self) -> qmo.CharacterisationParameters:
"""
Returns the default parameters
"""
edna_input = XSDataInputMXCuBE.parseString(self.edna_default_input)
diff_plan = edna_input.getDiffractionPlan()
edna_sample = edna_input.getSample()
char_params = qmo.CharacterisationParameters()
char_params.experiment_type = qme.EXPERIMENT_TYPE.OSC
# Optimisation parameters
char_params.use_aimed_resolution = False
try:
char_params.aimed_resolution = diff_plan.getAimedResolution().getValue()
except Exception:
char_params.aimed_resolution = None
char_params.use_aimed_multiplicity = False
try:
char_params.aimed_i_sigma = (
diff_plan.getAimedIOverSigmaAtHighestResolution().getValue()
)
char_params.aimed_completness = diff_plan.getAimedCompleteness().getValue()
except Exception:
char_params.aimed_i_sigma = None
char_params.aimed_completness = None
char_params.strategy_complexity = 0
char_params.induce_burn = False
char_params.use_permitted_rotation = False
char_params.permitted_phi_start = 0.0
char_params.permitted_phi_end = 360
char_params.low_res_pass_strat = False
# Crystal
char_params.max_crystal_vdim = edna_sample.getSize().getY().getValue()
char_params.min_crystal_vdim = edna_sample.getSize().getZ().getValue()
char_params.max_crystal_vphi = 90
char_params.min_crystal_vphi = 0.0
char_params.space_group = ""
# Characterisation type
char_params.use_min_dose = True
char_params.use_min_time = False
char_params.min_dose = 30.0
char_params.min_time = 0.0
char_params.account_rad_damage = True
char_params.auto_res = True
char_params.opt_sad = False
char_params.sad_res = 0.5
char_params.determine_rad_params = False
char_params.burn_osc_start = 0.0
char_params.burn_osc_interval = 3
# Radiation damage model
char_params.rad_suscept = edna_sample.getSusceptibility().getValue()
char_params.beta = 1
char_params.gamma = 0.06
return char_params
def generate_new_token(self) -> str:
# See: https://wyattbaldwin.com/2014/01/09/generating-random-tokens-in-python/
token = binascii.hexlify(os.urandom(5)).decode("utf-8")
SecureXMLRpcRequestHandler.setReferenceToken(token)
return token