# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"
import logging
import xml.sax
from xml.sax.handler import ContentHandler
from mxcubecore import BaseHardwareObjects
CURRENT_XML = None
new_objects_classes = {
"equipment": BaseHardwareObjects.Equipment,
"device": BaseHardwareObjects.Device,
"procedure": BaseHardwareObjects.Procedure,
}
[docs]def parse(filename, name):
"""[summary]
Args:
filename ([type]): [description]
name ([type]): [description]
Returns:
[type]: [description]
"""
cur_handler = HardwareObjectHandler(name)
global CURRENT_XML
try:
xml_file = open(filename)
CURRENT_XML = xml_file.read()
except Exception:
CURRENT_XML = None
xml.sax.parse(filename, cur_handler)
return cur_handler.get_hardware_object()
[docs]def parse_string(xml_hardware_object, name):
"""[summary]
Args:
xml_hardware_object ([type]): [description]
name ([type]): [description]
Returns:
[type]: [description]
"""
global CURRENT_XML
CURRENT_XML = xml_hardware_object
cur_handler = HardwareObjectHandler(name)
xml.sax.parseString(str.encode(xml_hardware_object), cur_handler)
return cur_handler.get_hardware_object()
[docs]def load_module(hardware_object_name):
"""[summary]
Args:
hardware_object_name ([type]): [description]
Returns:
[type]: [description]
"""
return __import__(hardware_object_name, globals(), locals(), [""])
[docs]def instanciate_class(module_name, class_name, object_name):
"""[summary]
Args:
module_name ([type]): [description]
class_name ([type]): [description]
object_name ([type]): [description]
Returns:
[type]: [description]
"""
module = load_module(module_name)
if module is None:
return
else:
try:
class_obj = getattr(module, class_name)
except AttributeError:
logging.getLogger("HWR").error(
"No class %s in module %s", class_name, module_name
)
else:
# check the XML
if module.__doc__ is not None and CURRENT_XML is not None:
i = module.__doc__.find("template:")
if i >= 0:
xml_template = module.__doc__[i + 10 :]
xml_structure_retriever = XmlStructureRetriever()
xml.sax.parseString(CURRENT_XML, xml_structure_retriever)
current_structure = xml_structure_retriever.get_structure()
xml_structure_retriever = XmlStructureRetriever()
xml.sax.parseString(xml_template, xml_structure_retriever)
template_structure = xml_structure_retriever.get_structure()
if not template_structure == current_structure:
logging.getLogger("HWR").error(
"%s: XML file does not match the %s class template"
% (object_name, class_name)
)
return
try:
new_instance = class_obj(object_name)
except Exception:
logging.getLogger("HWR").exception(
"Cannot instanciate class %s", class_name
)
else:
return new_instance
[docs]class HardwareObjectHandler(ContentHandler):
def __init__(self, name):
"""[summary]
Args:
name ([type]): [description]
"""
ContentHandler.__init__(self)
self.name = name
self.class_error = False
self.objects = []
self.reference = ""
self.property = ""
self.element_is_a_reference = False
self.element_role = None
self.buffer = ""
self.path = ""
self.previous_path = ""
self.hwr_import_reference = None
[docs] def get_hardware_object(self):
"""[summary]
Returns:
[type]: [description]
"""
if self.hwr_import_reference is not None:
return self.hwr_import_reference
elif len(self.objects) == 1:
return self.objects[0]
[docs] def startElement(self, name, attrs):
"""[summary]
Args:
name ([type]): [description]
attrs ([type]): [description]
"""
if self.class_error:
return
self.buffer = ""
if len(self.objects) == 0:
object_name = self.name
else:
object_name = name
assert not self.element_is_a_reference
self.element_role = None
self.property = ""
self.command = {}
self.channel = {}
#
# determine path to the new object
#
self.path += "/" + str(name) + "[%d]"
i = self.previous_path.rfind("[")
if i >= 0 and self.path[:-4] == self.previous_path[:i]:
object_index = int(self.previous_path[i + 1 : -1]) + 1
else:
object_index = 1 # XPath indexes begin at 1
self.path %= object_index
_attrs = attrs
attrs = {}
for k in list(_attrs.keys()):
v = str(_attrs[k])
if v == "None":
attrs[str(k)] = None
else:
try:
attrs[str(k)] = int(v)
except Exception:
try:
attrs[str(k)] = float(v)
except Exception:
if v == "False":
attrs[str(k)] = False
elif v == "True":
attrs[str(k)] = True
else:
attrs[str(k)] = v
if name == "hwr_import":
self.hwr_import_reference = attrs["href"]
if "role" in attrs:
self.element_role = attrs["role"]
if name == "device":
# maybe we have to add the DeviceContainer mix-in class to each node of
# the Hardware Object hierarchy
i = len(self.objects) - 1
while i >= 0 and not isinstance(
self.objects[i], BaseHardwareObjects.DeviceContainer
):
# newClass = new.classobj("toto", (self.objects[i].__class__,) + self.objects[i].__class__.__bases__ + (BaseHardwareObjects.DeviceContainer, ), {})
# TODO replace deprecated DeviceContainerNode with a different class
self.objects[i].__class__ = BaseHardwareObjects.DeviceContainerNode
i -= 1
#
# is element a reference to another hardware object ?
#
ref = "hwrid" in attrs and attrs["hwrid"] or "href" in attrs and attrs["href"]
if ref:
self.element_is_a_reference = True
self.reference = str(ref)
if self.reference.startswith("../"):
self.reference = "/".join(
self.name.split("/")[:-1] + [self.reference[3:]]
)
elif self.reference.startswith("./"):
self.reference = "/".join(
self.name.split("/")[:-1] + [self.reference[2:]]
)
return
if name in new_objects_classes:
if "class" in attrs:
module_name = str(attrs["class"])
class_name = module_name.split(".")[-1]
new_object = instanciate_class(module_name, class_name, object_name)
if new_object is None:
self.class_error = True
return
else:
new_object.set_path(self.path)
self.objects.append(new_object)
else:
new_object_class = new_objects_classes[name]
new_object = new_object_class(object_name)
new_object.set_path(self.path)
self.objects.append(new_object)
elif name == "command":
if "name" in attrs and "type" in attrs:
# short command notation
self.command.update(attrs)
else:
# long command notation (allow arguments)
self.objects.append(BaseHardwareObjects.HardwareObjectNode(object_name))
elif name == "channel":
if "name" in attrs and "type" in attrs:
self.channel.update(attrs)
else:
if len(self.objects) == 0:
if "class" in attrs:
module_name = str(attrs["class"])
class_name = module_name.split(".")[-1]
new_object = instanciate_class(module_name, class_name, object_name)
if new_object is None:
self.class_error = True
return
else:
new_object = BaseHardwareObjects.HardwareObject(object_name)
new_object.set_path(self.path)
self.objects.append(new_object)
"""
# maybe we can create a HardwareObject ? be strict for the moment...
logging.getLogger("HWR").error("%s: unknown Hardware Object type (should be one of %s)", object_name, str(new_objects_classes.keys()))
self.class_error = True
return
"""
else:
new_object = BaseHardwareObjects.HardwareObjectNode(object_name)
new_object.set_path(self.path)
self.objects.append(new_object)
self.property = name # element is supposed to be a Property
[docs] def characters(self, content):
"""[summary]
Args:
content ([type]): [description]
"""
if self.class_error:
return
self.buffer += str(content)
[docs] def endElement(self, name):
"""[summary]
Args:
name ([type]): [description]
"""
if self.class_error:
return
name = str(name)
if self.element_is_a_reference:
if len(self.objects) > 0:
self.objects[-1].add_reference(
name, self.reference, role=self.element_role
)
self.objects[0].add_reference(
name, self.reference, role=self.element_role
)
else:
try:
if name == "command":
if len(self.command) > 0:
if len(self.objects) > 0:
self.objects[-1].add_command(
self.command, self.buffer, add_now=False
)
else:
if len(self.objects) > 1:
self.objects[-2].add_command(
self.objects.pop(), add_now=False
)
elif name == "channel":
if len(self.channel) > 0:
if len(self.objects) > 0:
self.objects[-1].add_channel(
self.channel, self.buffer, add_now=False
)
elif name == self.property:
del self.objects[-1] # remove empty object
self.objects[-1]._set_property(name, self.buffer) # noqa: SLF001
else:
if len(self.objects) == 1:
return
if len(self.objects) > 1:
self.objects[-2]._add_object( # noqa: SLF001
name, self.objects[-1], role=self.element_role
)
if len(self.objects) > 0:
del self.objects[-1]
except Exception:
logging.getLogger("HWR").exception(
"%s: error while creating Hardware Object from XML file", self.name
)
self.element_is_a_reference = False
self.element_role = None
self.buffer = ""
self.previous_path = self.path
self.path = self.path[
: self.path.rfind("/")
] # remove last added name and suffix
class XMLStructure:
def __init__(self):
self.xmlpaths = set()
self.attributes = {}
def add(self, xml_path, attributes_set):
"""[summary]
Args:
xml_path ([type]): [description]
attributes_set ([type]): [description]
"""
self.xmlpaths.add(xml_path)
if len(attributes_set) > 0:
self.attributes[xml_path] = attributes_set
def __eq__(self, s):
"""[summary]
Args:
s ([type]): [description]
Returns:
[type]: [description]
"""
if self.xmlpaths.issubset(s.xmlpaths):
for xml_path, attribute_set in self.attributes.items():
try:
attribute_set_2 = s.attributes[xml_path]
except KeyError:
return False
else:
if not attribute_set.issubset(attribute_set_2):
return False
return True
else:
return False
[docs]class XmlStructureRetriever(ContentHandler):
def __init__(self):
ContentHandler.__init__(self)
self.path = ""
self.previous_path = ""
self.current_attributes = set()
self.structure = XMLStructure()
[docs] def get_structure(self):
"""[summary]
Returns:
[type]: [description]
"""
return self.structure
[docs] def startElement(self, name, attrs):
"""[summary]
Args:
name ([type]): [description]
attrs ([type]): [description]
"""
self.path += "/" + str(name) + "[%d]"
i = self.previous_path.rfind("[")
if i >= 0 and self.path[:-4] == self.previous_path[:i]:
index = int(self.previous_path[i + 1 : -1]) + 1
else:
index = 1 # XPath indexes begin at 1
self.path %= index
for attr, value in list(attrs.items()):
if str(attr) == "hwrid":
attr = "href"
self.current_attributes.add("%s=%s" % (str(attr), str(value)))
[docs] def endElement(self, name):
"""[summary]
Args:
name ([type]): [description]
"""
self.structure.add(self.path, self.current_attributes)
self.previous_path = self.path
self.path = self.path[: self.path.rfind("/")]