#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""EMBLEnergy"""
import logging
import gevent
from mxcubecore.HardwareObjects.abstract.AbstractEnergy import AbstractEnergy
__credits__ = ["EMBL Hamburg"]
__license__ = "LGPLv3+"
__category__ = "General"
[docs]class EMBLEnergy(AbstractEnergy):
"""
Defines interface to the Tine energy server
"""
def __init__(self, name):
AbstractEnergy.__init__(self, name)
self.ready_event = None
self.undulator_gaps = ()
self.ctrl_bytes = None
self.bragg_break_status = None
self.do_beam_alignment = False
self.delta = 0
self._tunable = True
self._energy_value = None
self._wavelength_value = None
self._energy_limits = ()
self._moving = None
self.chan_energy = None
self.chan_limit_low = None
self.chan_limit_high = None
self.chan_status = None
self.chan_undulator_gaps = None
self.chan_status_bragg_break = None
self.cmd_set_energy = None
self.cmd_energy_ctrl_byte = None
self.cmd_set_break_bragg = None
self.cmd_release_break_bragg = None
self.cmd_reset_perp = None
[docs] def init(self):
self.ready_event = gevent.event.Event()
self.cmd_set_energy = self.get_command_object("cmdSetEnergy")
self.cmd_energy_ctrl_byte = self.get_command_object("cmdEnergyCtrlByte")
self.cmd_set_break_bragg = self.get_command_object("cmdSetBreakBragg")
self.cmd_release_break_bragg = self.get_command_object("cmdReleaseBreakBragg")
self.cmd_reset_perp = self.get_command_object("cmdResetPerp")
self.chan_energy = self.get_channel_object("chanEnergy")
if self.chan_energy is not None:
self.chan_energy.connect_signal("update", self.energy_position_changed)
self.chan_limit_low = self.get_channel_object("chanLimitLow", optional=True)
if self.chan_limit_low is not None:
self.chan_limit_low.connect_signal("update", self.energy_limits_changed)
self.chan_limit_high = self.get_channel_object("chanLimitHigh", optional=True)
if self.chan_limit_high is not None:
self.chan_limit_high.connect_signal("update", self.energy_limits_changed)
self.chan_status = self.get_channel_object("chanStatus")
if self.chan_status is not None:
self.chan_status.connect_signal("update", self.energy_state_changed)
self.chan_undulator_gaps = self.get_channel_object(
"chanUndulatorGap", optional=True
)
if self.chan_undulator_gaps is not None:
self.chan_undulator_gaps.connect_signal(
"update", self.undulator_gaps_changed
)
self.chan_status_bragg_break = self.get_channel_object("chanStatusBraggBreak")
if self.chan_status_bragg_break is not None:
self.chan_status_bragg_break.connect_signal(
"update", self.bragg_break_status_changed
)
try:
self._default_energy = self.get_property("defaultEnergy")
except Exception:
self.log.warning("Energy: no default energy defined")
try:
self._energy_limits = eval(self.get_property("staticLimits"))
except Exception:
self._energy_limits = (None, None)
self.ctrl_bytes = eval(self.get_property("ctrlBytes"))
if not self.chan_energy:
self.energy_position_changed(self._default_energy * 1000)
[docs] def set_do_beam_alignment(self, state):
"""
Enables/disable beam alignment after changing the energy
:param state: boolean
:return:
"""
self.do_beam_alignment = state
[docs] def get_value(self):
"""
Returns current energy in keV
:return: float
"""
value = self._default_energy
if self.chan_energy is not None:
try:
value = self.chan_energy.get_value()
return value[0] / 1000
except Exception:
self.log.exception("Energy: could not read current energy")
return None
return value
[docs] def get_limits(self):
"""
Returns energy limits as list of two floats
:return: (float, float)
"""
if self.chan_limit_low is not None and self.chan_limit_high is not None:
try:
self._energy_limits = (
self.chan_limit_low.get_value(),
self.chan_limit_high.get_value(),
)
except Exception:
self.log.exception("Energy: could not read energy limits")
return self._energy_limits
[docs] def get_wavelength_limits(self):
"""
Returns wavelength limits as list of two floats
:return: (float, float)
"""
lims = None
self._energy_limits = self.getEnergyLimits()
if self._energy_limits is not None:
lims = (12.3984 / self._energy_limits[1], 12.3984 / self.en_lims[0])
return lims
[docs] def move_energy_started(self):
"""
Emits moveEnergyStarted signal
:return:
"""
self.emit("moveEnergyStarted", ())
[docs] def move_energy_failed(self):
"""
Emits moveEnergyFailedsignal
:return:
"""
self._moving = False
self.emit("moveEnergyFailed", ())
[docs] def move_energy_aborted(self):
"""
Emits moveEnergyFailed signal
:return:
"""
self._moving = False
self.emit("moveEnergyFailed", ())
[docs] def move_energy_finished(self, result):
"""
Emits moveEnergyFinished signal
:param result:
:return:
"""
self._moving = False
self.emit("moveEnergyFinished", ())
[docs] def check_limits(self, value):
"""
Checks given value if it is within limits
"""
self.log.info("Checking the move limits")
result = False
if self._energy_limits[0] <= value <= self.en_lims[1]:
self.log.info("Limits ok")
result = True
else:
logging.getLogger("GUI").info("Energy: Requested value is out of limits")
return result
#
# def start_move_wavelength(self, value, wait=True):
# """
# Starts wavelength change
# :param value: float
# :param wait: boolean
# :return:
# """
# self.log.info("Moving wavelength to (%s)" % value)
# return self.move_energy(12.3984 / value, wait)
# # return self.startMoveEnergy(value, wait)
[docs] def cancel_move_energy(self):
"""
Cancels energy change
:return:
"""
logging.getLogger("user_level_log").info("Energy: Cancel move")
# self.moveEnergy.abort()
[docs] def set_value(self, energy, wait=True):
"""
Sets energy in keV
"""
# gevent.spawn(self.move_energy_task(energy))
self.move_energy_task(energy)
[docs] def move_energy_task(self, energy):
"""
Actual energy change task
:param energy: in keV, float
:return:
"""
current_en = self.get_value()
pos = abs(current_en - energy)
self.delta = pos
if pos < 0.001:
self.emit("stateChanged", ("ready",))
else:
# if energy <= 6:
# self.cmd_energy_ctrl_byte(self.ctrl_bytes[0])
# else:
# self.cmd_energy_ctrl_byte(self.ctrl_bytes[1])
if self.cmd_energy_ctrl_byte is not None:
if pos > 0.1:
# p13 63, p14 15
self.cmd_energy_ctrl_byte(self.ctrl_bytes[1])
else:
self.cmd_energy_ctrl_byte(self.ctrl_bytes[0])
self._moving = pos
self.release_break_bragg()
gevent.sleep(2)
if self.cmd_set_energy:
logging.getLogger("GUI").info("Energy: Moving to %.2f keV", energy)
self.emit("statusInfoChanged", "Moving to %.2f keV" % energy)
self.cmd_set_energy(energy)
else:
# Mockup mode
self.energy_position_changed([energy * 1000])
[docs] def energy_position_changed(self, pos):
"""
Event called when energy value has been changed
:param pos: float
:return:
"""
# self.moveEnergyCmdFinished(True)
if isinstance(pos, (list, tuple)):
pos = pos[0]
value = pos / 1000
if self._nominal_value is None or abs(value - self._nominal_value) > 1e-3:
self.update_value(value)
[docs] def energy_limits_changed(self, limits):
"""
Updates energy limits
:param limits: (float, float)
:return:
"""
self.update_limits(self.get_limits())
[docs] def energy_state_changed(self, state):
"""
Updates energy status
:param state: int
:return:
"""
# logging.getLogger('HWR').info("Energy: State changed to %s" % str(state))
self.energy_server_check_for_errors(state)
state = int(state[0])
if state == 0:
if self._moving:
self._moving = False
self.set_break_bragg()
if self.cmd_reset_perp is not None:
self.log.info("Energy: Perp reset sent")
self.cmd_reset_perp()
self.move_energy_finished(0)
self.update_state(self.STATES.READY)
# self.emit("stateChanged", "ready")
self.emit("statusInfoChanged", "")
if self.do_beam_alignment and self.delta > 0.1:
self.emit("beamAlignmentRequested")
self.delta = 0
elif state == 1:
self.move_energy_started()
self.update_state(self.STATES.BUSY)
# self.emit("stateChanged", "busy")
[docs] def wait_ready(self, timeout=20):
"""
Waits till energy change is done
:param timeout: sec in int
:return:
"""
super(EMBLEnergy, self).wait_ready(timeout=20)
[docs] def energy_server_check_for_errors(self, state):
"""
Displays error message if the energy change fails
:param state: list of ints
:return:
"""
if state[0] == 1.0 or state[1] == 63:
return
elems = ["hdm1", "hdm2", "roll", "undulator", "bragg", "perp"]
message = "Energy: Error, setting energy failed on motors: "
bits = [int(state[1]) >> i & 1 for i in range(5, -1, -1)]
# logging.getLogger('GUI').error("%s"%bits)
for i in range(6):
if bits[i] == 0:
message = "%s %s" % (message, elems[i])
logging.getLogger("GUI").error(message)
self.emit("statusInfoChanged", message)
[docs] def bragg_break_status_changed(self, status):
"""
Updates status of bragg breaks
:param status:
:return:
"""
self.bragg_break_status = status
[docs] def re_emit_values(self):
"""
Reemits signals
:return:
"""
self.emit("energyChanged", (self._energy_value, self._wavelength_value))
self.emit("valueChanged", (self._energy_value,))
[docs] def undulator_gaps_changed(self, value):
"""
Updates undulator gaps
:param value:
:return:
"""
if isinstance(value, (list, tuple)):
self.undulator_gaps = value[0]
else:
self.undulator_gaps = value
[docs] def get_undulator_gaps(self):
"""
Returns undulator gaps
:return:
"""
if self.chan_undulator_gaps:
self.undulator_gaps_changed(self.chan_undulator_gaps.get_value())
return self.undulator_gaps
[docs] def set_break_bragg(self):
"""
Sets bragg breaks
:return:
"""
if self.chan_status_bragg_break.get_value() != 0:
logging.getLogger("GUI").warning("Energy: Setting bragg brake...")
self.emit("statusInfoChanged", "Setting Bragg break...")
gevent.sleep(3)
self.wait_ready()
gevent.sleep(1)
self.wait_ready()
self.log.info("Energy: Set bragg break cmd send")
self.cmd_set_break_bragg(1)
gevent.sleep(2)
if self.chan_status_bragg_break is not None:
self.log.warning("Energy: Waiting for break set (first try) ...")
with gevent.Timeout(
20, Exception("Energy: Timeout waiting for break set")
):
while self.chan_status_bragg_break.get_value() != 0:
gevent.sleep(0.1)
gevent.sleep(3)
self.log.warning("Waiting for break set (second try) ...")
with gevent.Timeout(20, Exception("Timeout waiting for break set")):
while self.chan_status_bragg_break.get_value() != 0:
gevent.sleep(0.1)
else:
gevent.sleep(10)
self.emit("statusInfoChanged", "Bragg break set")
logging.getLogger("GUI").info("Energy: Bragg brake set")
else:
logging.getLogger("GUI").info("Energy: Bragg brake already set")
[docs] def release_break_bragg(self):
"""
Release bragg breaks
:return:
"""
if self.chan_status_bragg_break.get_value() != 1:
logging.getLogger("GUI").warning("Energy: Releasing bragg brake...")
self.emit("statusInfoChanged", "Releasing Bragg break...")
self.cmd_release_break_bragg(1)
gevent.sleep(2)
if self.chan_status_bragg_break is not None:
logging.getLogger("GUI").warning(
"Energy: Waiting for brake released..."
)
with gevent.Timeout(
20, Exception("Energy: Timeout waiting for break release")
):
while self.chan_status_bragg_break.get_value() != 1:
gevent.sleep(0.1)
else:
logging.getLogger("GUI").info(
"Energy: Sleep 10 sec before brake released..."
)
gevent.sleep(10)
logging.getLogger("GUI").info("Energy: Bragg brake released")
self.emit("statusInfoChanged", "Bragg break released")
else:
logging.getLogger("GUI").info("Energy: Bragg brake already released")
self.emit("statusInfoChanged", "Bragg break released")