Source code for mxcubecore.HardwareObjects.mockup.MicrodiffInOutMockup

import logging
import time

from mxcubecore.BaseHardwareObjects import HardwareObject

"""
Use the exporter to set different MD2 actuators in/out.
If private_state not specified, True will be send to set in and False for out.
Example xml file:
<object class="MicrodiffInOut">
  <username>Scintilator</username>
  <exporter_address>wid30bmd2s:9001</exporter_address>
  <cmd_name>ScintillatorPosition</cmd_name>
  <private_state>{"PARK":"out", "SCINTILLATOR":"in"}</private_state>
  <use_hwstate>True</use_hwstate>
</object>
"""


[docs]class MicrodiffInOutMockup(HardwareObject): def __init__(self, name): super().__init__(name) self.actuatorState = "unknown" self.username = "unknown" # default timeout - 3 sec self.timeout = 3 self.hwstate_attr = None
[docs] def init(self): self.cmd_name = self.get_property("cmd_name") self.username = self.get_property("username") self.states = {True: "in", False: "out"} self.state_attr = False self.offset = self.get_property("offset", 0) if self.offset > 0: self.states = {self.offset: "out", self.offset - 1: "in"} states = self.get_property("private_state") if states: import ast self.states = ast.literal_eval(states) try: tt = float(self.get_property("timeout")) self.timeout = tt except Exception: self.log.exception("") self.moves = dict((self.states[k], k) for k in self.states)
def connect_notify(self, signal): if signal == "actuatorStateChanged": self.value_changed(self.state_attr) def value_changed(self, value): self.actuatorState = self.states.get(value, "unknown") self.emit("actuatorStateChanged", (self.actuatorState,)) def _ready(self): return True def _wait_ready(self, timeout=None): timeout = timeout or self.timeout tt1 = time.time() while time.time() - tt1 < timeout: if self._ready(): break else: time.sleep(0.5) def get_actuator_state(self, read=False): if read is True: value = self.state_attr self.actuatorState = self.states.get(value, "unknown") self.connect_notify("actuatorStateChanged") else: if self.actuatorState == "unknown": self.connect_notify("actuatorStateChanged") return self.actuatorState def actuatorIn(self, wait=True, timeout=None): if self._ready(): try: self.state_attr = self.moves["in"] if wait: timeout = timeout or self.timeout self._wait_ready(timeout) self.value_changed(self.state_attr) except Exception: logging.getLogger("user_level_log").error( "Cannot put %s in", self.username ) else: logging.getLogger("user_level_log").error( "Microdiff is not ready, will not put %s in", self.username ) def actuatorOut(self, wait=True, timeout=None): if self._ready(): try: self.state_attr = self.moves["out"] if wait: timeout = timeout or self.timeout self._wait_ready(timeout) self.value_changed(self.state_attr) except Exception: logging.getLogger("user_level_log").error( "Cannot put %s out", self.username ) else: logging.getLogger("user_level_log").error( "Microdiff is not ready, will not put %s out", self.username )