# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""Frontend config related classes
"""
import os.path
import urllib.error
import urllib.parse
import urllib.request
from glideinwms.creation.lib.matchPolicy import MatchPolicy
from glideinwms.lib import hashCrypto, util
############################################################
#
# Configuration
#
############################################################
[docs]
class FrontendConfig:
def __init__(self):
# set default values
# user should modify if needed
self.frontend_descript_file = "frontend.descript"
self.group_descript_file = "group.descript"
self.params_descript_file = "params.cfg"
self.attrs_descript_file = "attrs.cfg"
self.signature_descript_file = "signatures.sha1"
self.signature_type = "sha1"
self.history_file = "history.pk"
self.cache_dir = "schedd_ads_cache"
# global configuration of the module
frontendConfig = FrontendConfig()
############################################################
#
# Helper function
#
############################################################
[docs]
def get_group_dir(base_dir, group_name):
return os.path.join(base_dir, "group_" + group_name)
############################################################
#
# Generic Class
# You most probably don't want to use these
#
############################################################
# loads a file or URL composed of
# NAME VAL
# and creates
# self.data[NAME]=VAL
# It also defines:
# self.config_file="name of file"
# If validate is defined, also defines
# self.hash_value
[docs]
class ConfigFile:
"""Load a file or URL composed of NAME VAL lines
and create the data dictionary
self.data[NAME]=VAL
Also define:
self.config_file="name of file"
If validate is defined, also define a variable with the file hash:
self.hash_value
"""
def __init__(self, config_dir, config_file, convert_function=repr, validate=None):
"""Define, load and derive a config file
Args:
config_dir (str|bytes): directory of the config file
config_file (str|bytes): config file name/URI
convert_function: function converting each line value
validate (None|tuple): hash algorithm, value tuple (hash_algo,value)
"""
self.config_dir = config_dir
self.config_file = config_file
self.data = {}
self.load(os.path.join(config_dir, config_file), convert_function, validate)
self.derive()
[docs]
def open(self, fname):
"""Open the config file/URI. Used in self.load()
Args:
fname (str|bytes): URL or file path
Returns:
"""
if (fname[:5] == "http:") or (fname[:6] == "https:") or (fname[:4] == "ftp:"):
# one of the supported URLs
return urllib.request.urlopen(fname)
else:
# local file
return open(fname)
[docs]
def validate_func(self, data, validate, fname):
"""Validate the data
Args:
data (str): data to validate
validate (None|tuple): hash algorithm, value tuple
fname (str): file/URI, used only in the error message
Raises:
OSError: if the hash calculated is different from the provided one
"""
if validate is not None:
vhash = hashCrypto.get_hash(validate[0], data)
self.hash_value = vhash
if (validate[1] is not None) and (vhash != validate[1]):
raise OSError(
"Failed validation of '%s'. Hash %s computed to '%s', expected '%s'"
% (fname, validate[0], vhash, validate[1])
)
[docs]
def load(self, fname, convert_function, validate=None):
"""Load the config file/URI.
The file/URI is a series of NAME VALUE lines or comment lines (starting with #)
The hash algorithm and value are used to validate the file content.
The convert_function is used to convert the value of each line
Args:
fname (str|bytes): URL or file path
convert_function: function converting the line value
validate (None|tuple): if defined, must be (hash_algo,value)
"""
self.data = {}
with self.open(fname) as fd:
data = fd.read()
self.validate_func(data, validate, fname)
lines = data.splitlines()
del data
for line in lines:
if line[0] == "#":
continue # comment
if len(line.strip()) == 0:
continue # empty line
self.split_func(line, convert_function)
[docs]
def split_func(self, line, convert_function):
"""Loads the file line in the data dictionary
The first word is the key, the rest of the line the value, converted by the convert_function
Args:
line (str): line to load
convert_function: function converting the line value
"""
larr = line.split(None, 1)
lname = larr[0]
if len(larr) == 1:
lval = ""
else:
lval = larr[1]
exec(f"self.data['{lname}']={convert_function(lval)}")
[docs]
def derive(self):
return # by default, do nothing
def __str__(self):
output = "\n"
for key in list(self.data.keys()):
output += f"{key} = {str(self.data[key])}, ({type(self.data[key])})\n"
return output
# load from the group subdir
[docs]
class GroupConfigFile(ConfigFile):
"""Config file from the group subdirectory"""
def __init__(self, base_dir, group_name, config_file, convert_function=repr, validate=None):
"""Define, load and derive a config file from the group subdirectory
Args:
base_dir (str|bytes): directory of the config file
group_name (str): group name
config_file (str|bytes): config file name/URI
convert_function: function converting each line value
validate (None|tuple): hash algorithm, value tuple (hash_algo,value)
"""
ConfigFile.__init__(self, get_group_dir(base_dir, group_name), config_file, convert_function, validate)
self.group_name = group_name
# load both the main and group subdir config file
# and join the results
# Also defines:
# self.group_hash_value, if group_validate defined
[docs]
class JoinConfigFile(ConfigFile):
"""Joint main and group configuration"""
def __init__(
self, base_dir, group_name, config_file, convert_function=repr, main_validate=None, group_validate=None
):
"""Define, load and derive both the main and group subdir config file and join the results
Also define:
self.group_hash_value, if group_validate defined
Args:
base_dir (str|bytes): directory of the config file
group_name (str): group name
config_file (str|bytes): config file name/URI
convert_function: function converting each line value
main_validate (None|tuple): hash algorithm, value tuple (hash_algo,value)
group_validate (None|tuple): hash algorithm, value tuple (hash_algo,value)
"""
ConfigFile.__init__(self, base_dir, config_file, convert_function, main_validate)
self.group_name = group_name
group_obj = GroupConfigFile(base_dir, group_name, config_file, convert_function, group_validate)
if group_validate is not None:
self.group_hash_value = group_obj.hash_value
# merge by overriding whatever is found in the subdir
for k in list(group_obj.data.keys()):
self.data[k] = group_obj.data[k]
############################################################
#
# Configuration
#
############################################################
[docs]
class FrontendDescript(ConfigFile):
"""Description of the Frontand
Only one
Content comes from the global configuration
File name: frontend.descript
cWDictFile.StrDictFile defined in cvWDictFile.get_main_dicts()
"""
def __init__(self, config_dir):
global frontendConfig
ConfigFile.__init__(
self, config_dir, frontendConfig.frontend_descript_file, repr
) # convert everything in strings
[docs]
class ElementDescript(GroupConfigFile):
"""Description of a Frontend group
One per group/element
Content comes from the group configuration
File name: group.descript (in the group subdirectory - group_GROUPNAME)
cWDictFile.StrDictFile defined in cvWDictFile.get_group_dicts()
"""
def __init__(self, base_dir, group_name):
global frontendConfig
GroupConfigFile.__init__(
self, base_dir, group_name, frontendConfig.group_descript_file, repr
) # convert everything in strings
[docs]
class ParamsDescript(JoinConfigFile):
"""Global and grup parameters in a Frontend
One per group/element
Content has `parameter="True"` in the <attrs> sections in the global and group configuration
Files: params.cfg in the main directory and group subdirectory
cvWDictFile.ParamsDictFile defined in cvWDictFile.get_common_dicts()
"""
def __init__(self, base_dir, group_name):
global frontendConfig
JoinConfigFile.__init__(
self,
base_dir,
group_name,
frontendConfig.params_descript_file,
lambda s: "('%s',%s)" % tuple(s.split(None, 1)),
) # split the array
self.const_data = {}
self.expr_data = {} # original string
self.expr_objs = {} # compiled object
for k in list(self.data.keys()):
type_str, val = self.data[k]
if type_str == "EXPR":
try:
self.expr_objs[k] = compile(val, "<string>", "eval")
except SyntaxError:
self.expr_objs[k] = '""'
raise RuntimeError("Syntax error in parameter %s" % k)
self.expr_data[k] = val
elif type_str == "CONST":
self.const_data[k] = val
else:
raise RuntimeError(f"Unknown parameter type '{type_str}' for '{k}'!")
[docs]
class AttrsDescript(JoinConfigFile):
"""Global and grup attributes in a Frontend
One per group/element
Content comes from the <attrs> sections in the global and group configuration
Files: attrs.cfg in the main directory and group subdirectory
cWDictFile.ReprDictFile defined in cvWDictFile.get_common_dicts()
"""
def __init__(self, base_dir, group_name):
global frontendConfig
JoinConfigFile.__init__(
self, base_dir, group_name, frontendConfig.attrs_descript_file, str
) # they are already in python form
# this one is the special frontend work dir signature file
[docs]
class SignatureDescript(ConfigFile):
def __init__(self, config_dir):
global frontendConfig
ConfigFile.__init__(
self, config_dir, frontendConfig.signature_descript_file, None
) # Not used, redefining split_func
self.signature_type = frontendConfig.signature_type
[docs]
def split_func(self, line, convert_function):
larr = line.split(None)
if len(larr) != 3:
raise RuntimeError("Invalid line (expected 3 elements, found %i)" % len(larr))
self.data[larr[2]] = (larr[0], larr[1])
# this one is the generic hash descript file
[docs]
class BaseSignatureDescript(ConfigFile):
def __init__(self, config_dir, signature_fname, signature_type, validate=None):
ConfigFile.__init__(self, config_dir, signature_fname, None, validate) # Not used, redefining split_func
self.signature_type = signature_type
[docs]
def split_func(self, line, convert_function):
larr = line.split(None, 1)
if len(larr) != 2:
raise RuntimeError("Invalid line (expected 2 elements, found %i)" % len(larr))
lval = larr[1]
self.data[lval] = larr[0]
############################################################
#
# Processed configuration
#
############################################################
[docs]
class ElementMergedDescript:
"""Selective merge of global and group configuration
not everything is merged
the old element in the global configuration can still be accessed
"""
def __init__(self, base_dir, group_name):
self.frontend_data = FrontendDescript(base_dir).data
if group_name not in self.frontend_data["Groups"].split(","):
raise RuntimeError("Group '{}' not supported: {}".format(group_name, self.frontend_data["Groups"]))
self.element_data = ElementDescript(base_dir, group_name).data
self.group_name = group_name
self._merge()
#################
# Private
[docs]
def _merge(self):
self.merged_data = {}
for t in ("JobSchedds",):
self.merged_data[t] = self._split_list(self.frontend_data[t]) + self._split_list(self.element_data[t])
if len(self.merged_data[t]) == 0:
raise RuntimeError("Found empty %s!" % t)
for t in ("FactoryCollectors",):
self.merged_data[t] = eval(self.frontend_data[t]) + eval(self.element_data[t])
if len(self.merged_data[t]) == 0:
raise RuntimeError("Found empty %s!" % t)
for t in ("FactoryQueryExpr", "JobQueryExpr"):
self.merged_data[t] = f"({self.frontend_data[t]}) && ({self.element_data[t]})"
for data in (self.frontend_data, self.element_data):
if "MatchPolicyModule%s" % t in data:
self.merged_data[t] = "({}) && ({})".format(self.merged_data[t], data["MatchPolicyModule%s" % t])
# PM: TODO: Not sure why FactoryMatchAttrs was not in the list below
# To get complete list of FactoryMatchAttrs you need to merge it
for t in ("JobMatchAttrs", "FactoryMatchAttrs"):
attributes = []
names = []
match_attrs_list = eval(self.frontend_data[t]) + eval(self.element_data[t])
for data in (self.frontend_data, self.element_data):
if "MatchPolicyModule%s" % t in data:
match_attrs_list += eval(data["MatchPolicyModule%s" % t])
for el in match_attrs_list:
el_name = el[0]
if el_name not in names:
attributes.append(el)
names.append(el_name)
self.merged_data[t] = attributes
for t in ("MatchExpr",):
self.merged_data[t] = f"({self.frontend_data[t]}) and ({self.element_data[t]})"
self.merged_data[t + "CompiledObj"] = compile(self.merged_data[t], "<string>", "eval")
self.merged_data["MatchPolicyModules"] = []
if "MatchPolicyFile" in self.frontend_data:
self.merged_data["MatchPolicyModules"].append(MatchPolicy(self.frontend_data["MatchPolicyFile"]))
if "MatchPolicyFile" in self.element_data:
self.merged_data["MatchPolicyModules"].append(MatchPolicy(self.element_data["MatchPolicyFile"]))
# We use default ProxySelectionPlugin
self.merged_data["ProxySelectionPlugin"] = "ProxyAll"
for t in ("ProxySelectionPlugin", "SecurityName", "IDTokenLifetime", "IDTokenKeyname"):
for data in (self.frontend_data, self.element_data):
if t in data:
self.merged_data[t] = data[t]
proxies = []
# switching the order, so that the group credential will
# be chosen before the global credential when ProxyFirst is used.
for data in (self.element_data, self.frontend_data):
if "Proxies" in data:
proxies += eval(data["Proxies"])
self.merged_data["Proxies"] = proxies
proxy_descript_attrs = [
"ProxySecurityClasses",
"ProxyTrustDomains",
"ProxyTypes",
"CredentialGenerators",
"ProxyKeyFiles",
"ProxyPilotFiles",
"ProxyVMIds",
"ProxyVMTypes",
"ProxyCreationScripts",
"ProxyUpdateFrequency",
"ProxyVMIdFname",
"ProxyVMTypeFname",
"ProxyRemoteUsernames",
"ProxyProjectIds",
]
for attr in proxy_descript_attrs:
proxy_descript_data = {}
for data in (self.frontend_data, self.element_data):
if attr in data: # was data.has_key(attr):
dprs = eval(data[attr])
for k in list(dprs.keys()):
proxy_descript_data[k] = dprs[k]
self.merged_data[attr] = proxy_descript_data
return
[docs]
@staticmethod
def _split_list(val):
if val == "None":
return []
elif val == "":
return []
else:
return val.split(",")
[docs]
class GroupSignatureDescript:
def __init__(self, base_dir, group_name):
self.group_name = group_name
sd = SignatureDescript(base_dir)
self.signature_data = sd.data
self.signature_type = sd.signature_type
fd = sd.data["main"]
self.frontend_descript_fname = fd[1]
self.frontend_descript_signature = fd[0]
gd = sd.data["group_%s" % group_name]
self.group_descript_fname = gd[1]
self.group_descript_signature = gd[0]
[docs]
class StageFiles:
def __init__(self, base_URL, descript_fname, validate_algo, signature_hash):
self.base_URL = base_URL
self.validate_algo = validate_algo
self.stage_descript = ConfigFile(
base_URL, descript_fname, repr, (validate_algo, None)
) # just get the hash value... will validate later
self.signature_descript = BaseSignatureDescript(
base_URL, self.stage_descript.data["signature"], validate_algo, (validate_algo, signature_hash)
)
if self.stage_descript.hash_value != self.signature_descript.data[descript_fname]:
raise OSError(
"Descript file %s signature invalid, expected'%s' got '%s'"
% (descript_fname, self.signature_descript.data[descript_fname], self.stage_descript.hash_value)
)
[docs]
def get_stage_file(self, fname, repr):
return ConfigFile(self.base_URL, fname, repr, (self.validate_algo, self.signature_descript.data[fname]))
[docs]
def get_file_list(self, list_type): # example list_type == 'preentry_file_list'
if list_type not in self.stage_descript.data:
raise KeyError(f"Unknown list type '{list_type}'; valid typtes are {list(self.stage_descript.data.keys())}")
list_fname = self.stage_descript.data[list_type]
return self.get_stage_file(list_fname, lambda x: x.split(None, 4))
# this class knows how to interpret some of the files in the Stage area
[docs]
class ExtStageFiles(StageFiles):
def __init__(self, base_URL, descript_fname, validate_algo, signature_hash):
StageFiles.__init__(self, base_URL, descript_fname, validate_algo, signature_hash)
self.preentry_file_list = None
[docs]
def get_constants(self):
self.load_preentry_file_list()
return self.get_stage_file(self.preentry_file_list.data["constants.cfg"][0], repr)
[docs]
def get_condor_vars(self):
self.load_preentry_file_list()
return self.get_stage_file(self.preentry_file_list.data["condor_vars.lst"][0], lambda x: x.split(None, 6))
# internal
[docs]
def load_preentry_file_list(self):
if self.preentry_file_list is None:
self.preentry_file_list = self.get_file_list("preentry_file_list")
# else, nothing to do
# this class knows how to interpret some of the files in the Stage area
# Will parrpopriately merge the main and the group ones
[docs]
class MergeStageFiles:
def __init__(
self,
base_URL,
validate_algo,
main_descript_fname,
main_signature_hash,
group_name,
group_descript_fname,
group_signature_hash,
):
self.group_name = group_name
self.main_stage = ExtStageFiles(base_URL, main_descript_fname, validate_algo, main_signature_hash)
self.group_stage = ExtStageFiles(
get_group_dir(base_URL, group_name), group_descript_fname, validate_algo, group_signature_hash
)
[docs]
def get_constants(self):
main_consts = self.main_stage.get_constants()
group_consts = self.group_stage.get_constants()
# group constants override the main ones
for k in list(group_consts.data.keys()):
main_consts.data[k] = group_consts.data[k]
main_consts.group_name = self.group_name
main_consts.group_hash_value = group_consts.hash_value
return main_consts
[docs]
def get_condor_vars(self):
main_cv = self.main_stage.get_condor_vars()
group_cv = self.group_stage.get_condor_vars()
# group condor_vars override the main ones
for k in list(group_cv.data.keys()):
main_cv.data[k] = group_cv.data[k]
main_cv.group_name = self.group_name
main_cv.group_hash_value = group_cv.hash_value
return main_cv
############################################################
#
# The FrontendGroups may want to preserve some state between
# iterations/invocations. The HistoryFile class provides
# the needed support for this.
#
# There is no fixed schema in the class itself;
# the FrontedGroup is free to store any arbitrary dictionary
# in it.
#
############################################################
[docs]
class HistoryFile:
def __init__(self, base_dir, group_name, load_on_init=True, default_factory=None):
"""
The default_factory semantics is the same as the one in
collections.defaultdict
"""
self.base_dir = base_dir
self.group_name = group_name
self.fname = os.path.join(get_group_dir(base_dir, group_name), frontendConfig.history_file)
self.default_factory = default_factory
# cannot use collections.defaultdict directly
# since it is only supported starting python 2.5
self.data = {}
if load_on_init:
self.load()
[docs]
def load(self, raise_on_error=False):
try:
# using it only for convenience (expiration, ... not used)
data = util.file_pickle_load(self.fname)
except Exception:
if raise_on_error:
raise
else:
# default to empty history on error
data = {}
if not isinstance(data, dict):
if raise_on_error:
raise TypeError("History object not a dictionary: %s" % str(type(data)))
else:
# default to empty history on error
data = {}
self.data = data
[docs]
def save(self, raise_on_error=False):
# There is no concurrency, so does not need to be done atomically
# Anyway we want to avoid to write an empty file on top of a
# saved state because of an exception
try:
util.file_pickle_dump(self.fname, self.data)
except Exception:
if raise_on_error:
raise
# else, just ignore
[docs]
def has_key(self, keyid):
return keyid in self.data
def __contains__(self, keyid):
return keyid in self.data
def __getitem__(self, keyid):
try:
return self.data[keyid]
except KeyError:
if self.default_factory is None:
raise # no default initialization, just fail
# i have the initialization function, use it
self.data[keyid] = self.default_factory()
return self.data[keyid]
def __setitem__(self, keyid, val):
self.data[keyid] = val
def __delitem__(self, keyid):
del self.data[keyid]
[docs]
def empty(self):
self.data = {}
[docs]
def get(self, keyid, defaultval=None):
return self.data.get(keyid, defaultval)