Source code for mxcubecore.HardwareObjects.ESRF.OxfordCryostream

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU General Lesser Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""Oxford Cryostream, controlled by bliss.
Example xml_ configuration:

.. code-block:: xml

 <object class="ESRF.OxfordCryostream">
   <username>Cryostream</username>
   <object role="controller" href="/bliss"/>
   <cryostat>cryostream</cryostat>
   <interval>120</interval>
   <object role="monitor_temperature" href="/monitor_temperature"/>
 </object>
"""

import logging
import sys

from gevent import (
    Timeout,
    sleep,
    spawn,
)

from mxcubecore.HardwareObjects.abstract.AbstractActuator import AbstractActuator

CRYO_STATUS = ["OFF", "SATURATED", "READY", "WARNING", "FROZEN", "UNKNOWN"]
PHASE_ACTION = {
    "RAMP": "ramp",
    "COOL": "cool",
    "HOLD": "hold",
    "PLAT": "plat",
    "PURGE": "purge",
    "END": "end",
}


[docs]class OxfordCryostream(AbstractActuator): """Control of the Oxford Cryostream model 700, 800 and 1000""" def __init__(self, name): super().__init__(name) self.temp = None self.temp_threshold = None self._monitor_obj = None self._timeout = 5 # [s] self._hw_ctrl = None self.ctrl = None self.interval = None def _do_polling(self): """Do endless polling with predefined interval""" while True: try: self.force_emit_signals() except Exception: sys.excepthook(*sys.exc_info()) sleep(self.interval)
[docs] def init(self): """Initialisation""" controller = self.get_object_by_role("controller") cryostat = self.get_property("cryostat") self.interval = self.get_property("interval", 10) try: self.ctrl = getattr(controller, cryostat) # for some reason the first reading of the cryo is giving timeout # just ignore it for ii in range(3): try: self.ctrl.input.read() break except BaseException as err: # BaseException needed because of bliss timeout exception msg = f"Cannot read cryostream: {err}. Retry {ii + 1}" self.log.warning(msg) spawn(self._do_polling) self._hw_ctrl = self.ctrl.controller._hw_controller except AttributeError as err: raise RuntimeError("Cannot use cryostream") from err self._monitor_obj = self.get_object_by_role("monitor_temperature") self.temp_threshold = self.get_property("temperature_threshold", 0.0)
[docs] def force_emit_signals(self): """Forces to emit all signals.""" self.emit("valueChanged", (self.get_value(),)) self.emit("stateChanged", (self.get_state(),))
[docs] def get_temperature(self): """Read the temperature. Returns: (float): The temperature [deg K] """ try: return self.ctrl.input.read() except Exception: # try to read again temp = self.ctrl.input.read() if temp is None: return 9999.0 return temp
[docs] def get_value(self): return self.get_temperature()
def _set_value(self, value=None): """Define the setpoint. Args: value(float): target temperature [deg K] """ if value is not None: self.ctrl.setpoint = value
[docs] def rampstate(self): """Read the state of the ramping. Returns: (str): Ramping state. """ return self.ctrl.is_ramping()
[docs] def start_action(self, phase="RAMP", target=None, rate=None): """Run phase action action. Args: phase(str): The phase action. Default value - RAMP target(float): Target temperature. rate:(float): Ramp rate. """ if phase in PHASE_ACTION: action = getattr(self._hw_ctrl, PHASE_ACTION[phase]) if rate: action(target, rate=rate) elif target: action(target) else: action()
[docs] def stop_action(self, phase="HOLD"): """Stop action. Args: phase(str): Phase action. """ if phase in PHASE_ACTION: getattr(self._hw_ctrl, PHASE_ACTION[phase])
[docs] def pause(self, execute=True): """Pause the ramping. Args: execute(bool): True to pause, False to resume. """ if execute: self._hw_ctrl.pause() else: self._hw_ctrl.resume()
[docs] def get_specific_state(self): """Read the state of the controller. Returns: (str): The state. """ try: return self._hw_ctrl.read_run_mode().upper() except (AttributeError, TypeError): return "UNKNOWN"
[docs] def get_static_parameters(self): """Get predefined parameters. Returns: {list): Predefimed parameters. """ return ["oxford", "K", "hour"]
[docs] def get_params(self): """Read from the controller. Returns: (list): [target_temperature, ramp_rate, phase, run_mode] """ target_temperature = self.ctrl.setpoint ramp_rate = self.ctrl.ramprate phase = self._hw_ctrl.read_phase().upper() run_mode = self._hw_ctrl.read_run_mode() self.temp = self.ctrl.input.read() return [target_temperature, ramp_rate, phase, run_mode]
[docs] def check_temperature(self, threshold=None): """Check if the temperature is under the threshold. Args: threshold (float): Temperature threshold (optional) Returns: (bool): True if under the threshold, False otherwise. """ threshold = threshold or self.temp_threshold logging.getLogger("user_level_log").info("Cryo temperature reading ...") cryo_temp = self.get_value() if cryo_temp > threshold: logging.getLogger("user_level_log").info("Cryo temperature too high ...") return False return True
[docs] def wait_temperature(self, threshold=None, timeout=None): """Wait until the temperature is under the threshold. Args: threshold (float): Temperature threshold (optional) timeout (float): optional - timeout [s], If timeout == 0: return at once and do not wait (default); if timeout is None: wait forever. """ if self._monitor_obj: try: check = self._monitor_obj.get_value().value except AttributeError: check = False if check is True: threshold = threshold or self.temp_threshold timeout = timeout or self._timeout with Timeout(timeout, RuntimeError("Temperature timeout")): while not self.check_temperature(threshold=threshold): sleep(0.5)