Source code for mxcubecore.HardwareObjects.abstract.AbstractSampleChanger

#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""

--------------------------------------------
Description
--------------------------------------------

AbstractSampleChanger is a base class to help in the implementation of
Hardware Objects for SampleChangers following the
"SampleChanger Standard Interface".

If this class is used as base class a standard class is then provided for
its use by generic bricks or by MXCuBE itself.  This class exposes the
following API for bricks and MXCuBE:

--------------------------------------------
SampleChanger - Standard Interface
--------------------------------------------

Sample Changer States
----------------------

    SampleChangerState.Unknown
    SampleChangerState.Ready
    SampleChangerState.Loaded
    SampleChangerState.Loading
    SampleChangerState.Unloading
    SampleChangerState.Selecting
    SampleChangerState.Scanning
    SampleChangerState.Resetting
    SampleChangerState.Charging
    SampleChangerState.Moving
    SampleChangerState.ChangingMode
    SampleChangerState.StandBy
    SampleChangerState.Disabled
    SampleChangerState.Alarm
    SampleChangerState.Fault
    SampleChangerState.Initializing
    SampleChangerState.Closing

Commands
----------------------

load()
unload()
select()
abort()
change_mode()

get_state()
get_status()
is_ready()
wait_ready()
has_loaded_sample()
get_loaded_sample()

Specifying sample locations
-----------------------------
The sample model in a sample changer is based
in the model:
   SampleChanger
      Container
         [Container...]
            Sample

Typically for a sample changer with Pucks and Sample
there is a single level for Container. Specifying
a sample location will consist in giving the puck (basket)
number followed by the sample number. In the location
example `3:5` the fifth sample in the third puck is specified.

For other more complex constructions (for example for
a plate manipulator) each nested container will be specified
until getting to the sample:

In the example for a location in a plate manipulator like `1:5:2`
the location specifies first plate well, fifth drop, second crystal.


Events emitted
----------------------

SampleChanger.STATE_CHANGED_EVENT
SampleChanger.STATUS_CHANGED_EVENT
SampleChanger.INFO_CHANGED_EVENT
SampleChanger.LOADED_SAMPLE_CHANGED_EVENT
SampleChanger.SELECTION_CHANGED_EVENT
SampleChanger.TASK_FINISHED_EVENT
SampleChanger.PROGRESS_MESSAGE

Tools for SC Classes
----------------------

- useUpdateTimer (xml property):
   This property can accept a boolean value (True/False)

   If this property is set the HardwareObject will
   poll itself for state changes, information change and
   other needed values.

   Include a line like `<useUpdateTimer>True</useUpdateTimer`
   in the xml file



--------------------------------------------
How to implement derived SC Classes
--------------------------------------------

