Source code for mxcubecore.HardwareObjects.Energy

import logging
import math

import gevent

from mxcubecore.BaseHardwareObjects import HardwareObject

"""
Example xml file:
  - for tunable wavelength beamline:
<object class="Energy">
  <object href="/energy" role="energy"/>
  <object href="/bliss" role="controller"/>
  <tunable_energy>True</tunable_energy>
</object>
The energy should have methods get_value, get_limits and move.
If used, the controller should have method moveEnergy.

  - for fixed wavelength beamline:
<object class="Energy">
  <default_energy>12.8123</tunable_energy>
</object>
"""


[docs]class Energy(HardwareObject):
[docs] def init(self): self.ready_event = gevent.event.Event() self.energy_motor = None self.tunable = False self.moving = None self.default_en = None self.ctrl = None self.en_lims = [] try: self.energy_motor = self.get_object_by_role("energy") except KeyError: self.log.warning("Energy: error initializing energy motor") try: self.default_en = self.get_property("default_energy") except KeyError: self.log.warning("Energy: no default energy") try: self.tunable = self.get_property("tunable_energy") except KeyError: self.log.warning("Energy: will set to fixed energy") try: self.ctrl = self.get_object_by_role("controller") except KeyError: self.log.info("No controller used") if self.energy_motor is not None: self.energy_motor.connect("valueChanged", self.energyPositionChanged) self.energy_motor.connect("stateChanged", self.energyStateChanged)
def is_connected(self): return True def get_value(self): if self.energy_motor is not None: try: return self.energy_motor.get_value() except Exception: self.log.exception("EnergyHO: could not read current energy") return None return self.default_en def get_wavelength(self): current_en = self.get_current_energy() if current_en: return 12.3984 / current_en return None def get_limits(self): self.log.debug("Get energy limits") if not self.tunable: energy = self.get_value() return (energy, energy) if self.energy_motor is not None: try: self.en_lims = self.energy_motor.get_limits() return self.en_lims except Exception: self.log.exception("EnergyHO: could not read energy motor limits") return None return None def get_wavelength_limits(self): self.log.debug("Get wavelength limits") if not self.tunable: return None self.en_lims = self.get_limits() if self.en_lims: lims = (12.3984 / self.en_lims[1], 12.3984 / self.en_lims[0]) return lims # def start_move_energy(self, value, wait=True): # if not self.tunable: # return False # # try: # value = float(value) # except (TypeError, ValueError) as diag: # logging.getLogger("user_level_log").error( # "Energy: invalid energy (%s)" % value # ) # return False # # current_en = self.get_current_energy() # if current_en: # if math.fabs(value - current_en) < 0.001: # self.moveEnergyCmdFinished(True) # if self.checkLimits(value) is False: # return False # # self.moveEnergyCmdStarted() # # def change_egy(): # try: # self.set_value(value, wait=True) # except Exception: # sys.excepthook(*sys.exc_info()) # self.moveEnergyCmdFailed() # else: # self.moveEnergyCmdFinished(True) # # if wait: # change_egy() # else: # gevent.spawn(change_egy) def moveEnergyCmdStarted(self): self.moving = True self.emit("moveEnergyStarted", ()) def moveEnergyCmdFailed(self): self.moving = False self.emit("moveEnergyFailed", ()) def moveEnergyCmdAborted(self): pass def moveEnergyCmdFinished(self, result): self.moving = False self.emit("moveEnergyFinished", ()) def checkLimits(self, value): self.log.debug("Checking the move limits") if self.get_limits(): if value >= self.en_lims[0] and value <= self.en_lims[1]: self.log.info("Limits ok") return True logging.getLogger("user_level_log").info("Requested value is out of limits") return False # def start_move_wavelength(self, value, wait=True): # self.log.info("Moving wavelength to (%s)" % value) # return self.startMoveEnergy(12.3984 / value, wait) def cancelMoveEnergy(self): logging.getLogger("user_level_log").info("Cancel move") self.moveEnergy.abort() def set_value(self, energy, wait=True): current_en = self.get_value() pos = math.fabs(current_en - energy) if pos < 0.001: logging.getLogger("user_level_log").info( "Energy: already at %g, not moving", energy ) else: logging.getLogger("user_level_log").info( "Energy: moving energy to %g", energy ) if pos > 0.02: try: if self.ctrl: self.ctrl.change_energy(energy) else: self.execute_command("moveEnergy", energy, wait=True) except RuntimeError as AttributeError: self.energy_motor.set_value(energy) else: self.energy_motor.set_value(energy) def energyPositionChanged(self, pos): wl = 12.3984 / pos if wl: self.emit("energyChanged", (pos, wl)) self.emit("valueChanged", (pos,)) def energyStateChanged(self, state): self.emit("stateChanged", (state,))