Source code for mxcubecore.HardwareObjects.MicrodiffInOut

import logging
import time
import warnings

from mxcubecore.BaseHardwareObjects import HardwareObject

"""
Use the exporter to set different MD2 actuators in/out.
If private_state not specified, True will be send to set in and False for out.
Example xml file:
<object class="MicrodiffInOut">
  <username>Scintilator</username>
  <exporter_address>wid30bmd2s:9001</exporter_address>
  <cmd_name>ScintillatorPosition</cmd_name>
  <private_state>{"PARK":"out", "SCINTILLATOR":"in"}</private_state>
  <use_hwstate>True</use_hwstate>
</object>
"""


[docs]class MicrodiffInOut(HardwareObject): def __init__(self, name): super().__init__(name) self.actuatorState = "unknown" self.username = "unknown" # default timeout - 5 sec self.timeout = 5 self.hwstate_attr = None
[docs] def init(self): self.cmd_name = self.get_property("cmd_name") self.username = self.get_property("username") self.cmd_attr = self.add_channel( {"type": "exporter", "name": "move"}, self.cmd_name ) self.cmd_attr.connect_signal("update", self.value_changed) self.statecmd_name = self.get_property("statecmd_name") if self.statecmd_name is None: self.statecmd_name = self.cmd_name self.state_attr = self.add_channel( {"type": "exporter", "name": "state"}, self.statecmd_name ) self.state_attr.connect_signal("update", self.value_changed) self.states = {True: "in", False: "out"} self.offset = self.get_property("offset", 0) if self.offset > 0: self.states = {self.offset: "out", self.offset - 1: "in"} states = self.get_property("private_state") if states: import ast self.states = ast.literal_eval(states) tt = self.get_property("timeout") if tt is None: warnings.warn( "%s: %s is not configured" % (self.__class__.__name__, "timeout") ) else: try: tt = float(tt) except TypeError: self.log.exception("") self.timeout = tt if self.get_property("use_hwstate"): self.hwstate_attr = self.add_channel( {"type": "exporter", "name": "hwstate"}, "HardwareState" ) self.swstate_attr = self.add_channel( {"type": "exporter", "name": "swstate"}, "State" ) self.moves = dict((self.states[k], k) for k in self.states) self.get_actuator_state(read=True)
def connect_notify(self, signal): if signal == "actuatorStateChanged": self.value_changed(self.state_attr.get_value()) def value_changed(self, value): self.actuatorState = self.states.get(value, "unknown") self.emit("actuatorStateChanged", (self.actuatorState,)) def _ready(self): if self.hwstate_attr: if ( self.hwstate_attr.get_value() == "Ready" and self.swstate_attr.get_value() == "Ready" ): return True else: if self.swstate_attr.get_value() == "Ready": return True return False def _wait_ready(self, timeout=None): timeout = timeout or self.timeout tt1 = time.time() while time.time() - tt1 < timeout: if self._ready(): break else: time.sleep(0.5) def get_actuator_state(self, read=False): if read is True: value = self.state_attr.get_value() self.actuatorState = self.states.get(value, "unknown") return self.actuatorState def actuatorIn(self, wait=True, timeout=None): if self._ready(): try: self.cmd_attr.set_value(self.moves["in"]) if wait: timeout = timeout or self.timeout self._wait_ready(timeout) self.value_changed(self.state_attr.get_value()) except Exception: logging.getLogger("user_level_log").error( "Cannot put %s in", self.username ) else: logging.getLogger("user_level_log").error( "Microdiff is not ready, will not put %s in", self.username ) def actuatorOut(self, wait=True, timeout=None): if self._ready(): try: self.cmd_attr.set_value(self.moves["out"]) if wait: timeout = timeout or self.timeout self._wait_ready(timeout) self.value_changed(self.state_attr.get_value()) except Exception: logging.getLogger("user_level_log").error( "Cannot put %s out", self.username ) else: logging.getLogger("user_level_log").error( "Microdiff is not ready, will not put %s out", self.username )