Source code for mxcubecore.HardwareObjects.abstract.ISPyBAbstractLims

import logging
import warnings
from typing import List

import gevent

from mxcubecore import HardwareRepository as HWR
from mxcubecore.HardwareObjects.abstract.AbstractLims import AbstractLims
from mxcubecore.HardwareObjects.abstract.ISPyBDataAdapter import ISPyBDataAdapter
from mxcubecore.model.lims_session import (
    Lims,
)


[docs]class ISPyBAbstractLIMS(AbstractLims): """ Web-service client for ISPyB. """ def __init__(self, name): super().__init__(name) self.ldapConnection = None self.pyispyb = None self._translations = {} self.authServerType = None self.loginTranslate = None self.base_result_url = None self.login_ok = False
[docs] def init(self): super().init() self.pyispyb = self.get_object_by_role("pyispyb") self.icat_client = self.get_object_by_role("icat_client") self.samples = [] self.authServerType = self.get_property("authServerType") or "ldap" if self.authServerType == "ldap": # Initialize ldap self.ldapConnection = self.get_object_by_role("ldapServer") if self.ldapConnection is None: self.log.debug("LDAP Server is not available") self.loginTranslate = self.get_property("loginTranslate", default_value=True) # ISPyB Credentials self.ws_root = self.get_property("ws_root") self.ws_username = self.get_property("ws_username") or None self.ws_password = str(self.get_property("ws_password")) or None self.proxy_address = self.get_property("proxy_address") if self.proxy_address: self.proxy = {"http": self.proxy_address, "https": self.proxy_address} else: self.proxy = {} base_result_url = self.get_property("base_result_url") if base_result_url and isinstance(base_result_url, str): self.base_result_url = base_result_url.strip() else: warnings.warn( "%s.%s missing or misconfigured: %s" % (self.__class__.__name__, "base_result_url", base_result_url) ) self.adapter = self._create_data_adapter() self.log.debug("[ISPYB] Proxy address: %s" % self.proxy) # Add the proposal codes defined in the configuration xml file # to a directory. Used by translate() if hasattr(HWR.beamline.session, "proposals"): for proposal in HWR.beamline.session["proposals"]: code = proposal.code self._translations[code] = {} try: self._translations[code]["ldap"] = proposal.ldap except AttributeError: self.log.exception("") try: self._translations[code]["ispyb"] = proposal.ispyb except AttributeError: self.log.exception("") try: self._translations[code]["gui"] = proposal.gui except AttributeError: self.log.exception("")
def _create_data_adapter(self) -> ISPyBDataAdapter: return ISPyBDataAdapter( self.ws_root.strip(), self.proxy, self.ws_username, self.ws_password, self.beamline_name, )
[docs] def get_lims_name(self) -> List[Lims]: return [ Lims( name="ISPyB", description="Information System for protein Crystallographic Beamlines", ) ]
[docs] def get_user_name(self): raise NotImplementedError
[docs] def get_full_user_name(self): raise NotImplementedError
[docs] def is_user_login_type(self): raise NotImplementedError
[docs] def store_beamline_setup(self, session_id, bl_config): self.adapter.store_beamline_setup(session_id, bl_config)
def _translate(self, code, what): try: translated = self._translations[code][what] except KeyError: translated = code return translated
[docs] def echo(self): if not self.adapter._shipping: msg = "Error in echo: Could not connect to server." logging.getLogger("ispyb_client").warning(msg) raise Exception("Error in echo: Could not connect to server.") try: self.adapter._shipping._shipping.service.echo() return True except Exception as e: logging.getLogger("ispyb_client").error(str(e)) return False
def ldap_login(self, login_name, psd): warnings.warn( ( "Using Authenticator from ISPyBClient is deprecated," "use Authenticator to authenticate separately and then login to ISPyB" ), DeprecationWarning, ) return self.ldapConnection.authenticate(login_name, psd) def ispyb_login(self, login_name, psd): raise NotImplementedError
[docs] def store_data_collection(self, mx_collection, bl_config=None): return self._store_data_collection(mx_collection, bl_config)
[docs] def update_data_collection(self, mx_collection): return self._update_data_collection(mx_collection)
[docs] def finalize_data_collection(self, mx_collection): # Also upload the same data to icat if icat_client is available if self.icat_client: self.icat_client.store_data_collection(mx_collection) return self._update_data_collection(mx_collection)
def _store_data_collection(self, mx_collection, bl_config=None): return self.adapter.store_data_collection(mx_collection, bl_config) def _update_data_collection(self, mx_collection): return self.adapter._update_data_collection(mx_collection) def update_bl_sample(self, bl_sample): return self.adapter.update_bl_sample(bl_sample)
[docs] def store_image(self, image_dict): self.adapter.store_image(image_dict)
def find_sample_by_sample_id(self, sample_id): for sample in self.samples: try: if str(sample.get("limsID")) == str(sample_id): self.log.debug("Sample found by limsID=%s" % (sample_id)) return sample except (TypeError, KeyError): self.log.exception("") return None
[docs] def get_samples(self, lims_name): self.samples = [] if self.session_manager.active_session is not None: self.samples = self.adapter.get_samples( self.session_manager.active_session.proposal_id ) self.log.debug( "get_samples. %s samples retrieved. proposal_id=%s lims_name=%s" % ( len(self.samples), self.session_manager.active_session.proposal_id, lims_name, ) ) return self.samples
[docs] def create_session(self, proposal_id: str): self.log.debug("create_session. proposal_id=%s" % proposal_id) session_manager = self.adapter.create_session(proposal_id, self.beamline_name) self.log.debug("Session created. proposal_id=%s" % proposal_id) return session_manager
[docs] def store_energy_scan(self, energyscan_dict): return self.adapter.store_energy_scan(energyscan_dict)
def associate_bl_sample_and_energy_scan(self, entry_dict): return self.adapter.associate_bl_sample_and_energy_scan(entry_dict) def get_data_collection(self, data_collection_id): return self.adapter.get_data_collection(self, data_collection_id) def get_session(self, session_id): return self.adapter.get_session(session_id)
[docs] def store_xfe_spectrum(self, xfespectrum_dict): return self.adapter.store_xfe_spectrum(xfespectrum_dict)
def is_connected(self): return self.login_ok def isInhouseUser(self, proposal_code, proposal_number): for proposal in self["inhouse"]: if proposal_code == proposal.code: if str(proposal_number) == str(proposal.number): return True return False def _store_data_collection_group(self, group_data): return self.adapter._store_data_collection_group(group_data)
[docs] def store_workflow(self, *args, **kwargs): try: return self._store_workflow(*args, **kwargs) except gevent.GreenletExit: raise except Exception: logging.exception("Could not store workflow") return None, None, None
[docs] def store_robot_action(self, robot_action_dict): return self.adapter.store_robot_action(robot_action_dict)
def create_mx_collection(self, collection_parameters): self.icat_client.create_mx_collection(collection_parameters) def create_ssx_collection( self, data_path, collection_parameters, beamline_parameters, extra_lims_values ): self.icat_client.create_ssx_collection( data_path, collection_parameters, beamline_parameters, extra_lims_values )