Source code for glideinwms.creation.lib.factoryXmlConfig

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

import os
import xml.sax

import glideinwms.factory.glideFactorySelectionAlgorithms

from . import xmlConfig

ENTRY_INDENT = 6
CONFIG_DIR = "config.d"
FACTORY_DEFAULTS_XML = "factory_defaults.xml"

xmlConfig.register_list_elements(
    {
        "allow_frontends": lambda d: d["name"],
        "condor_tarballs": lambda d: "{},{},{}".format(d["arch"], d["os"], d["version"]),
        "entries": lambda d: d["name"],
        "entry_sets": lambda d: d["alias"],
        "frontends": lambda d: d["name"],
        "infosys_refs": lambda d: d["ref"],
        "monitorgroups": lambda d: d["group_name"],
        "monitoring_collectors": lambda d: d["DN"],
        "per_frontends": lambda d: d["name"],
        "process_logs": lambda d: d["extension"],
        "security_classes": lambda d: d["username"],
        "submit_attrs": lambda d: d["name"],
    }
)


# TODO: add a way to check for multi-attr constraints:
# TODO: e.g. GLIDEIN_ESTIMATED_CPUS should not be set if GLIDEIN_CPUS is not set or GLIDEIN_CPUS > 0
[docs] class FactAttrElement(xmlConfig.AttrElement):
[docs] def validate(self): super().validate() self.check_boolean("const") self.check_boolean("publish") is_publish = eval(self["publish"]) is_const = eval(self["const"]) is_param = eval(self["parameter"]) if is_publish and (not is_const and not is_param): raise RuntimeError(self.err_str('published attribute must either be "parameter" or "const"')) if not is_publish and (not is_const or not is_param): raise RuntimeError(self.err_str('unpublished attribute must be "const" "parameter"')) self.check_overwrite_soundness()
[docs] def check_overwrite_soundness(self): """If the attribute is defined in the global attributes section then check that the "const" key is True/False for both """ config = self.get_config_node() try: attrs = config.get_child("attrs") except KeyError: # No need to check if the configuration has no global attribute section return for att in attrs.get_children(): if att["name"] == self["name"] and att["const"] != self["const"]: entry = "Global section" if isinstance(self.parent.parent, Config) else self.parent.parent.getName() raise RuntimeError( '%s: attribute %s is already defined in the global section, but it is const="%s". ' "Please make sure the 'const' value is the same." % (entry, self["name"], att["const"]) )
xmlConfig.register_tag_classes({"attr": FactAttrElement})
[docs] class FactFileElement(xmlConfig.FileElement):
[docs] def validate(self): super().validate() # only defined in factory global entries if "after_entry" in self: self.check_boolean("after_entry")
xmlConfig.register_tag_classes({"file": FactFileElement})
[docs] class CondTarElement(xmlConfig.DictElement):
[docs] def validate(self): if "tar_file" not in self and "base_dir" not in self: raise RuntimeError(self.err_str('must either define "tar_file" or "base_dir"'))
xmlConfig.register_tag_classes({"condor_tarball": CondTarElement})
[docs] class FrontendElement(xmlConfig.DictElement):
[docs] def validate(self): self.check_missing("name") self.check_missing("identity") for sc in self.get_child_list("security_classes"): sc.check_missing("username")
xmlConfig.register_tag_classes({"frontend": FrontendElement})
[docs] class EntryElement(xmlConfig.DictElement):
[docs] def validate(self): self.check_missing("name") self.check_missing("gatekeeper") self.check_boolean("enabled") # Using parent.parent because EntryElements are inside the <entries> tag # Hence the EntrySetElement is parent.parent if isinstance(self.parent.parent, EntrySetElement): # no need to check more if the parent is an entryset return self.validate_sub_elements()
[docs] def validate_sub_elements(self): for per_fe in self.get_child("config").get_child("max_jobs").get_child_list("per_frontends"): per_fe.check_missing("name") for submit_attr in self.get_child("config").get_child("submit").get_child_list("submit_attrs"): submit_attr.check_missing("name") for allowed_fe in self.get_child_list("allow_frontends"): allowed_fe.check_missing("name") for infosys in self.get_child_list("infosys_refs"): infosys.check_missing("ref") infosys.check_missing("server") infosys.check_missing("type") for group in self.get_child_list("monitorgroups"): group.check_missing("group_name")
[docs] def getName(self): return self["name"]
xmlConfig.register_tag_classes({"entry": EntryElement})
[docs] class EntrySetElement(EntryElement):
[docs] def validate(self): self.check_missing("alias") self.check_boolean("enabled") self.validate_sub_elements() algo_name = self.get_child("config").children.get("entry_selection", {}).get("algorithm_name") if algo_name: if not (getattr(glideinwms.factory.glideFactorySelectionAlgorithms, "selectionAlgo" + algo_name, None)): raise RuntimeError( "Function name selectionAlgo%s not found in the glideFactorySelectionAlgorithms module" % algo_name )
[docs] def select(self, entry): self.selected_entry = entry
def __contains__(self, item): try: _ = self[item] return True except KeyError: return False def __getitem__(self, attrname): if getattr(self, "selected_entry", False) and attrname in self.selected_entry: val = self.selected_entry[attrname] else: val = self.attrs.get(attrname) if val is None and attrname in self.get_child_list("entries")[0]: val = self.get_child_list("entries")[0][attrname] # val = [ x[attrname] for x in self.get_child_list(u'entries') ] return val
[docs] def get_subentries(self): return self.get_child_list("entries")
[docs] def getName(self): """The name for entry sets is actaully called alias""" return self["alias"]
xmlConfig.register_tag_classes({"entry_set": EntrySetElement})
[docs] class CvmfsexecDistroElement(xmlConfig.DictElement):
[docs] def validate(self): # Using 'sources' attribute in the <cvmfsexec_distro> tag to control the enable/disable of building/rebuilding cvmfsexec distributions if not self.getSources(): if self.getPlatforms(): raise RuntimeError( self.err_str( "'platforms' attribute cannot have a value when 'sources' attribute is empty. \nTo add a cvmfsexec distribution, 'sources' must be a single value or a comma-separated list of values from the following options: {osg, egi, default}." ) ) else: if not self.getPlatforms(): self.setPlatforms()
[docs] def getSources(self): return self["sources"]
[docs] def getPlatforms(self): return self["platforms"]
[docs] def setPlatforms(self): # TODO: periodically add rhel, suse and other derivatives as supported by cvmfsexec # NOTE: ignoring rhel9-x86_64 (although supported) since el7 tools cannot work with el9 files at the moment (as suggested by Dave Dykstra) self["platforms"] = "rhel7-x86_64,rhel8-x86_64,suse15-x86_64,rhel8-aarch64,rhel8-ppc64le"
xmlConfig.register_tag_classes({"cvmfsexec_distro": CvmfsexecDistroElement})
[docs] class Config(xmlConfig.DictElement): def __init__(self, tag, *args, **kwargs): super().__init__(tag, *args, **kwargs) # dir paths should be set after validation in parse function self.submit_dir = None self.stage_dir = None self.monitor_dir = None self.log_dir = None self.client_log_dirs = None self.client_proxy_dirs = None self.web_url = None
[docs] def validate(self): self.check_missing("factory_name") self.check_boolean("factory_versioning") for mon_coll in self.get_child_list("monitoring_collectors"): mon_coll.check_missing("DN") mon_coll.check_missing("node") self.check_recoverable_exitcodes()
[docs] def check_recoverable_exitcodes(self): try: for codestr in self.get("recoverable_exitcodes", "").split(","): if not codestr: continue splitstr = codestr.split(" ") for subcode in splitstr: int(subcode) except ValueError: raise RuntimeError("recoverable_exitcodes should be list of integers. See configuration.html for more info")
####################### # # FactoryXmlConfig setter functions # ######################
[docs] def set_submit_dir(self): if eval(self["factory_versioning"]): self.submit_dir = os.path.join(self.get_child("submit")["base_dir"], "glidein_%s" % self["glidein_name"]) else: self.submit_dir = self.get_child("submit")["base_dir"]
[docs] def set_stage_dir(self): if eval(self["factory_versioning"]): self.stage_dir = os.path.join(self.get_child("stage")["base_dir"], "glidein_%s" % self["glidein_name"]) else: self.stage_dir = self.get_child("stage")["base_dir"]
[docs] def set_monitor_dir(self): if eval(self["factory_versioning"]): self.monitor_dir = os.path.join(self.get_child("monitor")["base_dir"], "glidein_%s" % self["glidein_name"]) else: self.monitor_dir = self.get_child("monitor")["base_dir"]
[docs] def set_log_dir(self): if eval(self["factory_versioning"]): self.log_dir = os.path.join(self.get_child("submit")["base_log_dir"], "glidein_%s" % self["glidein_name"]) else: self.log_dir = self.get_child("submit")["base_log_dir"]
[docs] def set_client_log_dirs(self): self.client_log_dirs = {} client_dir = self.get_child("submit")["base_client_log_dir"] glidein_name = self["glidein_name"] for fe in self.get_child("security").get_child_list("frontends"): for sc in fe.get_child_list("security_classes"): self.client_log_dirs[sc["username"]] = os.path.join( client_dir, "user_%s" % sc["username"], "glidein_%s" % glidein_name )
[docs] def set_client_proxy_dirs(self): self.client_proxy_dirs = {} client_dir = self.get_child("submit")["base_client_proxies_dir"] glidein_name = self["glidein_name"] for fe in self.get_child("security").get_child_list("frontends"): for sc in fe.get_child_list("security_classes"): self.client_proxy_dirs[sc["username"]] = os.path.join( client_dir, "user_%s" % sc["username"], "glidein_%s" % glidein_name )
[docs] def set_web_url(self): if eval(self["factory_versioning"]): self.web_url = os.path.join(self.get_child("stage")["web_base_url"], "glidein_%s" % self["glidein_name"]) else: self.web_url = self.get_child("stage")["web_base_url"]
[docs] def set_num_factories(self): if eval(self["factory_versioning"]): self.num_factories = os.path.join( self.get_child("submit")["num_factories"], "glidein_%s" % self["glidein_name"] ) else: self.num_factories = self.get_child("submit")["num_factories"] self.num_factories = int(self.num_factories)
####################### # # FactoryXmlConfig getter functions # ######################
[docs] def get_submit_dir(self): return self.submit_dir
[docs] def get_stage_dir(self): return self.stage_dir
[docs] def get_monitor_dir(self): return self.monitor_dir
[docs] def get_log_dir(self): return self.log_dir
[docs] def get_client_log_dirs(self): return self.client_log_dirs
[docs] def get_client_proxy_dirs(self): return self.client_proxy_dirs
[docs] def get_web_url(self): return self.web_url
[docs] def get_entries(self): try: entries = self.get_child_list("entries") except KeyError: entries = [] try: entry_sets = self.get_child_list("entry_sets") except KeyError: entry_sets = [] return entries + entry_sets
xmlConfig.register_tag_classes({"glidein": Config}) xmlConfig.register_root("glidein") ####################### # # Module functions # ######################
[docs] def parse(file): """parse a config file The root element (glidein) is registered as type Config :param file: file path :return: the document root :rtype: Config """ # pylint: disable=maybe-no-member conf = _parse(file) conf_dir_path = os.path.join(os.path.dirname(file), CONFIG_DIR) if os.path.exists(conf_dir_path): files = sorted(os.listdir(conf_dir_path)) for f in files: if f.endswith(".xml"): conf.merge(_parse(os.path.join(conf_dir_path, f))) # assume FACTORY_DEFAULTS_XML is in factoryXmlConfig module directory conf_def = _parse(os.path.join(os.path.dirname(__file__), FACTORY_DEFAULTS_XML), True) conf.merge_defaults(conf_def) # now that we are validated, set the directories conf.set_submit_dir() conf.set_stage_dir() conf.set_monitor_dir() conf.set_log_dir() conf.set_client_log_dirs() conf.set_client_proxy_dirs() conf.set_web_url() conf.set_num_factories() return conf
# this parse function is for internal usage # it does not merge defaults or validate
[docs] def _parse(file, default=False): """ The root element (glidein) is registered as type Config :param file: file path :param default: if True return a default configuration (no file) :return: the document root :rtype: Config """ parser = xml.sax.make_parser() if default: handler = xmlConfig.Handler() else: handler = xmlConfig.Handler(file) parser.setContentHandler(handler) parser.parse(file) return handler.root