# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""Functions needed to create files used by the glidein entry points
"""
import io
import os
import shutil
import stat
import subprocess
import tarfile
from glideinwms.lib.util import chmod
from . import cgWDictFile
# Create condor tarball and store it into a StringIO
[docs]
def create_condor_tar_fd(condor_base_dir):
"""Extract only components from a full condor distribution needed to run a
glidein on a CE. This code is only run when a factory reconfig or upgrade
is triggered.
Args:
condor_base_dir (str): an untarred tarball of an HTCondor Distribution
Returns:
StringIO: representation of tarfile.
"""
try:
# List of required files
condor_bins = ["sbin/condor_master", "sbin/condor_startd", "sbin/condor_starter"]
# List of optional files, included if found in condor distro
condor_opt_bins = [
"sbin/condor_procd",
"sbin/condor_fetchlog",
"sbin/condor_advertise",
"bin/condor_nsenter",
"bin/condor_docker_enter",
]
condor_opt_libs = [
"lib/condor_ssh_to_job_sshd_config_template",
"lib/CondorJavaInfo.class",
"lib/CondorJavaWrapper.class",
"lib/scimark2lib.jar",
"lib/condor",
"lib/libgetpwnam.so",
]
condor_opt_libexecs = [
"libexec/condor_shared_port",
"libexec/condor_ssh_to_job_sshd_setup",
"libexec/condor_ssh_to_job_shell_setup",
"libexec/condor_kflops",
"libexec/condor_mips",
"libexec/curl_plugin",
"libexec/data_plugin",
"libexec/condor_chirp",
"libexec/condor_gpu_discovery",
"libexec/condor_gpu_utilization",
]
# for RPM installations, add libexec/condor as libexec into the
# tarball instead
condor_bins_map = {}
condor_opt_libexecs_rpm = []
for libexec in condor_opt_libexecs:
libexec_rpm = libexec.replace("libexec", "libexec/condor")
condor_opt_libexecs_rpm.append(libexec_rpm)
condor_bins_map[libexec_rpm] = libexec
condor_opt_libexecs += condor_opt_libexecs_rpm
# check that dir and files exist
if not os.path.isdir(condor_base_dir):
raise RuntimeError("%s is not a directory" % condor_base_dir)
for f in condor_bins:
if not os.path.isfile(os.path.join(condor_base_dir, f)):
raise RuntimeError("Cannot find %s" % os.path.join(condor_base_dir, f))
# Get the list of dlls required
dlls = get_condor_dlls(condor_base_dir, condor_bins + condor_opt_bins + condor_opt_libexecs)
# Get list of all the files & directories that exist
for f in condor_opt_bins + condor_opt_libs + condor_opt_libexecs + dlls:
if os.path.exists(os.path.join(condor_base_dir, f)):
condor_bins.append(f)
# tar
fd = io.BytesIO()
# TODO #23166: Use context managers[with statement] when python 3
# once we get rid of SL6 and tarballs
tf = tarfile.open("dummy.tgz", "w:gz", fd)
for f in condor_bins:
tf.add(os.path.join(condor_base_dir, f), condor_bins_map.get(f, f))
tf.close()
# rewind the file to the beginning
fd.seek(0)
except RuntimeError as e:
raise RuntimeError("Error creating condor tgz: %s" % e)
return fd
##########################################
[docs]
def get_factory_log_recipients(entry):
"""Read the log recipients specified by the Factory in its configuration
Args:
entry: dict-like object representing the entry configuration
Returns:
list: list contaning the URLs of the log servers, empty if none present
"""
entr_attrs = entry.get_child_list("attrs")
for attr in entr_attrs:
if attr["name"] == "LOG_RECIPIENTS_FACTORY":
return attr["value"].split()
return []
##########################################
# Condor submit file dictionary
# NOTE: the 'environment' attribute should be in the new syntax format, to allow characters like ';' in the values
# value all double quoted, var=var_val space separated, var_val can be single quoted
[docs]
class GlideinSubmitDictFile(cgWDictFile.CondorJDLDictFile):
# MM5345 passing job_descript instead of entry (was sub_params in common root), used only in first section to eval variables
[docs]
def populate(self, exe_fname, entry_name, conf, entry):
"""Since there are only two parameters that ever were passed that didn't already exist in the params dict or the
sub_params dict, the function signature has been greatly simplified into just those two parameters and the
two dicts.
This has the added benefit of being "future-proof" for as long as we maintain this particular configuration
method. Any new attribute that may be in params or sub_params can be accessed here without having to add yet
another parameter to the function.
Args:
exe_fname (str, Path): startup file name (glidein_startup.sh)
entry_name (str): Name of the Entry
conf (factoryXmlConfig.Config): XML configuration
entry (factoryXmlConfig.EntryElement): Entry element XML configuration
"""
glidein_name = conf["glidein_name"]
gridtype = entry["gridtype"]
gatekeeper = entry["gatekeeper"]
entry_enabled = entry["enabled"]
if "rsl" in entry:
rsl = entry["rsl"]
else:
rsl = None
auth_method = entry["auth_method"]
if "proxy_url" in entry:
proxy_url = entry["proxy_url"]
else:
proxy_url = None
client_log_base_dir = conf.get_child("submit")["base_client_log_dir"]
submit_attrs = entry.get_child("config").get_child("submit").get_child_list("submit_attrs")
enc_input_files = []
enc_input_files.append("$ENV(IDTOKENS_FILE:)")
self.add_environment("IDTOKENS_FILE=$ENV(IDTOKENS_FILE:)")
if gridtype not in ["ec2", "gce"] and not (gridtype == "arc" and auth_method == "grid_proxy"):
self.add("+SciTokensFile", '"$ENV(SCITOKENS_FILE:)"')
# Folders and files of tokens for glidein logging authentication
# leos token stuff, left it in for now
token_list = []
token_basedir = os.path.realpath(os.path.join(os.getcwd(), "..", "server-credentials"))
token_entrydir = os.path.join(token_basedir, "entry_" + entry_name)
url_dirs_desc_file = os.path.join(token_entrydir, "url_dirs.desc")
if os.path.exists(url_dirs_desc_file):
file_size = os.stat(url_dirs_desc_file)
if file_size.st_size > 0:
enc_input_files.append(url_dirs_desc_file)
token_tgz_file = os.path.join(token_entrydir, "tokens.tgz")
if os.path.exists(token_tgz_file):
file_size = os.stat(token_tgz_file)
if file_size.st_size > 0:
enc_input_files.append(token_tgz_file)
token_list.append(token_tgz_file)
if token_list:
self.add_environment("JOB_TOKENS='" + ",".join(token_list) + "'")
# Get the list of log recipients specified from the Factory for this entry
factory_recipients = get_factory_log_recipients(entry)
frontend_recipients = [] # TODO: change when adding support for LOG_RECIPIENTS_CLIENT
log_recipients = list(set(factory_recipients + frontend_recipients))
if len(log_recipients) > 0:
self.append("environment", '"LOG_RECIPIENTS=' + "'" + " ".join(log_recipients) + "'" + '"')
# print("Token dir: " + token_basedir) # DEBUG
# Add in some common elements before setting up grid type specific attributes
self.add("Universe", "grid")
if gridtype.startswith("batch "):
# For BOSCO ie gridtype 'batch *', allow means to pass VO specific
# bosco/ssh keys
# was: self.add("Grid_Resource", "%s $ENV(GRID_RESOURCE_OPTIONS) %s" % (gridtype, gatekeeper))
# gatekeeper is [name@]host[:port]. Keep only the host part and replace name with username from env
# This returns always the host:port part: gatekeeper.split('@')[-1]
if "bosco_dir" in entry:
bosco_dir = "--rgahp-glite ~/%s/glite" % entry["bosco_dir"]
else:
bosco_dir = ""
self.add(
"Grid_Resource",
"%s $ENV(GRID_RESOURCE_OPTIONS) %s $ENV(GLIDEIN_REMOTE_USERNAME)@%s"
% (gridtype, bosco_dir, gatekeeper.split("@")[-1]),
)
enc_input_files.append("$ENV(X509_USER_PROXY:/dev/null)")
self.add_environment("X509_USER_PROXY=$ENV(X509_USER_PROXY_BASENAME)")
elif gridtype == "gce":
self.add("Grid_Resource", f"{gridtype} {gatekeeper} $ENV(GRID_RESOURCE_OPTIONS)")
else:
self.add("Grid_Resource", f"{gridtype} {gatekeeper}")
self.add("Executable", exe_fname)
# set up the grid specific attributes
if gridtype == "ec2":
self.populate_ec2_grid()
self.populate_standard_grid(rsl, auth_method, gridtype, entry_enabled, entry_name, enc_input_files=None)
elif gridtype == "gce":
self.populate_gce_grid()
elif gridtype == "condor":
# Condor-C is the same as normal grid with a few additions
# so we first do the normal population
self.populate_standard_grid(rsl, auth_method, gridtype, entry_enabled, entry_name, enc_input_files)
# self.populate_standard_grid(rsl, auth_method, gridtype, token_files)
# next we add the Condor-C additions
self.populate_condorc_grid()
elif gridtype == "arc":
# not adding a function for one line for the moment
self.populate_standard_grid(rsl, auth_method, gridtype, entry_enabled, entry_name, enc_input_files)
self.add("x509UserProxy", "$ENV(X509_USER_PROXY:)")
else:
self.populate_standard_grid(rsl, auth_method, gridtype, entry_enabled, entry_name, enc_input_files)
self.populate_submit_attrs(submit_attrs, gridtype)
self.populate_glidein_classad(proxy_url)
# Leave jobs in the condor queue for 12 hours if they are completed.
if conf["advertise_pilot_accounting"] == "True":
self.add("LeaveJobInQueue", "((time() - EnteredCurrentStatus) < 12*60*60)")
# GLIDEIN_IDLE_LIFETIME comes from idle_glideins_lifetime in the config section of the Frontend configuration (glidein limits)
# GlideinSkipIdleRemoval is set instead in the submit_attr section of the Factory
remove_expr = "(isUndefined(GlideinSkipIdleRemoval)==True || GlideinSkipIdleRemoval==False) && (JobStatus==1 && isInteger($ENV(GLIDEIN_IDLE_LIFETIME)) && $ENV(GLIDEIN_IDLE_LIFETIME)>0 && (time() - QDate)>$ENV(GLIDEIN_IDLE_LIFETIME))"
max_walltime = next(
iter([x for x in entry.get_child_list("attrs") if x["name"] == "GLIDEIN_Max_Walltime"]), None
) # Get the GLIDEIN_Max_Walltime attribute
if max_walltime:
remove_expr += " || (JobStatus==2 && ((time() - EnteredCurrentStatus) > (GlideinMaxWalltime + 12*60*60)))"
self.add("periodic_remove", remove_expr)
# Notification and Owner are the same no matter what grid type
self.add("Notification", "Never")
self.add("+Owner", "undefined")
# The logging of the jobs will be the same across grid types
self.add(
"Log",
"%s/user_$ENV(GLIDEIN_USER)/glidein_%s/entry_%s/condor_activity_$ENV(GLIDEIN_LOGNR)_$ENV(GLIDEIN_CLIENT).log"
% (client_log_base_dir, glidein_name, entry_name),
)
self.add(
"Output",
"%s/user_$ENV(GLIDEIN_USER)/glidein_%s/entry_%s/job.$(Cluster).$(Process).out"
% (client_log_base_dir, glidein_name, entry_name),
)
self.add(
"Error",
"%s/user_$ENV(GLIDEIN_USER)/glidein_%s/entry_%s/job.$(Cluster).$(Process).err"
% (client_log_base_dir, glidein_name, entry_name),
)
self.jobs_in_cluster = "$ENV(GLIDEIN_COUNT)"
[docs]
def populate_standard_grid(self, rsl, auth_method, gridtype, entry_enabled, entry_name, enc_input_files=None):
"""Create a standard condor jdl file to submit to OSG grid
Args:
rsl (str):
auth_method (str): grid_proxy, voms_proxy, key_pair, cert_pair, username_password,
optionally with +vm_type, +vm_id, and/or +project_id
grid_type (str): 'condor', 'cream', 'nordugrid_rsl' currently supported
entry_enabled (expr): if evaluates to False, does not write jdl
entry_name (str): factory entry name
enc_input_files (list): files to be appended to Transfer_input_files
and encrypt_input_files in the jdl
Returns:
None
Raises:
RunTimeError when jdl file should not be created
"""
if (gridtype == "gt2" or gridtype == "gt5") and eval(entry_enabled):
raise RuntimeError(
" The grid type '%s' is no longer supported. Review the attributes of the entry %s "
% (gridtype, entry_name)
)
elif gridtype == "cream" and ((rsl is not None) and rsl != ""):
self.add("cream_attributes", "$ENV(GLIDEIN_RSL)")
elif gridtype == "nordugrid" and rsl:
self.add("nordugrid_rsl", "$ENV(GLIDEIN_RSL)")
elif (gridtype == "condor") and ("project_id" in auth_method):
self.add("+ProjectName", '"$ENV(GLIDEIN_PROJECT_ID)"')
# Force the copy to spool to prevent caching at the CE side
self.add("copy_to_spool", "True")
self.add("Arguments", "$ENV(GLIDEIN_ARGUMENTS)")
if enc_input_files:
indata = ",".join(enc_input_files)
self.add("transfer_Input_files", indata)
self.add("encrypt_Input_files", indata)
# Logging is optional, no exception if empty
self.add("Transfer_Executable", "True")
self.add("transfer_Output_files", "")
self.add("WhenToTransferOutput ", "ON_EXIT")
self.add("stream_output", "False")
self.add("stream_error ", "False")
[docs]
def populate_submit_attrs(self, submit_attrs, gridtype, attr_prefix=""):
for submit_attr in submit_attrs:
if (
submit_attr.get("all_grid_types", "False") == "True"
or gridtype.startswith("batch ")
or gridtype in ("condor", "gce", "ec2", "arc")
):
# TODO: If switching to f-string, must use different quotes in the following line
# to avoid error w/ f-string (fixed in Py 3.12) - check that formatter is not changing things
# self.add(f'{attr_prefix}{submit_attr["name"]}', submit_attr["value"])
self.add("{}{}".format(attr_prefix, submit_attr["name"]), submit_attr["value"])
[docs]
def populate_condorc_grid(self):
self.add("+TransferOutput", '""')
self.add("x509userproxy", "$ENV(X509_USER_PROXY:)")
[docs]
def populate_gce_grid(self):
self.add("gce_image", "$ENV(IMAGE_ID)")
self.add("gce_machine_type", "$ENV(INSTANCE_TYPE)")
# self.add("+gce_project_name", "$ENV(GCE_PROJECT_NAME)")
# self.add("+gce_availability_zone", "$ENV(AVAILABILITY_ZONE)")
self.add("gce_auth_file", "$ENV(GCE_AUTH_FILE)")
self.add(
"gce_metadata", "glideinwms_metadata=$ENV(USER_DATA)#### -cluster $(Cluster) -subcluster $(Process)####"
)
self.add("gce_metadata_file", "$ENV(GLIDEIN_PROXY_FNAME)")
[docs]
def populate_ec2_grid(self):
self.add("ec2_ami_id", "$ENV(IMAGE_ID)")
self.add("ec2_instance_type", "$ENV(INSTANCE_TYPE)")
self.add("ec2_access_key_id", "$ENV(ACCESS_KEY_FILE)")
self.add("ec2_secret_access_key", "$ENV(SECRET_KEY_FILE)")
self.add("ec2_keypair_file", "$ENV(CREDENTIAL_DIR)/ssh_key_pair.$(Cluster).$(Process).pem")
# We do not add the entire argument list to the userdata directly
# since we want to be able to change the argument list without
# having to modify every piece of code under the sun
# This way only the submit_glideins function has to change
# (and of course glidein_startup.sh)
self.add(
"ec2_user_data", "glideinwms_metadata=$ENV(USER_DATA)#### -cluster $(Cluster) -subcluster $(Process)####"
)
self.add("ec2_user_data_file", "$ENV(GLIDEIN_PROXY_FNAME)")
[docs]
def populate_glidein_classad(self, proxy_url):
# add in the classad attributes for the WMS collector
self.add("+GlideinFactory", '"$ENV(FACTORY_NAME)"')
self.add("+GlideinName", '"$ENV(GLIDEIN_NAME)"')
self.add("+GlideinEntryName", '"$ENV(GLIDEIN_ENTRY_NAME)"')
self.add("+GlideinEntrySubmitFile", '"$ENV(GLIDEIN_ENTRY_SUBMIT_FILE)"')
self.add("+GlideinClient", '"$ENV(GLIDEIN_CLIENT)"')
self.add("+GlideinFrontendName", '"$ENV(GLIDEIN_FRONTEND_NAME)"')
self.add("+GlideinCredentialIdentifier", '"$ENV(GLIDEIN_CREDENTIAL_ID)"')
self.add("+GlideinSecurityClass", '"$ENV(GLIDEIN_SEC_CLASS)"')
self.add("+GlideinWebBase", '"$ENV(GLIDEIN_WEB_URL)"')
self.add("+GlideinLogNr", '"$ENV(GLIDEIN_LOGNR)"')
self.add("+GlideinWorkDir", '"$ENV(GLIDEIN_STARTUP_DIR)"')
self.add("+GlideinSlotsLayout", '"$ENV(GLIDEIN_SLOTS_LAYOUT)"')
self.add("+GlideinMaxWalltime", "$ENV(GLIDEIN_MAX_WALLTIME)")
# fename does not start with Glidein for convenience (it's short) and consistency with options of the factory tools
# that already use the fename naming convention. Added after the condor_privsep deprecation.
self.add("+fename", '"$ENV(GLIDEIN_USER)"')
if proxy_url:
self.add("+GlideinProxyURL", '"%s"' % proxy_url)
[docs]
def finalize(self, sign, entry_sign, descript, entry_descript):
"""
Since the arguments will be built by the submit script now, just pass. Keeping the function here
because the code is so obtuse, if I remove the function, I may create unintended effects.
"""
pass
#########################################
# Create init.d compatible startup file
[docs]
def create_initd_startup(startup_fname, factory_dir, glideinWMS_dir, cfg_name, rpm_install=""):
"""
Creates the factory startup script from the template.
"""
template = get_template("factory_initd_startup_template", glideinWMS_dir)
with open(startup_fname, "w") as fd:
template = template % {
"factory_dir": factory_dir,
"glideinWMS_dir": glideinWMS_dir,
"default_cfg_fpath": cfg_name,
"rpm_install": rpm_install,
}
fd.write(template)
chmod(startup_fname, stat.S_IRWXU | stat.S_IROTH | stat.S_IRGRP | stat.S_IXOTH | stat.S_IXGRP)
return
#####################
# INTERNAL
# Simply copy a file
[docs]
def copy_file(infile, outfile):
"""Copy a file from infile to outfile preserving permission mode and all file metadata
It follows symlinks and overwrites the destination if existing
Args:
infile: source file path
outfile: destination file path or directory
Returns:
None
Raises:
RuntimeError: if the copy failed raising IOError
"""
try:
shutil.copy2(infile, outfile)
except OSError as e:
raise RuntimeError(f"Error copying {infile} in {outfile}: {e}")
#####################################
# Copy an executable between two dirs
[docs]
def copy_exe(filename, work_dir, org_dir, overwrite=False):
"""Copy a file from one dir to another and changes the permissions to 0555. Can overwrite an existing file.
Args:
filename: base name of the file
work_dir: destination directory
org_dir: source directory
overwrite: if True, delete the destination file before making a copy of the source. If false shutil.copy2
(the command used underneath) will overwrite the destination
Returns:
None
Raises:
RuntimeError: if the copy failed raising IOError
"""
if overwrite and os.path.exists(os.path.join(work_dir, filename)):
# Remove file if already exists
os.remove(os.path.join(work_dir, filename))
copy_file(os.path.join(org_dir, filename), work_dir)
chmod(os.path.join(work_dir, filename), 0o755)
[docs]
def get_template(template_name, glideinWMS_dir):
with open(f"{glideinWMS_dir}/creation/templates/{template_name}") as template_fd:
template_str = template_fd.read()
return template_str
[docs]
def get_link_chain(link):
"""
Given a filepath, checks if it is a link and processes all the links until
the actual file is found
@type link: string
@param link: Full path to the file/link
@return: List containing links in the chain
@rtype: list
"""
rlist = set()
link_p = link
while os.path.islink(link_p):
if link_p in rlist:
# Cycle detected. Break
break
rlist.add(link_p)
link_p = os.path.join(os.path.dirname(link_p), os.readlink(link_p))
rlist.add(link_p)
return list(rlist)
[docs]
def ldd(file):
"""
Given a file return all the libraries referenced by the file
@type file: string
@param file: Full path to the file
@return: List containing linked libraries required by the file
@rtype: list
"""
rlist = []
if os.path.exists(file) and shutil.which("ldd") is not None:
process = subprocess.Popen(["ldd", file], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for line in process.stdout.readlines():
tokens = line.split(b"=>")
if len(tokens) == 2:
lib_loc = ((tokens[1].strip()).split(b" "))[0].strip()
if os.path.exists(lib_loc):
rlist.append(os.path.abspath(lib_loc).decode("utf-8"))
return rlist
[docs]
def get_condor_dlls(condor_dir, files=[], libdirs=["lib", "lib/condor"]):
"""
Given list of condor files return all the libraries referenced by the files
@type condor_dir: string
@param condor_dir: Location containing condor binaries
@type files: list
@param files: List of files relative to condor_dir
@type libdirs: list
@param libdirs: List of dirs relative to condor_dir that contain libs
@return: List containing linked libraries required by all the files.
Paths a relative to the condor_dir
@rtype: list
"""
fileset = set()
libstodo = set()
libsdone = set()
rlist = []
for file in files:
libstodo.update(ldd(os.path.join(condor_dir, file)))
while len(libstodo) > 0:
lib = libstodo.pop()
libname = os.path.basename(lib)
if lib in libsdone:
# This lib has been processes already
continue
if not lib.startswith(condor_dir):
# Check if the library is provided by condor
# If so, add the condor provided lib to process
for libdir in libdirs:
if os.path.exists(os.path.join(condor_dir, libdir, libname)):
new_lib = os.path.join(condor_dir, libdir, libname)
if new_lib not in libsdone:
libstodo.add(new_lib)
libsdone.add(lib)
else:
new_libstodo = set(ldd(lib))
libstodo.update(new_libstodo - libsdone)
# This could be a link chain
links = get_link_chain(lib)
# Consider the system links for further processing
# Add the links in the condor_dir as processed
for link in links:
if link.startswith(condor_dir):
fileset.add(link)
libsdone.add(link)
else:
libstodo.add(link)
# Return the list of files relative to condor_dir
for lib in fileset:
tokens = lib.split("%s/" % os.path.normpath(condor_dir))
rlist.append(tokens[1])
return rlist