# encoding: utf-8
"""Workflow connection, interfacing to external workflow engine
using py4j and Abstract Beamline Interface messages
License:
This file is part of MXCuBE.
MXCuBE is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MXCuBE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with MXCuBE. If not, see <https://www.gnu.org/licenses/>.
"""
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
import logging
import os
import signal
import socket # Do NOT remove - see 'origsocket' below
import subprocess
import sys
import time
import uuid
from urllib.parse import urlparse
from py4j import (
clientserver,
java_gateway,
)
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import (
ConfiguredObject,
HardwareObject,
)
from mxcubecore.HardwareObjects.Gphl import GphlMessages
from mxcubecore.utils import conversion
# NB this is patching the original socket module in to avoid the
# monkeypatched version we get from gevent - that causes errors.
# It depends on knowing where in py4j socket is imported
# Hacky, but the best solution to making py4j and gevent compatible
origsocket = sys.modules.pop("socket")
_origsocket = sys.modules.pop("_socket")
import socket # noqa E402, F811 Needed as part of workaround for socket/oy4j connection
java_gateway.socket = socket
clientserver.socket = socket
sys.modules["socket"] = origsocket
sys.modules["_socket"] = _origsocket
del origsocket
del _origsocket
__copyright__ = """ Copyright © 2016 - 2019 by Global Phasing Ltd. """
__license__ = "LGPLv3+"
__author__ = "Rasmus H Fogh"
[docs]class GphlWorkflowConnection(HardwareObject):
"""
This HO acts as a gateway to the Global Phasing workflow engine.
"""
[docs] class HOConfig(ConfiguredObject.HOConfig):
"""Temporary replacement for Pydantic class
Required during transition, as long as we do not have the fields defined"""
# Defaults - should be replaced by proper Pydantic
software_paths = {}
software_properties = {}
directory_locations = {}
gphl_subdir = "GPHL"
gphl_persistname = "persistence"
ssh_options = {}
connection_parameters = {}
def __init__(self, name):
super().__init__(name)
# Py4J gateway to external workflow program
self._gateway = None
self.jvm_imports_checked = False
# ID for current workflow calculation
self._enactment_id = None
# Queue for communicating with MXCuBE HardwareObject
self.workflow_queue = None
self._await_result = None
self._running_process = None
self.collect_emulator_process = None
self.update_state(self.STATES.UNKNOWN)
[docs] def init(self):
super().init()
# Adapt connections if we are running via ssh
if self.config.ssh_options:
self.config.connection_parameters["python_address"] = socket.gethostname()
# Adapt paths and properties to use directory_locations
locations = self.config.directory_locations
installdir = locations["GPHL_INSTALLATION"]
paths = self.config.software_paths
properties = self.config.software_properties
for tag, val in paths.items():
val2 = val.format(**locations)
if not os.path.isabs(val2):
val2 = HWR.get_hardware_repository().find_in_repository(val)
if val2 is None:
raise ValueError("File path %s not recognised" % val)
paths[tag] = val2
paths["GPHL_INSTALLATION"] = locations["GPHL_INSTALLATION"]
paths["runworkflow"] = "%s/ASTRAWorkflows/bin/runworkflow" % installdir
for tag, val in properties.items():
val2 = val.format(**locations)
if not os.path.isabs(val2):
val2 = HWR.get_hardware_repository().find_in_repository(val)
if val2 is None:
raise ValueError("File path %s not recognised" % val)
paths[tag] = properties[tag] = val2
# Set master location, based on known release directory structure
properties["co.gphl.wf.bin"] = os.path.join(
locations["GPHL_INSTALLATION"], "exe"
)
if "GPHL_XDS_PATH" in paths:
properties["co.gphl.wf.xds.bin"] = os.path.join(
paths["GPHL_XDS_PATH"], "xds_par"
)
self.update_state(self.STATES.OFF)
[docs] def to_java_time(self, time_in):
"""Convert time in seconds since the epoch (python time) to Java time value"""
return self._gateway.jvm.java.lang.Long(int(time_in * 1000))
[docs] def get_executable(self, name):
"""Get location of executable binary for program called 'name'"""
tag = "co.gphl.wf.%s.bin" % name
result = self.config.software_paths.get(tag)
if not result:
result = os.path.join(
self.config.software_paths["GPHL_INSTALLATION"], "exe", name
)
return result
[docs] def get_bdg_licence_dir(self, name):
"""Get directory containing specific licence file (if any)
for program called 'name'"""
tag = "co.gphl.wf.%s.bdg_licence_dir" % name
result = self.config.software_paths.get(tag)
return result
def open_connection(self):
if self._gateway is None:
self.log.debug("Opening GPhL connection")
else:
return
params = self.config.connection_parameters
python_parameters = {}
val = params.get("python_address")
if val is not None:
python_parameters["address"] = val
val = params.get("python_port")
if val is not None:
python_parameters["port"] = val
java_parameters = {"auto_convert": True}
val = params.get("java_address")
if val is not None:
java_parameters["address"] = val
val = params.get("java_port")
if val is not None:
java_parameters["port"] = val
self.log.debug(
"Opening GPhL connection: %s ",
(", ".join("%s:%s" % tt0 for tt0 in sorted(params.items()))),
)
self._gateway = clientserver.ClientServer(
java_parameters=clientserver.JavaParameters(**java_parameters),
python_parameters=clientserver.PythonParameters(**python_parameters),
python_server_entry_point=self,
)
def start_workflow(self, workflow_queue, workflow_model_obj):
# NBNB All command line option values are put in quotes (repr) when
# the workflow is invoked remotely through ssh.
if self.get_state() == self.STATES.UNKNOWN:
self.log.warning(
"GphlWorkflowConnection not correctly initialised - check for errors"
)
elif self.get_state() != self.STATES.OFF:
# NB, for now workflow is started as the connection is made,
# so we are never in state 'ON'/STANDBY
raise RuntimeError("Workflow is already running, cannot be started")
self.jvm_imports_checked = False
# Cannot be done in init, where the api.sessions link is not yet ready
self.config.software_paths["GPHL_WDIR"] = os.path.join(
HWR.beamline.session.get_base_process_directory(), self.config.gphl_subdir
)
strategy_settings = workflow_model_obj.strategy_settings
wf_settings = HWR.beamline.gphl_workflow.config.settings
ssh_options = self.config.ssh_options
in_shell = bool(ssh_options)
if in_shell:
ssh_options = ssh_options.copy()
host = ssh_options.pop("Host")
command_list = ["ssh"]
if "ConfigFile" in ssh_options:
command_list.extend(("-F", ssh_options.pop("ConfigFile")))
for tag, val in sorted(ssh_options.items()):
command_list.extend(("-o", "%s=%s" % (tag, val)))
command_list.append(host)
else:
command_list = []
runworkflow_opts = []
command_list.append(self.config.software_paths["runworkflow"])
# # HACK - debug options REMOVE!
# import socket
# sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# sock.connect(("8.8.8.8", 80))
# ss0 = "-agentlib:jdwp=transport=dt_socket,address=%s:8050,server=y,suspend=y"
# command_list.append(ss0 % sock.getsockname()[0])
for tag, val in sorted(wf_settings.get("invocation_properties", {}).items()):
runworkflow_opts.extend(
conversion.java_property(tag, val, quote_value=in_shell)
)
init_spot_dir = workflow_model_obj.init_spot_dir
if init_spot_dir:
runworkflow_opts.extend(
conversion.java_property("co.gphl.wf.initSpotDir", init_spot_dir)
)
# We must get hold of the options here, as we need wdir for a property
workflow_options = dict(strategy_settings.get("options", {}))
calibration_name = workflow_options.get("calibration")
if calibration_name:
# Expand calibration base name - to simplify identification.
workflow_options["calibration"] = "%s_%s" % (
calibration_name,
workflow_model_obj.get_name(),
)
if workflow_model_obj.wftype in ("acquisition", "diffractcal"):
workflow_options["strategy"] = workflow_model_obj.initial_strategy
path_template = workflow_model_obj.get_path_template()
if "prefix" in workflow_options:
workflow_options["prefix"] = path_template.base_prefix
workflow_options["wdir"] = self.config.software_paths["GPHL_WDIR"]
workflow_options["persistname"] = self.config.gphl_persistname
# Set the workflow root subdirectory parameter from the base image directory
image_root = os.path.abspath(HWR.beamline.session.get_base_image_directory())
if strategy_settings["wftype"] != "transcal":
workflow_options["appdir"] = (
HWR.beamline.session.get_base_process_directory()
)
rootsubdir = path_template.directory[len(image_root) :]
if rootsubdir.startswith(os.path.sep):
rootsubdir = rootsubdir[1:]
if rootsubdir:
workflow_options["rootsubdir"] = rootsubdir
# Hardcoded - location for log output
runworkflow_opts.extend(
conversion.java_property(
"co.gphl.wf.wdir", workflow_options["wdir"], quote_value=in_shell
)
)
command_list.append(strategy_settings["wf_selection"])
for keyword, value in wf_settings.get("workflow_properties", {}).items():
command_list.extend(
conversion.java_property(keyword, value, quote_value=in_shell)
)
for keyword, value in self.config.software_properties.items():
command_list.extend(
conversion.java_property(keyword, value, quote_value=in_shell)
)
for keyword, value in workflow_options.items():
command_list.extend(
conversion.command_option(keyword, value, quote_value=in_shell)
)
wdir = workflow_options.get("wdir")
# NB this creates the appdir as well (wdir is within appdir)
if not os.path.isdir(wdir):
try:
os.makedirs(wdir)
except Exception:
# No need to raise error - program will fail downstream
self.log.error("Could not create GPhL working directory: %s", wdir)
for ss0 in command_list:
ss0 = ss0.rsplit("=", maxsplit=1)[-1]
if ss0.startswith("/") and "*" not in ss0 and not os.path.exists(ss0):
self.log.warning("File does not exist : %s", ss0)
self.log.info("GPhL execute :\n%s", " ".join(command_list))
# Get environmental variables
envs = os.environ.copy()
# # Trick to allow unauthorised account (e.g. ESRF: opid30) to run GPhL programs
# # Any value is OK, just setting it is enough.
# envs["AutoPROCWorkFlowUser"] = "1"
# These env variables are needed in some cases for wrapper scripts
# Specifically for the stratcal wrapper.
envs["GPHL_INSTALLATION"] = self.config.software_paths["GPHL_INSTALLATION"]
GPHL_XDS_PATH = self.config.software_paths.get("GPHL_XDS_PATH")
if GPHL_XDS_PATH:
envs["GPHL_XDS_PATH"] = GPHL_XDS_PATH
GPHL_CCP4_PATH = self.config.software_paths.get("GPHL_CCP4_PATH")
if GPHL_CCP4_PATH:
envs["GPHL_CCP4_PATH"] = GPHL_CCP4_PATH
GPHL_AUTOPROC_PATH = self.config.software_paths.get("GPHL_AUTOPROC_PATH")
if GPHL_AUTOPROC_PATH:
envs["GPHL_AUTOPROC_PATH"] = GPHL_AUTOPROC_PATH
GPHL_MINICONDA_PATH = self.config.software_paths.get("GPHL_MINICONDA_PATH")
if GPHL_MINICONDA_PATH:
envs["GPHL_MINICONDA_PATH"] = GPHL_MINICONDA_PATH
if runworkflow_opts:
envs["RUNWORKFLOW_OPTS"] = " ".join(runworkflow_opts)
self.log.debug("Executing GPhL workflow, in environment %s", envs)
try:
self._running_process = subprocess.Popen(command_list, env=envs)
except Exception:
logging.getLogger().exception("Error in spawning workflow application")
raise
self.workflow_queue = workflow_queue
logging.getLogger("py4j.clientserver").setLevel(logging.WARNING)
self.update_state(self.STATES.READY)
self.log.debug(
"GPhL workflow pid, returncode : %s, %s"
% (self._running_process.pid, self._running_process.returncode)
)
def workflow_ended(self):
if self.get_state() == self.STATES.OFF:
# No workflow to abort
return
self.log.debug("GPhL workflow ended")
self.update_state(self.STATES.OFF)
if self._await_result is not None:
# We are awaiting an answer - give an abort
self._await_result.append((GphlMessages.BeamlineAbort(), None))
time.sleep(0.2)
elif self._running_process is not None:
self._running_process = None
# NBNB TODO how do we close down the workflow if there is no answer pending?
self._enactment_id = None
self.workflow_queue = None
self._await_result = None
xx0 = self.collect_emulator_process
if xx0 is not None:
self.collect_emulator_process = "ABORTED"
try:
if xx0.poll() is None:
xx0.send_signal(signal.SIGINT)
time.sleep(3)
if xx0.poll() is None:
xx0.terminate()
time.sleep(9)
if xx0.poll() is None:
xx0.kill()
except Exception:
self.log.info(
"Exception while terminating external workflow process %s", xx0
)
self.log.info("Error was:", exc_info=True)
def close_connection(self):
self.log.debug("GPhL Close connection ")
xx0 = self._gateway
self._gateway = None
if xx0 is not None:
try:
# Exceptions 'can easily happen' (py4j docs)
# Without raise_exception exceptions in the first part of the shutddown
# will be caught and the rest of the shutdown will continue.
# which is what we want.
# xx0.shutdown(raise_exception=True)
xx0.shutdown()
except Exception:
self.log.debug("Exception during py4j gateway shutdown. Ignored")
[docs] def abort_workflow(self, message=None):
"""Abort workflow - may be called from controller in any state"""
self.log.info("Aborting workflow: %s", message)
logging.getLogger("user_level_log").info("Aborting workflow ...")
if self._await_result is not None:
# Workflow waiting for answer - send abort
self._await_result = [(GphlMessages.BeamlineAbort(), None)]
# Shut down hardware object
que = self.workflow_queue
if que is None:
self.workflow_ended()
else:
# If the queue is running,
# workflow_ended will be called from post_execute
que.put_nowait(StopIteration)
[docs] def processText(self, py4j_message):
"""Receive and process info message from workflow server
Return goes to server
NB Called from external java) workflow"""
xx0 = self._decode_py4j_message(py4j_message)
message_type = xx0.message_type
payload = xx0.payload
correlation_id = xx0.correlation_id
if not payload:
self.log.warning("GPhL Empty or unparsable information message. Ignored")
elif self.workflow_queue is not None:
# Could happen if we have ended the workflow
self.workflow_queue.put_nowait(
(message_type, payload, correlation_id, None)
)
[docs] def processMessage(self, py4j_message):
"""Receive and process message from workflow server
Return goes to server
NB Called from external java) workflow"""
if self.get_state() is self.STATES.OFF:
return None
if not self.jvm_imports_checked:
# We need to use dir to check for the presence or absence of an imported
# class. hasattr/getattr don't do what is needed here, because if the
# attribute name is not present java_gateway receives proto.SUCCESS_PACKAGE
# from the Java side and instantiates a JavaPackage with the name, even
# though no such package exists in the JVM. See:
# https://github.com/py4j/py4j/blob/cb9e392d8fc5bec6b99a612e2911017900061628/py4j-python/src/py4j/java_gateway.py#L1748
# This looks like a py4j bug on the Java side but needs more investigation.
# The Py4jMessage class is used as the indicator class here: we assume that
# if it has been imported, all other unqualified Java classnames used via
# the default JVM view have been imported, otherwise none of them have been
# imported and we need to do it here.
if "Py4jMessage" not in dir(self._gateway.jvm):
for qualified_class_name in (
"co.gphl.sdcp.astra.service.py4j.Py4jMessage",
"co.gphl.beamline.v2_unstable.instrumentation.CentringStatus",
"co.gphl.beamline.v2_unstable.domain_types.CrystalClass",
"co.gphl.beamline.v2_unstable.domain_types.ChemicalElement",
"co.gphl.beamline.v2_unstable.domain_types.AbsorptionEdge",
):
java_gateway.java_import(self._gateway.jvm, qualified_class_name)
self.log.warning(
"Importing required unqualified class names from the JVM explicitly"
)
self.log.warning(
"Please consider upgrading the GPhL workflow application"
)
self.jvm_imports_checked = True
xx0 = self._decode_py4j_message(py4j_message)
message_type = xx0.message_type
payload = xx0.payload
correlation_id = xx0.correlation_id
if self._enactment_id is None:
# NB this should be made less primitive
# once we are past direct function calls
self._enactment_id = xx0.enactment_id
elif not payload:
self.log.error(
"GPhL message lacks payload - sending 'Abort' to external workflow"
)
return self._response_to_server(
GphlMessages.BeamlineAbort(), correlation_id
)
if message_type in ("SubprocessStarted", "SubprocessStopped"):
if self.workflow_queue is not None:
# Could happen if we have ended the workflow
self.workflow_queue.put_nowait(
(message_type, payload, correlation_id, None)
)
self.log.debug("Subprocess start/stop - return None")
return None
elif message_type in (
"RequestConfiguration",
"GeometricStrategy",
"CollectionProposal",
"ChooseLattice",
"RequestCentring",
"ObtainPriorInformation",
"PrepareForCentring",
):
# Requests:
self._await_result = []
self.update_state(self.STATES.BUSY)
if self.workflow_queue is None:
# Could be None if we have ended the workflow
return self._response_to_server(
GphlMessages.BeamlineAbort(), correlation_id
)
else:
self.workflow_queue.put_nowait(
(message_type, payload, correlation_id, self._await_result)
)
while not self._await_result:
time.sleep(0.1)
result, correlation_id = self._await_result.pop(0)
if self.get_state() == self.STATES.BUSY:
self.update_state(self.STATES.READY)
self._await_result = None
if result is StopIteration:
result = GphlMessages.BeamlineAbort()
self.workflow_queue.put_nowait(
(
"WorkflowAborted",
GphlMessages.WorkflowAborted(),
correlation_id,
None,
)
)
self.workflow_ended()
else:
self.log.debug(
"GPhL - response=%s messageId=%s"
% (result.__class__.__name__, correlation_id)
)
return self._response_to_server(result, correlation_id)
elif message_type in ("WorkflowAborted", "WorkflowCompleted", "WorkflowFailed"):
if self.workflow_queue is not None:
# Could happen if we have ended the workflow
self.workflow_queue.put_nowait(
(message_type, payload, correlation_id, None)
)
self.workflow_ended()
self.log.debug("Aborting - return None")
return None
else:
self.log.error("GPhL Unknown message type: %s - aborting", message_type)
return self._response_to_server(
GphlMessages.BeamlineAbort(), correlation_id
)
# Conversion to Python
def _decode_py4j_message(self, py4j_message):
"""Extract messageType and convert py4J object to python object"""
# Determine message type
message_type = py4j_message.getPayloadClass().getSimpleName()
if message_type.endswith("Impl"):
message_type = message_type[:-4]
if message_type == "RequestConfiguration":
xx0 = py4j_message.getPayload().getEnactmentId()
enactment_id = xx0 and xx0.toString()
else:
enactment_id = None
xx0 = py4j_message.getCorrelationId()
correlation_id = xx0 and xx0.toString()
if message_type == "String":
payload = py4j_message.getPayload()
else:
self.log.debug(
"GPhL incoming: message=%s, jobId=%s, messageId=%s"
% (message_type, enactment_id, correlation_id)
)
converterName = "_%s_to_python" % message_type
try:
# determine converter function
converter = getattr(self, converterName)
except AttributeError:
self.log.error(
"GPhL Message type %s not recognised (no %s function)"
% (message_type, converterName)
)
payload = None
else:
try:
# Convert to Python objects
payload = converter(py4j_message.getPayload())
except NotImplementedError:
self.log.error(
"Processing of GPhL message %s not implemented", message_type
)
payload = None
return GphlMessages.ParsedMessage(
message_type, payload, enactment_id, correlation_id
)
def _RequestConfiguration_to_python(self, py4jRequestConfiguration):
py4jWorkflowVersion = py4jRequestConfiguration.getWorkflowVersion()
workflowVersion = self._SimpleVersion_to_string(py4jWorkflowVersion)
metadata = py4jWorkflowVersion.getBuildmetadata()
if metadata:
# NB buildtime is not used now, but could be in later versions
# NB a human-readable buildtime is part of the metadata
# buildtime = py4jWorkflowVersion.getBuildTime()
if not py4jWorkflowVersion.isClean():
metadata += "-dirty"
workflowVersion = f"{workflowVersion}+{metadata}"
abiVersion = self._SimpleVersion_to_string(
py4jRequestConfiguration.getAbiVersion()
)
return GphlMessages.RequestConfiguration(workflowVersion, abiVersion)
def _SimpleVersion_to_string(self, py4jSimpleVersion):
parts = [
str(py4jSimpleVersion.getMajor()),
str(py4jSimpleVersion.getMinor()),
str(py4jSimpleVersion.getPatch()),
]
xx0 = py4jSimpleVersion.getPrerelease()
result = ".".join(parts)
if xx0:
result += xx0
return result
def _ObtainPriorInformation_to_python(self, py4jObtainPriorInformation):
return GphlMessages.ObtainPriorInformation()
def _PrepareForCentring_to_python(self, py4jPrepareForCentring):
return GphlMessages.PrepareForCentring()
def _GeometricStrategy_to_python(self, py4jGeometricStrategy):
uuidString = py4jGeometricStrategy.getId().toString()
sweeps = frozenset(
self._Sweep_to_python(x) for x in py4jGeometricStrategy.getSweeps()
)
beamSetting = py4jGeometricStrategy.getDefaultBeamSetting()
if beamSetting:
beamSetting = self._BeamSetting_to_python(beamSetting)
else:
beamSetting = None
detectorSetting = py4jGeometricStrategy.getDefaultDetectorSetting()
if detectorSetting:
detectorSetting = self._DetectorSetting_to_python(detectorSetting)
else:
detectorSetting = None
reflectingRangeEsd = None
try:
if py4jGeometricStrategy.isSetReflectingRangeEsd():
reflectingRangeEsd = py4jGeometricStrategy.getReflectingRangeEsd()
except Exception: # noqa S110
# Temporary fix, pending upgrading of the GPhL workflow
# NB the error raised is likely Py4JError - but we do not care
pass
return GphlMessages.GeometricStrategy(
# isInterleaved=py4jGeometricStrategy.isInterleaved(),
isUserModifiable=py4jGeometricStrategy.isUserModifiable(),
allowedWidths=py4jGeometricStrategy.getAllowedWidths(),
sweepOffset=py4jGeometricStrategy.getSweepOffset(),
sweepRepeat=py4jGeometricStrategy.getSweepRepeat(),
defaultWidthIdx=py4jGeometricStrategy.getDefaultWidthIdx(),
defaultBeamSetting=beamSetting,
defaultDetectorSetting=detectorSetting,
sweeps=sweeps,
reflectingRangeEsd=reflectingRangeEsd,
id_=uuid.UUID(uuidString),
)
def _SubprocessStarted_to_python(self, py4jSubprocessStarted):
return GphlMessages.SubprocessStarted(name=py4jSubprocessStarted.getName())
def _SubprocessStopped_to_python(self, py4jSubprocessStopped):
return GphlMessages.SubprocessStopped()
def _ChooseLattice_to_python(self, py4jChooseLattice):
# NB the functions return different types, so toString is needed in only once
indexingFormat = py4jChooseLattice.getIndexingFormat().toString()
indexingHeader = py4jChooseLattice.getIndexingHeader()
inputCell = py4jChooseLattice.getUserProvidedCell()
userProvidedCell = self._UnitCell_to_python(inputCell) if inputCell else None
return GphlMessages.ChooseLattice(
indexingSolutions=tuple(
self._IndexingSolution_to_python(sol)
for sol in py4jChooseLattice.getIndexingSolutions()
),
indexingFormat=indexingFormat,
indexingHeader=indexingHeader,
priorCrystalClasses=tuple(
ccl.toString() for ccl in py4jChooseLattice.getPriorCrystalClasses()
),
priorSpaceGroup=py4jChooseLattice.getPriorSpaceGroup(),
priorSpaceGroupString=py4jChooseLattice.getPriorSpaceGroupString(),
userProvidedCell=userProvidedCell,
)
def _CollectionProposal_to_python(self, py4jCollectionProposal):
uuidString = py4jCollectionProposal.getId().toString()
strategy = self._GeometricStrategy_to_python(
py4jCollectionProposal.getStrategy()
)
text_type = conversion.text_type
id2Sweep = dict((text_type(x.id_), x) for x in strategy.sweeps)
scans = []
for py4jScan in py4jCollectionProposal.getScans():
sweep = id2Sweep[py4jScan.getSweep().getId().toString()]
scans.append(self._Scan_to_python(py4jScan, sweep))
return GphlMessages.CollectionProposal(
relativeImageDir=py4jCollectionProposal.getRelativeImageDir(),
strategy=strategy,
scans=scans,
id_=uuid.UUID(uuidString),
)
def __WorkflowDone_to_python(self, py4jWorkflowDone, cls):
Issue = GphlMessages.Issue
issues = []
for py4jIssue in py4jWorkflowDone.getIssues():
component = py4jIssue.getComponent()
message = py4jIssue.getMessage()
code = py4jIssue.getCode()
issues.append(Issue(component=component, message=message, code=code))
return cls(issues=issues)
def _WorkflowCompleted_to_python(self, py4jWorkflowCompleted):
return self.__WorkflowDone_to_python(
py4jWorkflowCompleted, GphlMessages.WorkflowCompleted
)
def _WorkflowAborted_to_python(self, py4jWorkflowAborted):
return self.__WorkflowDone_to_python(
py4jWorkflowAborted, GphlMessages.WorkflowAborted
)
def _WorkflowFailed_to_python(self, py4jWorkflowFailed):
return self.__WorkflowDone_to_python(
py4jWorkflowFailed, GphlMessages.WorkflowFailed
)
def _RequestCentring_to_python(self, py4jRequestCentring):
goniostatRotation = self._GoniostatRotation_to_python(
py4jRequestCentring.getGoniostatRotation()
)
return GphlMessages.RequestCentring(
currentSettingNo=py4jRequestCentring.getCurrentSettingNo(),
totalRotations=py4jRequestCentring.getTotalRotations(),
goniostatRotation=goniostatRotation,
)
def _GoniostatRotation_to_python(self, py4jGoniostatRotation, isSweepSetting=False):
if py4jGoniostatRotation is None:
return None
uuidString = py4jGoniostatRotation.getId().toString()
axisSettings = py4jGoniostatRotation.getAxisSettings()
if isSweepSetting:
scanAxis = py4jGoniostatRotation.getScanAxis()
result = GphlMessages.GoniostatSweepSetting(
id_=uuid.UUID(uuidString), scanAxis=scanAxis, **axisSettings
)
else:
result = GphlMessages.GoniostatRotation(
id_=uuid.UUID(uuidString), **axisSettings
)
py4jGoniostatTranslation = py4jGoniostatRotation.getTranslation()
if py4jGoniostatTranslation:
translationAxisSettings = py4jGoniostatTranslation.getAxisSettings()
translationUuidString = py4jGoniostatTranslation.getId().toString()
# Next line creates Translation and links it to Rotation
GphlMessages.GoniostatTranslation(
id_=uuid.UUID(translationUuidString),
rotation=result,
**translationAxisSettings,
)
return result
def _BeamstopSetting_to_python(self, py4jBeamstopSetting):
if py4jBeamstopSetting is None:
return None
uuidString = py4jBeamstopSetting.getId().toString()
axisSettings = py4jBeamstopSetting.getAxisSettings()
return GphlMessages.BeamstopSetting(id_=uuid.UUID(uuidString), **axisSettings)
def _DetectorSetting_to_python(self, py4jDetectorSetting):
if py4jDetectorSetting is None:
return None
uuidString = py4jDetectorSetting.getId().toString()
axisSettings = py4jDetectorSetting.getAxisSettings()
return GphlMessages.DetectorSetting(id_=uuid.UUID(uuidString), **axisSettings)
def _BeamSetting_to_python(self, py4jBeamSetting):
if py4jBeamSetting is None:
return None
uuidString = py4jBeamSetting.getId().toString()
return GphlMessages.BeamSetting(
id_=uuid.UUID(uuidString), wavelength=py4jBeamSetting.getWavelength()
)
def _GoniostatSweepSetting_to_python(self, py4jGoniostatSweepSetting):
return self._GoniostatRotation_to_python(
py4jGoniostatSweepSetting, isSweepSetting=True
)
def _UnitCell_to_python(self, py4jUnitCell):
cell_params = tuple(py4jUnitCell.getLengths()) + tuple(py4jUnitCell.getAngles())
return GphlMessages.UnitCell(*cell_params)
def _IndexingSolution_to_python(self, py4jIndexingSolution):
return GphlMessages.IndexingSolution(
bravaisLattice=py4jIndexingSolution.getBravaisLattice(),
cell=self._UnitCell_to_python(py4jIndexingSolution.getCell()),
isConsistent=py4jIndexingSolution.isConsistent(),
latticeCharacter=py4jIndexingSolution.getLatticeCharacter(),
qualityOfFit=py4jIndexingSolution.getQualityOfFit(),
)
def _Sweep_to_python(self, py4jSweep):
# NB scans are not set - where scans are present in a message,
# the link is set from the Scan side.
uuidString = py4jSweep.getId().toString()
return GphlMessages.Sweep(
goniostatSweepSetting=self._GoniostatSweepSetting_to_python(
py4jSweep.getGoniostatSweepSetting()
),
detectorSetting=self._DetectorSetting_to_python(
py4jSweep.getDetectorSetting()
),
beamSetting=self._BeamSetting_to_python(py4jSweep.getBeamSetting()),
start=py4jSweep.getStart(),
width=py4jSweep.getWidth(),
beamstopSetting=self._BeamstopSetting_to_python(
py4jSweep.getBeamstopSetting()
),
sweepGroup=py4jSweep.getSweepGroup(),
id_=uuid.UUID(uuidString),
)
def _ScanExposure_to_python(self, py4jScanExposure):
uuidString = py4jScanExposure.getId().toString()
return GphlMessages.ScanExposure(
time=py4jScanExposure.getTime(),
transmission=py4jScanExposure.getTransmission(),
id_=uuid.UUID(uuidString),
)
def _ScanWidth_to_python(self, py4jScanWidth):
uuidString = py4jScanWidth.getId().toString()
return GphlMessages.ScanWidth(
imageWidth=py4jScanWidth.getImageWidth(),
numImages=py4jScanWidth.getNumImages(),
id_=uuid.UUID(uuidString),
)
def _Scan_to_python(self, py4jScan, sweep):
uuidString = py4jScan.getId().toString()
return GphlMessages.Scan(
width=self._ScanWidth_to_python(py4jScan.getWidth()),
exposure=self._ScanExposure_to_python(py4jScan.getExposure()),
imageStartNum=py4jScan.getImageStartNum(),
start=py4jScan.getStart(),
sweep=sweep,
filenameParams=py4jScan.getFilenameParams(),
id_=uuid.UUID(uuidString),
)
# Conversion to Java
def _payload_to_java(self, payload):
"""Convert Python payload object to java"""
payloadType = payload.__class__.__name__
if payloadType == "ConfigurationData":
return self._ConfigurationData_to_java(payload)
if payloadType == "BeamlineAbort":
return self._BeamlineAbort_to_java(payload)
if payloadType == "ReadyForCentring":
return self._ReadyForCentring_to_java(payload)
if payloadType == "SampleCentred":
return self._SampleCentred_to_java(payload)
if payloadType == "CollectionDone":
# self.test_lattice_selection()
return self._CollectionDone_to_java(payload)
if payloadType == "SelectedLattice":
return self._SelectedLattice_to_java(payload)
if payloadType == "CentringDone":
return self._CentringDone_to_java(payload)
if payloadType == "PriorInformation":
return self._PriorInformation_to_java(payload)
raise ValueError(
"Payload %s not supported for conversion to java" % payloadType
)
def _response_to_server(self, payload, correlation_id):
"""Create py4j message from py4j wrapper and current ids"""
if correlation_id is not None:
correlation_id = self._gateway.jvm.java.util.UUID.fromString(correlation_id)
py4j_payload = self._payload_to_java(payload)
try:
response = self._gateway.jvm.Py4jMessage(py4j_payload, correlation_id)
except Exception:
self.abort_workflow(
message="Error creating Java message (%s) to send to workflow"
% py4j_payload.getClass().getSimpleName()
)
return None
else:
return response
def _CentringDone_to_java(self, centringDone):
jvm = self._gateway.jvm
return jvm.astra.messagebus.messages.information.CentringDoneImpl(
jvm.CentringStatus.valueOf(centringDone.status),
self.to_java_time(centringDone.timestamp),
self._GoniostatTranslation_to_java(centringDone.goniostatTranslation),
)
def _ConfigurationData_to_java(self, configurationData):
jvm = self._gateway.jvm
return jvm.astra.messagebus.messages.information.ConfigurationDataImpl(
self._gateway.jvm.java.io.File(configurationData.location)
)
def _ReadyForCentring_to_java(self, readyForCentring):
return (
self._gateway.jvm.astra.messagebus.messages.control.ReadyForCentringImpl()
)
def _PriorInformation_to_java(self, priorInformation):
jvm = self._gateway.jvm
buildr = jvm.astra.messagebus.messages.information.PriorInformationImpl.Builder(
jvm.java.util.UUID.fromString(
conversion.text_type(priorInformation.sampleId)
)
)
xx0 = priorInformation.sampleName
if xx0:
buildr = buildr.sampleName(xx0)
xx0 = priorInformation.rootDirectory
if xx0:
buildr = buildr.rootDirectory(xx0)
buildr = buildr.userProvidedInfo(
self._UserProvidedInfo_to_java(priorInformation.userProvidedInfo)
)
return buildr.build()
def _SampleCentred_to_java(self, sampleCentred):
cls = self._gateway.jvm.astra.messagebus.messages.information.SampleCentredImpl
# if sampleCentred.interleaveOrder:
result = cls(
float(sampleCentred.imageWidth),
int(sampleCentred.wedgeWidth),
float(sampleCentred.exposure),
float(sampleCentred.transmission),
list(sampleCentred.interleaveOrder),
list(self._PhasingWavelength_to_java(x) for x in sampleCentred.wavelengths),
self._BcsDetectorSetting_to_java(sampleCentred.detectorSetting),
sampleCentred.repetition_count,
)
beamstopSetting = sampleCentred.beamstopSetting
if beamstopSetting is not None:
result.setBeamstopSetting(self._BeamstopSetting_to_java(beamstopSetting))
translationSettings = sampleCentred.goniostatTranslations
if translationSettings:
result.setGoniostatTranslations(
list(self._GoniostatTranslation_to_java(x) for x in translationSettings)
)
return result
def _CollectionDone_to_java(self, collectionDone):
jvm = self._gateway.jvm
proposalId = jvm.java.util.UUID.fromString(
conversion.text_type(collectionDone.proposalId)
)
centrings = set(
self._GoniostatTranslation_to_java(translation)
for translation in collectionDone.centrings
)
scanIdMap = {}
for item in collectionDone.scanIdMap.items():
scanIdMap[jvm.java.util.UUID.fromString(conversion.text_type(item[0]))] = (
jvm.java.util.UUID.fromString(conversion.text_type(item[1]))
if item[1]
else None
)
return jvm.astra.messagebus.messages.information.CollectionDoneImpl(
proposalId,
collectionDone.status,
collectionDone.procWithLatticeParams,
scanIdMap,
centrings,
)
def _SelectedLattice_to_java(self, selectedLattice):
jvm = self._gateway.jvm
builder = jvm.astra.messagebus.messages.information.SelectedLatticeImpl.Builder(
self._IndexingSolution_to_java(selectedLattice.solution)
)
builder = builder.strategyDetectorSetting(
self._BcsDetectorSetting_to_java(selectedLattice.strategyDetectorSetting)
)
builder = builder.strategyWavelength(
self._PhasingWavelength_to_java(selectedLattice.strategyWavelength)
)
builder = builder.strategyControl(selectedLattice.strategyControl)
builder = builder.userSpaceGroup(selectedLattice.userSpaceGroup)
crystal_classes = selectedLattice.userCrystalClasses
if crystal_classes:
ccset = set(
jvm.CrystalClass.fromStringList(self.toJStringArray(crystal_classes))
)
builder = builder.userCrystalClasses(ccset)
urlstrings = selectedLattice.referenceReflectionFiles
for urlstring in urlstrings:
urltpl = urlparse(urlstring)
host = urltpl.hostname
port = urltpl.port or None
if host:
builder = builder.referenceFile(
urltpl.scheme, urltpl.hostname, port, urltpl.path
)
elif urltpl:
builder = builder.referenceFile(urltpl.scheme, urltpl.path)
return builder.build()
def _IndexingSolution_to_java(self, indexingSolution):
jvm = self._gateway.jvm
cell = indexingSolution.cell
cell = cell and self._UnitCell_to_java(cell)
result = jvm.astra.messagebus.messages.information.IndexingSolutionImpl(
indexingSolution.bravaisLattice,
indexingSolution.latticeCharacter,
indexingSolution.isConsistent,
indexingSolution.qualityOfFit,
cell,
)
return result
def _BeamlineAbort_to_java(self, beamlineAbort):
return (
self._gateway.jvm.astra.messagebus.messages.instructions.BeamlineAbortImpl()
)
def _UserProvidedInfo_to_java(self, userProvidedInfo):
jvm = self._gateway.jvm
if userProvidedInfo is None:
return None
builder = (
jvm.astra.messagebus.messages.information.UserProvidedInfoImpl.Builder()
)
for scatterer in userProvidedInfo.scatterers:
builder = builder.addScatterer(self._AnomalousScatterer_to_java(scatterer))
crystal_classes = userProvidedInfo.crystalClasses
if crystal_classes:
ccset = set(
jvm.CrystalClass.fromStringList(self.toJStringArray(crystal_classes))
)
builder = builder.crystalClasses(ccset)
xx0 = userProvidedInfo.spaceGroup
if xx0:
builder = builder.spaceGroup(xx0)
xx0 = userProvidedInfo.spaceGroupString
if xx0:
builder = builder.spaceGroupString(xx0)
xx0 = userProvidedInfo.cell
if xx0 is not None:
builder = builder.cell(self._UnitCell_to_java(xx0))
if userProvidedInfo.expectedResolution:
builder = builder.expectedResolution(
float(userProvidedInfo.expectedResolution)
)
xx0 = userProvidedInfo.isAnisotropic
if xx0 is not None:
builder = builder.anisotropic(xx0)
return builder.build()
def _AnomalousScatterer_to_java(self, anomalousScatterer):
jvm = self._gateway.jvm
if anomalousScatterer is None:
return None
element = jvm.ChemicalElement.valueOf(anomalousScatterer.element)
edge = jvm.AbsorptionEdge.valueOf(anomalousScatterer.edge)
return jvm.astra.messagebus.messages.domain_types.AnomalousScattererImpl(
element, edge
)
def _UnitCell_to_java(self, unitCell):
if unitCell is None:
return None
lengths = [float(x) for x in unitCell.lengths]
angles = [float(x) for x in unitCell.angles]
return self._gateway.jvm.astra.messagebus.messages.domain_types.UnitCellImpl(
lengths[0], lengths[1], lengths[2], angles[0], angles[1], angles[2]
)
def _PhasingWavelength_to_java(self, phasingWavelength):
jvm = self._gateway.jvm
if phasingWavelength is None:
return None
javaUuid = self._gateway.jvm.java.util.UUID.fromString(
conversion.text_type(phasingWavelength.id_)
)
return jvm.astra.messagebus.messages.information.PhasingWavelengthImpl(
javaUuid, float(phasingWavelength.wavelength), phasingWavelength.role
)
def _BcsDetectorSetting_to_java(self, bcsDetectorSetting):
jvm = self._gateway.jvm
if bcsDetectorSetting is None:
return None
orgxy = bcsDetectorSetting.orgxy
# Need (temporarily?) because there is a primitive array, not a list,
# on the other side
orgxy_array = self._gateway.new_array(jvm.double, 2)
orgxy_array[0] = orgxy[0]
orgxy_array[1] = orgxy[1]
axisSettings = dict(
((x, float(y)) for x, y in bcsDetectorSetting.axisSettings.items())
)
javaUuid = jvm.java.util.UUID.fromString(
conversion.text_type(bcsDetectorSetting.id_)
)
return jvm.astra.messagebus.messages.instrumentation.BcsDetectorSettingImpl(
float(bcsDetectorSetting.resolution), orgxy_array, axisSettings, javaUuid
)
def _GoniostatTranslation_to_java(self, goniostatTranslation):
jvm = self._gateway.jvm
if goniostatTranslation is None:
return None
gts = goniostatTranslation
javaUuid = jvm.java.util.UUID.fromString(conversion.text_type(gts.id_))
javaRotationId = jvm.java.util.UUID.fromString(
conversion.text_type(gts.requestedRotationId)
)
axisSettings = dict(((x, float(y)) for x, y in gts.axisSettings.items()))
newRotation = gts.newRotation
if newRotation:
if isinstance(newRotation, GphlMessages.GoniostatSweepSetting):
javaNewRotation = self._GoniostatSweepSetting_to_java(newRotation)
else:
javaNewRotation = self._GoniostatRotation_to_java(newRotation)
return (
jvm.astra.messagebus.messages.instrumentation.GoniostatTranslationImpl(
axisSettings, javaUuid, javaRotationId, javaNewRotation
)
)
else:
return (
jvm.astra.messagebus.messages.instrumentation.GoniostatTranslationImpl(
axisSettings, javaUuid, javaRotationId
)
)
def _GoniostatRotation_to_java(self, goniostatRotation):
jvm = self._gateway.jvm
if goniostatRotation is None:
return None
grs = goniostatRotation
javaUuid = jvm.java.util.UUID.fromString(conversion.text_type(grs.id_))
axisSettings = dict(((x, float(y)) for x, y in grs.axisSettings.items()))
# Long problematic, but now fixed (on both sides)
return jvm.astra.messagebus.messages.instrumentation.GoniostatRotationImpl(
axisSettings, javaUuid
)
def _GoniostatSweepSetting_to_java(self, goniostatSweepSetting):
"""Not currently in use, as you cannot replace SweepSettings,
but may come back in if something changes"""
jvm = self._gateway.jvm
if goniostatSweepSetting is None:
return None
gss = goniostatSweepSetting
javaUuid = jvm.java.util.UUID.fromString(conversion.text_type(gss.id_))
axisSettings = dict(((x, float(y)) for x, y in gss.axisSettings.items()))
return jvm.astra.messagebus.messages.instrumentation.GoniostatSweepSettingImpl(
axisSettings, javaUuid, goniostatSweepSetting.scanAxis
)
def _BeamstopSetting_to_java(self, beamStopSetting):
jvm = self._gateway.jvm
if beamStopSetting is None:
return None
javaUuid = jvm.java.util.UUID.fromString(
conversion.text_type(beamStopSetting.id_)
)
axisSettings = dict(
((x, float(y)) for x, y in beamStopSetting.axisSettings.items())
)
return jvm.astra.messagebus.messages.instrumentation.BeamstopSettingImpl(
axisSettings, javaUuid
)
[docs] def toJStringArray(self, arr):
"""Modified from
https://stackoverflow.com/questions/61230680/pyspark-py4j-create-java-string-array
"""
jarr = self._gateway.new_array(self._gateway.jvm.java.lang.String, len(arr))
for ind, val in enumerate(arr):
jarr[ind] = val
return jarr
class Java:
implements = ["co.gphl.py4j.PythonListener"]