Source code for mxcubecore.HardwareObjects.DESY.P11Session

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"

import glob
import json
import os
import time
from configparser import ConfigParser
from datetime import date

import yaml

from mxcubecore.HardwareObjects.Session import Session

PATH_BEAMTIME = "/gpfs/current"
PATH_COMMISSIONING = "/gpfs/commissioning"
PATH_FALLBACK = "/gpfs/local"


[docs]class P11Session(Session): default_archive_folder = "raw"
[docs] def init(self): super().init() self.settings_file = self.get_property("p11_settings_file") self.operation_mode = self.get_property("mode") self.beamtime_info = {} parser = ConfigParser() parser.read(self.settings_file) self.session_file_name = parser["general"]["file_name"] self.session_file_name = "mxcube" if self.session_start_date: self.start_time = self.session_start_date.split(" ")[0].replace("-", "") else: self.start_time = time.strftime("%Y%m%d") self.info_set_defaults() # Try to locate the metadata file and process it. self.beamtime_metadata_file = self.locate_metadata_file() if self.beamtime_metadata_file: ( self.beamline, self.beamtime, self.remote_data_dir, self.user_name, self.user_sshkey, self.slurm_reservation, self.slurm_partition, self.slurm_node, ) = self.parse_metadata_file(self.beamtime_metadata_file) else: # Fall back to local paths if no metadata is found self.log.debug("Falling back to local directory for saving data.") self.select_base_directory( "local" ) # Use local paths instead of beamtime data if self.is_beamtime_open(): self.read_beamtime_info() elif self.is_commissioning_open(): self.read_commissioning_info() self.set_base_data_directories( self.base_directory, self.base_directory, self.base_directory, raw_folder=self.raw_data_folder_name, process_folder=self.processed_data_folder_name, archive_folder=self.default_archive_folder, )
def info_set_defaults(self): self.beamtime_info["beamtimeId"] = None self.beamtime_info["proposalType"] = None self.beamtime_info["proposalId"] = None self.beamtime_info["rootPath"] = PATH_FALLBACK def is_beamtime_open(self): self.log.debug("=========== CHECKING IF BEAMTIME ID IS OPEN... ============") if self.is_writable_dir(os.path.join(PATH_BEAMTIME, self.raw_data_folder_name)): self.log.debug( "=========== BEAMTIME IS OPEN (/gpfs/current exists) ============" ) else: logging.getLogger("GUI").error( "No beamtime ID is open (check /gpfs/current)" ) return self.is_writable_dir( os.path.join(PATH_BEAMTIME, self.raw_data_folder_name) ) def is_commissioning_open(self): return self.is_writable_dir( os.path.join(PATH_COMMISSIONING, self.raw_data_folder_name) ) def is_writable_dir(self, folder): return os.path.isdir(folder) and os.access(folder, os.F_OK | os.W_OK) def get_current_beamtime_id(self): if self.is_beamtime_open(): info = self.get_beamtime_info() return info["beamtimeId"] def get_current_proposal_code(self): if self.is_beamtime_open(): info = self.get_beamtime_info() return info["proposalType"] def get_current_proposal_number(self): if self.is_beamtime_open(): info = self.get_beamtime_info() return info["proposalId"] def get_beamtime_info(self): return self.beamtime_info def read_beamtime_info(self): self.log.debug("=========== READING BEAMTIME INFO ============") if os.path.exists(PATH_BEAMTIME): if os.scandir(PATH_BEAMTIME): for ety in os.scandir(PATH_BEAMTIME): if ety.is_file() and ety.name.startswith("beamtime-metadata"): info = self.read_load_info(ety.path) self.log.debug(f"BEAMTIME INFO from {ety.path} is " + str(info)) if info is not None: self.beamtime_info.update(self.read_load_info(ety.path)) self.beamtime_info["rootPath"] = PATH_BEAMTIME else: self.log.debug(f"No beamtime ID is open, using local path {PATH_FALLBACK}.") self.beamtime_info["rootPath"] = PATH_FALLBACK def read_commissioning_info(self): for ety in os.scandir(PATH_COMMISSIONING): if ety.is_file() and ety.name.startswith("commissioning-metadata"): fname = ety.path break else: return None return self.read_load_info(fname) def read_load_info(self, filename): try: with open(filename, encoding="utf-8") as file: json_str = file.read() return json.loads(json_str) except (ValueError, FileNotFoundError): return None def select_base_directory(self, mode="beamtime"): self.base_directory = self.beamtime_info["rootPath"]
[docs] def get_base_data_directory(self): """ Returns the base data directory taking the 'contextual' information into account, such as if the current user is inhouse. :returns: The base data path. :rtype: str """ user_category = "" directory = "" return self.base_directory
[docs] def get_base_image_directory(self): """ :returns: The base path for images. :rtype: str """ return os.path.join(self.get_base_data_directory(), self.raw_data_folder_name)
[docs] def get_base_process_directory(self): """ :returns: The base path for processed data. :rtype: str """ return os.path.join( self.get_base_data_directory(), self.processed_data_folder_name )
[docs] def get_archive_directory(self): """ :returns: The base path for processed data. :rtype: str """ return os.path.join(self.get_base_data_directory(), self.default_archive_folder)
def path_to_ispyb(self, path): ispyb_template = self["file_info"].get_property("ispyb_directory_template") bid = self.beamtime_info["beamtimeId"] year = date.today().year ispyb_path = ispyb_template.format(beamtime_id=bid, year=year) return path def is_writable_dir(self, folder): return os.path.isdir(folder) and os.access(folder, os.F_OK | os.W_OK) def locate_metadata_file(self, root_dir="/gpfs"): try: beamtime_dirs = [ path for path in [ os.path.join(root_dir, entry) for entry in os.listdir(root_dir) ] if os.path.isdir(path) and not path.endswith("local") ] except OSError as e: print(e) self.log.debug("Root directory does not exist: " + str(root_dir)) return None # Fall back if the root directory doesn't exist. self.log.debug(f"Scanning directories: {beamtime_dirs}") metadata_files = [] for curr_dir in beamtime_dirs + [root_dir]: curr_dir_metadata_files = glob.glob("{0}/*metadata*.json".format(curr_dir)) metadata_files.extend(curr_dir_metadata_files) self.log.debug( f"Found metadata files in {curr_dir}: {curr_dir_metadata_files}" ) if len(metadata_files) != 1: self.log.debug( "Unique metadata JSON file not found. Falling back to /gpfs/local." ) return None # Return None to indicate no metadata file was found. return metadata_files[0] def parse_metadata_file(self, metadatafile_path): beamline = "" beamtime = "" coredatadir = "" temp_user_name = "" temp_user_sshkeyfile = "" slurm_reservation = "" slurm_partition = "" reserved_nodes = [] with open(metadatafile_path, "r") as mdfile: try: md = yaml.safe_load(mdfile) if "beamline" in md: beamline = str(md["beamline"]) if "beamtimeId" in md: beamtime = str(md["beamtimeId"]) elif "id" in md: beamtime = str(md["id"]) if "corePath" in md: coredatadir = str(md["corePath"]) if "onlineAnalysis" in md: temp_user_name = str(md["onlineAnalysis"]["userAccount"]) temp_user_sshkeyfile = str( md["onlineAnalysis"]["sshPrivateKeyPath"] ) slurm_reservation = str(md["onlineAnalysis"]["slurmReservation"]) slurm_partition = str(md["onlineAnalysis"]["slurmPartition"]) reserved_nodes = md["onlineAnalysis"]["reservedNodes"] except: raise RuntimeError( "JSON parsing of metadata file failed", metadatafile_path ) if not beamline: raise RuntimeError("Beamline ID not found", metadatafile_path) if not beamtime: raise RuntimeError("Beamtime ID not found", metadatafile_path) if not coredatadir: raise RuntimeError( "Data location on remote filesystem unknown", metadatafile_path ) if not temp_user_name: raise RuntimeError( "Temporary account for online analysis unknown ", metadatafile_path ) if not temp_user_sshkeyfile: raise RuntimeError( "SSH key for online analysis account not found", metadatafile_path ) if not slurm_reservation: raise RuntimeError( "Slurm reservation for online analysis not found", metadatafile_path ) if not slurm_partition: raise RuntimeError( "Slurm partition for online analysis not found", metadatafile_path ) if not reserved_nodes: raise RuntimeError( "Reserved node(s) for online analysis not found", metadatafile_path ) else: temp_user_sshkeyfile = os.path.join( os.path.dirname(metadatafile_path), temp_user_sshkeyfile ) slurm_node = str(reserved_nodes[0]) return ( beamline, beamtime, coredatadir, temp_user_name, temp_user_sshkeyfile, slurm_reservation, slurm_partition, slurm_node, ) def get_beamtime_metadata(self, root_dir="/gpfs"): try: metadata_file = self.locate_metadata_file(root_dir) return self.parse_metadata_file(metadata_file) except: self.log.debug( "Metadata file can not be parsed. Check if beamtime is open." ) return None def get_ssh_command(self): ssh_command = "/usr/bin/ssh" ssh_opts_general = "-o BatchMode=yes -o CheckHostIP=no -o StrictHostKeyChecking=no -o GSSAPIAuthentication=no -o GSSAPIDelegateCredentials=no -o PasswordAuthentication=no -o PubkeyAuthentication=yes -o PreferredAuthentications=publickey -o ConnectTimeout=10" ssh_opts_user = "-l {0}".format(self.user_name) ssh_opts_key = "-i {0}".format(self.user_sshkey) ssh_opts_host = self.slurm_node ssh_command += " {0} {1} {2} {3}".format( ssh_opts_general, ssh_opts_key, ssh_opts_user, ssh_opts_host ) return ssh_command def get_sbatch_command( self, jobname_prefix="onlineanalysis", logfile_path="/dev/null" ): sbatch_command = "/usr/bin/sbatch" sbatch_opts_jobname = "{0}_{1.beamline}_{1.beamtime}".format( jobname_prefix, self ) sbatch_opts_logfile = logfile_path sbatch_opts = "--partition={0.slurm_partition} --reservation={0.slurm_reservation} --job-name={1} --output={2}".format( self, sbatch_opts_jobname, sbatch_opts_logfile ) sbatch_command += " {0}".format(sbatch_opts) return sbatch_command