Source code for mxcubecore.HardwareObjects.BlissEnergy
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Lesser Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
Energy and Wavelength with bliss.
Example yaml file:
- for tunable wavelength beamline:
.. code-block:: yaml
class: BlissEnergy.BlissEnergy
configuration:
username: Energy
objects:
controller: bliss.yml
energy_motor: energy_motor.yml
- for fixed wavelength beamline:
.. code-block:: yaml
class: BlissEnergy.BlissEnergy
configuration:
username: Energy
read_only: True
default_value: 12.8123
"""
import logging
import math
from gevent import spawn
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObjectState
from mxcubecore.HardwareObjects.abstract.AbstractEnergy import AbstractEnergy
__copyright__ = """ Copyright © by the MXCuBE collaboration """
__license__ = "LGPLv3+"
[docs]class BlissEnergy(AbstractEnergy):
"""Energy and Wavelength with bliss."""
def __init__(self, name):
super().__init__(name)
self.energy_motor = None
self.controller = None
self._cmd_execution = None
[docs] def init(self):
"""Initialisation"""
super().init()
self.controller = self.get_object_by_role("controller")
self.energy_motor = self.get_object_by_role("energy_motor")
self.update_state(HardwareObjectState.READY)
if self.energy_motor:
self.update_state(self.energy_motor.get_state())
self.energy_motor.connect("valueChanged", self.update_value)
self.energy_motor.connect("stateChanged", self.update_state)
if self.read_only and not self.energy_motor:
# self._nominal_value = float(self.get_property("energy", 0))
try:
self._nominal_value = float(self.default_value)
except TypeError as err:
msg = "Energy not defined"
raise RuntimeError(msg) from err
[docs] def get_value(self):
"""Read the energy.
Returns:
(float): Energy [keV]
"""
if self.energy_motor:
self._nominal_value = self.energy_motor.get_value()
return self._nominal_value
[docs] def get_limits(self):
"""Return energy low and high limits.
Returns:
(tuple): two floats tuple (low limit, high limit) [keV].
"""
if not self.read_only:
self._nominal_limits = self.energy_motor.get_limits()
return self._nominal_limits
def _set_value(self, value):
"""Execute the sequence to move to an energy
Args:
value (float): target energy
"""
try:
defocus = HWR.beamline.beam.definer.defocused_beam
except AttributeError:
defocus = False
self.update_state(HardwareObjectState.BUSY)
try:
self.controller.change_energy(value, defocus=defocus)
except (AttributeError, RuntimeError):
self.energy_motor.set_value(value)
finally:
self.update_state(HardwareObjectState.READY)
[docs] def set_value(self, value, timeout=0):
"""Move energy to absolute position. Wait the move to finish.
Args:
value (float): target value.
timeout (float): optional - timeout [s],
If timeout == 0: return at once and do not wait
if timeout is None: wait forever.
Raises:
ValueError: Value not valid or attempt to set write only actuator.
"""
if self.read_only:
return
if self.validate_value(value):
current_value = self.get_value()
_delta = math.fabs(current_value - value)
if _delta < 0.001:
logging.getLogger("user_level_log").info(
"Energy: already at %g, not moving", value
)
return
logging.getLogger("user_level_log").info(
"Energy: moving energy to %g", value
)
if _delta > 0.02:
if timeout != 0:
self._set_value(value)
else:
self._cmd_execution = spawn(self._set_value, value)
else:
self.energy_motor.set_value(value, timeout=timeout)
else:
msg = f"Invalid value {value}"
raise ValueError(msg)
[docs] def abort(self):
"""Abort the procedure"""
if self._cmd_execution and not self._cmd_execution.ready():
self._cmd_execution.kill()