Source code for glideinwms.creation.lib.xmlConfig

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

"""
XML configuration handler
Currently used only in the Factory configuration, factoryXmlConfig,
parsing glideinXML.xml and imported elements
"""

import copy
import os
import xml.sax

from collections.abc import MutableMapping

INDENT_WIDTH = 3

LIST_TAGS = {"attrs": lambda d: d["name"], "files": lambda d: d["absfname"]}

TAG_CLASS_MAPPING = {}
DOCUMENT_ROOT = None


[docs] class Handler(xml.sax.ContentHandler): # leave file=None when parsing default xml to ignore xml file and line numbers def __init__(self, file=None): self.root = None self.ancestry = [] self.file = file
[docs] def startElement(self, name, attrs): if self.file is None: if name in LIST_TAGS: el = ListElement(name, parent=self.ancestry[-1:]) elif name in TAG_CLASS_MAPPING: el = TAG_CLASS_MAPPING[name](name) for k in list(attrs.keys()): el.attrs[k] = attrs[k] else: el = DictElement(name, parent=self.ancestry[-1:]) for k in list(attrs.keys()): el.attrs[k] = attrs[k] else: # _locator is an undocumented feature of SAX... if name in LIST_TAGS: el = ListElement(name, self.file, self._locator.getLineNumber(), parent=self.ancestry[-1:]) elif name in TAG_CLASS_MAPPING: el = TAG_CLASS_MAPPING[name](name, self.file, self._locator.getLineNumber(), parent=self.ancestry[-1:]) for k in list(attrs.keys()): el.attrs[k] = attrs[k] else: el = DictElement(name, self.file, self._locator.getLineNumber(), parent=self.ancestry[-1:]) for k in list(attrs.keys()): el.attrs[k] = attrs[k] if name == DOCUMENT_ROOT: self.root = el else: self.ancestry[-1].add_child(el) self.ancestry.append(el)
[docs] def endElement(self, name): self.ancestry.pop()
[docs] class Element: def __init__(self, tag, file="default", line_no=None, parent=None): self.tag = tag self.file = file self.line_no = line_no # Notice that in Handler.startElement the parent is passed as a slice (=>self.ancestry[:-1]) # instead of just taking the last element (=>self.ancestry[-1]). This way if self.ancestry is an # empty list we pass an empty list (instad of throwing IndexError) and we set None here self.parent = parent[0] if parent else None # children should override these (signature should be the same)
[docs] def add_child(self, child): pass
[docs] def clear_lists(self): pass
[docs] def merge_defaults(self, default): pass
[docs] def validate(self): pass
[docs] def merge(self, other): pass
[docs] def get_config_node(self): """Get the node containing the whole configuration""" # Need to import it here to avoid import loops from .factoryXmlConfig import Config config_node = None current = self while current is not None and hasattr(current, "parent"): if isinstance(current, Config): config_node = current break current = current.parent return config_node
[docs] class DictElement(Element, MutableMapping): def __init__(self, tag, *args, **kwargs): super().__init__(tag, *args, **kwargs) self.attrs = {} self.children = {} def __getitem__(self, key): return self.attrs[key] def __setitem__(self, key, value): self.attrs[key] = value def __delitem__(self, key): del self.attrs[key] # def __contains__(self, key): # return key in self.attrs def __iter__(self): return iter(self.attrs) def __len__(self): return len(self.attrs)
[docs] def has_child(self, tag): return tag in self.children
[docs] def get_child(self, tag): return self.children[tag]
[docs] def get_child_list(self, tag): return self.get_child(tag).get_children()
[docs] def add_child(self, child): self.children[child.tag] = child
[docs] def clear_lists(self): for tag in self.children: self.children[tag].clear_lists()
[docs] def merge_default_attrs(self, default): for key in default: if key not in self: self[key] = default[key]
[docs] def merge_defaults(self, default): self.merge_default_attrs(default) for tag in default.children: # xml blob completely missing from config, add it if tag not in self.children: # if its an xml list that is missing just create a new empty one if isinstance(default.children[tag], ListElement): self.children[tag] = type(default.children[tag])(tag) # otherwise clone from default else: self.children[tag] = copy.deepcopy(default.children[tag]) # zero out any xml lists self.children[tag].clear_lists() # or continue down the tree else: self.children[tag].merge_defaults(default.children[tag]) # after filling in defaults, validate the element self.validate()
# this creates references into other rather than deep copies for efficiency
[docs] def merge(self, other): self.attrs.update(other.attrs) for tag in other.children: # if completely missing just add it if tag not in self.children: self.children[tag] = other.children[tag] # otherwise merge what we have else: self.children[tag].merge(other.children[tag])
[docs] def err_str(self, str): return f"{self.file}:{self.line_no}: {self.tag}: {str}"
[docs] def check_boolean(self, flag): if self[flag] != "True" and self[flag] != "False": raise RuntimeError(self.err_str('%s must be "True" or "False"' % flag))
[docs] def check_missing(self, attr): if attr not in self: raise RuntimeError(self.err_str('missing "%s" attribute' % attr))
# TODO: Should this inherit from MutableSequence?
[docs] class ListElement(Element): def __init__(self, tag, *args, **kwargs): super().__init__(tag, *args, **kwargs) self.children = []
[docs] def get_children(self): return self.children
[docs] def add_child(self, child): self.children.append(child)
[docs] def clear_lists(self): self.children = []
[docs] def merge_defaults(self, default): for child in self.children: child.merge_defaults(default.children[0])
[docs] def check_sort_key(self): for child in self.children: try: LIST_TAGS[self.tag](child) except KeyError as e: raise RuntimeError(child.err_str('missing "%s" attribute' % e)) from None
# this creates references into other rather than deep copies for efficiency
[docs] def merge(self, other): self.check_sort_key() other.check_sort_key() self.children.sort(key=LIST_TAGS[self.tag]) other.children.sort(key=LIST_TAGS[self.tag]) new_children = [] my_size = len(self.children) other_size = len(other.children) my_count = 0 other_count = 0 while my_count < my_size and other_count < other_size: my_key = LIST_TAGS[self.tag](self.children[my_count]) other_key = LIST_TAGS[self.tag](other.children[other_count]) if my_key < other_key: new_children.append(self.children[my_count]) my_count += 1 else: new_children.append(other.children[other_count]) other_count += 1 if my_key == other_key: my_count += 1 while my_count < my_size: new_children.append(self.children[my_count]) my_count += 1 while other_count < other_size: new_children.append(other.children[other_count]) other_count += 1 self.children = new_children
[docs] class AttrElement(DictElement):
[docs] def get_val(self): if self["type"] in ("string", "expr"): return str(self["value"]) else: return int(self["value"])
[docs] def validate(self): self.check_missing("name") self.check_missing("value") if self["type"] != "string" and self["type"] != "int" and self["type"] != "expr": raise RuntimeError(self.err_str('type must be "int", "string", or "expr"')) self.check_boolean("glidein_publish") self.check_boolean("job_publish") self.check_boolean("parameter")
TAG_CLASS_MAPPING.update({"attr": AttrElement})
[docs] class FileElement(DictElement):
[docs] def validate(self): self.check_missing("absfname") if len(os.path.basename(self["absfname"])) < 1: raise RuntimeError(self.err_str("absfname is an invalid file path")) if "relfname" in self and len(self["relfname"]) < 1: raise RuntimeError(self.err_str("relfname cannot be empty")) self.check_boolean("const") self.check_boolean("executable") self.check_boolean("wrapper") self.check_boolean("untar") is_exec = eval(self["executable"]) is_wrapper = eval(self["wrapper"]) is_tar = eval(self["untar"]) try: period = int(self["period"]) except ValueError: raise RuntimeError(self.err_str("period must be an int")) from None if is_exec + is_wrapper + is_tar > 1: raise RuntimeError(self.err_str('must be exactly one of type "executable", "wrapper", or "untar"')) if (is_exec or is_wrapper or is_tar) and not eval(self["const"]): raise RuntimeError(self.err_str('type "executable", "wrapper", or "untar" requires const="True"')) if not is_exec and period > 0: raise RuntimeError(self.err_str('cannot have execution period if type is not "executable"'))
TAG_CLASS_MAPPING.update({"file": FileElement}) ####################### # # Module functions # ###################### # any modules that choose to subclass from xmlConfig should register new xml tags # and either flag them as being a list element, or associate with respective class type # as needed
[docs] def register_root(tag): global DOCUMENT_ROOT DOCUMENT_ROOT = tag
[docs] def register_list_elements(tag_list): LIST_TAGS.update(tag_list)
[docs] def register_tag_classes(map_dict): TAG_CLASS_MAPPING.update(map_dict)