# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module implements the functions needed to advertize
and get commands from the Collector
"""
import fcntl
import os
import time
from glideinwms.lib import classadSupport, condorExe, condorManager, condorMonitor, logSupport
############################################################
#
# Global Variables
#
############################################################
# Define global variables that keep track of the Daemon lifetime
start_time = time.time()
# Advertize counter for glidefactory classad
advertizeGFCounter = {}
# Advertize counter for glidefactoryclient classad
advertizeGFCCounter = {}
# Advertize counter for glidefactoryglobal classad
advertizeGlobalCounter = 0
############################################################
#
# Configuration
#
############################################################
# class FakeLog:
# def write(self,str):
# pass
[docs]
class FactoryConfig:
def __init__(self):
# set default values
# user should modify if needed
# The name of the attribute that identifies the glidein
self.factory_id = "glidefactory"
self.client_id = "glideclient"
self.factoryclient_id = "glidefactoryclient"
self.factory_global = "glidefactoryglobal"
# Default the glideinWMS version string
self.glideinwms_version = "glideinWMS UNKNOWN"
# String to prefix for the attributes
self.glidein_attr_prefix = ""
# String to prefix for the submit attributes
self.glidein_submit_prefix = "GlideinSubmit"
# String to prefix for the parameters
self.glidein_param_prefix = "GlideinParam"
self.encrypted_param_prefix = "GlideinEncParam"
# String to prefix for the monitoring
self.glidein_monitor_prefix = "GlideinMonitor"
# String to prefix for the configured limits
self.glidein_config_prefix = "GlideinConfig"
# String to prefix for the requests
self.client_req_prefix = "Req"
# String to prefix for the web passing
self.client_web_prefix = "Web"
self.glidein_web_prefix = "Web"
# The name of the signtype
self.factory_signtype_id = "SupportedSignTypes"
self.client_web_signtype_suffix = "SignType"
# Should we use TCP for condor_advertise?
self.advertise_use_tcp = False
# Should we use the new -multiple for condor_advertise?
self.advertise_use_multi = False
# warning log files
# default is FakeLog, any other value must implement the write(str) method
# self.warning_log = FakeLog()
# Location of lock directory
self.lock_dir = "."
# Location of the factory Collector
# Please notice that None means "use the system collector"
# while any string value will force the use of that specific collector
# i.e. the -pool argument coption to HTCondor cmdline tools
self.factory_collector = None
# global configuration of the module
factoryConfig = FactoryConfig()
#
# When something is set to this,
# use the value set in factoryConfig
# This is useful when None has a well defined semantics
# end cannot be used to signal the use of the default
# e.g.
# the functions getting the default factory_collector value
# will use
# factoryConfig.factory_collector
# (None means "use system collector" in that context)
#
DEFAULT_VAL = "default"
#####################################################
# Exception thrown when multiple executions are used
# Helps handle partial failures
[docs]
class MultiExeError(condorExe.ExeError):
def __init__(self, arr):
"""
arr is a list of ExeError exceptions
"""
self.arr = arr
# First approximation of implementation, can be improved
str_arr = []
for e in arr:
str_arr.append("%s" % e)
error_str = "\\n".join(str_arr)
condorExe.ExeError.__init__(self, error_str)
############################################################
#
# User functions
#
############################################################
[docs]
def findGroupWork(
factory_name,
glidein_name,
entry_names,
supported_signtypes,
pub_key_obj=None,
additional_constraints=None,
factory_collector=DEFAULT_VAL,
):
"""
Find request classAds that have my (factory, glidein name, entries) and
create the dictionary of dictionary of work request information.
Example: work[entry_name][frontend] = {'params':'value', 'requests':'value}
@type factory_name: string
@param factory_name: name of the factory
@type glidein_name: string
@param glidein_name: name of the glidein instance
@type entry_names: list
@param entry_names: list of factory entry names
@type supported_signtypes: list
@param supported_signtypes: only support one kind of signtype, 'sha1', default is None
@type pub_key_obj: string
@param pub_key_obj: only support 'RSA', defaults to None
@type additional_constraints: string
@param additional_constraints: any additional constraints to include for querying the WMS collector, default is None
@type factory_collector: string or None
@param factory_collector: the collector to query, special value 'default' will get it from the global config
@rtype: dict
@return: Dictionary of work to perform. Return format is work[entry_name][frontend] = {'params':'value', 'requests':'value}
"""
global factoryConfig
if factory_collector == DEFAULT_VAL:
factory_collector = factoryConfig.factory_collector
req_glideins = ""
for entry in entry_names:
req_glideins = f"{entry}@{glidein_name}@{factory_name},{req_glideins}"
# Strip off leading & trailing comma
req_glideins = req_glideins.strip(",")
status_constraint = '(GlideinMyType=?="{}") && (stringListMember(ReqGlidein,"{}")=?=True)'.format(
factoryConfig.client_id,
req_glideins,
)
if supported_signtypes is not None:
status_constraint += ' && stringListMember({}{},"{}")'.format(
factoryConfig.client_web_prefix,
factoryConfig.client_web_signtype_suffix,
",".join(supported_signtypes),
)
if pub_key_obj is not None:
# Get only classads that have my key or no key at all
# Any other key will not work
status_constraint += (
' && (((ReqPubKeyID=?="%s") && (ReqEncKeyCode=!=Undefined) && (ReqEncIdentity=!=Undefined)) || (ReqPubKeyID=?=Undefined))'
% pub_key_obj.get_pub_key_id()
)
if additional_constraints is not None:
status_constraint = f"({status_constraint})&&({additional_constraints})"
status = condorMonitor.CondorStatus(subsystem_name="any", pool_name=factory_collector)
# Important, this dictates what gets submitted
status.require_integrity(True)
status.glidein_name = glidein_name
# Serialize access to the Collector accross all the processes
# these is a single Collector anyhow
lock_fname = os.path.join(factoryConfig.lock_dir, "gfi_status.lock")
if not os.path.exists(lock_fname):
# Create a lock file if needed
try:
fd = open(lock_fname, "w")
fd.close()
except Exception:
# could be a race condition
pass
with open(lock_fname, "r+") as fd:
fcntl.flock(fd, fcntl.LOCK_EX)
try:
status.load(status_constraint)
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
data = status.fetchStored()
reserved_names = (
"ReqName",
"ReqGlidein",
"ClientName",
"FrontendName",
"GroupName",
"ReqPubKeyID",
"ReqEncKeyCode",
"ReqEncIdentity",
"AuthenticatedIdentity",
)
# Output is now in the format of
# out[entry_name][frontend]
out = {}
# Copy over requests and parameters
for k in data:
kel = data[k]
el = {"requests": {}, "web": {}, "params": {}, "params_decrypted": {}, "monitor": {}, "internals": {}}
for key, prefix in (
("requests", factoryConfig.client_req_prefix),
("web", factoryConfig.client_web_prefix),
("params", factoryConfig.glidein_param_prefix),
("monitor", factoryConfig.glidein_monitor_prefix),
):
plen = len(prefix)
for attr in kel:
if attr in reserved_names:
# Skip reserved names
continue
if attr[:plen] == prefix:
el[key][attr[plen:]] = kel[attr]
# sym_key_obj will stay None if
# 1) extract_sym_key throws exception
# 2) kel does not contain 'ReqPubKeyID'
# 3) pub_key_obj is None and there is no key to decrypt
sym_key_obj = None
if (pub_key_obj is not None) and ("ReqPubKeyID" in kel):
try:
sym_key_obj = pub_key_obj.extract_sym_key(kel["ReqEncKeyCode"])
except Exception:
continue
if sym_key_obj is not None:
# Verify that the identity the client claims to be is the
# identity that Condor thinks it is
try:
enc_identity = sym_key_obj.decrypt_hex(kel["ReqEncIdentity"]).decode("utf-8")
except Exception:
logSupport.log.warning(
"Client %s provided invalid ReqEncIdentity, could not decode. Skipping for security reasons." % k
)
continue # Corrupted classad
if enc_identity != kel["AuthenticatedIdentity"]:
logSupport.log.warning(
"Client %s provided invalid ReqEncIdentity(%s!=%s). Skipping for security reasons."
% (k, enc_identity, kel["AuthenticatedIdentity"])
)
# Either the client is misconfigured or someone is cheating
continue
invalid_classad = False
for key, prefix in (("params_decrypted", factoryConfig.encrypted_param_prefix),):
# TODO: useless for, only one element
plen = len(prefix)
for attr in kel:
if attr in reserved_names:
# Skip reserved names
continue
if attr[:plen] == prefix:
# Define it even if I don't understand the content
el[key][attr[plen:]] = None
if sym_key_obj is not None:
try:
el[key][attr[plen:]] = sym_key_obj.decrypt_hex(kel[attr])
except Exception:
# I don't understand it -> invalid
invalid_classad = True
break
# Continue if I have problems in an inner loop
if invalid_classad:
logSupport.log.warning(
"At least one of the encrypted parameters for client %s cannot be decoded. Skipping for security reasons."
% k
)
continue
for attr in kel:
if attr in (
"ClientName",
"FrontendName",
"GroupName",
"ReqName",
"LastHeardFrom",
"ReqPubKeyID",
"AuthenticatedIdentity",
):
el["internals"][attr] = kel[attr]
out[k] = el
return workGroupByEntries(out)
[docs]
def workGroupByEntries(work):
"""
Given the dictionary of work items, group the work based on the entry
Example: grouped_work[entry][w]
"""
grouped_work = {}
for w in work:
req_name = work[w]["internals"]["ReqName"]
try:
entry = (req_name.split("@"))[0]
if entry not in grouped_work:
grouped_work[entry] = {}
grouped_work[entry][w] = work[w]
except Exception:
logSupport.log.warning(
"Unable to group work for '%s' based on ReqName '%s'. This work item will not be processed."
% (w, req_name)
)
return grouped_work
# TODO: PM: findWork is still needed by tools/wmsXMLView. Modify wmsXMLView
# its still being used before removing the function below
[docs]
def findWork(
factory_name,
glidein_name,
entry_name,
supported_signtypes,
pub_key_obj=None,
additional_constraints=None,
factory_collector=DEFAULT_VAL,
):
"""
Find request classAds that have my (factory, glidein name, entry name) and create the dictionary of work request information.
@type factory_name: string
@param factory_name: name of the factory
@type glidein_name: string
@param glidein_name: name of the glidein instance
@type entry_name: string
@param entry_name: name of the factory entry
@type supported_signtypes: list
@param supported_signtypes: only support one kind of signtype, 'sha1', default is None
@type pub_key_obj: string
@param pub_key_obj: only support 'RSA'
@type additional_constraints: string
@param additional_constraints: any additional constraints to include for querying the WMS collector, default is None
@type factory_collector: string or None
@param factory_collector: the collector to query, special value 'default' will get it from the global config
@return: dictionary, each key is the name of a frontend. Each value has a 'requests' and a 'params' key. Both refer to classAd dictionaries.
"""
global factoryConfig
logSupport.log.debug("Querying collector for requests")
if factory_collector == DEFAULT_VAL:
factory_collector = factoryConfig.factory_collector
status_constraint = '(GlideinMyType=?="{}") && (ReqGlidein=?="{}@{}@{}")'.format(
factoryConfig.client_id,
entry_name,
glidein_name,
factory_name,
)
if supported_signtypes is not None:
status_constraint += ' && stringListMember({}{},"{}")'.format(
factoryConfig.client_web_prefix,
factoryConfig.client_web_signtype_suffix,
",".join(supported_signtypes),
)
if additional_constraints is not None:
status_constraint = f"(({status_constraint})&&({additional_constraints}))"
status = condorMonitor.CondorStatus(subsystem_name="any", pool_name=factory_collector)
status.require_integrity(True) # important, this dictates what gets submitted
status.glidein_name = glidein_name
status.entry_name = entry_name
# serialize access to the Collector accross all the processes
# these is a single Collector anyhow
lock_fname = os.path.join(factoryConfig.lock_dir, "gfi_status.lock")
if not os.path.exists(lock_fname): # create a lock file if needed
try:
fd = open(lock_fname, "w")
fd.close()
except Exception:
# could be a race condition
pass
with open(lock_fname, "r+") as fd:
fcntl.flock(fd, fcntl.LOCK_EX)
try:
status.load(status_constraint)
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
data = status.fetchStored()
reserved_names = (
"ReqName",
"ReqGlidein",
"ClientName",
"FrontendName",
"GroupName",
"ReqPubKeyID",
"ReqEncKeyCode",
"ReqEncIdentity",
"AuthenticatedIdentity",
)
out = {}
# copy over requests and parameters
for k in list(data.keys()):
kel = data[k]
el = {"requests": {}, "web": {}, "params": {}, "params_decrypted": {}, "monitor": {}, "internals": {}}
for key, prefix in (
("requests", factoryConfig.client_req_prefix),
("web", factoryConfig.client_web_prefix),
("params", factoryConfig.glidein_param_prefix),
("monitor", factoryConfig.glidein_monitor_prefix),
):
plen = len(prefix)
for attr in list(kel.keys()):
if attr in reserved_names:
continue # skip reserved names
if attr[:plen] == prefix:
el[key][attr[plen:]] = kel[attr]
if pub_key_obj is not None:
if "ReqPubKeyID" in kel:
try:
sym_key_obj = pub_key_obj.extract_sym_key(kel["ReqEncKeyCode"])
except Exception:
continue # bad key, ignore entry
else:
sym_key_obj = None # no key used, will not decrypt
else:
sym_key_obj = None # have no key, will not decrypt
if sym_key_obj is not None:
# this is verifying that the identity that the client claims to be is the identity that Condor thinks it is
try:
enc_identity = sym_key_obj.decrypt_hex(kel["ReqEncIdentity"])
except Exception:
logSupport.log.warning(
"Client %s provided invalid ReqEncIdentity, could not decode. Skipping for security reasons." % k
)
continue # corrupted classad
if enc_identity != kel["AuthenticatedIdentity"]:
logSupport.log.warning(
"Client %s provided invalid ReqEncIdentity(%s!=%s). Skipping for security reasons."
% (k, enc_identity, kel["AuthenticatedIdentity"])
)
continue # uh oh... either the client is misconfigured, or someone is trying to cheat
invalid_classad = False
for key, prefix in (("params_decrypted", factoryConfig.encrypted_param_prefix),):
plen = len(prefix)
for attr in list(kel.keys()):
if attr in reserved_names:
continue # skip reserved names
if attr[:plen] == prefix:
el[key][attr[plen:]] = None # define it even if I don't understand the content
if sym_key_obj is not None:
try:
el[key][attr[plen:]] = sym_key_obj.decrypt_hex(kel[attr])
except Exception:
invalid_classad = True
break # I don't understand it -> invalid
if invalid_classad:
logSupport.log.warning(
"At least one of the encrypted parameters for client %s cannot be decoded. Skipping for security reasons."
% k
)
continue # need to go this way as I may have problems in an inner loop
for attr in list(kel.keys()):
if attr in (
"ClientName",
"FrontendName",
"GroupName",
"ReqName",
"LastHeardFrom",
"ReqPubKeyID",
"AuthenticatedIdentity",
):
el["internals"][attr] = kel[attr]
out[k] = el
return out
###############################################################################
# Code to advertise glidefactory classads to the WMS Pool
###############################################################################
[docs]
class EntryClassad(classadSupport.Classad):
"""
This class describes the glidefactory classad. Factory advertises the
glidefactory classad to the user pool as an UPDATE_AD_GENERIC type classad
"""
def __init__(
self,
factory_name,
glidein_name,
entry_name,
trust_domain,
auth_method,
supported_signtypes,
pub_key_obj=None,
glidein_submit={},
glidein_attrs={},
glidein_params={},
glidein_monitors={},
glidein_stats={},
glidein_web_attrs={},
glidein_config_limits={},
):
"""Class Constructor
glidein_attrs is a dictionary of values to publish like {"Arch":"INTEL","MinDisk":200000}
similar for glidein_submits, glidein_params, glidein_monitor_monitors and the other dictionaries.
Args:
factory_name (str): Name of the factory
glidein_name (str): Name of the resource in the glideclient classad
entry_name (str): Name of the resource in the glidefactory classad
trust_domain (str): trust domain for this Entry
auth_method (str): the authentication methods this entry supports in glidein submission, i.e. grid_proxy, scitoken
supported_signtypes (str): suppported sign types, i.e. sha1 (comma separated list)
pub_key_obj (_type_, optional): GlideinKey - for the frontend to use in encryption. Defaults to None.
glidein_submit (dict, optional): Submit attributes in the configuration. Defaults to {}.
glidein_attrs (dict, optional): glidein attrs to be published, not be overwritten by Frontends. Defaults to {}.
glidein_params (dict, optional): params to be published, can be overwritten by Frontends. Defaults to {}.
glidein_monitors (dict, optional): monitor attrs to be published. Defaults to {}.
glidein_stats (dict, optional): aggregated Entry(entry) and Factory(total) statistics to be published. Defaults to {}.
glidein_web_attrs (dict, optional): Web attributes to be published. Defaults to {}.
glidein_config_limits (dict, optional): Factory configuration limits to be published. Defaults to {}.
"""
# TODO: rename glidein_ to entry_ (entry_monitors)?
global factoryConfig, advertizeGFCounter
classadSupport.Classad.__init__(self, factoryConfig.factory_id, "UPDATE_AD_GENERIC", "INVALIDATE_ADS_GENERIC")
# Shorthand for easy access
classad_name = f"{entry_name}@{glidein_name}@{factory_name}"
self.adParams["Name"] = classad_name
self.adParams["FactoryName"] = "%s" % factory_name
self.adParams["GlideinName"] = "%s" % glidein_name
self.adParams["EntryName"] = "%s" % entry_name
self.adParams[factoryConfig.factory_signtype_id] = "%s" % ",".join(supported_signtypes)
self.adParams["DaemonStartTime"] = int(start_time)
advertizeGFCounter[classad_name] = advertizeGFCounter.get(classad_name, -1) + 1
self.adParams["UpdateSequenceNumber"] = advertizeGFCounter[classad_name]
self.adParams["GlideinWMSVersion"] = factoryConfig.glideinwms_version
if pub_key_obj is not None:
self.adParams["PubKeyID"] = "%s" % pub_key_obj.get_pub_key_id()
self.adParams["PubKeyType"] = "%s" % pub_key_obj.get_pub_key_type()
self.adParams["PubKeyValue"] = "%s" % pub_key_obj.get_pub_key_value().decode("ascii").replace("\n", "\\n")
if "grid_proxy" in auth_method:
self.adParams["GlideinAllowx509_Proxy"] = "%s" % True
self.adParams["GlideinRequirex509_Proxy"] = "%s" % True
self.adParams["GlideinRequireGlideinProxy"] = "%s" % False
else:
self.adParams["GlideinAllowx509_Proxy"] = "%s" % False
self.adParams["GlideinRequirex509_Proxy"] = "%s" % False
self.adParams["GlideinRequireGlideinProxy"] = "%s" % True
# write out both the attributes, params and monitors
for prefix, data in (
(factoryConfig.glidein_submit_prefix, glidein_submit),
(factoryConfig.glidein_attr_prefix, glidein_attrs),
(factoryConfig.glidein_param_prefix, glidein_params),
(factoryConfig.glidein_monitor_prefix, glidein_monitors),
(factoryConfig.glidein_web_prefix, glidein_web_attrs),
(factoryConfig.glidein_config_prefix, glidein_config_limits),
):
for attr in list(data.keys()):
el = data[attr]
# TODO: ClassAd attribute names must be alphanumeric and _
# ad-hoc filter to be replaces with more exhaustive one
# not expecting more that initial +, so OK for now
if attr[0] == "+":
attr = f"_PLUS_{attr[1:]}"
if isinstance(el, int):
# don't quote ints
self.adParams[f"{prefix}{attr}"] = el
else:
escaped_el = str(el).replace("\n", "\\n")
self.adParams[f"{prefix}{attr}"] = "%s" % escaped_el
# write job completion statistics
if glidein_stats:
prefix = factoryConfig.glidein_monitor_prefix
for k, v in list(glidein_stats["entry"].items()):
self.adParams[f"{prefix}{k}"] = v
for k, v in list(glidein_stats["total"].items()):
self.adParams[f"{prefix}{k}"] = v
###############################################################################
# Code to advertise glidefactoryglobal classads to the WMS Pool
###############################################################################
[docs]
class FactoryGlobalClassad(classadSupport.Classad):
"""
This class describes the glidefactoryglobal classad. Factory advertises the
glidefactoryglobal classad to the user pool as an UPDATE_AD_GENERIC type classad
glidefactory and glidefactoryglobal classads must be of the same type because they may be
invalidated together (with a single command)
"""
def __init__(self, factory_name, glidein_name, supported_signtypes, pub_key_obj):
"""Class Constructor
:param factory_name: Name of the factory
:param glidein_name: Name of the resource in the glidefactoryglobal classad?
:param supported_signtypes: suppported sign types, i.e. sha1
:param pub_key_obj: GlideinKey - for the frontend to use in encryption
:return:
"""
global factoryConfig, advertizeGlobalCounter
classadSupport.Classad.__init__(
self, factoryConfig.factory_global, "UPDATE_AD_GENERIC", "INVALIDATE_ADS_GENERIC"
)
# Short hand for easy access
classad_name = f"{glidein_name}@{factory_name}"
self.adParams["Name"] = classad_name
self.adParams["FactoryName"] = "%s" % factory_name
self.adParams["GlideinName"] = "%s" % glidein_name
self.adParams[factoryConfig.factory_signtype_id] = "%s" % ",".join(supported_signtypes)
self.adParams["DaemonStartTime"] = int(start_time)
self.adParams["UpdateSequenceNumber"] = advertizeGlobalCounter
advertizeGlobalCounter += 1
self.adParams["GlideinWMSVersion"] = factoryConfig.glideinwms_version
self.adParams["PubKeyID"] = "%s" % pub_key_obj.get_pub_key_id()
self.adParams["PubKeyType"] = "%s" % pub_key_obj.get_pub_key_type()
self.adParams["PubKeyValue"] = "%s" % pub_key_obj.get_pub_key_value().decode("ascii").replace("\n", "\\n")
[docs]
def advertizeGlobal(
factory_name, glidein_name, supported_signtypes, pub_key_obj, stats_dict={}, factory_collector=DEFAULT_VAL
):
"""
Creates the glidefactoryglobal classad and advertises.
@type factory_name: string
@param factory_name: the name of the factory
@type glidein_name: string
@param glidein_name: name of the glidein
@type supported_signtypes: string
@param supported_signtypes: suppported sign types, i.e. sha1
@type pub_key_obj: GlideinKey
@param pub_key_obj: for the frontend to use in encryption
@type stats_dict: dict
@param stats_dict: completed jobs statistics
@type factory_collector: string or None
@param factory_collector: the collector to query, special value 'default' will get it from the global config
@todo add factory downtime?
"""
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_ad_gfg")
gfg_classad = FactoryGlobalClassad(factory_name, glidein_name, supported_signtypes, pub_key_obj)
try:
gfg_classad.writeToFile(tmpnam, append=False)
exe_condor_advertise(tmpnam, gfg_classad.adAdvertiseCmd, factory_collector=factory_collector)
finally:
# Unable to write classad
_remove_if_there(tmpnam)
[docs]
def deadvertizeGlidein(factory_name, glidein_name, entry_name, factory_collector=DEFAULT_VAL):
"""
Removes the glidefactory classad advertising the entry from the WMS Collector.
"""
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_gf")
# TODO: use tempfile
try:
with open(tmpnam, "w") as fd:
fd.write('MyType = "Query"\n')
fd.write('TargetType = "%s"\n' % factoryConfig.factory_id)
fd.write(
'Requirements = (Name == "%s@%s@%s")&&(GlideinMyType == "%s")\n'
% (entry_name, glidein_name, factory_name, factoryConfig.factory_id)
)
exe_condor_advertise(tmpnam, "INVALIDATE_ADS_GENERIC", factory_collector=factory_collector)
finally:
_remove_if_there(tmpnam)
[docs]
def deadvertizeGlobal(factory_name, glidein_name, factory_collector=DEFAULT_VAL):
"""
Removes the glidefactoryglobal classad advertising the factory globals from the WMS Collector.
"""
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_gfg")
# TODO: use tempfile
try:
with open(tmpnam, "w") as fd:
fd.write('MyType = "Query"\n')
fd.write('TargetType = "%s"\n' % factoryConfig.factory_global)
fd.write(
'Requirements = (Name == "%s@%s")&&(GlideinMyType == "%s")\n'
% (glidein_name, factory_name, factoryConfig.factory_id)
)
exe_condor_advertise(tmpnam, "INVALIDATE_ADS_GENERIC", factory_collector=factory_collector)
finally:
_remove_if_there(tmpnam)
[docs]
def deadvertizeFactory(factory_name, glidein_name, factory_collector=DEFAULT_VAL):
"""
Deadvertize all entry and global classads for this factory.
"""
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_fact")
# TODO: use tempfile
try:
with open(tmpnam, "w") as fd:
fd.write('MyType = "Query"\n')
fd.write('TargetType = "%s"\n' % factoryConfig.factory_id)
fd.write(f'Requirements = (FactoryName =?= "{factory_name}")&&(GlideinName =?= "{glidein_name}")\n')
exe_condor_advertise(tmpnam, "INVALIDATE_ADS_GENERIC", factory_collector=factory_collector)
finally:
_remove_if_there(tmpnam)
############################################################
# glidein_attrs is a dictionary of values to publish
# like {"Arch":"INTEL","MinDisk":200000}
# similar for glidein_params and glidein_monitor_monitors
[docs]
def advertizeGlideinClientMonitoring(
factory_name,
glidein_name,
entry_name,
client_name,
client_int_name,
client_int_req,
glidein_attrs={},
client_params={},
client_monitors={},
factory_collector=DEFAULT_VAL,
):
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_adm_gfc")
createGlideinClientMonitoringFile(
tmpnam,
factory_name,
glidein_name,
entry_name,
client_name,
client_int_name,
client_int_req,
glidein_attrs,
client_params,
client_monitors,
)
advertizeGlideinClientMonitoringFromFile(tmpnam, remove_file=True, factory_collector=factory_collector)
[docs]
class MultiAdvertizeGlideinClientMonitoring:
# glidein_attrs is a dictionary of values to publish
# like {"Arch":"INTEL","MinDisk":200000}
def __init__(self, factory_name, glidein_name, entry_name, glidein_attrs, factory_collector=DEFAULT_VAL):
self.factory_name = factory_name
self.glidein_name = glidein_name
self.entry_name = entry_name
self.glidein_attrs = glidein_attrs
self.client_data = []
self.factory_collector = factory_collector
[docs]
def add(
self, client_name, client_int_name, client_int_req, client_params={}, client_monitors={}, limits_triggered={}
):
el = {
"client_name": client_name,
"client_int_name": client_int_name,
"client_int_req": client_int_req,
"client_params": client_params,
"client_monitors": client_monitors,
"limits_triggered": limits_triggered,
}
self.client_data.append(el)
# do the actual advertizing
# can throw MultiExeError
[docs]
def do_advertize(self):
if factoryConfig.advertise_use_multi:
self.do_advertize_multi()
else:
self.do_advertize_iterate()
self.client_data = []
# INTERNAL
[docs]
def do_advertize_iterate(self):
error_arr = []
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_ad_gfc")
for el in self.client_data:
createGlideinClientMonitoringFile(
tmpnam,
self.factory_name,
self.glidein_name,
self.entry_name,
el["client_name"],
el["client_int_name"],
el["client_int_req"],
self.glidein_attrs,
el["client_params"],
el["client_monitors"],
)
try:
advertizeGlideinClientMonitoringFromFile(
tmpnam, remove_file=True, factory_collector=self.factory_collector
)
except condorExe.ExeError as e:
error_arr.append(e)
if len(error_arr) > 0:
raise MultiExeError(error_arr)
[docs]
def do_advertize_multi(self):
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_adm_gfc")
ap = False
for el in self.client_data:
createGlideinClientMonitoringFile(
tmpnam,
self.factory_name,
self.glidein_name,
self.entry_name,
el["client_name"],
el["client_int_name"],
el["client_int_req"],
self.glidein_attrs,
el["client_params"],
el["client_monitors"],
do_append=ap,
)
ap = True # Append from here on
if ap:
error_arr = []
try:
advertizeGlideinClientMonitoringFromFile(
tmpnam, remove_file=True, is_multi=True, factory_collector=self.factory_collector
)
except condorExe.ExeError as e:
error_arr.append(e)
if len(error_arr) > 0:
raise MultiExeError(error_arr)
[docs]
def writeToMultiClassadFile(self, filename=None, append=True):
# filename: Name of the file to write classads to
# append: Wether the classads need to be appended to the file
# If we create file append is in a way ignored
if filename is None:
filename = classadSupport.generate_classad_filename(prefix="gfi_adm_gfc")
append = False
for el in self.client_data:
createGlideinClientMonitoringFile(
filename,
self.factory_name,
self.glidein_name,
self.entry_name,
el["client_name"],
el["client_int_name"],
el["client_int_req"],
self.glidein_attrs,
el["client_params"],
el["client_monitors"],
el["limits_triggered"],
do_append=append,
)
# Append from here on anyways
append = True
return filename
##############################
# Start INTERNAL
# glidein_attrs is a dictionary of values to publish
# like {"Arch":"INTEL","MinDisk":200000}
# similar for glidein_params and glidein_monitor_monitors
[docs]
def createGlideinClientMonitoringFile(
fname,
factory_name,
glidein_name,
entry_name,
client_name,
client_int_name,
client_int_req,
glidein_attrs={},
client_params={},
client_monitors={},
limits_triggered={},
do_append=False,
):
global factoryConfig
global advertizeGFCCounter
if do_append:
open_type = "a"
else:
open_type = "w"
try:
with open(fname, open_type) as fd:
limits = ("IdleGlideinsPerEntry", "HeldGlideinsPerEntry", "TotalGlideinsPerEntry")
for limit in limits:
if limit in limits_triggered:
fd.write(
'%sStatus_GlideFactoryLimit%s = "%s"\n'
% (factoryConfig.glidein_monitor_prefix, limit, limits_triggered[limit])
)
all_frontends = limits_triggered.get("all_frontends")
for fe_sec_class in all_frontends:
sec_class_limits = ("IdlePerClass_%s" % fe_sec_class, "TotalPerClass_%s" % fe_sec_class)
for limit in sec_class_limits:
if limit in limits_triggered:
fd.write(
'%sStatus_GlideFactoryLimit%s = "%s"\n'
% (factoryConfig.glidein_monitor_prefix, limit, limits_triggered[limit])
)
fd.write('MyType = "%s"\n' % factoryConfig.factoryclient_id)
fd.write('GlideinMyType = "%s"\n' % factoryConfig.factoryclient_id)
fd.write('GlideinWMSVersion = "%s"\n' % factoryConfig.glideinwms_version)
fd.write('Name = "%s"\n' % client_name)
fd.write(f'ReqGlidein = "{entry_name}@{glidein_name}@{factory_name}"\n')
fd.write('ReqFactoryName = "%s"\n' % factory_name)
fd.write('ReqGlideinName = "%s"\n' % glidein_name)
fd.write('ReqEntryName = "%s"\n' % entry_name)
fd.write('ReqClientName = "%s"\n' % client_int_name)
fd.write('ReqClientReqName = "%s"\n' % client_int_req)
# fd.write('DaemonStartTime = %li\n'%start_time)
advertizeGFCCounter[client_name] = advertizeGFCCounter.get(client_name, -1) + 1
fd.write("UpdateSequenceNumber = %i\n" % advertizeGFCCounter[client_name])
# write out both the attributes, params and monitors
for prefix, data in (
(factoryConfig.glidein_attr_prefix, glidein_attrs),
(factoryConfig.glidein_param_prefix, client_params),
(factoryConfig.glidein_monitor_prefix, client_monitors),
):
for attr in list(data.keys()):
el = data[attr]
if isinstance(el, int):
# don't quote ints
fd.write(f"{prefix}{attr} = {el}\n")
else:
escaped_el = str(el).replace('"', '\\"')
fd.write(f'{prefix}{attr} = "{escaped_el}"\n')
# add a final empty line... useful when appending
fd.write("\n")
except Exception:
# remove file in case of problems
if os.path.exists(fname):
os.remove(fname)
raise
# Given a file, advertize
# Can throw a CondorExe/ExeError exception
[docs]
def advertizeGlideinClientMonitoringFromFile(fname, remove_file=True, is_multi=False, factory_collector=DEFAULT_VAL):
if os.path.exists(fname):
try:
logSupport.log.info("Advertising glidefactoryclient classads")
exe_condor_advertise(fname, "UPDATE_LICENSE_AD", is_multi=is_multi, factory_collector=factory_collector)
except Exception:
logSupport.log.warning("Advertising glidefactoryclient classads failed")
logSupport.log.exception("Advertising glidefactoryclient classads failed: ")
if remove_file:
os.remove(fname)
else:
logSupport.log.warning(
"glidefactoryclient classad file %s does not exist. Check if frontends are allowed to submit to entry"
% fname
)
[docs]
def advertizeGlideinFromFile(fname, remove_file=True, is_multi=False, factory_collector=DEFAULT_VAL):
if os.path.exists(fname):
try:
logSupport.log.info("Advertising glidefactory classads")
exe_condor_advertise(fname, "UPDATE_AD_GENERIC", is_multi=is_multi, factory_collector=factory_collector)
except Exception:
logSupport.log.warning("Advertising glidefactory classads failed")
logSupport.log.exception("Advertising glidefactory classads failed: ")
if remove_file:
os.remove(fname)
else:
logSupport.log.warning(
"glidefactory classad file %s does not exist. Check if you have atleast one entry enabled" % fname
)
# End INTERNAL
###########################################
# remove classads from Collector
[docs]
def deadvertizeAllGlideinClientMonitoring(factory_name, glidein_name, entry_name, factory_collector=DEFAULT_VAL):
"""
Deadvertize monitoring classads for the given entry.
"""
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_gfc")
# TODO: use tempfile
try:
with open(tmpnam, "w") as fd:
fd.write('MyType = "Query"\n')
fd.write('TargetType = "%s"\n' % factoryConfig.factoryclient_id)
fd.write(
'Requirements = (ReqGlidein == "%s@%s@%s")&&(GlideinMyType == "%s")\n'
% (entry_name, glidein_name, factory_name, factoryConfig.factoryclient_id)
)
exe_condor_advertise(tmpnam, "INVALIDATE_LICENSE_ADS", factory_collector=factory_collector)
finally:
_remove_if_there(tmpnam)
[docs]
def deadvertizeFactoryClientMonitoring(factory_name, glidein_name, factory_collector=DEFAULT_VAL):
"""
Deadvertize all monitoring classads for this factory.
"""
tmpnam = classadSupport.generate_classad_filename(prefix="gfi_de_gfc")
# TODO: use tempfile
try:
with open(tmpnam, "w") as fd:
fd.write('MyType = "Query"\n')
fd.write('TargetType = "%s"\n' % factoryConfig.factoryclient_id)
fd.write(
'Requirements = (ReqFactoryName=?="%s")&&(ReqGlideinName=?="%s")&&(GlideinMyType == "%s")'
% (factory_name, glidein_name, factoryConfig.factoryclient_id)
)
exe_condor_advertise(tmpnam, "INVALIDATE_LICENSE_ADS", factory_collector=factory_collector)
finally:
_remove_if_there(tmpnam)
############################################################
#
# I N T E R N A L - Do not use
#
############################################################
[docs]
def _remove_if_there(fname):
"""Remove the file and ignore errors (e.g. file not there)"""
try:
os.remove(fname)
except OSError:
# Do the possible to remove the file if there
pass
# serialize access to the Collector accross all the processes
# these is a single Collector anyhow
[docs]
def exe_condor_advertise(fname, command, is_multi=False, factory_collector=None):
global factoryConfig
if factory_collector == DEFAULT_VAL:
factory_collector = factoryConfig.factory_collector
lock_fname = os.path.join(factoryConfig.lock_dir, "gfi_advertize.lock")
if not os.path.exists(lock_fname): # create a lock file if needed
try:
fd = open(lock_fname, "w")
fd.close()
except Exception:
# could be a race condition
pass
with open(lock_fname, "r+") as fd:
fcntl.flock(fd, fcntl.LOCK_EX)
try:
ret = condorManager.condorAdvertise(
fname, command, factoryConfig.advertise_use_tcp, is_multi, factory_collector
)
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
return ret