Source code for mxcubecore.HardwareRepository

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""Gives access to the Hardware Objects contained in the Hardware Repository database

The Hardware Repository database is a set of XML files describing devices, equipment
and procedures on a beamline. Each XML file represent a Hardware Object.
The Hardware Repository module provides access to these Hardware Objects, and manages
connections to the Control Software (Spec or Taco Device Servers).
"""

from __future__ import (
    absolute_import,
    print_function,
)

import importlib
import logging
import os
import sys
import time
import traceback
import weakref
from datetime import datetime
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Optional,
    Union,
)
from warnings import warn

from ruamel.yaml import YAML

from mxcubecore import (
    BaseHardwareObjects,
    HardwareObjectFileParser,
)
from mxcubecore.dispatcher import dispatcher
from mxcubecore.protocols_config import setup_commands_channels
from mxcubecore.utils.conversion import (
    make_table,
    string_types,
)

if TYPE_CHECKING:
    from mxcubecore.BaseHardwareObjects import HardwareObject

# If you want to write out copies of the file, use typ="rt" instead
# pure=True uses yaml version 1.2, with fewer gotchas for strange type conversions
yaml = YAML(typ="safe", pure=True)
# The following are not needed for load, but define the default style.
yaml.default_flow_style = False
yaml.indent(mapping=2, sequence=4, offset=2)


__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"


_instance = None
TIMERS = []

beamline = None
#
# Supported beamline configuration filenames.
# The first name in the list takes precedence.
#
BEAMLINE_CONFIG_FILES = ["beamline.yaml", "beamline_config.yml"]


[docs]def load_from_yaml( configuration_file, role, yaml_export_directory: Optional[Path] = None, _container=None, _table=None, ): """ Args: configuration_file (str): role (str): Role name of configured object, used as its name _container (ConfiguredObject): Container object for recursive loading _table Optional[List]: Internal, collecting summary output Returns: """ global beamline column_names = ("role", "Class", "file", "Time (ms)", "Comment") if _table is None: # This is the topmopst call _table = [] start_time = time.time() msg0 = "" result = None class_name = "None" # Get full path for configuration file if _instance is None: raise RuntimeError("HardwareRepository has not been initialised") configuration_path = _instance.find_in_repository(configuration_file) if configuration_path is None: msg0 = "File not found" if not msg0: # Load the configuration file with open(configuration_path, "r") as fp0: configuration = yaml.load(fp0) class_import = configuration.pop("class", None) if not class_import: if _container: msg0 = "No 'class' tag" else: # at top level we want to get the actual error raise ValueError("%s file lacks 'class' tag" % configuration_file) if not msg0: module_name, class_name = class_import.rsplit(".", 1) # For "a.b.c" equivalent to absolute import of "from a.b import c" try: cls = getattr(importlib.import_module(module_name), class_name) except Exception as ex: if _container: msg0 = "Error importing class" class_name = class_import print( "Encountered Exception (continuing):\n%s" % traceback.format_exc() ) else: # at top level we want to get the actual error raise if not msg0: try: # instantiate object result = cls(name=role) result._hwobj_container = _container except Exception: if _container: msg0 = "Error instantiating %s" % cls.__name__ print( "Encountered Exception (continuing):\n%s" % traceback.format_exc() ) else: # at top level we want to get the actual error raise if _container is None: # We are loading the beamline object into HardwareRepository # and want the link to be set before _init or content loading beamline = result if not msg0: try: config = configuration.pop("configuration", {}) # Set configuration with non-object properties. result._config = result.HOConfig(**config) # Initialise object result._init() except Exception: if _container: msg0 = "Error in %s._init()" % cls.__name__ else: # at top level we want to get the actual error raise if not msg0: # Recursively load contained objects (of any type that the system can support) objects = configuration.pop("objects", {}) setup_commands_channels(result, configuration) if _container is None: load_time = 1000 * (time.time() - start_time) msg1 = "Start loading contents:" _table.append( (role, class_name, configuration_file, "%.1d" % load_time, msg1) ) msg0 = "Done loading contents" for role1, config_file in objects.items(): fname, fext = os.path.splitext(config_file) if fext in (".yaml", ".yml"): fname = f"/{fname}" hwobj = load_from_yaml( config_file, role=role1, yaml_export_directory=yaml_export_directory, _container=result, _table=_table, ) if hwobj: # only add if we successfully loaded the object _instance.hardware_objects[fname] = hwobj elif fext == ".xml": msg1 = "" time0 = time.time() class_name1 = "None" try: hwobj = _instance.get_hardware_object(fname) if hwobj is None: msg1 = "No object loaded" else: class_name1 = hwobj.__class__.__name__ _attach_xml_objects(yaml_export_directory, result, hwobj, role1) except Exception as ex: msg1 = "Loading error (%s)" % str(ex) load_time = 1000 * (time.time() - time0) _table.append( (role1, class_name1, config_file, "%.1d" % load_time, msg1) ) if not msg0: if _container: if not hasattr(_container, role): warn( f"load_from_yaml Class {_container.__class__.__name__} has no attribute {role}", stacklevel=2, ) _container._hwobj_by_role[role] = result try: # Initialise object result.init() except Exception: if _container: msg0 = "Error in %s.init()" % cls.__name__ else: # at top level we want to get the actual error raise load_time = 1000 * (time.time() - start_time) _table.append((role, class_name, configuration_file, "%.1d" % load_time, msg0)) if _container is None: print(make_table(column_names, _table)) elif yaml_export_directory and result: _export_draft_config_file(yaml_export_directory, result) return result
def _export_draft_config_file(dest_dir: Path, hwobj): def write_yaml(data, file_name: str): dest_dir.mkdir(parents=True, exist_ok=True) yaml.dump(data, Path(dest_dir, file_name)) result = { "class": "%s.%s" % (hwobj.__class__.__module__, hwobj.__class__.__name__), } objects_by_role = hwobj.objects_by_role if objects_by_role: objects = result["objects"] = {} for role, obj in objects_by_role.items(): try: objects[role] = "%s.yaml" % obj.id except Exception: logging.getLogger("HWR").exception("") config = result["configuration"] = {} for tag, val in hwobj.config.model_dump().items(): if tag not in objects_by_role: config[tag] = val # noqa: PERF403 write_yaml(result, "%s.yaml" % hwobj.id) def _attach_xml_objects(yaml_export_directory: Optional[Path], container, hwobj, role): """Recursively attach XML-configured object to container as role NBNB guard against duplicate objects""" hwobj._hwobj_container = container hwobj._name = role container._hwobj_by_role[role] = hwobj objects_by_role = hwobj._objects_by_role for role2, hwobj2 in objects_by_role.items(): _attach_xml_objects(yaml_export_directory, hwobj, hwobj2, role2) if yaml_export_directory and hwobj: _export_draft_config_file(yaml_export_directory, hwobj) def _convert_xml_property(hwobj): """Convert complex xml-configured object""" result = {} result.update(hwobj.get_properties()) for tag in hwobj._objects_names(): # NB this does NOT allow having HardwareObjects inside complex properties objs = list(hwobj._get_objects(tag)) result[tag] = [_convert_xml_property(obj) for obj in objs] return result def _create_config_for_xml_hwobj(hwobj: BaseHardwareObjects.HardwareObjectNode): """ Populate hwobj._config attribute for an HWOBJ loaded with XML configure file. This allows to access HWOBJ configuration uniformly for both YAML and XML configured objects, using its 'config' attribute. """ hwobj._config = hwobj.HOConfig(**hwobj.get_properties()) objects_by_role = hwobj._objects_by_role for tag in hwobj._objects_names(): if tag not in objects_by_role: # Complex object, not contained hwobj objs = [_convert_xml_property(obj) for obj in hwobj._get_objects(tag)] if len(objs) == 1: setattr(hwobj.config, tag, objs[0]) else: setattr(hwobj.config, tag, objs)
[docs]def add_hardware_objects_dirs(ho_dirs): """Adds directories with xml/yaml config files Args: ho_dirs ([type]): [description] """ if isinstance(ho_dirs, list): new_ho_dirs = list(filter(os.path.isdir, list(map(os.path.abspath, ho_dirs)))) for new_ho_dir in reversed(new_ho_dirs): if new_ho_dir not in sys.path: sys.path.insert(0, new_ho_dir)
[docs]def set_user_file_directory(user_file_directory): """Sets user file directory. Args: user_file_directory (str): absolute path to user file directory """ BaseHardwareObjects.HardwareObjectNode.set_user_file_directory(user_file_directory)
[docs]def init_hardware_repository( configuration_path: str, yaml_export_directory: Optional[Path] = None, ): """Initialise hardware repository - must be run at program start Args: configuration_path (str): PATHSEP-separated string of directories giving configuration file lookup path yaml_export_directory: if specified, loaded hardware objects configuration will be written to this directory, as YAML files Returns: """ global _instance global beamline if _instance is not None or beamline is not None: raise RuntimeError( "init_hardware_repository called on already initialised repository" ) if not configuration_path: logging.getLogger("HWR").error( "Unable to initialize hardware repository. No configuration path passed." ) return # If configuration_path is a string of combined paths, split it up lookup_path = [ os.path.abspath(os.path.expanduser(x)) for x in configuration_path.split(os.path.pathsep) ] lookup_path = [x for x in lookup_path if os.path.exists(x)] if lookup_path: configuration_path = lookup_path logging.getLogger("HWR").info("Hardware repository: %s", configuration_path) _instance = __HardwareRepositoryClient(configuration_path) _instance.connect() beamline_config_file = _instance.find_beamline_config_file() if beamline_config_file.endswith(".yml"): warn( f"Config file '{beamline_config_file}' have deprecated extension 'yml', " "change to 'yaml'." ) beamline = load_from_yaml( beamline_config_file, role="beamline", yaml_export_directory=yaml_export_directory, )
def uninit_hardware_repository(): global _instance, beamline _instance = None beamline = None
[docs]def get_hardware_repository(): """ Get the HardwareRepository (singleton) instance, Returns: HardwareRepository: The Singleton instance of HardwareRepository (in reality __HardwareRepositoryClient) """ if _instance is None: raise RuntimeError("The HardwareRepository has not been initialised") return _instance
class __HardwareRepositoryClient: """Hardware Repository class Warning -- should not be instantiated directly ; call the module's level get_hardware_repository() function instead """ def __init__(self, server_address): """Constructor server_address needs to be the HWR server address (host:port) or a list of paths where to find XML files locally (when server is not in use) """ self.server_address = server_address self.required_hardware_objects = {} self.xml_source = {} self.__connected = False self.server = None self.hwobj_info_list = [] self.invalid_hardware_objects = None self.hardware_objects = None def connect(self): if self.__connected: return try: self.invalid_hardware_objects = set() self.hardware_objects = weakref.WeakValueDictionary() self.server = None finally: self.__connected = True def find_beamline_config_file(self) -> str | None: """Find beamline configuration file name. Look in the repository paths for beamline config file. Returns: The found config file name, or None if not found. """ for file_name in BEAMLINE_CONFIG_FILES: file = self.find_in_repository(file_name) if file is not None: return Path(file).name return None def find_in_repository(self, relative_path): """Finds absolute path of a file or directory matching relativePath in one of the hardwareRepository directories Will work for any file or directory, but intended for configuration files that do NOT match the standard XML file system""" if self.server: logging.getLogger("HWR").error( "Cannot find file in repository - server is in use" ) return else: if relative_path.startswith(os.path.sep): relative_path = relative_path[1:] for xml_files_path in self.server_address: file_path = os.path.join(xml_files_path, relative_path) if os.path.exists(file_path): return os.path.abspath(file_path) return def require(self, mnemonics_list): """Download a list of Hardware Objects in one go""" self.required_hardware_objects = {} if not self.server: return try: t0 = time.time() mnemonics = ",".join([repr(mne) for mne in mnemonics_list]) except Exception: logging.getLogger("HWR").exception( "Could not execute 'require' on Hardware Repository server" ) def _load_hardware_object(self, hwobj_name=""): """ Load a Hardware Object. Do NOT use externally, as this will mess up object tracking, signals, etc. :param hwobj_name: string name of the Hardware Object to load, e.g. /motors/m0 :return: the loaded Hardware Object, or None if it fails """ comment = "" class_name = "" hwobj_instance = None xml_data = "" for xml_files_path in self.server_address: file_name = ( hwobj_name[1:] if hwobj_name.startswith(os.path.sep) else hwobj_name ) file_path = os.path.join(xml_files_path, file_name) + os.path.extsep + "xml" if os.path.exists(file_path): try: xml_data = open(file_path, "r").read() except Exception: pass break start_time = datetime.now() if xml_data: try: hwobj_instance = self.parse_xml(xml_data, hwobj_name) if isinstance(hwobj_instance, string_types): # We have redirection to another file # Enter in dictionaries also under original names result = self._load_hardware_object(hwobj_instance) if hwobj_name in self.invalid_hardware_objects: self.invalid_hardware_objects.remove(hwobj_name) self.hardware_objects[hwobj_name] = result return result except Exception: comment = "Cannot parse xml" logging.getLogger("HWR").exception( "Cannot parse XML file for Hardware Object %s", hwobj_name ) else: if hwobj_instance is not None: self.xml_source[hwobj_name] = xml_data dispatcher.send("hardwareObjectLoaded", hwobj_name, self) def hardwareObjectDeleted(name=hwobj_instance.name): logging.getLogger("HWR").debug( "%s Hardware Object has been deleted from Hardware Repository", name, ) del self.hardware_objects[name] hwobj_instance.resolve_references() try: hwobj_instance._add_channels_and_commands() except Exception: logging.getLogger("HWR").exception( "Error while adding commands and/or channels to Hardware Object %s", hwobj_name, ) comment = "Failed to add all commands and/or channels" try: _create_config_for_xml_hwobj(hwobj_instance) hwobj_instance._init() hwobj_instance.init() class_name = str(hwobj_instance.__module__) except Exception: logging.getLogger("HWR").exception( 'Cannot initialize Hardware Object "%s"', hwobj_name ) self.invalid_hardware_objects.add(hwobj_instance.name) hwobj_instance = None comment = "Failed to init class" else: if hwobj_instance.load_name in self.invalid_hardware_objects: self.invalid_hardware_objects.remove(hwobj_instance.name) self.hardware_objects[hwobj_instance.load_name] = hwobj_instance else: logging.getLogger("HWR").error( "Failed to load Hardware object %s", hwobj_name ) comment = "Loading failed" else: logging.getLogger("HWR").error( 'Cannot load Hardware Object "%s" : file not found.', hwobj_name ) end_time = datetime.now() time_delta = end_time - start_time self.hwobj_info_list.append( ( hwobj_name, class_name, "%d ms" % (time_delta.microseconds / 1000), comment, ) ) return hwobj_instance def discard_hardware_object(self, ho_name): """Remove a Hardware Object from the Hardware Repository Parameters : ho_name -- the name of the Hardware Object to remove Emitted signals : hardwareObjectDiscarded (<object name>) -- emitted when the object has been removed """ try: del self.hardware_objects[ho_name] except KeyError: pass try: self.invalid_hardware_objects.remove(ho_name) except Exception: pass try: del self.required_hardware_objects[ho_name] except KeyError: pass dispatcher.send("hardwareObjectDiscarded", ho_name, self) def parse_xml(self, xml_string, ho_name): """Load a Hardware Object from its XML string representation Parameters : xml_string -- the XML string ho_name -- the name of the Hardware Object to load (i.e. '/motors/m0') Return : the Hardware Object, or None if it fails """ try: hardware_obj = HardwareObjectFileParser.parse_string(xml_string, ho_name) except Exception: logging.getLogger("HWR").exception( "Cannot parse Hardware Repository file %s", ho_name ) else: return hardware_obj def update(self, name, updatesList): """[summary] Args: name ([type]): [description] updatesList ([type]): [description] """ # TODO: update without HWR server if self.server is not None and self.server.isSpecConnected(): self.server.send_msg_cmd_with_return( 'xml_multiwrite("%s", "%s")' % (name, str(updatesList)) ) else: logging.getLogger("HWR").error( "Cannot update Hardware Object %s : not connected to server", name ) def rewrite_xml(self, name, xml): """[summary] Args: name ([type]): [description] xml ([type]): [description] """ # TODO: rewrite without HWR server if self.server is not None and self.server.isSpecConnected(): self.server.send_msg_cmd_with_return( 'xml_writefile("%s", %s)' % (name, repr(xml)) ) self.xml_source[name] = xml else: logging.getLogger("HWR").error( "Cannot update Hardware Object %s : not connected to server", name ) def __getitem__(self, item): """[summary] Args: item ([type]): [description] Raises: KeyError: [description] Returns: [type]: [description] """ if item == "equipments": return self.get_equipments() elif item == "procedures": return self.get_procedures() elif item == "devices": return self.get_devices() else: return self.get_hardware_object(item) raise KeyError def get_equipments(self): """Return the list of the currently loaded Equipment Hardware Objects""" eq_list = [] for ho_name in self.hardware_objects: if self.is_equipment(ho_name): eq_list.append(self.hardware_objects[ho_name]) return eq_list def get_procedures(self): """Return the list of the currently loaded Procedures Hardware Objects""" result = [] for ho_name in self.hardware_objects: if self.is_procedure(ho_name): result.append(self.hardware_objects[ho_name]) return result def get_devices(self): """Return the list of the currently loaded Devices Hardware Objects""" result = [] for ho_name in self.hardware_objects: if self.is_device(ho_name): result.append(self.hardware_objects[ho_name]) return result def get_hardware_object(self, object_name: str) -> Union["HardwareObject", None]: """Return a Hardware Object given its name. If the object is not in the Hardware Repository, try to load it. Args: object_name (str): The name of the Hardware Object Returns: Union[HardwareObject, None]: The required Hardware Object """ if not object_name: return None if not object_name.startswith("/"): object_name = "/" + object_name try: if object_name: if object_name in self.invalid_hardware_objects: return None if object_name in self.hardware_objects: hardware_obj = self.hardware_objects[object_name] else: hardware_obj = self._load_hardware_object(object_name) return hardware_obj except TypeError as err: logging.getLogger("HWR").exception( "could not get Hardware Object %s", object_name ) def get_equipment(self, equipment_name): """Return an Equipment given its name (see get_hardware_object())""" return self.get_hardware_object(equipment_name) def get_device(self, device_name): """Return a Device given its name (see get_hardware_object())""" return self.get_hardware_object(device_name) def get_procedure(self, procedure_name): """Return a Procedure given its name (see get_hardware_object())""" return self.get_hardware_object(procedure_name) def is_device(self, name): """Check if a Hardware Object is a Device Parameters : name -- name of the Hardware Object to test Return : True if the Hardware Object is a Device, False otherwise """ try: return isinstance(self.hardware_objects[name], BaseHardwareObjects.Device) except Exception: return False def is_procedure(self, name): """Check if a Hardware Object is a Procedure Parameters : name -- name of the Hardware Object to test Return : True if the Hardware Object is a Procedure, False otherwise """ try: return isinstance( self.hardware_objects[name], BaseHardwareObjects.Procedure ) except Exception: return False def is_equipment(self, name): """Check if a Hardware Object is an Equipment Parameters : name -- name of the Hardware Object to test Return : True if the Hardware Object is an Equipment, False otherwise """ try: return isinstance( self.hardware_objects[name], BaseHardwareObjects.Equipment ) except Exception: return False def has_hardware_object(self, name): """Check if the Hardware Repository contains an object Parameters : name -- name of the Hardware Object Return : True if HardwareObject is loaded in the Hardware Repository, False otherwise """ return name in self.hardware_objects def get_info(self, name): """Return a dictionary with information about the specified Hardware Object Parameters : name -- name of the Hardware Object Return : a dictionary containing information about the Hardware Object """ try: hardware_obj = self.hardware_objects[name] except KeyError: return {} else: hardware_obj_class = hardware_obj.__class__.__name__ d = { "class": hardware_obj_class, "python module": sys.modules[hardware_obj.__module__].__file__, } if hasattr(hardware_obj, "is_ready"): d["is ready ?"] = str(hardware_obj.is_ready()) if hasattr(hardware_obj, "get_commands"): # hardware object is a command container d["commands"] = {} for cmd in hardware_obj.get_commands(): if cmd.__class__.__name__ == "SpecCommand": d["commands"][cmd.userName()] = { "type": "spec", "version": "%s:%s" % ( cmd.connection.host, cmd.connection.port or cmd.connection.scanname, ), "connected ?": cmd.isSpecConnected() and "yes" or "no", "macro or function": str(cmd.command), } elif cmd.__class__.__name__ == "TacoCommand": dd = {"type": "taco", "device": cmd.device_name} try: dd["imported ?"] = "yes" if cmd.device.imported else "no" except Exception: dd["imported ?"] = "no, invalid Taco device" dd["device method"] = str(cmd.command) d["commands"][cmd.userName()] = dd elif cmd.__class__.__name__ == "TangoCommand": d["commands"][cmd.userName()] = { "type": "tango", "device": cmd.device_name, "imported ?": ( "no, invalid Tango device" if cmd.device is None else "yes" ), "device method": str(cmd.command), } d["channels"] = {} for chan in hardware_obj.get_channels(): if chan.__class__.__name__ == "SpecChannel": d["channels"][chan.userName()] = { "type": "spec", "version": "%s:%s" % ( chan.connection.host, chan.connection.port or chan.connection.scanname, ), "connected ?": chan.isSpecConnected() and "yes" or "no", "variable": str(chan.varName), } elif chan.__class__.__name__ == "TangoChannel": d["channels"][chan.userName()] = { "type": "tango", "device": chan.device_name, "imported ?": chan.device is not None and "yes" or "no, invalid Tango device or attribute name", "attribute": str(chan.attribute_name), } if "SpecMotorA" in [ klass.__name__ for klass in hardware_obj.__class__.__bases__ ]: d["spec version"] = hardware_obj.specVersion d["motor mnemonic"] = hardware_obj.specName try: d["connected ?"] = ( "yes" if hardware_obj.connection.isSpecConnected() else "no" ) except Exception: d["connected ?"] = "no" if isinstance(hardware_obj, BaseHardwareObjects.DeviceContainer): d["children"] = {} for ho in hardware_obj.get_devices(): try: d["children"][ho.load_name] = self.get_info(ho.load_name) except Exception: continue return d def end_polling(self): """Stop all pollers Warning : should not be used directly (finalization purposes only) """ return def close(self): """'close' the Hardware Repository Discards all Hardware Objects """ self.end_polling() self.hardware_objects = weakref.WeakValueDictionary() def timerEvent(self, t_ev): try: global TIMERS func_ref = TIMERS[t_ev.timerId()] func = func_ref() if func is None: # self.killTimer(t_ev.timerId()) del TIMERS[t_ev.timerId()] else: try: func() except Exception: logging.getLogger("HWR").exception( "an error occured while calling timer function" ) except Exception: logging.getLogger("HWR").exception("an error occured inside the timerEvent") def print_report(self): longest_cols = [ (max([len(str(row[i])) for row in self.hwobj_info_list]) + 3) for i in range(len(self.hwobj_info_list[0])) ] row_format = "| ".join( ["{:<" + str(longest_col) + "}" for longest_col in longest_cols] ) print("+", "=" * sum(longest_cols), "+") print("| %s" % row_format.format(*("xml", "Class", "Load time", "Comment"))) print("+", "=" * sum(longest_cols), "+") for row in sorted(self.hwobj_info_list): print("| %s" % row_format.format(*row)) print("+", "=" * sum(longest_cols), "+") def reload_hardware_objects(self): """ Reloads all modified modules. Package reimport is used to detect modified modules. Hardware objects that correspond to these modules: 1. are disconnected from gui 2. imported in this module with __import__ and reimport is called 3. connected back to the gui channels """ # NOTE # reimport is supported for python 2.x and not by python 3.x # if needed a similar package for 3x could be used. In this case # code depends on a platform: platform.python_version()[0] > 2 ... # NB reload_hardware_objects does NOT work with beamline_opbject # and other yaml configs import reimport modified_modules = reimport.modified() for hwr_obj in self.hardware_objects: for item in modified_modules: if self.hardware_objects[hwr_obj].__module__ == item: try: connections = self.hardware_objects[hwr_obj].connect_dict for sender in connections: self.hardware_objects[hwr_obj].disconnect( sender, connections[sender]["signal"], connections[sender]["slot"], ) logging.getLogger("HWR").debug( "HardwareRepository: %s disconnected from GUI", item ) self.hardware_objects[hwr_obj].clear_gevent() except Exception: logging.getLogger("HWR").exception( "HardwareRepository: Unable to disconnect hwobj %s", item ) continue try: __import__(item, globals(), locals(), [], -1) reimport.reimport(item) logging.getLogger("HWR").debug( "HardwareRepository: %s reloaded", item ) except Exception: logging.getLogger("HWR").exception( "HardwareRepository: Unable to reload module %s", item ) try: for sender in connections: self.hardware_objects[hwr_obj].connect( sender, connections[sender]["signal"], connections[sender]["slot"], ) logging.getLogger("HWR").debug( "HardwareRepository: %s connected to GUI", item ) except Exception: logging.getLogger("HWR").exception( "HardwareRepository: Unable to connect hwobj %s", item ) try: self.hardware_objects[hwr_obj].init() self.hardware_objects[hwr_obj].re_emit_values() logging.getLogger("HWR").debug( "HardwareRepository: %s initialized and updated", item ) except Exception: logging.getLogger("HWR").exception( "HardwareRepository: Unable to initialize hwobj %s", item )