"""

import abc
import logging

from gevent import (
    Timeout,
    sleep,
)

from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects.abstract.sample_changer.Container import Container
from mxcubecore.TaskUtils import task as dtask


[docs]class SampleChangerState: """ Enumeration of sample changer states """ Unknown = 0 Ready = 1 Loaded = 2 Loading = 3 Unloading = 4 Selecting = 5 Scanning = 6 Resetting = 7 Charging = 8 Moving = 9 ChangingMode = 10 StandBy = 11 Disabled = 12 Alarm = 13 Fault = 14 Initializing = 15 Closing = 16 STATE_DESC = { Ready: "Ready", Loaded: "Loaded", Alarm: "Alarm", Charging: "Charging", Disabled: "Disabled", Fault: "Fault", Loading: "Loading", Resetting: "Resetting", Scanning: "Scanning", Selecting: "Selecting", Unloading: "Unloading", Moving: "Moving", ChangingMode: "Changing Mode", StandBy: "StandBy", Initializing: "Initializing", Closing: "Closing", }
[docs] @staticmethod def tostring(state): """Convert state to string""" return SampleChangerState.STATE_DESC.get(state, "Unknown")
[docs]class SampleChangerMode: """ Enumeration of sample changer operating modes """ Unknown = 0 Normal = 1 Charging = 8 Disabled = 11
[docs]class SampleChanger(Container, HardwareObject): """ Abstract base class for sample changers """ __metaclass__ = abc.ABCMeta # ######################## EVENTS ######################### STATE_CHANGED_EVENT = "stateChanged" STATUS_CHANGED_EVENT = "statusChanged" INFO_CHANGED_EVENT = "infoChanged" LOADED_SAMPLE_CHANGED_EVENT = "loadedSampleChanged" SELECTION_CHANGED_EVENT = "selectionChanged" TASK_FINISHED_EVENT = "taskFinished" CONTENTS_UPDATED_EVENT = "contentsUpdated" PROGRESS_MESSAGE = "progress_message" def __init__(self, type_, scannable, name): super().__init__(type_, None, type_, scannable) HardwareObject.__init__(self, name) self.state = -1 self.status = "" self._progress_message = "" self._set_state(SampleChangerState.Unknown) self.task = None self.task_proc = None self.task_error = None self._transient = False self._token = None self._timer_update_inverval = 5 # interval in periods of 100 ms self._timer_update_counter = 0 self.use_update_timer = None
[docs] def init(self): use_update_timer = self.get_property("useUpdateTimer", True) msg = f"SampleChanger: Using update timer is {use_update_timer}" self.log.info(msg) if use_update_timer: task1s = self.__timer_1s_task(wait=False) task1s.link(self._on_timer_1s_exit) updateTask = self.__update_timer_task(wait=False) updateTask.link(self._on_timer_update_exit) self.use_update_timer = use_update_timer self.update_info()
def _on_timer_1s_exit(self, task): logging.warning("Exiting Sample Changer 1s timer task") def _on_timer_update_exit(self, task): logging.warning("Exiting Sample Changer update timer task") @dtask def __timer_1s_task(self, *args): while True: sleep(1.0) try: if self.is_enabled(): self._on_timer_1s() except Exception: pass @dtask def __update_timer_task(self, *args): while True: sleep(1) try: if self.is_enabled(): self._timer_update_counter += 1 if self._timer_update_counter >= self._timer_update_counter: self._on_timer_update() self._timer_update_counter = 0 except Exception: pass # ######################## TIMER ######################### def _set_timer_update_interval(self, value): self._timer_update_inverval = value def _on_timer_update(self): # if not self.is_executing_task(): self.update_info() def _on_timer_1s(self): pass # ####################### HardwareObject ####################### def connect_notify(self, signal): msg = f"connect_notify {signal}" logging.getLogger().info(msg) # ######################## PUBLIC #########################
[docs] def get_state(self): """ Returns: (SampleChangerState): Current sample changer state """ return self.state
[docs] def get_status(self): """ Returns: (str) String representation of current state :rtype: str """ return self.status
[docs] def get_contents_as_dict(self) -> dict: """ Build and return the hierarchical structure of the sample changer contents. Returns: dict: A nested dictionary describing the sample changer contents. { "name": <str>, # root sample changer address "room_temperature_mode": <bool> (optional), "children": [ { "name": <str>, # element address "status": <str>, # "Loaded", "Used", "Present", or "" "id": <str>, # element ID "selected": <bool>, # whether element is selected "children": [ ... ] # nested elements, same structure }, ... ] } """ contents = {"name": self.get_address()} if hasattr(self, "get_room_temperature_mode"): contents["room_temperature_mode"] = self.get_room_temperature_mode() for element in self.get_components(): if element.is_present(): self._add_element(contents, element) return contents
def _get_status(self, element) -> str: """ Determine the status string for a sample changer element. Args: element: The element object to check. Returns: str: One of: - "Loaded": element is a leaf and currently loaded - "Used": element is a leaf and has been loaded before - "Present": element is present in the changer - "": element is not present """ if element.is_leaf(): if element.is_loaded(): return "Loaded" if element.has_been_loaded(): return "Used" return "" return "Present" if element.is_present() else "" def _get_id(self, element) -> str: """ Get the unique identifier (ID or token) for a sample changer element. Args: element: The element to get the ID for. Returns: str: The token (if root sample changer and available), the element ID (if available), or an empty string. """ if element == self: token = element.get_token() return token if token else "" return element.get_id() or "" def _add_element(self, parent, element) -> dict: """ Recursively add an element and its children into the contents dictionary. Args: parent (dict): The parent dictionary to append the element to. element: The element object to add. """ new_element = { "name": element.get_address(), "status": self._get_status(element), "id": self._get_id(element), "selected": element.is_selected(), "state": element.state, "puck_barcode": element.puck_barcode, "sample_barcode": element.sample_barcode, "puck_type": element.puck_type, } parent.setdefault("children", []).append(new_element) if not element.is_leaf(): for child in element.get_components(): self._add_element(new_element, child) @property def progress_message(self) -> str: """ Returns: Current progress message """ self._progress_message
[docs] def set_progress_message(self, message: str) -> None: """ Set progress message describing the current sample changer activity. Args: message: string describing current sample changer activity. """ self._progress_message = message self._trigger_progress_message(message)
[docs] def get_task_error(self): """ Returns: (str): Description of the error of last executed task (or None if success) """ return self.task_error
[docs] def is_ready(self): """ Returns: (bool): True if the state corresponds to READY. """ return self.state in ( SampleChangerState.Ready, SampleChangerState.Loaded, SampleChangerState.Charging, SampleChangerState.StandBy, )
[docs] def wait_ready(self, timeout=None): """Wait for current sample changer operation to finish. Blocks for timeout seconds or forever if timeout is None Args: timeout (int): timeout [s]. Raises: (Exception): If operation lasts longer than the timeout. """ with Timeout(timeout, RuntimeError("Timeout waiting ready")): while not self.is_ready(): sleep(0.5)
[docs] def is_normal_state(self): """ Returns: (bool): True if state is not in the 'NOT USABLE' states list. """ return self.state not in ( SampleChangerState.Disabled, SampleChangerState.Alarm, SampleChangerState.Fault, SampleChangerState.Unknown, )
[docs] def is_enabled(self): """ Returns: (boolean): True if sample changer is enabled otherwise False """ return self.state != SampleChangerState.Disabled
[docs] def assert_enabled(self): """ Raises: (Exception): If sample changer is disabled. """ if not self.is_enabled(): raise Exception("Sample Changer is disabled")
[docs] def assert_not_charging(self): """ Raises: (Exception): If sample changer is not charging """ if self.state == SampleChangerState.Charging: raise Exception("Sample Changer is in Charging mode")
[docs] def assert_can_execute_task(self): """ Raises: (Exception): If sample changer cannot execute a task """ if not self.is_ready(): raise Exception( "Cannot execute task: bad state (" + SampleChangerState.tostring(self.state) + ")" )
[docs] def is_task_finished(self): """ Returns: (str): Description of the error of last executed task (or None if success). """ return self.is_ready() or ( (not self.is_normal_state()) and (self.state != SampleChangerState.Unknown) )
[docs] def is_executing_task(self): """ Returns: (str): Description of the error of last executed task (or None if success). """ return self.task is not None
[docs] def wait_task_finished(self, timeout=None): """ Wait for currently running task to finish. """ with Timeout(timeout, RuntimeError("Timeout waiting end of task")): while not self.is_task_finished(): sleep(0.1)
[docs] def get_loaded_sample(self): """ Returns: (Sample) Currently loaded sample """ for smp in self.get_sample_list(): if smp.is_loaded(): return smp return None
[docs] def has_loaded_sample(self): """ Returns: (boolean): True if a sample is loaded False otherwise """ return self.get_loaded_sample() is not None
[docs] def is_mounted_sample(self, sample_location): """Check if the sample is mounted. Args: sample_location: Sample location to check. Returns: (bool): True if mounted. """ try: return self.get_loaded_sample().get_coords() == sample_location except AttributeError: return False
[docs] def abort(self): """ Aborts current task and puts device in safe state """ self._do_abort() if self.task_proc is not None: self.task_proc.join(1.0) if self.task_proc is not None: self.task_proc.kill(Exception("Task aborted")) self.task = None self.task_proc = None self.task_error = None
[docs] def update_info(self): """ Update sample changer sample information, currently loaded sample and emits infoChanged and loadedSampleChanged when loaded sample have changed """ former_loaded = self.get_loaded_sample() self._do_update_info() if self._is_dirty(): self._trigger_info_changed_event() loaded = self.get_loaded_sample() if loaded != former_loaded: if ( (loaded is None) or (former_loaded is None) or (loaded.get_address() != former_loaded.get_address()) ): self._trigger_loaded_sample_changed_event(loaded) self._reset_dirty()
def is_transient(self): return self._transient def _set_transient(self, value): self._transient = value def get_token(self): return self._token def set_token(self, token): self._token = token
[docs] def get_sample_properties(self): """ Returns: (tuple): With sample properties defined in Sample """ return ()
# ######################## TASKS #########################
[docs] def change_mode(self, mode, wait=True): """Change the mode (SC specific, imply change of the State) Args: mode (int): Modes: Unknown = 0 Normal = 1 Charging = 2 Disabled = 3 wait (boolean): True to block until mode changed is completed, False otherwise Rerturns: (Object): Value returned by _execute_task either a Task or result of the operation. """ if mode == SampleChangerMode.Unknown: return if mode == self.get_state(): return if self.get_state() == SampleChangerState.Disabled: self._set_state(SampleChangerState.Unknown) self.update_info() elif mode == SampleChangerMode.Disabled: self._set_state(SampleChangerState.Disabled) return self._execute_task( SampleChangerState.ChangingMode, wait, self._do_change_mode, mode )
@dtask def scan(self, component=None, recursive=False): """Scan component or list of components for presence. Args: component (Component): Root component to start scan from. Sample changer root is used if None is passed. (recursive) (boolean): Recurse down the component structure if True, scan only component otherwise. Rerturns: (Object): Value returned by _execute_task either a Task or result of the operation. """ obj_list = [] if isinstance(component, list): for comp in component: obj_list.append(self._scan_one(comp, recursive)) return obj_list return self._scan_one(component, recursive) def _scan_one(self, component, recursive): """Scan component or list of components for samples. Args: component (Component): Root component to start scan from. Sample changer root is used if None is passed (recursive) (boolean): Recurse down the component structure if True. scan only component otherwise. Rerturns: (Object): Value returned by _execute_task either a Task or result of the operation """ self.assert_not_charging() if component is None: component = self component = self._resolve_component(component) component.assert_is_scannable() return self._execute_task( SampleChangerState.Scanning, True, self._do_scan, component, recursive )
[docs] def select(self, component, wait=True): """ Select a component. Args: component (Component): Component to select wait (boolean): True to wait for selection to complete otherwise False Rerturns: (Object): Value returned by _execute_task either a Task or result of the operation """ component = self._resolve_component(component) ret = self._execute_task( SampleChangerState.Selecting, wait, self._do_select, component ) self._trigger_selection_changed_event() return ret
[docs] def chained_load(self, sample_to_unload, sample_to_load): """ Chain the unload of a sample with a load. Args: sample_to_unload (tuple): sample address on the form (component1, ... ,component_N-1, component_N) sample_to_load (tuple): sample address on the form (component1, ... ,component_N-1, component_N) (Object): Value returned by _execute_task either a Task or result of the operation """ self.unload(sample_to_unload) self.wait_ready(timeout=10) return self.load(sample_to_load)
[docs] def load(self, sample=None, wait=True): """ Load a sample. Args: sample (tuple): sample address on the form (component1, ... ,component_N-1, component_N) wait (boolean): True to wait for load to complete False otherwise Returns (Object): Value returned by _execute_task either a Task or result of the operation """ sample = self._resolve_component(sample) self.assert_not_charging() # Do a chained load in this case if self.has_loaded_sample(): # Do a chained load in this case if (sample is None) or (sample == self.get_loaded_sample()): raise Exception( "The sample " + str(self.get_loaded_sample().get_address()) + " is already loaded" ) return self.chained_load(self.get_loaded_sample(), sample) return self._execute_task( SampleChangerState.Loading, wait, self._do_load, sample )
[docs] def unload(self, sample_slot=None, wait=True): """ Unload sample to location sample_slot, unloads to the same slot as it was loaded from if None is passed Args: sample_slot (tuple): sample address on the form (component1, ... ,component_N-1, component_N) wait: If True wait for unload to finish otherwise return immediately Returns: (Object): Value returned by _execute_task either a Task or result of the operation """ sample_slot = self._resolve_component(sample_slot) self.assert_not_charging() # In case we have manually mounted we can command an unmount if not self.has_loaded_sample(): raise Exception("No sample is loaded") return self._execute_task( SampleChangerState.Unloading, wait, self._do_unload, sample_slot )
[docs] def reset(self, wait=True): """ Reset the sample changer. wait: If True wait for reset to finish otherwise return immediately Returns: (Object): Value returned by _execute_task either a Task or result of the operation """ return self._execute_task(SampleChangerState.Resetting, wait, self._do_reset)
def _load(self, sample=None): self._do_load(sample) def _unload(self, sample_slot=None): self._do_unload(sample_slot) def _resolve_component(self, component): if component is not None and isinstance(component, str): comp = self.get_component_by_address(component) if comp is None: raise Exception(f"Invalid component: {component}") return comp return component # ######################## ABSTRACTS ######################### @abc.abstractmethod def _do_abort(self): """ Aborts current task and puts device in safe state """ return @abc.abstractmethod def _do_update_info(self): return @abc.abstractmethod def _do_change_mode(self, mode): return @abc.abstractmethod def _do_scan(self, component, recursive): return @abc.abstractmethod def _do_select(self, component): return @abc.abstractmethod def _do_load(self, sample): return @abc.abstractmethod def _do_unload(self, sample_slot=None): return @abc.abstractmethod def _do_reset(self): return # ######################## PROTECTED ######################### def _execute_task(self, task, wait, method, *args): self.assert_can_execute_task() msg = f"Start {SampleChangerState.tostring(task)}" logging.debug(msg) self.task = task self.task_error = None self._set_state(task) ret = self._run(task, method, wait=False, *args) self.task_proc = ret ret.link(self._on_task_ended) if wait: return ret.get() return ret @dtask def _run(self, task, method, *args): """ method(self,*arguments) exception=None try: while !_is_task_finished(state): time.sleep(0.1) exception=_getTaskException(state) finally: _trigger_task_finished_event(state,exception) self._set_state(SampleChangerState.Ready) """ exception = None ret = None try: ret = method(*args) except Exception as ex: exception = ex # if self.get_state()==self.task: # self._set_state(SampleChangerState.Ready) self.update_info() task = self.task self.task = None self.task_proc = None self._trigger_task_finished_event(task, ret, exception) if exception is not None: self._on_task_failed(task, exception) raise exception return ret def _on_task_failed(self, task, exception): """What to do when task failed""" def _on_task_ended(self, task): """What to do when task ended normally""" try: msg = f"Task ended. Return value: {task.get()}" logging.debug(msg) except Exception as err: msg = f"Error while executing sample changer task: {err}" logging.error(msg) def _set_state(self, state=None, status=None): """Set the state""" if (state is not None) and (self.state != state): former = self.state self.state = state if status is None: status = SampleChangerState.tostring(state) self._trigger_state_changed_event(former) if (status is not None) and (self.status != status): self.status = status self._trigger_status_changed_event() def _reset_loaded_sample(self): for smp in self.get_sample_list(): smp._set_loaded(False) self._trigger_loaded_sample_changed_event(None) def _set_loaded_sample(self, sample): loaded_sample = self.get_loaded_sample() for s in self.get_sample_list(): if s != sample: s._set_loaded(False) else: if loaded_sample != s: s._set_loaded(True) if sample != loaded_sample: self._trigger_loaded_sample_changed_event(sample) def _set_selected_sample(self, sample): cur = self.get_selected_sample() if cur != sample: Container._set_selected_sample(self, sample) self._trigger_selection_changed_event() def _set_selected_component(self, component): cur = self.get_selected_component() if cur != component: Container._set_selected_component(self, component) self._trigger_selection_changed_event() def trigger_progress_message(self, message: str): self.emit(self.PROGRESS_MESSAGE, (message,)) # ######################## PRIVATE ######################### def _trigger_state_changed_event(self, former): self.emit(self.STATE_CHANGED_EVENT, (self.state, former)) def _trigger_status_changed_event(self): self.emit(self.STATUS_CHANGED_EVENT, (str(self.status),)) def _trigger_loaded_sample_changed_event(self, sample): self.emit(self.LOADED_SAMPLE_CHANGED_EVENT, (sample,)) def _trigger_selection_changed_event(self): self.emit(self.SELECTION_CHANGED_EVENT, ()) def _trigger_info_changed_event(self): self.emit(self.INFO_CHANGED_EVENT, ()) def _trigger_sample_info_changed_event(self, sample): self.emit(self.INFO_CHANGED_EVENT, (sample)) def _trigger_task_finished_event(self, task, ret, exception): self.emit(self.TASK_FINISHED_EVENT, (task, ret, exception)) def _trigger_contents_updated_event(self): self.emit(self.CONTENTS_UPDATED_EVENT)