Source code for mxcubecore.HardwareObjects.ESRF.ESRFSmallXrayCentring

# encoding: utf-8
#
# This file is part of MXCuBE.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE.  If not, see <https://www.gnu.org/licenses/>.
""" """

from __future__ import (
    absolute_import,
    division,
    print_function,
    unicode_literals,
)

import json
import logging
import os
import time

import requests

from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractXrayCentring import (
    AbstractXrayCentring,
)

__copyright__ = """ Copyright © 2016 - 2022 by MXCuBE Collaboration """
__license__ = "LGPLv3+"
__author__ = "rhfogh"
__date__ = "25/03/2022"
__category__ = "General"


[docs]class ESRFSmallXrayCentring(AbstractXrayCentring):
[docs] def execute(self): """Executes the BES workflow SmallXrayCentring""" logging.getLogger("HWR").debug("Executes SmallXrayCentring workflow") workflow_name = "SmallXrayCentring" bes_host = HWR.beamline.workflow.get_property("bes_host") bes_port = HWR.beamline.workflow.get_property("bes_port") task_group_node_id = self._data_collection_group._node_id dict_parameters = json.loads(json.dumps(HWR.beamline.workflow.dict_parameters)) dict_parameters["sample_node_id"] = task_group_node_id dict_parameters["end_workflow_in_mxcube"] = False dict_parameters["workflow_id"] = HWR.beamline.xml_rpc_server.workflow_id dict_parameters["workflow_parameters"] = ( self._queue_entry.get_data_model().get_workflow_parameters() ) logging.getLogger("HWR").info("Starting workflow {0}".format(workflow_name)) logging.getLogger("HWR").info( "Starting a workflow on http://%s:%d/BES" % (bes_host, bes_port) ) bes_url = "http://{0}:{1}".format(bes_host, bes_port) start_URL = os.path.join(bes_url, "RUN", workflow_name) logging.getLogger("HWR").info("BES start URL: %r" % start_URL) response = requests.post(start_URL, json=dict_parameters) if response.status_code == 200: request_id = response.text logging.getLogger("HWR").info( "Workflow started, request id: %r" % request_id ) else: logging.getLogger("HWR").error("Workflow didn't start!") logging.getLogger("HWR").error(response.text) status_url = os.path.join(bes_url, "STATUS", str(request_id)) logging.getLogger("HWR").info("STATUS URL: %r" % status_url) start_time = time.time() max_time = 600 # Max five minutes finished = False timed_out = False while not timed_out and not finished: response = requests.get(status_url) if response.text == "STARTED": logging.getLogger("HWR").info("Workflow still running...") time.sleep(1) elif response.text == "FINISHED": logging.getLogger("HWR").info("Workflow finished!") finished = True if time.time() > start_time + max_time: logging.getLogger("HWR").info("Workflow timed out!") timed_out = True