"""
Session hardware object.
Contains information regarding the current session and methods to
access and manipulate this information.
"""
import logging
import os
import re
import time
from pathlib import Path
from typing import Tuple
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.model.queue_model_objects import PathTemplate
default_raw_data_folder = "RAW_DATA"
default_processed_data_folder = "PROCESSED_DATA"
default_archive_folder = "ARCHIVE"
[docs]class Session(HardwareObject):
def __init__(self, name):
HardwareObject.__init__(self, name)
self.session_id = None
self.proposal_code = None
self.proposal_number = None
self.proposal_id = None
self.in_house_users = []
self.session_start_date = None
self.user_group = ""
self.default_precision = 5
self.base_directory = None
self.base_process_directory = None
self.base_archive_directory = None
self._endstation_name: str = ""
self.raw_data_folder_name = default_raw_data_folder
self.processed_data_folder_name = default_processed_data_folder
@property
def synchrotron_name(self) -> str:
return self.config.synchrotron_name
@property
def beamline_name(self) -> str:
return self.config.beamline_name
@property
def endstation_name(self) -> str:
return self._endstation_name
def set_endstation_name(self, name: str) -> None:
self.log.info(f"Setting end-station name to {name}")
self._endstation_name = name
[docs] def init(self):
def get_inhouse_proposals():
"""
get the optional 'inhouse_users' config property
return the property, or an empty list if the property is not specified
"""
inhouse_users = self.get_property("inhouse_users")
if inhouse_users:
return inhouse_users.get("proposal", [])
# property not specified
return []
file_info = self.config.file_info
folder_name = file_info.get("raw_data_folder_name")
if folder_name and folder_name.strip():
self.raw_data_folder_name = folder_name
folder_name = file_info.get("processed_data_folder_name")
if folder_name and folder_name.strip():
self.processed_data_folder_name = folder_name
archive_folder = file_info.get("archive_folder")
if archive_folder:
archive_folder = archive_folder.strip()
if not archive_folder:
archive_folder = default_archive_folder
self.in_house_users = [
(prop["code"], str(prop["number"])) for prop in get_inhouse_proposals()
]
self.set_base_data_directories(
file_info["base_directory"],
file_info["processed_data_base_directory"],
file_info["archive_base_directory"],
raw_folder=self.raw_data_folder_name,
process_folder=self.processed_data_folder_name,
archive_folder=archive_folder,
)
try:
precision = int(file_info.get("precision", self.default_precision))
except ValueError:
precision = self.default_precision
PathTemplate.set_precision(precision)
PathTemplate.set_path_template_style(
self.synchrotron_name, file_info.get("file_template")
)
self._endstation_name = self.config.endstation_name
def set_base_data_directories(
self,
base_directory,
base_process_directory,
base_archive_directory,
raw_folder="RAW_DATA",
process_folder="PROCESSED_DATA",
archive_folder="ARCHIVE",
):
self.base_directory = base_directory
self.base_process_directory = base_process_directory
self.base_archive_directory = base_archive_directory
self.raw_data_folder_name = raw_folder
self.processed_data_folder_name = process_folder
if self.base_directory is not None:
PathTemplate.set_data_base_path(self.base_directory)
if self.base_archive_directory is not None:
PathTemplate.set_archive_path(self.base_archive_directory, archive_folder)
[docs] def get_base_data_directory(self):
"""
Returns the base data directory taking the 'contextual'
information into account, such as if the current user
is inhouse.
:returns: The base data path.
:rtype: str
"""
user_category = ""
directory = ""
if self.session_start_date:
# start_time = self.session_start_date.split(" ")[0].replace("-", "")
start_time = (
self.session_start_date
) # .strftime("%Y%m%d") # .split(" ")[0].replace("-", "")
else:
start_time = time.strftime("%Y%m%d")
if self.is_inhouse():
user_category = "inhouse"
directory = os.path.join(
self.base_directory,
self.endstation_name,
user_category,
self.get_proposal(),
start_time,
)
else:
user_category = "visitor"
directory = os.path.join(
self.base_directory,
user_category,
self.get_proposal(),
self.endstation_name,
start_time,
)
return directory
[docs] def prepare_directories(self, _session):
"""
Prepares directories required for the given session.
This method is a placeholder intended to be overridden in subclasses
to implement logic for creating or preparing directories. By default,
the method is empty and does not perform any actions nor raise errors,
since its implementation may be skipped in some cases.
Args:
_session: The session object containing session-specific
information.
"""
# pass statement is required by python 3.10
pass # noqa: PIE790
[docs] def get_path_with_proposal_as_root(self, path: str) -> str:
"""
Strips the beginning of the path so that it starts with
the proposal folder as root
:path: The full path
:returns: Path stripped so that it starts with proposal
"""
if self.is_inhouse():
user_category = "inhouse"
directory = os.path.join(
self.base_directory, self.endstation_name, user_category
)
else:
user_category = "visitor"
directory = os.path.join(self.base_directory, user_category)
return path.split(directory)[1]
[docs] def get_base_image_directory(self):
"""
:returns: The base path for images.
:rtype: str
"""
return os.path.join(self.get_base_data_directory(), self.raw_data_folder_name)
[docs] def get_base_process_directory(self):
"""
:returns: The base path for processed data.
:rtype: str
"""
return os.path.join(
self.get_base_data_directory(), self.processed_data_folder_name
)
def _sanitize_subdir(self, sub_dir: str) -> str:
"""
Returns a sanitized version of the subdir, removing any characters that
are not allowed in the path as well as leading or multiple slashes.
:param sub_dir: The subdir to sanitize
:returns: The sanitized subdir.
"""
sub_dir = sub_dir.replace(":", "-")
sub_dir = re.sub(r"[^A-Za-z0-9_/-]", "", sub_dir)
sub_dir = sub_dir.lstrip("/")
return os.path.normpath(re.sub(r"/+", "/", sub_dir))
[docs] def get_image_directory(self, sub_dir: str = "") -> str:
"""
Returns the full path to images
:param subdir: sub directory relative to path returned
by get_base_image_directory
:returns: The full path to images.
"""
base_directory = Path(os.path.abspath(self.get_base_image_directory()))
if sub_dir:
sub_dir = self._sanitize_subdir(sub_dir)
directory = Path(base_directory, sub_dir)
else:
directory = base_directory
if not directory.is_relative_to(base_directory):
error_message = "Invalid subdirectory"
logging.getLogger("user_level_log").error(error_message)
raise PermissionError(error_message)
return os.path.abspath(f"{directory}/")
[docs] def get_process_directory(self, sub_dir: str = "") -> str:
"""
Returns the full path to processed data,
:param subdir: sub directory relative to path returned
by get_base_process_directory
:returns: The full path to processed data.
"""
base_directory = Path(os.path.abspath(self.get_base_process_directory()))
if sub_dir:
sub_dir = self._sanitize_subdir(sub_dir)
directory = Path(base_directory, sub_dir)
else:
directory = base_directory
if not directory.is_relative_to(base_directory):
error_message = "Invalid subdirectory"
logging.getLogger("user_level_log").error(error_message)
raise PermissionError(error_message)
return os.path.abspath(f"{directory}/")
[docs] def get_full_paths(self, subdir: str = "", tag: str = "") -> Tuple[str, str]:
"""
Returns the full path to both image and processed data.
The path(s) returned will follow the convention:
<base_directory>/<subdir>/run_<NUMBER>_<tag>
Where NUMBER is a automatically sequential number and
base_directory the path returned by get_base_image/process_directory
:param subdir: subdirectory
:param tag: tag for
:returns: Tuple with the full path to image and processed data
"""
return self.get_image_directory(subdir), self.get_process_directory(subdir)
[docs] def get_default_prefix(self, sample_data_node=None, generic_name=False):
"""
Returns the default prefix, using sample data such as the
acronym as parts in the prefix.
:param sample_data_node: The data node to get additional
information from, (which will be
added to the prefix).
:type sample_data_node: Sample
:returns: The default prefix.
:rtype: str
"""
proposal = self.get_proposal()
prefix = proposal
if sample_data_node:
name = sample_data_node.name
protein_acronym = sample_data_node.crystals[0].protein_acronym
if protein_acronym:
if name:
prefix = "%s-%s" % (protein_acronym, name)
else:
prefix = protein_acronym
else:
prefix = name or ""
elif generic_name:
prefix = "<acronym>-<name>"
return prefix
[docs] def get_default_subdir(self, sample_data: dict) -> str:
"""
Gets the default sub-directory based on sample information
Args:
sample_data: Lims sample dictionary
Returns:
Sub-directory path string
"""
subdir = ""
if isinstance(sample_data, dict):
sample_name = sample_data.get("sampleName", "")
protein_acronym = sample_data.get("proteinAcronym", "")
else:
sample_name = sample_data.name
protein_acronym = sample_data.crystals[0].protein_acronym
if protein_acronym:
subdir = "%s/%s-%s/" % (protein_acronym, protein_acronym, sample_name)
else:
subdir = "%s/" % sample_name
return subdir.replace(":", "-")
def get_archive_directory(self):
return PathTemplate.get_archive_directory()
[docs] def get_proposal(self):
"""
:returns: The proposal, 'local-user' if no proposal is
available
:rtype: str
"""
proposal = "local-user"
if self.proposal_code and self.proposal_number:
if self.proposal_code == "ifx":
self.proposal_code = "fx"
proposal = "%s%s" % (self.proposal_code, self.proposal_number)
return proposal.lower().replace("-", "")
[docs] def is_inhouse(self, proposal_code=None, proposal_number=None):
"""
Determines if a given proposal is considered to be inhouse.
:param proposal_code: Proposal code
:type proposal_code: str
:param proposal_number: Proposal number
:type proposal_number: str
:returns: True if the proposal is inhouse, otherwise False.
:rtype: bool
"""
if not proposal_code:
proposal_code = self.proposal_code
if not proposal_number:
proposal_number = self.proposal_number
if (proposal_code, proposal_number) in self.in_house_users:
return True
else:
return False
[docs] def get_inhouse_user(self):
"""
:returns: The current inhouse user.
:rtype: tuple (<proposal_code>, <proposal_number>)
"""
return self.in_house_users[0]
[docs] def set_session_start_date(self, start_date_str):
"""
:param start_date_str: The session start date
:type start_date_str: str
"""
self.session_start_date = start_date_str
[docs] def get_session_start_date(self):
"""
:returns: The session start date
:rtype: str
"""
return self.session_start_date
[docs] def set_user_group(self, group_name):
"""
:param group_name: Name of user group
:type group_name: str
"""
self.user_group = str(group_name)
[docs] def get_group_name(self):
"""
:returns: Name of user group
:rtype: str
"""
return self.user_group