import glob
import os
import time
from pathlib import Path
from typing import Tuple
from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects import Session
from mxcubecore.model import queue_model_objects
[docs]class ESRFSession(Session.Session):
def __init__(self, name):
Session.Session.__init__(self, name)
[docs] def init(self):
Session.Session.init(self)
self._use_acronym = self.get_property("use_acronym", True)
archive_base_directory = self["file_info"].get_property(
"archive_base_directory"
)
if archive_base_directory:
archive_folder = os.path.join(
self["file_info"].get_property("archive_folder"), time.strftime("%Y")
)
queue_model_objects.PathTemplate.set_archive_path(
archive_base_directory, archive_folder
)
def set_endstation_name(self, name: str) -> None:
name = name.lower().replace("-", "")
name = {
"id231": "id23eh1",
"id232": "id23eh2",
}.get(name, name)
self.log.info(f"Setting end-station name to {name}")
self._endstation_name = name
[docs] def get_full_paths(self, subdir: str, tag: str) -> Tuple[str, str]:
"""
Returns the full path to both image and processed data.
The path(s) returned will follow the convention:
<base_direcotry>/<subdir>/run_<NUMBER>_<tag>
Where NUMBER is a automatically sequential number and
base_directory the path returned by get_base_image/process_direcotry
:param subdir: subdirecotry
:param tag: tag for
:returns: Tuple with the full path to image and processed data
"""
subdir = Path(self._sanitize_subdir(subdir))
folders = glob.glob(
os.path.join(self.get_base_image_directory(), subdir) + "/run*"
)
runs = [0]
for folder in folders:
runs.append(int(folder.split("/")[-1].strip("run_").split("_")[0]))
run_num = max(runs) + 1
# Use the same sequence numbering for all tags/types, (We are not
# creating individual run number per tag)
full_path = os.path.join(
self.get_base_image_directory(), subdir, f"run_{run_num:02d}_{tag}/"
)
# Check collects in queue not yet collected
for pt in HWR.beamline.queue_model.get_path_templates():
if pt[1].directory.startswith(full_path[:-1]):
run_num += 1
full_path = os.path.join(
self.get_base_image_directory(), subdir, f"run_{run_num:02d}/"
)
full_path = os.path.join(
self.get_base_image_directory(), subdir, f"run_{run_num:02d}_{tag}/"
)
process_path = os.path.join(
self.get_base_process_directory(), subdir, f"run_{run_num:02d}_{tag}/"
)
return full_path, process_path
[docs] def get_default_subdir(self, sample_data: dict) -> str:
"""
Gets the default sub-directory based on sample information
Args:
sample_data: Lims sample dictionary
Returns:
Sub-directory path string
"""
subdir = ""
if isinstance(sample_data, dict):
sample_name = sample_data.get("sampleName", "")
protein_acronym = sample_data.get("proteinAcronym", "")
else:
sample_name = sample_data.name
protein_acronym = sample_data.crystals[0].protein_acronym
if self._use_acronym and protein_acronym:
subdir = "%s/%s-%s/" % (protein_acronym, protein_acronym, sample_name)
else:
subdir = "%s/" % sample_name
return subdir.replace(":", "-")
[docs] def get_image_directory(self, sub_dir=None):
"""
Returns the full path to images, using the name of each of
data_nodes parents as sub directories.
:param data_node: The data node to get additional
information from, (which will be added
to the path).
:type data_node: TaskNode
:returns: The full path to images.
:rtype: str
"""
data_path = self.get_base_image_directory()
if sub_dir:
run_num = 0
data_path = os.path.join(
self.get_base_image_directory(), sub_dir, f"run_{run_num}/"
)
while os.path.exists(data_path):
data_path = os.path.join(
self.get_base_image_directory(), sub_dir, f"run_{run_num}/"
)
run_num += 1
return data_path