Source code for mxcubecore.HardwareObjects.Session

"""
Session hardware object.

Contains information regarding the current session and methods to
access and manipulate this information.
"""

import logging
import os
import re
import time
from pathlib import Path
from typing import Tuple

from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.model.queue_model_objects import PathTemplate

default_raw_data_folder = "RAW_DATA"
default_processed_data_folder = "PROCESSED_DATA"
default_archive_folder = "ARCHIVE"


[docs]class Session(HardwareObject): def __init__(self, name): HardwareObject.__init__(self, name) self.session_id = None self.proposal_code = None self.proposal_number = None self.proposal_id = None self.in_house_users = [] self.session_start_date = None self.user_group = "" self.default_precision = 5 self.base_directory = None self.base_process_directory = None self.base_archive_directory = None self._endstation_name: str = "" self.raw_data_folder_name = default_raw_data_folder self.processed_data_folder_name = default_processed_data_folder @property def synchrotron_name(self) -> str: return self.config.synchrotron_name @property def beamline_name(self) -> str: return self.config.beamline_name @property def endstation_name(self) -> str: return self._endstation_name def set_endstation_name(self, name: str) -> None: self.log.info(f"Setting end-station name to {name}") self._endstation_name = name
[docs] def init(self): def get_inhouse_proposals(): """ get the optional 'inhouse_users' config property return the property, or an empty list if the property is not specified """ inhouse_users = self.get_property("inhouse_users") if inhouse_users: return inhouse_users.get("proposal", []) # property not specified return [] file_info = self.config.file_info folder_name = file_info.get("raw_data_folder_name") if folder_name and folder_name.strip(): self.raw_data_folder_name = folder_name folder_name = file_info.get("processed_data_folder_name") if folder_name and folder_name.strip(): self.processed_data_folder_name = folder_name archive_folder = file_info.get("archive_folder") if archive_folder: archive_folder = archive_folder.strip() if not archive_folder: archive_folder = default_archive_folder self.in_house_users = [ (prop["code"], str(prop["number"])) for prop in get_inhouse_proposals() ] self.set_base_data_directories( file_info["base_directory"], file_info["processed_data_base_directory"], file_info["archive_base_directory"], raw_folder=self.raw_data_folder_name, process_folder=self.processed_data_folder_name, archive_folder=archive_folder, ) try: precision = int(file_info.get("precision", self.default_precision)) except ValueError: precision = self.default_precision PathTemplate.set_precision(precision) PathTemplate.set_path_template_style( self.synchrotron_name, file_info.get("file_template") ) self._endstation_name = self.config.endstation_name
def set_base_data_directories( self, base_directory, base_process_directory, base_archive_directory, raw_folder="RAW_DATA", process_folder="PROCESSED_DATA", archive_folder="ARCHIVE", ): self.base_directory = base_directory self.base_process_directory = base_process_directory self.base_archive_directory = base_archive_directory self.raw_data_folder_name = raw_folder self.processed_data_folder_name = process_folder if self.base_directory is not None: PathTemplate.set_data_base_path(self.base_directory) if self.base_archive_directory is not None: PathTemplate.set_archive_path(self.base_archive_directory, archive_folder)
[docs] def get_base_data_directory(self): """ Returns the base data directory taking the 'contextual' information into account, such as if the current user is inhouse. :returns: The base data path. :rtype: str """ user_category = "" directory = "" if self.session_start_date: # start_time = self.session_start_date.split(" ")[0].replace("-", "") start_time = ( self.session_start_date ) # .strftime("%Y%m%d") # .split(" ")[0].replace("-", "") else: start_time = time.strftime("%Y%m%d") if self.is_inhouse(): user_category = "inhouse" directory = os.path.join( self.base_directory, self.endstation_name, user_category, self.get_proposal(), start_time, ) else: user_category = "visitor" directory = os.path.join( self.base_directory, user_category, self.get_proposal(), self.endstation_name, start_time, ) return directory
[docs] def prepare_directories(self, _session): """ Prepares directories required for the given session. This method is a placeholder intended to be overridden in subclasses to implement logic for creating or preparing directories. By default, the method is empty and does not perform any actions nor raise errors, since its implementation may be skipped in some cases. Args: _session: The session object containing session-specific information. """ # pass statement is required by python 3.10 pass # noqa: PIE790
[docs] def get_path_with_proposal_as_root(self, path: str) -> str: """ Strips the beginning of the path so that it starts with the proposal folder as root :path: The full path :returns: Path stripped so that it starts with proposal """ if self.is_inhouse(): user_category = "inhouse" directory = os.path.join( self.base_directory, self.endstation_name, user_category ) else: user_category = "visitor" directory = os.path.join(self.base_directory, user_category) return path.split(directory)[1]
[docs] def get_base_image_directory(self): """ :returns: The base path for images. :rtype: str """ return os.path.join(self.get_base_data_directory(), self.raw_data_folder_name)
[docs] def get_base_process_directory(self): """ :returns: The base path for processed data. :rtype: str """ return os.path.join( self.get_base_data_directory(), self.processed_data_folder_name )
def _sanitize_subdir(self, sub_dir: str) -> str: """ Returns a sanitized version of the subdir, removing any characters that are not allowed in the path as well as leading or multiple slashes. :param sub_dir: The subdir to sanitize :returns: The sanitized subdir. """ sub_dir = sub_dir.replace(":", "-") sub_dir = re.sub(r"[^A-Za-z0-9_/-]", "", sub_dir) sub_dir = sub_dir.lstrip("/") return os.path.normpath(re.sub(r"/+", "/", sub_dir))
[docs] def get_image_directory(self, sub_dir: str = "") -> str: """ Returns the full path to images :param subdir: sub directory relative to path returned by get_base_image_directory :returns: The full path to images. """ base_directory = Path(os.path.abspath(self.get_base_image_directory())) if sub_dir: sub_dir = self._sanitize_subdir(sub_dir) directory = Path(base_directory, sub_dir) else: directory = base_directory if not directory.is_relative_to(base_directory): error_message = "Invalid subdirectory" logging.getLogger("user_level_log").error(error_message) raise PermissionError(error_message) return os.path.abspath(f"{directory}/")
[docs] def get_process_directory(self, sub_dir: str = "") -> str: """ Returns the full path to processed data, :param subdir: sub directory relative to path returned by get_base_process_directory :returns: The full path to processed data. """ base_directory = Path(os.path.abspath(self.get_base_process_directory())) if sub_dir: sub_dir = self._sanitize_subdir(sub_dir) directory = Path(base_directory, sub_dir) else: directory = base_directory if not directory.is_relative_to(base_directory): error_message = "Invalid subdirectory" logging.getLogger("user_level_log").error(error_message) raise PermissionError(error_message) return os.path.abspath(f"{directory}/")
[docs] def get_full_paths(self, subdir: str = "", tag: str = "") -> Tuple[str, str]: """ Returns the full path to both image and processed data. The path(s) returned will follow the convention: <base_directory>/<subdir>/run_<NUMBER>_<tag> Where NUMBER is a automatically sequential number and base_directory the path returned by get_base_image/process_directory :param subdir: subdirectory :param tag: tag for :returns: Tuple with the full path to image and processed data """ return self.get_image_directory(subdir), self.get_process_directory(subdir)
[docs] def get_default_prefix(self, sample_data_node=None, generic_name=False): """ Returns the default prefix, using sample data such as the acronym as parts in the prefix. :param sample_data_node: The data node to get additional information from, (which will be added to the prefix). :type sample_data_node: Sample :returns: The default prefix. :rtype: str """ proposal = self.get_proposal() prefix = proposal if sample_data_node: name = sample_data_node.name protein_acronym = sample_data_node.crystals[0].protein_acronym if protein_acronym: if name: prefix = "%s-%s" % (protein_acronym, name) else: prefix = protein_acronym else: prefix = name or "" elif generic_name: prefix = "<acronym>-<name>" return prefix
[docs] def get_default_subdir(self, sample_data: dict) -> str: """ Gets the default sub-directory based on sample information Args: sample_data: Lims sample dictionary Returns: Sub-directory path string """ subdir = "" if isinstance(sample_data, dict): sample_name = sample_data.get("sampleName", "") protein_acronym = sample_data.get("proteinAcronym", "") else: sample_name = sample_data.name protein_acronym = sample_data.crystals[0].protein_acronym if protein_acronym: subdir = "%s/%s-%s/" % (protein_acronym, protein_acronym, sample_name) else: subdir = "%s/" % sample_name return subdir.replace(":", "-")
def get_archive_directory(self): return PathTemplate.get_archive_directory()
[docs] def get_proposal(self): """ :returns: The proposal, 'local-user' if no proposal is available :rtype: str """ proposal = "local-user" if self.proposal_code and self.proposal_number: if self.proposal_code == "ifx": self.proposal_code = "fx" proposal = "%s%s" % (self.proposal_code, self.proposal_number) return proposal.lower().replace("-", "")
[docs] def is_inhouse(self, proposal_code=None, proposal_number=None): """ Determines if a given proposal is considered to be inhouse. :param proposal_code: Proposal code :type proposal_code: str :param proposal_number: Proposal number :type proposal_number: str :returns: True if the proposal is inhouse, otherwise False. :rtype: bool """ if not proposal_code: proposal_code = self.proposal_code if not proposal_number: proposal_number = self.proposal_number if (proposal_code, proposal_number) in self.in_house_users: return True else: return False
[docs] def get_inhouse_user(self): """ :returns: The current inhouse user. :rtype: tuple (<proposal_code>, <proposal_number>) """ return self.in_house_users[0]
[docs] def set_session_start_date(self, start_date_str): """ :param start_date_str: The session start date :type start_date_str: str """ self.session_start_date = start_date_str
[docs] def get_session_start_date(self): """ :returns: The session start date :rtype: str """ return self.session_start_date
[docs] def set_user_group(self, group_name): """ :param group_name: Name of user group :type group_name: str """ self.user_group = str(group_name)
[docs] def get_group_name(self): """ :returns: Name of user group :rtype: str """ return self.user_group