Source code for mxcubecore.HardwareObjects.LNLS.BlueskyHttpServer

from mxcubecore.BaseHardwareObjects import HardwareObject


[docs]class BlueskyHttpServer(HardwareObject): def __init__(self, name): HardwareObject.__init__(self, name)
[docs] def init(self): super().init() self.api = self.get_command_object("bluesky_http_server") self.api.username = "mnc-data"
def execute_plan(self, plan_name, kwargs=None): response = self.api.execute_plan(plan_name=plan_name, kwargs=kwargs) response_content = response.json() if response_content["success"]: try: self.api.monitor_manager_state("executing_queue") self.api.monitor_manager_state("idle") except TimeoutError: self.log.exception("The Bluesky plan has timed out!") else: self.log.exception(response_content["msg"]) def monitor_bluesky_state(self, response, state): response_content = response.json() if response_content["success"]: try: self.api.monitor_manager_state(state) except TimeoutError: self.log.exception("The Bluesky action has timed out!") else: self.log.exception(response_content["msg"]) def pause(self, option="deferred"): response = self.api.pause_plan(option) self.monitor_bluesky_state(response, state="paused")
[docs] def abort(self): if self.api.status()["manager_state"] == "executing_queue": self.pause(option="immediate") response = self.api.abort_plan() self.monitor_bluesky_state(response, state="idle")
def resume(self): response = self.api.resume_plan() self.monitor_bluesky_state(response, state="executing_queue")