Source code for mxcubecore.HardwareObjects.MAXIV.MachInfo

import logging
import math
import re

import gevent
from tango import DeviceProxy

from mxcubecore.HardwareObjects.abstract.AbstractMachineInfo import AbstractMachineInfo
from mxcubecore.utils.units import (
    A_to_mA,
    sec_to_hour,
)

# how often we refresh machine info
REFRESH_PERIOD_SEC = 30
CLEANR = re.compile("<.*?>")


log = logging.getLogger("HWR")


def cleanhtml(raw_html):
    cleantext = re.sub(CLEANR, "", raw_html)
    return cleantext


[docs]def catch_errors(func): """ run wrapped function, catching all exception If an exception as raised, log the exception and return 'unknown' """ def wrapper(*a, **kw): try: return func(*a, **kw) except Exception: log.exception("error fetching machine info") return "unknown" return wrapper
[docs]class MachInfo(AbstractMachineInfo): """ Machine info hardware object for MAXIV site. Provides to the user general information about the machine, such as ring status, current, operator message, etc. This hardware objects fetches the information from attributes of specified tango devices. Hardware object properties: mach_info (str): name of the machine status tango device current (str): name of the ring status tango device parameters (str): topics to export, see AbstractMachineInfo class for details """ def __init__(self, name): super().__init__(name) self.mach_info = None self.mach_curr = None
[docs] def init(self): super().init() self.mach_info = self._get_tango_device("mach_info") self.mach_curr = self._get_tango_device("current") gevent.spawn(self._refresh_ticker)
def _get_tango_device(self, property_name: str) -> DeviceProxy: dev_name = self.get_property(property_name) try: return DeviceProxy(dev_name) except Exception: log.exception(f"error connecting to machine info tango device {dev_name}") def _refresh_ticker(self): while True: self.update_value() gevent.sleep(REFRESH_PERIOD_SEC)
[docs] @catch_errors def get_current(self) -> str: current = A_to_mA(self.mach_curr.Current) return f"{current:.2f} mA"
@catch_errors def get_fillmode(self) -> str: return self.mach_info.R3Mode
[docs] @catch_errors def get_message(self) -> str: return self.mach_info.OperatorMessage
[docs] @catch_errors def get_lifetime(self) -> str: lifetime = self.mach_curr.Lifetime if math.isnan(lifetime): return "n/a" return f"{sec_to_hour(lifetime):.2f} h"
@catch_errors def get_injection(self) -> str: return self.mach_info.R3NextInjection @catch_errors def get_status(self) -> str: message = cleanhtml(self.mach_info.MachineMessage) message = message.replace("R1", " R1").replace("Linac", " Linac") return message