Source code for mxcubecore.HardwareObjects.SOLEIL.PX1.PX1Environment

# -*- coding: utf-8 -*-

import logging
import time

import gevent

from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.Command.Tango import DeviceProxy
from mxcubecore.TaskUtils import task


class EnvironmentPhase:
    TRANSFER = 0
    CENTRING = 1
    COLLECT = 2
    DEFAULT = 3
    BEAMVIEW = 4
    FLUOX = 5
    MANUALTRANSFER = 6
    INPROGRESS = 7
    VISUSAMPLE = 8

    phasedesc = {
        "TRANSFER": TRANSFER,
        "CENTRING": CENTRING,
        "COLLECT": COLLECT,
        "DEFAULT": DEFAULT,
        "BEAMVIEW": BEAMVIEW,
        "FLUOX": FLUOX,
        "MANUALTRANSFER": MANUALTRANSFER,
        "INPROGRESS": INPROGRESS,
        "VISUSAMPLE": VISUSAMPLE,
    }

    @staticmethod
    def phase(phasename):
        return EnvironmentPhase.phasedesc.get(phasename, None)


class EnvironemntState:
    UNKNOWN, ON, RUNNING, ALARM, FAULT = (0, 1, 10, 13, 14)  # Like PyTango stated

    # TangoStates = {
    #    Unknown     = 0
    #    On          = 1
    #    Loaded      = 2
    #    Loading     = 3
    #    Unloading   = 4
    #    Selecting   = 5
    #    Scanning    = 6
    #    Resetting   = 7
    #    Charging    = 8
    #    Moving      = 9
    #    Running     = 10
    #    StandBy     = 11
    #    Disabled    = 12
    #    Alarm       = 13
    #    Fault       = 14
    #    Initializing= 15
    #    Closing     = 16
    #    Off         = 17
    # }

    statedesc = {ON: "ON", RUNNING: "RUNNING", ALARM: "ALARM", FAULT: "FAULT"}

    @staticmethod
    def tostring(state):
        return SampleChangerState.statedesc.get(state, "Unknown")


