# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
# Desscription:
# This module contains the create_glidein params class
import copy
import os
# import sys
import os.path
# import string
import socket
from glideinwms.lib import condorExe
from glideinwms.lib.util import safe_boolcomp
from glideinwms.lib.xmlParse import OrderedDict
from . import cWParams
######################################################
[docs]
class GlideinParams(cWParams.CommonParams):
"""
Contains all the factory configuration values as params. Used in create_glideins and recreate_glideins.
"""
[docs]
def init_defaults(self):
"""
Populates the defaults for all the factory configuration values.
"""
self.init_support_defaults()
# Defaults for allowing frontends in a whitelist
# in the factory config (per entry point)
self.allow_defaults = cWParams.CommentedOrderedDict()
self.allow_defaults["name"] = (None, "string", "frontend name", None)
self.allow_defaults["security_class"] = ("All", "string", "security class", None)
# publishing specific to factory
self.attr_defaults["publish"] = ("True", "Bool", "Should it be published by the factory?", None)
self.attr_defaults["const"] = (
"True",
"Bool",
"Should it be constant? (Else it can be overriden by the frontend. Used only if parameter is True.)",
None,
)
self.infosys_defaults = cWParams.CommentedOrderedDict()
self.infosys_defaults["type"] = (None, "RESS|BDII", "Type of information system", None)
self.infosys_defaults["server"] = (None, "host", "Location of the infosys server", None)
self.infosys_defaults["ref"] = (None, "id", "Referenced for the entry point in the infosys", None)
self.mongroup_defaults = cWParams.CommentedOrderedDict()
self.mongroup_defaults["group_name"] = (None, "groupname", "Name of the monitoring group", None)
entry_config_defaults = cWParams.CommentedOrderedDict()
entry_config_max_jobs_defaults = cWParams.CommentedOrderedDict()
max_jobs_per_entry_defaults = cWParams.CommentedOrderedDict()
max_jobs_per_entry_defaults["glideins"] = (
"10000",
"nr",
"Maximum number of concurrent glideins (per entry) that can be submitted.",
None,
)
max_jobs_per_entry_defaults["idle"] = (
"2000",
"nr",
"Maximum number of idle glideins (per entry) allowed.",
None,
)
max_jobs_per_entry_defaults["held"] = (
"1000",
"nr",
"Maximum number of held glideins (per entry) before forcing the cleanup.",
None,
)
entry_config_max_jobs_defaults["per_entry"] = max_jobs_per_entry_defaults
max_jobs_default_per_frontend_defaults = cWParams.CommentedOrderedDict()
max_jobs_default_per_frontend_defaults["glideins"] = (
"5000",
"nr",
"Maximum number of concurrent glideins (default per frontend) that can be submitted.",
None,
)
max_jobs_default_per_frontend_defaults["idle"] = (
"100",
"nr",
"Maximum number of idle glideins (default per frontend) allowed.",
None,
)
max_jobs_default_per_frontend_defaults["held"] = (
"50",
"nr",
"Maximum number of held glideins (default per frontend) before forcing the cleanup.",
None,
)
entry_config_max_jobs_defaults["default_per_frontend"] = max_jobs_default_per_frontend_defaults
max_jobs_per_frontend_defaults = cWParams.CommentedOrderedDict()
max_jobs_per_frontend_defaults["name"] = (None, "string", "frontend name", None)
max_jobs_per_frontend_defaults["held"] = (
"50",
"nr",
"Maximum number of held glideins (for this frontend) before forcing the cleanup.",
None,
)
max_jobs_per_frontend_defaults["idle"] = (
"100",
"nr",
"Maximum number of idle glideins (for this frontend) allowed.",
None,
)
max_jobs_per_frontend_defaults["glideins"] = (
"5000",
"nr",
"Maximum number of concurrent glideins (per frontend) that can be submitted",
None,
)
entry_config_max_jobs_defaults["per_frontends"] = (
OrderedDict(),
"Dictionary of frontends",
"Each frontend entry contains",
max_jobs_per_frontend_defaults,
)
entry_config_defaults["max_jobs"] = entry_config_max_jobs_defaults
entry_config_restrictions_defaults = cWParams.CommentedOrderedDict()
entry_config_restrictions_defaults["require_voms_proxy"] = (
"False",
"Bool",
"Whether this entry point requires a voms proxy",
None,
)
entry_config_defaults["restrictions"] = entry_config_restrictions_defaults
entry_config_queue_defaults = cWParams.CommentedOrderedDict()
entry_config_queue_defaults["max_per_cycle"] = ["100", "nr", "Maximum number of jobs affected per cycle.", None]
entry_config_queue_defaults["sleep"] = ["0.2", "seconds", "Sleep between interactions with the schedd.", None]
entry_config_defaults["submit"] = copy.deepcopy(entry_config_queue_defaults)
entry_config_defaults["submit"]["cluster_size"] = [
"10",
"nr",
"Max number of jobs submitted in a single transaction.",
None,
]
entry_config_defaults["submit"]["slots_layout"] = [
"partitionable",
"string",
"The way multiple slots should be setup.",
None,
]
self.submit_attrs = cWParams.CommentedOrderedDict()
self.submit_attrs["value"] = ("All", "string", "HTCondor classad value", None)
entry_config_defaults["submit"]["submit_attrs"] = (
OrderedDict(),
"Dictionary of submit attributes",
"Each attribute contains",
self.submit_attrs,
)
entry_config_defaults["remove"] = copy.deepcopy(entry_config_queue_defaults)
entry_config_defaults["remove"]["max_per_cycle"][0] = "5"
entry_config_defaults["release"] = copy.deepcopy(entry_config_queue_defaults)
entry_config_defaults["release"]["max_per_cycle"][0] = "20"
# not exported and order does not matter, can stay a regular dictionary
sub_defaults = {
"attrs": (OrderedDict(), "Dictionary of attributes", "Each attribute entry contains", self.attr_defaults),
"files": ([], "List of files", "Each file entry contains", self.file_defaults),
"infosys_refs": (
[],
"List of information system references",
"Each reference points to this entry",
self.infosys_defaults,
),
"monitorgroups": ([], "List of monitoring groups", "Each group entry belongs to", self.mongroup_defaults),
}
self.entry_defaults = cWParams.CommentedOrderedDict()
self.entry_defaults["gatekeeper"] = (None, "gatekeeper", "Grid gatekeeper/resource", None)
self.entry_defaults["gridtype"] = ("condor", "grid_type", "Condor Grid type", None)
self.entry_defaults["trust_domain"] = ("OSG", "trust_domain", "Entry trust domain", None)
self.entry_defaults["auth_method"] = (
"grid_proxy",
"auth_method",
"Type of auth method this entry supports",
None,
)
self.entry_defaults["vm_id"] = (None, "vm_id", "VM id this entry supports", None)
self.entry_defaults["vm_type"] = (None, "vm_type", "VM type this entry supports", None)
self.entry_defaults["rsl"] = (None, "RSL", "Globus gt2 RSL option", None)
self.entry_defaults["bosco_dir"] = (
None,
"bosco_dir",
"BOSCO directory on the (remote) resource submit host",
None,
)
self.entry_defaults["schedd_name"] = (
None,
"ScheddName",
"Which schedd to use (Overrides the global one if specified)",
None,
)
self.entry_defaults["work_dir"] = (".", ".|Condor|OSG|TMPDIR", "Where to start glidein", None)
self.entry_defaults["proxy_url"] = (None, "proxy_url", "Squid cache to use", None)
self.entry_defaults["verbosity"] = ("std", "std|nodebug|fast", "Verbosity level and timeout setting", None)
self.entry_defaults["enabled"] = ("True", "Bool", "Is this entry enabled?", None)
self.entry_defaults["config"] = entry_config_defaults
self.entry_defaults["attrs"] = sub_defaults["attrs"]
self.entry_defaults["files"] = sub_defaults["files"]
self.entry_defaults["infosys_refs"] = sub_defaults["infosys_refs"]
self.entry_defaults["monitorgroups"] = copy.deepcopy(sub_defaults["monitorgroups"])
self.entry_defaults["allow_frontends"] = (
OrderedDict(),
"Dictionary of frontends",
"Each frontend entry contains",
self.allow_defaults,
)
###############################
# Start defining the defaults
self.defaults["factory_name"] = (socket.gethostname(), "ID", "Factory name", None)
self.defaults["glidein_name"] = (None, "ID", "Glidein name", None)
self.defaults["schedd_name"] = (
"schedd_glideins@%s" % socket.gethostname(),
"ScheddName",
"Which schedd to use, can be a comma separated list",
None,
)
self.defaults["factory_collector"] = (
None,
"CollectorName",
"Which collector should we use for factory ClassAds",
None,
)
self.defaults["factory_versioning"] = ("True", "Bool", "Should we create versioned subdirectories?", None)
submit_defaults = cWParams.CommentedOrderedDict()
submit_defaults["base_dir"] = ("%s/glideinsubmit" % os.environ["HOME"], "base_dir", "Submit base dir", None)
submit_defaults["base_log_dir"] = ("%s/glideinlog" % os.environ["HOME"], "log_dir", "Submit base log dir", None)
submit_defaults["base_client_log_dir"] = (
"%s/glideclientlog" % os.environ["HOME"],
"client_dir",
"Base dir for client logs, needs a user_<uid> subdir per frontend user",
None,
)
submit_defaults["base_client_proxies_dir"] = (
"%s/glideclientproxies" % os.environ["HOME"],
"client_dir",
"Base dir for client proxies, needs a user_<uid> subdir per frontend user",
None,
)
self.defaults["submit"] = submit_defaults
one_log_retention_defaults = cWParams.CommentedOrderedDict()
one_log_retention_defaults["min_days"] = [
"3.0",
"days",
"Min number of days the logs must be preserved (even if they use too much space)",
None,
]
one_log_retention_defaults["max_days"] = [
"7.0",
"days",
"Max number of days the logs should be preserved",
None,
]
one_log_retention_defaults["max_mbytes"] = ["100.0", "Mbytes", "Max number of Mbytes the logs can use", None]
monitor_footer_defaults = cWParams.CommentedOrderedDict()
monitor_footer_defaults["display_txt"] = [
"",
"string",
"what will be displayed at the bottom of the monitoring page",
None,
]
monitor_footer_defaults["href_link"] = ["", "string", "where to link to", None]
self.defaults["monitor_footer"] = monitor_footer_defaults
process_log_defaults = copy.deepcopy(one_log_retention_defaults)
process_log_defaults["structured"] = ["False", "Bool", "True to use structured logs", None]
process_log_defaults["extension"] = ["all", "string", "name of the log extention", None]
process_log_defaults["msg_types"] = ["INFO, WARN, ERR", "string", "types of log messages", None]
process_log_defaults["backup_count"] = ["5", "string", "Number of backup logs to keep", None]
process_log_defaults["compression"] = ["", "string", "Compression for backup log files", None]
log_retention_defaults = cWParams.CommentedOrderedDict()
log_retention_defaults["process_logs"] = (
[],
"Dictionary of log types",
"Each log corresponds to a log file",
copy.deepcopy(process_log_defaults),
)
log_retention_defaults["job_logs"] = copy.deepcopy(one_log_retention_defaults)
log_retention_defaults["job_logs"]["min_days"][0] = "2.0"
self.defaults["advertise_with_tcp"] = ("True", "Bool", "Should condor_advertise use TCP connections?", None)
self.defaults["advertise_with_multiple"] = ("True", "Bool", "Should condor_advertise use -multiple?", None)
log_retention_defaults["summary_logs"] = copy.deepcopy(one_log_retention_defaults)
log_retention_defaults["summary_logs"]["max_days"][0] = "31.0"
log_retention_defaults["condor_logs"] = copy.deepcopy(one_log_retention_defaults)
log_retention_defaults["condor_logs"]["max_days"][0] = "14.0"
self.defaults["log_retention"] = log_retention_defaults
self.defaults["loop_delay"] = ("60", "seconds", "Number of seconds between iterations", None)
self.defaults["advertise_delay"] = ("5", "NR", "Advertize evert NR loops", None)
self.defaults["restart_attempts"] = (
"3",
"NR",
"Max allowed NR restarts every restart_interval before shutting down",
None,
)
self.defaults["restart_interval"] = (
"1800",
"NR",
"Time interval NR sec which allow max restart attempts",
None,
)
self.defaults["entry_parallel_workers"] = (
"0",
"NR",
"Number of entries that will perform the work in parallel",
None,
)
stage_defaults = cWParams.CommentedOrderedDict()
stage_defaults["base_dir"] = ("/var/www/html/glidefactory/stage", "base_dir", "Stage base dir", None)
stage_defaults["web_base_url"] = (
"http://%s/glidefactory/stage" % socket.gethostname(),
"base_url",
"Base Web server URL",
None,
)
stage_defaults["use_symlink"] = ("True", "Bool", "Can I symlink stage dir from submit dir?", None)
self.defaults["stage"] = stage_defaults
self.monitor_defaults["base_dir"] = (
"/var/www/html/glidefactory/monitor",
"base_dir",
"Monitoring base dir",
None,
)
# Default for rrd update threads
self.monitor_defaults["update_thread_count"] = (
os.sysconf("SC_NPROCESSORS_ONLN"),
"update_thread_count",
"Number of rrd update threads. Defaults to cpu count.",
None,
)
self.defaults["monitor"] = self.monitor_defaults
self.frontend_sec_class_defaults = cWParams.CommentedOrderedDict()
self.frontend_sec_class_defaults["username"] = (
None,
"username",
"UNIX ID to be used for this security class",
None,
)
self.frontend_defaults = cWParams.CommentedOrderedDict()
self.frontend_defaults["identity"] = (None, "identity", "Authenticated Identity", None)
self.frontend_defaults["security_classes"] = (
OrderedDict(),
"Dictionary of security class maps",
"Each mapping contains",
self.frontend_sec_class_defaults,
)
monitoring_collector_defaults = cWParams.CommentedOrderedDict()
monitoring_collector_defaults["node"] = (
None,
"nodename",
"Factory monitoring collector node name (for example, col1.my.org:9999)",
None,
)
monitoring_collector_defaults["DN"] = (
None,
"dn",
"Factory collector distinguised name (subject) (for example, /DC=org/DC=myca/OU=Services/CN=col1.my.org)",
None,
)
monitoring_collector_defaults["secondary"] = (
"False",
"Bool",
"Secondary nodes will be used by glideins, if present",
None,
)
monitoring_collector_defaults["group"] = (
"default",
"string",
"Collector group name useful to group HA setup",
None,
)
self.defaults["monitoring_collectors"] = (
[],
"List of factory monitoring collectors",
"Each collector contains",
monitoring_collector_defaults,
)
security_default = cWParams.CommentedOrderedDict()
security_default["pub_key"] = (
"RSA",
"None|RSA",
"Type of public key system used for secure message passing",
None,
)
security_default["reuse_oldkey_onstartup_gracetime"] = (
"900",
"seconds",
"Time in sec old key can be used to decrypt requests from frontend",
None,
)
security_default["remove_old_cred_freq"] = (
"24",
"hours",
"Frequency in hrs for cleaning unused credentials",
None,
)
security_default["remove_old_cred_age"] = ("30", "days", "Credentials older than this should be removed", None)
security_default["key_length"] = ("2048", "bits", "Key length in bits", None)
security_default["frontends"] = (
OrderedDict(),
"Dictionary of frontend",
"Each frontend contains",
self.frontend_defaults,
)
self.defaults["security"] = security_default
condor_defaults = cWParams.CommentedOrderedDict()
condor_defaults["os"] = ("default", "osname", "Operating System (like linux-rhel3)", None)
condor_defaults["arch"] = ("default", "arch", "Architecture (like x86)", None)
condor_defaults["version"] = ("default", "arch", "Architecture (like x86)", None)
condor_defaults["tar_file"] = (
None,
"fname",
"Tarball containing condor binaries (overrides base_dir if defined)",
None,
)
condor_defaults["base_dir"] = (
None,
"base_dir",
"Condor distribution base dir (used only if tar_file undefined)",
None,
)
cvmfsexec_distro_defaults = cWParams.CommentedOrderedDict()
cvmfsexec_distro_defaults["sources"] = ("", "sources", "CVMFS Sources (like osg)", None)
cvmfsexec_distro_defaults["platforms"] = ("", "platforms", "Supported Machine Types (like rhel7-x86_64", None)
self.defaults["cvmfsexec_distro"] = cvmfsexec_distro_defaults
self.defaults["condor_tarballs"] = ([], "List of condor tarballs", "Each entry contains", condor_defaults)
self.defaults["attrs"] = sub_defaults["attrs"]
self.defaults["files"] = copy.deepcopy(sub_defaults["files"])
# ordering is specific to global section of factory
self.defaults["files"][3]["after_entry"] = (
"False",
"Bool",
"Should this file be loaded after the entry ones?",
None,
)
self.defaults["entries"] = (OrderedDict(), "Dictionary of entries", "Each entry contains", self.entry_defaults)
return
# return name of top element
[docs]
def get_top_element(self):
return "glidein"
[docs]
def buildDir(self, factoryVersioning, basedir):
# return either basedir or basedir/frontend_fename
glidein_subdir = "glidein_%s" % self.glidein_name
if factoryVersioning:
return os.path.join(basedir, glidein_subdir)
else:
return basedir
# validate data and add additional attributes if needed
[docs]
def derive(self):
# glidein name does not have a reasonable default
if self.glidein_name is None:
raise RuntimeError("Missing glidein name")
if not cWParams.is_valid_name(self.glidein_name):
raise RuntimeError("Invalid glidein name '%s'" % self.glidein_name)
if self.factory_collector == "default":
raise RuntimeError('"default" is a reserved keyword, cannot be used as factory_collector')
factoryVersioning = False
if "factory_versioning" in self.data and safe_boolcomp(self.data["factory_versioning"], True):
factoryVersioning = True
self.stage_dir = self.buildDir(factoryVersioning, self.stage.base_dir)
self.monitor_dir = self.buildDir(factoryVersioning, self.monitor.base_dir)
self.submit_dir = self.buildDir(factoryVersioning, self.submit.base_dir)
self.log_dir = self.buildDir(factoryVersioning, self.submit.base_log_dir)
self.web_url = self.buildDir(factoryVersioning, self.stage.web_base_url)
self.client_log_dirs = {}
self.client_proxies_dirs = {}
for fename in list(self.security.frontends.keys()):
if not cWParams.is_valid_name(fename):
raise RuntimeError("Invalid frontend name '%s'" % fename)
if " " in self.security.frontends[fename].identity:
raise RuntimeError("Invalid frontend identity '%s'" % self.security.frontends[fename].identity)
for scname in list(self.security.frontends[fename].security_classes.keys()):
username = self.security.frontends[fename].security_classes[scname].username
self.client_log_dirs[username] = self.buildDir(
True, os.path.join(self.submit.base_client_log_dir, "user_%s" % username)
)
self.client_proxies_dirs[username] = self.buildDir(
True, os.path.join(self.submit.base_client_proxies_dir, "user_%s" % username)
)
if not cWParams.is_valid_name(self.factory_name):
raise RuntimeError("Invalid factory name '%s'" % self.factory_name)
entry_names = list(self.entries.keys())
for entry_name in entry_names:
if not cWParams.is_valid_name(entry_name):
raise RuntimeError("Invalid entry name '%s'" % entry_name)
attr_names = list(self.attrs.keys())
for attr_name in attr_names:
if not cWParams.is_valid_name(attr_name):
raise RuntimeError("Invalid global attribute name '%s'." % attr_name)
for entry_name in entry_names:
attr_names = list(self.entries[entry_name].attrs.keys())
for attr_name in attr_names:
if not cWParams.is_valid_name(attr_name):
raise RuntimeError(f"Invalid entry '{entry_name}' attribute name '{attr_name}'.")
# return xml formatting
############################################################
#
# P R I V A T E - Do not use
#
############################################################
#####################################
# try to find out the base condor dir
[docs]
def find_condor_base_dir():
if condorExe.condor_bin_path is None:
return None
else:
return os.path.dirname(condorExe.condor_bin_path)