# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module implements classes to query the condor daemons and manipulate the results
Please notice that it also converts \" into "
"""
import copy
import os
import socket
import xml.parsers.expat
from itertools import groupby
from . import condorExe, condorSecurity
USE_HTCONDOR_PYTHON_BINDINGS = False
try:
# NOTE:
# import htcondor tries to look for CONDOR_CONFIG in the usual search path
# If it cannot be found, it will print annoying error message which
# neither goes to stdout nor stderr. It was not clear how to mask this
# message so we may have to live with it till then
# In case of non default locations of CONDOR_CONFIG, frontend will always
# set the CONDOR_CONFIG appropriately before every command. Since import
# happens before frontend can do anything, htcondor module is initialized
# without the knowledge of CONDOR_CONFIG. A reload is needed.
# Furthemore _CONDOR_ variables are ignored by htcondor and need to be added
# manually to htcondor.param.
# This mandates that we do a htcondor_full_reload() every time to use the bindings.
import classad # pylint: disable=import-error
import htcondor # pylint: disable=import-error
USE_HTCONDOR_PYTHON_BINDINGS = True
except ImportError:
# TODO Maybe we should print a message here? Even though I don't know if
# logSupport has been initialized. But I'd try to put it log.debug
pass
[docs]
def htcondor_full_reload():
HTCONDOR_ENV_PREFIX = "_CONDOR_"
HTCONDOR_ENV_PREFIX_LEN = len(HTCONDOR_ENV_PREFIX) # len of _CONDOR_ = 8
if not USE_HTCONDOR_PYTHON_BINDINGS:
return
# Reload configuration reading CONDOR_CONFIG from the environment
htcondor.reload_config()
# _CONDOR_ variables need to be added manually to _Params
for i in os.environ:
if i.startswith(HTCONDOR_ENV_PREFIX):
htcondor.param[i[HTCONDOR_ENV_PREFIX_LEN:]] = os.environ[i]
#
# Configuration
#
# Set path to condor binaries
[docs]
def set_path(new_condor_bin_path):
global condor_bin_path
condor_bin_path = new_condor_bin_path
#
# Exceptions
#
[docs]
class QueryError(RuntimeError):
"""
Thrown when there are exceptions using htcondor python bindings or commands
"""
def __init__(self, err_str):
RuntimeError.__init__(self, err_str)
[docs]
class PBError(RuntimeError):
"""
Thrown when there are exceptions using htcondor python bindings
"""
def __init__(self, err_str):
RuntimeError.__init__(self, err_str)
#
# Caching classes
#
[docs]
class NoneScheddCache:
"""
Dummy caching class, when you don't want caching.
Used as base class below, too
"""
[docs]
def getScheddId(self, schedd_name, pool_name):
"""
Given the schedd name and pool name, return a tuple (ScheddId, dict)
@param schedd_name: Name of the schedd
@type schedd_name: string
@param pool_name: Name of the pool
@type pool_name: string
@return: tuple (schedd string, LOCAL_DIR)
"""
return (self.iGetCmdScheddStr(schedd_name), {})
[docs]
def iGetCmdScheddStr(self, schedd_name):
if schedd_name is None:
schedd_str = ""
else:
schedd_str = "-name %s " % schedd_name
return schedd_str
[docs]
class LocalScheddCache(NoneScheddCache):
"""
Schedd caching class
The schedd can be found either through -name attr or through the
local disk lookup. Remember which one to use.
"""
def __init__(self):
self.enabled = True
# dict of (schedd_name,pool_name)=>(cms arg schedd string,env)
self.cache = {}
self.my_ips = socket.gethostbyname_ex(socket.gethostname())[2]
try:
self.my_ips += socket.gethostbyname_ex("localhost")[2]
except socket.gaierror:
pass # localhost not defined, ignore
[docs]
def enable(self):
self.enabled = True
[docs]
def disable(self):
self.enabled = False
[docs]
def getScheddId(self, schedd_name, pool_name):
"""
Given the schedd name and pool name, get a tuple containing
(ScheddId, dict)
@param schedd_name: Name of the schedd
@type schedd_name: string
@param pool_name: Name of the pool
@type pool_name: string
@return: tuple (schedd string, env)
"""
if schedd_name is None: # special case, do not cache
return ("", {})
if self.enabled:
k = (schedd_name, pool_name)
if k not in self.cache: # not in cache, discover it
env = self.iGetEnv(schedd_name, pool_name)
if env is None:
self.cache[k] = (self.iGetCmdScheddStr(schedd_name), {})
else:
self.cache[k] = ("", env)
return self.cache[k]
else:
# Not enabled, just return the str
return (self.iGetCmdScheddStr(schedd_name), {})
#
# PRIVATE
#
# return None if not found
# Can raise exceptions
[docs]
def iGetEnv(self, schedd_name, pool_name):
global disk_cache
data = disk_cache.get(schedd_name + ".igetenv") # pylint: disable=assignment-from-none
if data is None:
cs = CondorStatus("schedd", pool_name)
data = cs.fetch(
constraint='Name=?="%s"' % schedd_name,
format_list=[("ScheddIpAddr", "s"), ("SPOOL_DIR_STRING", "s"), ("LOCAL_DIR_STRING", "s")],
)
disk_cache.save(schedd_name + ".igetenv", data)
if schedd_name not in data:
raise RuntimeError("Schedd '%s' not found" % schedd_name)
el = data[schedd_name]
if "SPOOL_DIR_STRING" not in el and "LOCAL_DIR_STRING" not in el:
# not advertising, cannot use disk optimization
return None
if "ScheddIpAddr" not in el:
# This should never happen
raise RuntimeError("Schedd '%s' is not advertising ScheddIpAddr" % schedd_name)
schedd_ip = el["ScheddIpAddr"][1:].split(":")[0]
if schedd_ip in self.my_ips: # seems local, go for the dir
l_dir = el.get("SPOOL_DIR_STRING", el.get("LOCAL_DIR_STRING"))
if os.path.isdir(l_dir): # making sure the directory exists
if "SPOOL_DIR_STRING" in el:
return {"_CONDOR_SPOOL": "%s" % l_dir}
else: # LOCAL_DIR_STRING, assuming spool is LOCAL_DIR_STRING/spool
if os.path.isdir("%s/spool" % l_dir):
return {"_CONDOR_SPOOL": "%s/spool" % l_dir}
else:
# dir does not exist, not relevant, revert to standard behaviour
return None
else:
# not local
return None
# The class does not belong here, it should be in the disk_cache module.
# However, condorMonitor is not importing anything from glideinwms.lib, it is a standalon module
# We might revisit this in the future
[docs]
class NoneDiskCache:
"""Dummy class used if a regular DiskCache is not specified"""
[docs]
def get(self, objid):
return None
[docs]
def save(self, objid, obj):
return None
# default global object
local_schedd_cache = LocalScheddCache()
disk_cache = NoneDiskCache()
[docs]
def condorq_attrs(q_constraint, attribute_list):
"""
Retrieves a list of a single item from the all the factory queues.
"""
attr_str = ""
for attr in attribute_list:
attr_str += " -attr %s" % attr
xml_data = condorExe.exe_cmd("condor_q", f"-g -l {attr_str} -xml -constraint '{q_constraint}'")
classads_xml = []
tmp_list = []
for line in xml_data:
# look for the xml header
if line[:5] == "<?xml":
if len(tmp_list) > 0:
classads_xml.append(tmp_list)
tmp_list = []
tmp_list.append(line)
q_proxy_list = []
for ad_xml in classads_xml:
cred_list = xml2list(ad_xml)
q_proxy_list.extend(cred_list)
return q_proxy_list
#
# Condor monitoring classes
#
[docs]
class AbstractQuery:
"""
Pure virtual class to have a minimum set of methods defined
"""
[docs]
def fetch(self, constraint=None, format_list=None):
"""
Fetch the classad attributes specified in the format_list
matching the constraint
@rtype: dict
@return: Dict containing classad attributes and values
"""
raise NotImplementedError("Fetch not implemented")
[docs]
def load(self, constraint=None, format_list=None):
"""
Fetch the data and store it in self.stored_data
"""
raise NotImplementedError("Load not implemented")
[docs]
def fetchStored(self, constraint_func=None):
"""
@param constraint_func: A boolean function, with only one argument
(data el). If None, return all the data.
@type constraint_func: Boolean function
@rtype: dict
@return: Same as fetch(), but limited to constraint_func(el)==True
"""
raise NotImplementedError("fetchStored not implemented")
[docs]
class StoredQuery(AbstractQuery):
"""
Virtual class that implements fetchStored
"""
stored_data = {}
[docs]
def fetchStored(self, constraint_func=None):
"""
@param constraint_func: A boolean function, with only one argument
(data el). If None, return all the data.
@type constraint_func: Boolean function
@rtype: dict
@return: Same as fetch(), but limited to constraint_func(el)==True
"""
return applyConstraint(self.stored_data, constraint_func)
[docs]
class CondorQEdit:
"""
Fully implemented class that executes condorq_edit commands. Only provides a method to do bulk
updates of jobs using transaction-based API and the condor python bindings. Cannot be used without
python bindings
"""
def __init__(self, pool_name=None, schedd_name=None):
"""Raises QueryError in case python bindings are not available"""
self.pool_name = pool_name
self.schedd_name = schedd_name
if not USE_HTCONDOR_PYTHON_BINDINGS:
raise QueryError("QEdit class only implemented with python bindings")
[docs]
def executeAll(self, joblist=None, attributes=None, values=None):
"""Given equal sized lists of job ids, attributes and values,
executes in one large transaction a single qedit for each job.
"""
global disk_cache
joblist = joblist or []
attributes = attributes or []
values = values or []
if not (len(joblist) == len(attributes) == len(values)):
raise QueryError("Arguments to QEdit.executeAll should have the same length")
try:
htcondor_full_reload()
if self.pool_name:
collector = htcondor.Collector(str(self.pool_name))
else:
collector = htcondor.Collector()
if self.schedd_name:
schedd_ad = disk_cache.get(self.schedd_name + ".locate") # pylint: disable=assignment-from-none
if schedd_ad is None:
schedd_ad = collector.locate(htcondor.DaemonTypes.Schedd, self.schedd_name)
disk_cache.save(self.schedd_name + ".locate", schedd_ad)
schedd = htcondor.Schedd(schedd_ad)
else:
schedd = htcondor.Schedd()
with schedd.transaction() as _:
for jobid, attr, val in zip(joblist, attributes, values):
schedd.edit([jobid], attr, classad.quote(val))
except Exception as ex:
s = "default"
if self.schedd_name is not None:
s = self.schedd_name
p = "default"
if self.pool_name is not None:
p = self.pool_name
try:
j1 = jobid
j2 = attr
j3 = val
except Exception:
j1 = j2 = j3 = "unknown"
err_str = (
"Error querying schedd %s in pool %s using python bindings (qedit of job/attr/val %s/%s/%s): %s"
% (s, p, j1, j2, j3, ex)
)
raise QueryError(err_str) from ex
#
# format_list is a list of
# (attr_name, attr_type)
# where attr_type is one of
# "s" - string
# "i" - integer
# "r" - real (float)
# "b" - bool
#
#
# security_obj, if defined, should be a child of condorSecurity.ProtoRequest
[docs]
class CondorQuery(StoredQuery):
"""
Fully implemented class that executes condor commands
"""
def __init__(self, exe_name, resource_str, group_attribute, pool_name=None, security_obj=None, env={}):
self.exe_name = exe_name
self.env = env
self.resource_str = resource_str
self.group_attribute = group_attribute
self.pool_name = pool_name
if pool_name is None:
self.pool_str = ""
else:
self.pool_str = "-pool %s" % pool_name
if security_obj is not None:
if security_obj.has_saved_state():
raise RuntimeError("Cannot use a security object which has saved state.")
self.security_obj = copy.deepcopy(security_obj)
else:
self.security_obj = condorSecurity.ProtoRequest()
[docs]
def require_integrity(self, requested_integrity):
"""Set client integrity settings to use for condor commands
Args:
requested_integrity (str): HTCondor integrity level
"""
if requested_integrity is None:
condor_val = None
elif requested_integrity:
condor_val = "REQUIRED"
else:
# Not required, set OPTIONAL if the other side requires it
condor_val = "OPTIONAL"
self.security_obj.set("CLIENT", "INTEGRITY", condor_val)
[docs]
def get_requested_integrity(self):
"""Get the current integrity settings
Returns: None->None; REQUIRED->True; OPTIONAL->False
"""
condor_val = self.security_obj.get("CLIENT", "INTEGRITY")
if condor_val is None:
return None
return condor_val == "REQUIRED"
[docs]
def require_encryption(self, requested_encryption):
"""Set client encryption settings to use for condor commands
Args:
requested_encryption (str): HTCondor encryption level
"""
if requested_encryption is None:
condor_val = None
elif requested_encryption:
condor_val = "REQUIRED"
else:
# Not required, set OPTIONAL if the other side requires it
condor_val = "OPTIONAL"
self.security_obj.set("CLIENT", "ENCRYPTION", condor_val)
[docs]
def get_requested_encryption(self):
"""Get the current encryption settings
Returns: None->None; REQUIRED->True; OPTIONAL->False
"""
condor_val = self.security_obj.get("CLIENT", "ENCRYPTION")
if condor_val is None:
return None
return condor_val == "REQUIRED"
[docs]
def fetch(self, constraint=None, format_list=None):
"""Return the results obtained using HTCondor commands or python bindings
Args:
constraint (str): query constraint
format_list (list): Classad attr & type. [(attr1, 'i'), ('attr2', 's')]
Returns (dict): Dict containing the query results
"""
try:
if USE_HTCONDOR_PYTHON_BINDINGS:
return self.fetch_using_bindings(constraint=constraint, format_list=format_list)
else:
return self.fetch_using_exe(constraint=constraint, format_list=format_list)
except Exception as ex:
err_str = (
"Error executing htcondor query to pool %s with constraint %s and format_list %s: %s. Env is %s"
% (self.pool_name, constraint, format_list, ex, os.environ)
)
raise QueryError(err_str) from ex
[docs]
def fetch_using_exe(self, constraint=None, format_list=None):
"""Return the results obtained from executing the HTCondor query command
Args:
constraint (str): Constraints to be applied to the query
format_list (list): Classad attr & type. [(attr1, 'i'), ('attr2', 's')]
Returns (dict): Dict containing the results
"""
if constraint is None:
constraint_str = ""
else:
constraint_str = "-constraint '%s'" % constraint
full_xml = format_list is None
if format_list is not None:
format_arr = []
for format_el in format_list:
attr_name, attr_type = format_el
attr_format = {"s": "%s", "i": "%i", "r": "%f", "b": "%i"}[attr_type]
format_arr.append(f'-format "{attr_format}" "{attr_name}"')
format_str = " ".join(format_arr)
# set environment for security settings
self.security_obj.save_state()
try:
self.security_obj.enforce_requests()
if full_xml:
xml_data = condorExe.exe_cmd(
self.exe_name, f"{self.resource_str} -xml {self.pool_str} {constraint_str}", env=self.env
)
else:
# format_str is defined because full_xml False means (format_list is not None)
xml_data = condorExe.exe_cmd(
self.exe_name,
f"{self.resource_str} {format_str} -xml {self.pool_str} {constraint_str}", # pylint: disable=E0606
env=self.env,
)
finally:
# restore old security context
self.security_obj.restore_state()
list_data = xml2list(xml_data)
del xml_data
dict_data = list2dict(list_data, self.group_attribute)
return dict_data
[docs]
def fetch_using_bindings(self, constraint=None, format_list=None):
"""Fetch the results using htcondor-python bindings
Args:
constraint (str): Constraints to be applied to the query
format_list (list): Classad attr & type. [(attr1, 'i'), ('attr2', 's')]
Returns (dict): Dict containing the results
Raises:
NotImplementedError: the operation is not implemented using bindings
"""
raise NotImplementedError("fetch_using_bindings() not implemented")
[docs]
def load(self, constraint=None, format_list=None):
"""
Fetch the results and cache it in self.stored_data
"""
self.stored_data = self.fetch(constraint, format_list)
def __repr__(self):
output = "%s:\n" % self.__class__.__name__
output += "exe_name = %s\n" % str(self.exe_name)
output += "env = %s\n" % str(self.env)
output += "resource_str = %s\n" % str(self.resource_str)
output += "group_attribute = %s\n" % str(self.group_attribute)
output += "pool_name = %s\n" % str(self.pool_name)
output += "pool_str = %s\n" % str(self.pool_str)
output += "security_obj = %s\n" % str(self.security_obj)
output += "used_python_bindings = %s\n" % USE_HTCONDOR_PYTHON_BINDINGS
output += "stored_data = %s" % str(self.stored_data)
return output
[docs]
class CondorQ(CondorQuery):
"""Class to implement condor_q. Uses htcondor-python bindings if possible."""
def __init__(self, schedd_name=None, pool_name=None, security_obj=None, schedd_lookup_cache=local_schedd_cache):
self.schedd_name = schedd_name
if schedd_lookup_cache is None:
schedd_lookup_cache = NoneScheddCache()
schedd_str, env = schedd_lookup_cache.getScheddId(schedd_name, pool_name)
CondorQuery.__init__(self, "condor_q", schedd_str, ["ClusterId", "ProcId"], pool_name, security_obj, env)
[docs]
def fetch(self, constraint=None, format_list=None):
if format_list is not None:
# If format_list, make sure ClusterId and ProcId are present
format_list = complete_format_list(format_list, [("ClusterId", "i"), ("ProcId", "i")])
return CondorQuery.fetch(self, constraint=constraint, format_list=format_list)
[docs]
def fetch_using_bindings(self, constraint=None, format_list=None):
"""Fetch the condor_q results using htcondor-python bindings
Args:
constraint (str): Constraints to be applied to the query
format_list (list): Classad attr & type. [(attr1, 'i'), ('attr2', 's')]
Returns (dict): Dict containing the results
"""
global disk_cache
results_dict = {} # defined here in case of exception
constraint = bindings_friendly_constraint(constraint)
attrs = bindings_friendly_attrs(format_list)
self.security_obj.save_state()
try:
self.security_obj.enforce_requests()
htcondor_full_reload()
if self.pool_name:
collector = htcondor.Collector(str(self.pool_name))
else:
collector = htcondor.Collector()
if self.schedd_name is None:
schedd = htcondor.Schedd()
else:
schedd_ad = disk_cache.get(self.schedd_name + ".locate") # pylint: disable=assignment-from-none
if schedd_ad is None:
schedd_ad = collector.locate(htcondor.DaemonTypes.Schedd, self.schedd_name)
disk_cache.save(self.schedd_name + ".locate", schedd_ad)
schedd = htcondor.Schedd(schedd_ad)
results = schedd.query(constraint, attrs)
results_dict = list2dict(results, self.group_attribute)
except Exception as ex:
s = "default"
if self.schedd_name is not None:
s = self.schedd_name
p = "default"
if self.pool_name is not None:
p = self.pool_name
err_str = f"Error querying schedd {s} in pool {p} using python bindings: {ex}"
raise PBError(err_str) from ex
finally:
self.security_obj.restore_state()
return results_dict
[docs]
class CondorStatus(CondorQuery):
"""
Class to implement condor_status. Uses htcondor-python bindings if possible.
"""
def __init__(self, subsystem_name=None, pool_name=None, security_obj=None):
if subsystem_name is None:
subsystem_str = ""
else:
subsystem_str = "-%s" % subsystem_name
CondorQuery.__init__(self, "condor_status", subsystem_str, "Name", pool_name, security_obj, {})
[docs]
def fetch(self, constraint=None, format_list=None):
if format_list is not None:
# If format_list, make sure Name is present
format_list = complete_format_list(format_list, [("Name", "s")])
return CondorQuery.fetch(self, constraint=constraint, format_list=format_list)
[docs]
def fetch_using_bindings(self, constraint=None, format_list=None):
"""
Fetch the results using htcondor-python bindings
"""
results_dict = {} # defined here in case of exception
constraint = bindings_friendly_constraint(constraint)
attrs = bindings_friendly_attrs(format_list)
adtype = resource_str_to_py_adtype(self.resource_str)
self.security_obj.save_state()
try:
self.security_obj.enforce_requests()
htcondor_full_reload()
if self.pool_name:
collector = htcondor.Collector(str(self.pool_name))
else:
collector = htcondor.Collector()
results = collector.query(adtype, constraint, attrs)
results_dict = list2dict(results, self.group_attribute)
except Exception as ex:
p = "default"
if self.pool_name is not None:
p = self.pool_name
err_str = f"Error querying pool {p} using python bindings: {ex}"
raise PBError(err_str) from ex
finally:
self.security_obj.restore_state()
return results_dict
#
# Subquery classes
#
[docs]
class BaseSubQuery(StoredQuery):
"""
Virtual class that implements fetch for StoredQuery()
"""
def __init__(self, query, subquery_func):
self.query = query
self.subquery_func = subquery_func
[docs]
def fetch(self, constraint=None):
indata = self.query.fetch(constraint)
return self.subquery_func(self, indata)
#
# NOTE: You need to call load on the SubQuery object to use fetchStored
# and had query.load issued before
#
[docs]
def load(self, constraint=None):
indata = self.query.fetchStored(constraint)
self.stored_data = self.subquery_func(indata)
[docs]
class SubQuery(BaseSubQuery):
"""
Fully usable subquery class, typically used on CondorQ() or CondorStatus()
"""
def __init__(self, query, constraint_func=None):
BaseSubQuery.__init__(self, query, lambda d: applyConstraint(d, constraint_func))
def __repr__(self):
output = "%s:\n" % self.__class__.__name__
# output += "client_name = %s\n" % str(self.client_name)
# output += "entry_name = %s\n" % str(self.entry_name)
# output += "factory_name = %s\n" % str(self.factory_name)
# output += "glidein_name = %s\n" % str(self.glidein_name)
# output += "schedd_name = %s\n" % str(self.schedd_name)
output += "stored_data = %s" % str(self.stored_data)
return output
[docs]
class Group(BaseSubQuery):
"""
Sub Query class with grouping functionality
Each element has a value that is the summary of the values in a group
"""
def __init__(self, query, group_key_func, group_data_func):
"""
group_key_func - Key extraction function
One argument: classad dictionary
Returns: value of the group key
group_data_func - Group extraction function
One argument: list of classad dictionaries
Returns: a summary classad dictionary
"""
BaseSubQuery.__init__(self, query, lambda d: doGroup(d, group_key_func, group_data_func))
[docs]
class NestedGroup(BaseSubQuery):
"""
Sub Query class with grouping functionality to create nested results
Each element is a dictionary with elements reduced from the original elements in the group
"""
def __init__(self, query, group_key_func, group_element_func=None):
"""
group_key_func - Key extraction function
One argument: classad dictionary
Returns: value of the group key
group_element_func - Group extraction function
One argument: list of tuples (key, classad dictionaries)
Returns: a dictionary of classad dictionary
If None, 'dict' is used
"""
BaseSubQuery.__init__(self, query, lambda d: doNestedGroup(d, group_key_func, group_element_func))
[docs]
class Summarize:
"""
Summarizing classes
"""
# hash_func - Hashing function
# One argument: classad dictionary
# Returns: hash value
# if None, will not be counted
# if a list, all elements will be used
def __init__(self, query, hash_func=lambda x: 1):
self.query = query
self.hash_func = hash_func
# Parameters:
# constraint - string to be passed to query.fetch()
# hash_func - if !=None, use this instead of the main one
# Returns a dictionary of hash values
# Elements are counts (or more dictionaries if hash returns lists)
[docs]
def count(self, constraint=None, hash_func=None, flat_hash=False):
data = self.query.fetch(constraint)
if flat_hash:
return fetch2count_flat(data, self.getHash(hash_func))
return fetch2count(data, self.getHash(hash_func))
# Use data pre-stored in query
# Same output as count
[docs]
def countStored(self, constraint_func=None, hash_func=None, flat_hash=False):
data = self.query.fetchStored(constraint_func)
if flat_hash:
return fetch2count_flat(data, self.getHash(hash_func))
return fetch2count(data, self.getHash(hash_func))
# Parameters, same as count
# Returns a dictionary of hash values
# Elements are lists of keys (or more dictionaries if hash returns lists)
[docs]
def list(self, constraint=None, hash_func=None):
data = self.query.fetch(constraint)
return fetch2list(data, self.getHash(hash_func))
# Use data pre-stored in query
# Same output as list
[docs]
def listStored(self, constraint_func=None, hash_func=None):
data = self.query.fetchStored(constraint_func)
return fetch2list(data, self.getHash(hash_func))
### Internal
[docs]
def getHash(self, hash_func):
if hash_func is None:
return self.hash_func
else:
return hash_func
############################################################
#
# P R I V A T E, do not use
#
############################################################
# check that req_format_els are present in in_format_list, and if not add them
# return a new format_list
#
# Convert Condor XML to list
#
# For Example:
#
# <?xml version="1.0"?>
# <!DOCTYPE classads SYSTEM "classads.dtd">
# <classads>
# <c>
# <a n="MyType"><s>Job</s></a>
# <a n="TargetType"><s>Machine</s></a>
# <a n="AutoClusterId"><i>0</i></a>
# <a n="ExitBySignal"><b v="f"/></a>
# <a n="TransferOutputRemaps"><un/></a>
# <a n="WhenToTransferOutput"><s>ON_EXIT</s></a>
# </c>
# <c>
# <a n="MyType"><s>Job</s></a>
# <a n="TargetType"><s>Machine</s></a>
# <a n="AutoClusterId"><i>0</i></a>
# <a n="OnExitRemove"><b v="t"/></a>
# <a n="x509userproxysubject"><s>/DC=gov/DC=fnal/O=Fermilab/OU=People/CN=Igor Sfiligoi/UID=sfiligoi</s></a>
# </c>
# </classads>
#
# 3 xml2list XML handler functions
[docs]
def xml2list_start_element(name, attrs):
global xml2list_data, xml2list_inclassad, xml2list_inattr, xml2list_intype
if name == "c":
xml2list_inclassad = {}
elif name == "a":
xml2list_inattr = {"name": attrs["n"], "val": ""}
xml2list_intype = "s"
elif name == "i":
xml2list_intype = "i"
elif name == "r":
xml2list_intype = "r"
elif name == "b":
xml2list_intype = "b"
if "v" in attrs:
xml2list_inattr["val"] = attrs["v"] in ("T", "t", "1") # pylint: disable=unsupported-assignment-operation
else:
# extended syntax... value in text area
xml2list_inattr["val"] = None # pylint: disable=unsupported-assignment-operation
elif name == "un":
xml2list_intype = "un"
xml2list_inattr["val"] = None # pylint: disable=unsupported-assignment-operation
elif name in ("s", "e"):
pass # nothing to do
elif name == "classads":
pass # top element, nothing to do
else:
raise TypeError("Unsupported type: %s" % name)
[docs]
def xml2list_end_element(name):
global xml2list_data, xml2list_inclassad, xml2list_inattr, xml2list_intype
# The following would be resetting global variables and failing ./test_frontend.py
# xml2list_data, xml2list_inclassad, xml2list_inattr, xml2list_intype = {}
if name == "c":
xml2list_data.append(xml2list_inclassad)
xml2list_inclassad = None
elif name == "a":
xml2list_inclassad[xml2list_inattr["name"]] = xml2list_inattr["val"] # pylint: disable=unsubscriptable-object
xml2list_inattr = None
elif name in ("i", "b", "un", "r"):
xml2list_intype = "s"
elif name in ("s", "e"):
pass # nothing to do
elif name == "classads":
pass # top element, nothing to do
else:
raise TypeError("Unexpected type: %s" % name)
[docs]
def xml2list_char_data(data):
global xml2list_data, xml2list_inclassad, xml2list_inattr, xml2list_intype
if xml2list_inattr is None:
# only process when in attribute
return
if xml2list_intype == "i":
xml2list_inattr["val"] = int(data)
elif xml2list_intype == "r":
xml2list_inattr["val"] = float(data)
elif xml2list_intype == "b":
if xml2list_inattr["val"] is not None:
# nothing to do, value was in attribute
pass
else:
xml2list_inattr["val"] = data[0] in ("T", "t", "1")
elif xml2list_intype == "un":
# nothing to do, value was in attribute
pass
else:
unescaped_data = data.replace('\\"', '"')
xml2list_inattr["val"] += unescaped_data
[docs]
def xml2list(xml_data):
global xml2list_data, xml2list_inclassad, xml2list_inattr, xml2list_intype
xml2list_data = []
xml2list_inclassad = None
xml2list_inattr = None
xml2list_intype = None
p = xml.parsers.expat.ParserCreate()
p.StartElementHandler = xml2list_start_element
p.EndElementHandler = xml2list_end_element
p.CharacterDataHandler = xml2list_char_data
found_xml = -1
for line in range(len(xml_data)):
# look for the xml header
if xml_data[line][:5] == "<?xml":
found_xml = line
break
if found_xml >= 0:
try:
p.Parse(" ".join(xml_data[found_xml:]), 1)
except TypeError as e:
raise RuntimeError("Failed to parse XML data, TypeError: %s" % e) from e
except Exception as e:
raise RuntimeError("Failed to parse XML data, generic error") from e
# else no xml, so return an empty list
return xml2list_data
[docs]
def list2dict(list_data, attr_name):
"""Convert a list to a dictionary where the keys are tuples with the values of the attributes listed in attr_name
Original description: Convert a list to a dictionary and group the results based on
attributes specified by attr_name
This function has a couple of quirks, but is OK because the ways it is used (MM)
The way it is used, attr_name is the job cluster, process, which are both present in all jobs from condor_q and unique,
or the Name that is always present and unique in condor_status, so the quirks should not cause problems
1. Type checking (of attr_name) probably should use isistance()
2. dict_name is a tuple including elements of attr_name translated to value in list_el
if them or the lowercase is a key in list_el
BUT from the value ( dict_data[dict_name] ) only exact match is excluded, not the lowercase version
3. keys (dict_name) may have different cardinality if one or some of the elements is not matching list_el keys
4. if 2 or more list_el have the same dict_name (same valies in attr_list attributes), the newest ones overwrite
the older ones without any warning
AND the original description mentions ... "and group the results" ... there is no grouping
5. 'Undefined' attributes are not added to the dict_el (dict elements may have different keys)
6. using '%s'%a_value != 'Undefined' and str(list_el[a]) != 'Undefined' for the same. Use twice the better one
Args:
list_data: list of dictionaries to convert
attr_name: string (1 attribute) or list or tuple (one or more attributes) with the attributes to use as key
Returns:
dict: dictionary of dictionaries
"""
if type(attr_name) in (type([]), type((1, 2))):
attr_list = attr_name
else:
attr_list = [attr_name]
dict_data = {}
for list_el in list_data:
if type(attr_name) in (type([]), type((1, 2))):
dict_name = []
for an in attr_name:
if an in list_el:
dict_name.append(list_el[an])
else:
# Try lower cases
for k in list_el:
if an.lower() == k.lower():
dict_name.append(list_el[k])
break
dict_name = tuple(dict_name)
else:
dict_name = list_el[attr_name]
# dict_el will have all the elements but those in attr_list
dict_el = {}
for a in list_el:
if a not in attr_list:
try:
if USE_HTCONDOR_PYTHON_BINDINGS:
if list_el[a].__class__.__name__ == "ExprTree":
# Try to evaluate the condor expr and use its value
# If cannot be evaluated, keep the expr as is
a_value = list_el[a].eval()
if "%s" % a_value != "Undefined":
# Cannot use classad.Value.Undefined for
# for comparison as it gets cast to int
dict_el[a] = a_value
elif str(list_el[a]) != "Undefined":
# No need for Undefined check to see if
# attribute exists in the fetched classad
dict_el[a] = list_el[a]
else:
dict_el[a] = list_el[a]
except Exception:
# Do not fail
pass
dict_data[dict_name] = dict_el
return dict_data
[docs]
def applyConstraint(data, constraint_func):
"""Return a subset of data that satisfies constraint_function
If constraint_func is None, return back entire data
Args:
data:
constraint_func:
Returns:
"""
if constraint_func is None:
return data
else:
outdata = {}
for key, val in data.items():
if constraint_func(val):
outdata[key] = val
return outdata
[docs]
def doGroup(indata, group_key_func, group_data_func):
"""Group the indata based on the keys that satisfy group_key_func (applied to the value)
Return a dict of groups summarized by group_data_func
Each group returned by group_data_func must be a dictionary,
possibly similar to the original value of the indata elements
Args:
indata:
group_key_func:
group_data_func:
Returns:
dict of groups summarized by group_data_func
"""
gdata = {}
for k, inel in indata.items():
gkey = group_key_func(inel)
if gkey in gdata:
gdata[gkey].append(inel)
else:
gdata[gkey] = [inel]
outdata = {}
for k in gdata:
# value is a summary of the values
outdata[k] = group_data_func(gdata[k])
return outdata
[docs]
def doNestedGroup(indata, group_key_func, group_element_func=None):
"""Group the indata based on the keys that satisfy group_key_func (applied to the value)
Return a dict of dictionaries created by group_element_func
Each each value of the dictionaries returned by group_element_func
must be a dictionary, possibly similar to the original value of the indata elements
If group_element_func is None (not provided), then the dictionaries in the groups are a copy of the
original dictionaries in indata
Args:
indata: data to group
group_key_func: group_by function
group_element_func: how to handle the data in each group (by default is a copy of the original one)
Returns:
dict: dictionary of dictionaries with grouped indata
"""
gdata = {}
for k, inel in indata.items():
gkey = group_key_func(inel)
if gkey in gdata:
gdata[gkey].append((k, inel))
else:
gdata[gkey] = [(k, inel)]
outdata = {}
if group_element_func:
for k in gdata:
# dictionary produced using original dictionary elements in the group
outdata[k] = group_element_func(gdata[k])
else:
for k in gdata:
# just grouping the original elements without changing them
outdata[k] = dict(gdata[k])
return outdata
[docs]
def fetch2count(data, hash_func):
"""Nested count of the hash values returned from all the elements in data
Args:
data: data from a fetch()
hash_func: Hashing function
One argument: classad dictionary
Returns: hash value
if None, will not be counted
if a list, all elements will be used
Returns:
dict: dictionary of hash values
Elements are counts (or more dictionaries if hash returns lists)
"""
count = {}
for k in list(data.keys()):
el = data[k]
hid = hash_func(el)
if hid is None:
# hash tells us it does not want to count this
continue
# cel will point to the real counter
cel = count
# check if it is a list
if isinstance(hid, list):
# have to create structure inside count
for h in hid[:-1]:
if h not in cel:
cel[h] = {}
cel = cel[h]
hid = hid[-1]
if hid in cel:
count_el = cel[hid] + 1
else:
count_el = 1
cel[hid] = count_el
return count
[docs]
def fetch2count_flat(data, hash_func):
"""Count the hash values returned from all the elements in data
:param data: data from a fetch()
:param hash_func: Hashing function
One argument: classad dictionary
Returns: flat hash value (for hashing functions returning also lists, use fetch2count
if None, will not be counted
:return: a dictionary with a count of the hash values returned
"""
data_list = sorted(hash_func(v) for v in list(data.values()))
count = {key: len(list(group)) for key, group in groupby([i for i in data_list if i is not None])}
return count
#
# Inputs
# data - data from a fetch()
# hash_func - Hashing function
# One argument: classad dictionary
# Returns: hash value
# if None, will not be counted
# if a list, all elements will be used
#
# Returns a dictionary of hash values
# Elements are lists of keys (or more dictionaries if hash returns lists)
#
[docs]
def fetch2list(data, hash_func):
return_list = {}
for k in list(data.keys()):
el = data[k]
hid = hash_func(el)
if hid is None:
# hash tells us it does not want to list this
continue
# lel will point to the real list
lel = return_list
# check if it is a list
if isinstance(hid, list):
# have to create structure inside list
for h in hid[:-1]:
if h not in lel:
lel[h] = {}
lel = lel[h]
hid = hid[-1]
if hid in lel:
list_el = lel[hid].append[k]
else:
list_el = [k]
lel[hid] = list_el
return return_list
[docs]
def addDict(base_dict, new_dict):
"""Recursively add two dictionaries
Do it in place, using the first one
Args:
base_dict: first dictionary
new_dict: dictionary to be added
"""
for k in new_dict:
new_el = new_dict[k]
if k not in base_dict:
# nothing there?, just copy
base_dict[k] = new_el
else:
if isinstance(new_el, dict):
# another dictionary, recourse
addDict(base_dict[k], new_el)
else:
base_dict[k] += new_el
################################################################################
# Functions to convert existing data type to their bindings friendly equivalent
################################################################################
[docs]
def resource_str_to_py_adtype(resource_str):
"""Given the resource string return equivalent classad type"""
adtype = resource_str
if USE_HTCONDOR_PYTHON_BINDINGS:
adtypes = {
"-any": htcondor.AdTypes.Any,
"-collector": htcondor.AdTypes.Collector,
"-generic": htcondor.AdTypes.Generic,
"-grid": htcondor.AdTypes.Grid,
"-had": htcondor.AdTypes.HAD,
"-license": htcondor.AdTypes.License,
"-master": htcondor.AdTypes.Master,
"-negotiator": htcondor.AdTypes.Negotiator,
"-schedd": htcondor.AdTypes.Schedd,
"-startd": htcondor.AdTypes.Startd,
"-submitter": htcondor.AdTypes.Submitter,
}
# Default to startd ads, even if resource_str is empty
adtype = adtypes.get(resource_str, htcondor.AdTypes.Startd)
return adtype
[docs]
def bindings_friendly_constraint(constraint):
"""Convert the constraint to format that can be used with python bindings"""
if constraint is None:
return True
return constraint
[docs]
def bindings_friendly_attrs(format_list):
"""Convert the format_list into attrs that can be used with python bindings
Python bindings should take care of the typing
"""
if format_list is not None:
return [f[0] for f in format_list]
return []
################################################################################
# TODO: Following code is never used. Maybe we should yank it off in future
# during code cleanup.
################################################################################
[docs]
class SummarizeMulti:
def __init__(self, queries, hash_func=lambda x: 1):
self.counts = []
for query in queries:
self.counts.append(self.count(query, hash_func))
self.hash_func = hash_func
# see Count for description
[docs]
def count(self, constraint=None, hash_func=None):
out = {}
for c in self.counts:
data = c.count(constraint, hash_func)
addDict(out, data)
return out
# see Count for description
[docs]
def countStored(self, constraint_func=None, hash_func=None):
out = {}
for c in self.counts:
data = c.countStored(constraint_func, hash_func)
addDict(out, data)
return out
# condor_q, where we have only one ProcId x ClusterId
[docs]
class CondorQLite(CondorQuery):
def __init__(self, schedd_name=None, pool_name=None, security_obj=None, schedd_lookup_cache=local_schedd_cache):
self.schedd_name = schedd_name
if schedd_lookup_cache is None:
schedd_lookup_cache = NoneScheddCache()
schedd_str, env = schedd_lookup_cache.getScheddId(schedd_name, pool_name)
CondorQuery.__init__(self, "condor_q", schedd_str, "ClusterId", pool_name, security_obj, env)
[docs]
def fetch(self, constraint=None, format_list=None):
if format_list is not None:
# check that ClusterId is present, and if not add it
format_list = complete_format_list(format_list, [("ClusterId", "i")])
return CondorQuery.fetch(self, constraint=constraint, format_list=format_list)