[docs]class PX1Environment(HardwareObject): def __init__(self, name): super().__init__(name) self.auth = None self.device = None
[docs] def init(self): self.device = DeviceProxy(self.get_property("tangoname")) try: self.state_chan = self.get_channel_object("State") self.state_chan.connect_signal("update", self.stateChanged) except KeyError: logging.getLogger().warning("%s: cannot report State", self.name()) try: self.chanAuth = self.get_channel_object("beamlineMvtAuthorized") self.chanAuth.connect_signal("update", self.setAuthorizationFlag) # state = self.state_chan.get_value() except KeyError: logging.getLogger().warning("%s: cannot report State", self.name()) try: self.usingCapillaryChannel = self.get_channel_object("usingCapillary") self.setUsingCapillary(True) except Exception: self.usingCapillaryChannel = None try: self.beamstopPositionChannel = self.get_channel_object("beamstopPosition") except Exception: self.beamstopPositionChannel = None if self.device is not None: self.set_is_ready(True) self.cmds = { EnvironmentPhase.TRANSFER: self.device.GoToTransfertPhase, EnvironmentPhase.CENTRING: self.device.GoToCentringPhase, EnvironmentPhase.COLLECT: self.device.GoToCollectPhase, EnvironmentPhase.DEFAULT: self.device.GoToDefaultPhase, # EnvironmentPhase.BEAMVIEW: self.device.GoToBeamViewPhase, EnvironmentPhase.FLUOX: self.device.GoToFluoXPhase, EnvironmentPhase.MANUALTRANSFER: self.device.GoToManualTransfertPhase, EnvironmentPhase.VISUSAMPLE: self.device.GoToVisuSamplePhase, }
# ---- begin state handling # def stateChanged(self, value): self.emit("StateChanged", (value,))
[docs] def get_state(self): state = str(self.state_chan.get_value()) return state
def isBusy(self, timeout=None): state = self.stateChan.get_value() return state not in [EnvironmentState.ON]
[docs] def wait_ready(self, timeout=None): self._waitState(["ON"], timeout)
def _waitState(self, states, timeout=None): if self.device is None: return _debut = time.time() with gevent.Timeout(timeout, Exception("Timeout waiting for device ready")): waiting = True while waiting: state = self.readState() if state in states: waiting = False gevent.sleep(0.05) # # ------- end state handling # logging.debug("PX1environment: end _waitState in %.1f sec" % (time.time() - _debut)) # ------- begin phase handling # def isPhaseTransfer(self): return self.device.readyForTransfert def isPhaseCollect(self): return self.readyForCollect() def isPhaseVisuSample(self): return self.device.readyForVisuSample def isPhaseFluoScan(self): return self.device.readyForFluoScan def readyForCentring(self): if self.device is not None: return self.device.readyForCentring else: return None def readyForCollect(self): if self.device is not None: return self.device.readyForCollect else: return None def readyForDefaultPosition(self): if self.device is not None: return self.device.readyForDefaultPosition else: return None def readyForFluoScan(self): if self.device is not None: return self.device.readyForFluoScan else: return None def readyForManualTransfer(self): if self.device is not None: return self.device.readyForManualTransfert else: return None def readyForTransfer(self): if self.device is not None: return self.device.readyForTransfert else: return None def readyForVisuSample(self): if self.device is not None: return self.device.readyForVisuSample else: return None def gotoPhase(self, phase): logging.debug("PX1environment.gotoPhase %s" % phase) cmd = self.cmds.get(phase, None) if cmd is not None: logging.debug("PX1environment.gotoPhase state %s" % self.readState()) cmd() else: return None def setPhase(self, phase, timeout=120): self.gotoPhase(phase) self.waitPhase(phase, timeout) def readPhase(self): if self.device is not None: phasename = self.device.currentPhase return EnvironmentPhase.phase(phasename) else: return None def get_current_phase(self): return self.device.currentPhase getCurrentPhase = get_current_phase def getPhase(self): if self.device is not None: phasename = self.device.currentPhase return phasename else: return None def waitPhase(self, phase, timeout=None): if self.device is None: return logging.debug("PX1environment: start waitPhase") _debut = time.time() n = 0 with gevent.Timeout( timeout, Exception("Timeout waiting for environment phase") ): waiting = True while waiting: n += 1 _phaseread = self.readPhase() if phase == _phaseread: waiting = False gevent.sleep(0.05) logging.debug( "PX1environment: end waitPhase in %.1f sec N= %d" % ((time.time() - _debut), n) ) # # ------- end phase handling def gotoCentringPhase(self): if not self.readyForCentring() or self.getPhase() != "CENTRING": self.get_command_object("GoToCentringPhase")() time.sleep(0.1) def gotoCollectPhase(self): if not self.readyForCollect() or self.getPhase() != "COLLECT": self.get_command_object("GoToCollectPhase")() time.sleep(0.1) def gotoLoadingPhase(self): if not self.readyForTransfer(): self.get_command_object("GoToTransfertPhase")() time.sleep(0.1) def gotoManualLoadingPhase(self): if not self.readyForTransfer(): self.get_command_object("GoToManualTransfertPhase")() time.sleep(0.1) def gotoSampleViewPhase(self): if not self.readyForVisuSample(): self.get_command_object("GoToVisuSamplePhase")() time.sleep(0.1) def gotoFluoScanPhase(self): if not self.readyForFluoScan(): self.get_command_object("GoToFluoScanPhase")() time.sleep(0.1) def setAuthorizationFlag(self, value): # make here the logic with eventually other permits (like hardware permit) if value != self.auth: self.log.debug( "PX1Environment. received authorization from cryotong: %s" % value ) self.auth = value self.emit("operationPermitted", value) def getUsingCapillary(self): if self.usingCapillaryChannel is not None: return self.usingCapillaryChannel.get_value() def setUsingCapillary(self, value): self.capillary_value = value gevent.spawn(self._setUsingCapillary) @task def _setUsingCapillary(self): if self.usingCapillaryChannel is not None: self.usingCapillaryChannel.set_value(self.capillary_value) def getBeamstopPosition(self): if self.beamstopPositionChannel is not None: return self.beamstopPositionChannel.get_value() def setBeamstopPosition(self, value): self.beamstop_position = value gevent.spawn(self._setBeamstopPosition) @task def _setBeamstopPosition(self): if self.beamstopPositionChannel is not None: self.beamstopPositionChannel.set_value(self.beamstop_position)
def test_hwo(hwo): t0 = time.time() print("PX1 Environment (state) ", hwo.get_state()) print(" phase is ", hwo.getCurrentPhase()) print(" beamstop pos is ", hwo.getBeamstopPosition()) # if not env.readyForTransfer(): # print "Going to transfer phase" # env.setPhase(EnvironmentPhase.TRANSFER) # print time.time() - t0 # print "done" # env.waitPhase(EnvironmentPhase.TRANSFER)