# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module implements the functions needed to aggregate the monitoring fo the frontend
"""
import os
import os.path
import time
from glideinwms.frontend import glideinFrontendMonitoring
from glideinwms.lib import logSupport, rrdSupport, xmlFormat, xmlParse
############################################################
#
# Configuration
#
############################################################
[docs]
class MonitorAggregatorConfig:
def __init__(self):
# The name of the attribute that identifies the glidein
self.monitor_dir = "monitor/"
# list of entries
self.entries = []
# name of the status files
self.status_relname = "frontend_status.xml"
[docs]
def config_frontend(self, monitor_dir, groups):
self.monitor_dir = monitor_dir
self.groups = groups
glideinFrontendMonitoring.monitoringConfig.monitor_dir = monitor_dir
# global configuration of the module
monitorAggregatorConfig = MonitorAggregatorConfig()
###########################################################
#
# Functions
#
###########################################################
# PM: Nov 26, 2014
# There is a limit on rrd field names. Max allowed is 20 chars long.
# RRD enforces this limit while creating fields, but will not enforce the limits
# when trying to read from a field with name longer than 20 chars.
# Truncate the names for following to be in limits to avoid above issue.
frontend_status_attributes = {
"Jobs": ("Idle", "OldIdle", "Running", "Total", "Idle_3600"),
"Glideins": ("Idle", "Running", "Total"),
"MatchedJobs": ("Idle", "EffIdle", "OldIdle", "Running", "RunningHere"),
"MatchedGlideins": ("Total", "Idle", "Running", "Failed"),
# 'MatchedGlideins':("Total","Idle","Running","Failed","TCores","ICores","RCores"),
"MatchedCores": ("Total", "Idle", "Running"),
"Requested": ("Idle", "MaxRun"),
}
frontend_total_type_strings = {
"Jobs": "Jobs",
"Glideins": "Glidein",
"MatchedJobs": "MatchJob",
"MatchedGlideins": "MatchGlidein",
"MatchedCores": "MatchCore",
"Requested": "Req",
}
frontend_job_type_strings = {
"MatchedJobs": "MatchJob",
"MatchedGlideins": "MatchGlidein",
"MatchedCores": "MatchCore",
"Requested": "Req",
}
################################################
# Function used by Frontend reconfig/upgrade
# No logging available, output is to stdout/err
[docs]
def verifyRRD(fix_rrd=False, backup=False):
"""
Go through all known monitoring rrds and verify that they
match existing schema (could be different if an upgrade happened)
If fix_rrd is true, then also attempt to add any missing attributes.
Args:
fix_rrd (bool): if True, will attempt to add missing attrs
backup (bool): if True, backup the old RRD before fixing
Returns:
bool: True if all OK, False if there is a problem w/ RRD files
"""
rrd_problems_found = False
mon_dir = monitorAggregatorConfig.monitor_dir
# Frontend monitoring dictionaries
status_dict = {}
status_total_dict = {}
# initialize the RRD dictionaries to match the current schema for verification
for tp in list(frontend_status_attributes.keys()):
if tp in list(frontend_total_type_strings.keys()):
tp_str = frontend_total_type_strings[tp]
attributes_tp = frontend_status_attributes[tp]
for a in attributes_tp:
status_total_dict[f"{tp_str}{a}"] = None
if tp in list(frontend_job_type_strings.keys()):
tp_str = frontend_job_type_strings[tp]
attributes_tp = frontend_status_attributes[tp]
for a in attributes_tp:
status_dict[f"{tp_str}{a}"] = None
# check all the existing files
if not os.path.isdir(mon_dir):
print("WARNING: monitor/ directory does not exist, skipping rrd verification.")
return True
for dir_name, sdir_name, f_list in os.walk(mon_dir):
for file_name in f_list:
if file_name == "Status_Attributes.rrd":
if os.path.basename(dir_name) == "total":
if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), status_total_dict, fix_rrd, backup):
rrd_problems_found = True
else:
if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), status_dict, fix_rrd, backup):
rrd_problems_found = True
return not rrd_problems_found
####################################
# PRIVATE - Used by aggregateStatus
# Write one RRD
[docs]
def write_one_rrd(name, updated, data, fact=0):
if fact == 0:
type_strings = frontend_total_type_strings
else:
type_strings = frontend_job_type_strings
# initialize the RRD dictionary, so it gets created properly
val_dict = {}
for tp in list(frontend_status_attributes.keys()):
if tp in list(type_strings.keys()):
tp_str = type_strings[tp]
attributes_tp = frontend_status_attributes[tp]
for a in attributes_tp:
val_dict[f"{tp_str}{a}"] = None
for tp in list(data.keys()):
# values (RRD type) - Status or Requested
if tp not in list(frontend_status_attributes.keys()):
continue
if tp not in list(type_strings.keys()):
continue
tp_str = type_strings[tp]
attributes_tp = frontend_status_attributes[tp]
tp_el = data[tp]
for a in list(tp_el.keys()):
if a in attributes_tp:
a_el = int(tp_el[a])
if not isinstance(a_el, dict): # ignore subdictionaries
val_dict[f"{tp_str}{a}"] = a_el
glideinFrontendMonitoring.monitoringConfig.establish_dir("%s" % name)
glideinFrontendMonitoring.monitoringConfig.write_rrd_multi("%s" % name, "GAUGE", updated, val_dict)
##############################################################################
# create an aggregate of status files, write it in an aggregate status file
# end return the values
[docs]
def aggregateStatus():
global monitorAggregatorConfig
type_strings = {
"Jobs": "Jobs",
"Glideins": "Glidein",
"MatchedJobs": "MatchJob",
"MatchedGlideins": "MatchGlidein",
"MatchedCores": "MatchCore",
"Requested": "Req",
}
global_total = {
"Jobs": None,
"Glideins": None,
"MatchedJobs": None,
"Requested": None,
"MatchedGlideins": None,
"MatchedCores": None,
}
status = {"groups": {}, "total": global_total}
global_fact_totals = {}
for fos in ("factories", "states"):
global_fact_totals[fos] = {}
nr_groups = 0
for group in monitorAggregatorConfig.groups:
# load group status file
status_fname = os.path.join(
monitorAggregatorConfig.monitor_dir, f"group_{group}", monitorAggregatorConfig.status_relname
)
try:
group_data = xmlParse.xmlfile2dict(status_fname)
except xmlParse.CorruptXML:
logSupport.log.error("Corrupt XML in %s; deleting (it will be recreated)." % (status_fname))
os.unlink(status_fname)
continue
except OSError:
continue # file not found, ignore
# update group
status["groups"][group] = {}
for fos in ("factories", "states"):
try:
status["groups"][group][fos] = group_data[fos]
except KeyError:
# first time after upgrade factories may not be defined
status["groups"][group][fos] = {}
this_group = status["groups"][group]
for fos in ("factories", "states"):
for fact in list(this_group[fos].keys()):
this_fact = this_group[fos][fact]
if fact not in list(global_fact_totals[fos].keys()):
# first iteration through, set fact totals equal to the first group's fact totals
global_fact_totals[fos][fact] = {}
for attribute in list(type_strings.keys()):
global_fact_totals[fos][fact][attribute] = {}
if attribute in list(this_fact.keys()):
for type_attribute in list(this_fact[attribute].keys()):
this_type_attribute = this_fact[attribute][type_attribute]
try:
global_fact_totals[fos][fact][attribute][type_attribute] = int(this_type_attribute)
except Exception:
pass
else:
# next iterations, factory already present in global fact totals, add the new factory values to the previous ones
for attribute in list(type_strings.keys()):
if attribute in list(this_fact.keys()):
for type_attribute in list(this_fact[attribute].keys()):
this_type_attribute = this_fact[attribute][type_attribute]
if isinstance(this_type_attribute, type(global_fact_totals[fos])):
# dict, do nothing
pass
else:
if attribute in list(
global_fact_totals[fos][fact].keys()
) and type_attribute in list(global_fact_totals[fos][fact][attribute].keys()):
global_fact_totals[fos][fact][attribute][type_attribute] += int(
this_type_attribute
)
else:
global_fact_totals[fos][fact][attribute][type_attribute] = int(
this_type_attribute
)
# nr_groups+=1
# status['groups'][group]={}
if "total" in group_data:
nr_groups += 1
status["groups"][group]["total"] = group_data["total"]
for w in list(global_total.keys()):
tel = global_total[w]
if w not in group_data["total"]:
continue
# status['groups'][group][w]=group_data[w]
el = group_data["total"][w]
if tel is None:
# new one, just copy over
global_total[w] = {}
tel = global_total[w]
for a in list(el.keys()): # coming from XML, everything is a string
tel[a] = int(el[a]) # pylint: disable=unsupported-assignment-operation
else:
# successive, sum
for a in list(el.keys()):
if a in tel: # pylint: disable=unsupported-membership-test
tel[a] += int(el[a]) # pylint: disable=unsupported-assignment-operation
# if any attribute from prev. factories are not in the current one, remove from total
for a in list(tel.keys()):
if a not in el:
del tel[a] # pylint: disable=unsupported-delete-operation
for w in list(global_total.keys()):
if global_total[w] is None:
del global_total[w] # remove group if not defined
# Write xml files
updated = time.time()
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<VOFrontendStats>\n"
+ xmlFormat.time2xml(updated, "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ xmlFormat.dict2string(
status["groups"],
dict_name="groups",
el_name="group",
subtypes_params={
"class": {
"dicts_params": {
"factories": {
"el_name": "factory",
"subtypes_params": {
"class": {
"subclass_params": {
"Requested": {
"dicts_params": {
"Parameters": {"el_name": "Parameter", "subtypes_params": {"class": {}}}
}
}
}
}
},
},
"states": {
"el_name": "state",
"subtypes_params": {
"class": {
"subclass_params": {
"Requested": {
"dicts_params": {
"Parameters": {"el_name": "Parameter", "subtypes_params": {"class": {}}}
}
}
}
}
},
},
}
}
},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ xmlFormat.class2string(status["total"], inst_name="total", leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ xmlFormat.dict2string(
global_fact_totals["factories"],
dict_name="factories",
el_name="factory",
subtypes_params={
"class": {
"subclass_params": {
"Requested": {
"dicts_params": {"Parameters": {"el_name": "Parameter", "subtypes_params": {"class": {}}}}
}
}
}
},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ xmlFormat.dict2string(
global_fact_totals["states"],
dict_name="states",
el_name="state",
subtypes_params={
"class": {
"subclass_params": {
"Requested": {
"dicts_params": {"Parameters": {"el_name": "Parameter", "subtypes_params": {"class": {}}}}
}
}
}
},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ "</VOFrontendStats>\n"
)
glideinFrontendMonitoring.monitoringConfig.write_file(monitorAggregatorConfig.status_relname, xml_str)
# Write rrds
glideinFrontendMonitoring.monitoringConfig.establish_dir("total")
write_one_rrd(os.path.join("total", "Status_Attributes"), updated, global_total, 0)
for fact in list(global_fact_totals["factories"].keys()):
fe_dir = os.path.join("total", f"factory_{glideinFrontendMonitoring.sanitize(fact)}")
glideinFrontendMonitoring.monitoringConfig.establish_dir(fe_dir)
write_one_rrd(os.path.join(fe_dir, "Status_Attributes"), updated, global_fact_totals["factories"][fact], 1)
for fact in list(global_fact_totals["states"].keys()):
fe_dir = os.path.join("total", f"state_{glideinFrontendMonitoring.sanitize(fact)}")
glideinFrontendMonitoring.monitoringConfig.establish_dir(fe_dir)
write_one_rrd(os.path.join(fe_dir, "Status_Attributes"), updated, global_fact_totals["states"][fact], 1)
return status