# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module implements the functions needed
to aggregate the monitoring fo the glidein factory
"""
import json
import os.path
import pickle
import time
from glideinwms.factory import glideFactoryMonitoring
from glideinwms.lib import logSupport, rrdSupport, xmlFormat, xmlParse
############################################################
#
# Configuration
#
############################################################
[docs]
class MonitorAggregatorConfig:
def __init__(self):
# The name of the attribute that identifies the glidein
self.monitor_dir = "monitor/"
# list of entries
self.entries = []
# name of the status files
self.status_relname = "schedd_status.xml"
self.logsummary_relname = "log_summary.xml"
self.jobsummary_relname = "job_summary.pkl"
self.completed_data_relname = "completed_data.json"
[docs]
def config_factory(self, monitor_dir, entries, log):
self.monitor_dir = monitor_dir
self.entries = entries
glideFactoryMonitoring.monitoringConfig.monitor_dir = monitor_dir
glideFactoryMonitoring.monitoringConfig.log = log
self.log = log
# global configuration of the module
monitorAggregatorConfig = MonitorAggregatorConfig()
[docs]
def rrd_site(name):
sname = name.split(".")[0]
return "rrd_%s.xml" % sname
###########################################################
#
# Functions
#
###########################################################
status_attributes = {
"Status": ("Idle", "Running", "Held", "Wait", "Pending", "StageIn", "IdleOther", "StageOut", "RunningCores"),
"Requested": ("Idle", "MaxGlideins", "IdleCores", "MaxCores"),
"ClientMonitor": (
"InfoAge",
"JobsIdle",
"JobsRunning",
"JobsRunHere",
"GlideIdle",
"GlideRunning",
"GlideTotal",
"CoresIdle",
"CoresRunning",
"CoresTotal",
),
}
type_strings = {"Status": "Status", "Requested": "Req", "ClientMonitor": "Client"}
##############################################################################
# Function used by Factory reconfig/upgrade
# No logging available, output is to stdout/err
[docs]
def verifyRRD(fix_rrd=False, backup=True):
"""
Go through all known monitoring rrds and verify that they
match existing schema (could be different if an upgrade happened)
If fix_rrd is true, then also attempt to add any missing attributes.
Args:
fix_rrd (bool): if True, will attempt to add missing attrs
backup (bool): if True, backup the old RRD before fixing
Returns:
bool: True if all OK, False if there is a problem w/ RRD files
"""
rrd_problems_found = False
mon_dir = monitorAggregatorConfig.monitor_dir
# Factory monitoring dictionaries
status_dict = {}
completed_stats_dict = {}
completed_waste_dict = {}
counts_dict = {}
# initialize the RRD dictionaries to match the current schema for verification
for tp in list(status_attributes.keys()):
if tp in list(type_strings.keys()):
tp_str = type_strings[tp]
attributes_tp = status_attributes[tp]
for a in attributes_tp:
status_dict[f"{tp_str}{a}"] = None
for jobrange in glideFactoryMonitoring.getAllJobRanges():
completed_stats_dict[f"JobsNr_{jobrange}"] = None
for timerange in glideFactoryMonitoring.getAllTimeRanges():
completed_stats_dict[f"Lasted_{timerange}"] = None
completed_stats_dict[f"JobsLasted_{timerange}"] = None
for jobtype in glideFactoryMonitoring.getAllJobTypes():
for timerange in glideFactoryMonitoring.getAllMillRanges():
completed_waste_dict[f"{jobtype}_{timerange}"] = None
for jobtype in ("Entered", "Exited", "Status"):
for jobstatus in ("Wait", "Idle", "Running", "Held"):
counts_dict[f"{jobtype}{jobstatus}"] = None
for jobstatus in ("Completed", "Removed"):
counts_dict[f"Entered{jobstatus}"] = None
completed_dict = glideFactoryMonitoring.getLogCompletedDefaults()
rrdict = {
"Status_Attributes.rrd": status_dict,
"Log_Completed.rrd": completed_dict,
"Log_Completed_Stats.rrd": completed_stats_dict,
"Log_Completed_WasteTime.rrd": completed_waste_dict,
"Log_Counts.rrd": counts_dict,
}
# check all the existing files
if not os.path.isdir(mon_dir):
print(f"WARNING: monitor directory '{mon_dir}' does not exist, skipping rrd verification.")
return True
for dir_name, sdir_name, f_list in os.walk(mon_dir):
for file_name in f_list:
if file_name in list(rrdict.keys()):
if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd, backup):
rrd_problems_found = True
return not rrd_problems_found
##############################################################################
[docs]
def aggregateStatus(in_downtime):
"""
Create an aggregate of status files, write it in an aggregate status file
and in the end return the values
@type in_downtime: boolean
@param in_downtime: Entry downtime information
@rtype: dict
@return: Dictionary of status information
"""
global monitorAggregatorConfig
avgEntries = ("InfoAge",)
global_total = {"Status": None, "Requested": None, "ClientMonitor": None}
status = {"entries": {}, "total": global_total}
status_fe = {"frontends": {}} # analogous to above but for frontend totals
completed_data_tot = {"entries": {}}
# initialize the RRD dictionary, so it gets created properly
val_dict = {}
for tp in global_total:
# values (RRD type) - Status or Requested
if tp not in list(status_attributes.keys()):
continue
tp_str = type_strings[tp]
attributes_tp = status_attributes[tp]
for a in attributes_tp:
val_dict[f"{tp_str}{a}"] = None
nr_entries = 0
nr_feentries = {} # dictionary for nr entries per fe
for entry in monitorAggregatorConfig.entries:
# load entry status file
status_fname = os.path.join(
monitorAggregatorConfig.monitor_dir, f"entry_{entry}", monitorAggregatorConfig.status_relname
)
# load entry completed data file
completed_data_fname = os.path.join(
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.completed_data_relname,
)
completed_data_fp = None
try:
# entry_data is a regular dictionary of nested dictionaries/lists returned form the XML parsed
entry_data = xmlParse.xmlfile2dict(status_fname)
completed_data_fp = open(completed_data_fname)
completed_data = json.load(completed_data_fp)
except OSError:
continue # file not found, ignore
finally:
if completed_data_fp:
completed_data_fp.close()
# update entry
status["entries"][entry] = {"downtime": entry_data["downtime"], "frontends": entry_data["frontends"]}
# update completed data
completed_data_tot["entries"][entry] = completed_data["stats"]
# to log when total dictionary is modified (in update total/frontend)
tmp_list_removed = []
# update total
if "total" in entry_data:
nr_entries += 1
status["entries"][entry]["total"] = entry_data["total"]
for w in list(global_total): # making a copy of the keys because the dict is being modified (keys are not!)
tel = global_total[w]
if w not in entry_data["total"]:
continue
el = entry_data["total"][w]
if tel is None:
# new one, just copy over
tel = {}
for a in el:
# coming from XML, everything is a string
tel[a] = int(el[a])
global_total[w] = tel
else:
# successive, sum
for a in el:
if a in tel: # pylint: disable=unsupported-membership-test
tel[a] += int(el[a]) # pylint: disable=unsupported-assignment-operation
# if any attribute from prev. frontends is not in the current one, remove from total
for a in list(tel): # making a copy of the keys because the dict is being modified
if a not in el:
del tel[a] # pylint: disable=unsupported-delete-operation
tmp_list_removed.append(a)
if tmp_list_removed:
logSupport.log.debug(
"Elements removed from total status (%s: %s) because of %s: %s"
% (w, len(tel), entry, tmp_list_removed)
)
tmp_list_removed = []
# update frontends
if "frontends" in entry_data:
# loop on fe's in this entry
for fe in entry_data["frontends"]:
# compare each to the list of fe's accumulated so far
if fe not in status_fe["frontends"]:
status_fe["frontends"][fe] = {}
fe_first = True
else:
fe_first = False
# number of entries with this frontend
if fe not in nr_feentries:
nr_feentries[fe] = 1 # first occurrence of frontend
else:
nr_feentries[fe] += 1 # already found one
for w in entry_data["frontends"][fe]:
# w is the entry name of the entry using the frontend
if w not in status_fe["frontends"][fe]:
status_fe["frontends"][fe][w] = {}
tela = status_fe["frontends"][fe][w]
ela = entry_data["frontends"][fe][w]
for a in ela:
# for the 'Downtime' field (only bool), do logical AND of all site downtimes
# 'w' is frontend attribute name, ie 'ClientMonitor' or 'Downtime'
# 'a' is sub-field, such as 'GlideIdle' or 'status'
if w == "Downtime" and a == "status":
ela_val = ela[a] != "False" # Check if 'True' or 'False' but default to True if neither
try:
tela[a] = tela[a] and ela_val
except KeyError:
tela[a] = ela_val
except Exception:
pass # just protect
else:
# All other fields could be numbers or something else
try:
# if is there already, sum
if a in tela:
tela[a] += int(ela[a])
else:
if fe_first: # to avoid adding back attributes that were not in other frontends
tela[a] = int(ela[a])
except Exception:
pass # not an int, not Downtime, so do nothing
# if any attribute from prev. frontends is not in the current one, remove from total
if not fe_first and w != "Downtime":
for a in list(tela): # making a copy of the keys because the dict is being modified
if a not in ela:
del tela[a]
tmp_list_removed.append(a)
if tmp_list_removed:
logSupport.log.debug(
"Elements removed from Frontend %s total status (%s: %s) because of %s: %s"
% (fe, w, len(tela), entry, tmp_list_removed)
)
tmp_list_removed = []
for w in list(global_total): # making a copy of the keys because the dict is being modified
if global_total[w] is None:
del global_total[w] # remove entry if not defined
else:
tel = global_total[w]
for a in tel: # pylint: disable=not-an-iterable
if a in avgEntries:
# since all entries must have this attr to be here, just divide by nr of entries
tel[a] = (
tel[a] // nr_entries
) # pylint: disable=unsupported-assignment-operation,unsubscriptable-object
# do average for per-fe stat--'InfoAge' only
for fe in list(status_fe["frontends"].keys()):
for w in list(status_fe["frontends"][fe].keys()):
tel = status_fe["frontends"][fe][w]
for a in list(tel.keys()):
if a in avgEntries and fe in nr_feentries:
tel[a] = tel[a] // nr_feentries[fe] # divide per fe
xml_downtime = xmlFormat.dict2string(
{}, dict_name="downtime", el_name="", params={"status": str(in_downtime)}, leading_tab=xmlFormat.DEFAULT_TAB
)
# Write xml files
updated = time.time()
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<glideFactoryQStats>\n"
+ xmlFormat.time2xml(updated, "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ xml_downtime
+ "\n"
+ xmlFormat.dict2string(
status["entries"],
dict_name="entries",
el_name="entry",
subtypes_params={
"class": {
"dicts_params": {
"frontends": {
"el_name": "frontend",
"subtypes_params": {
"class": {
"subclass_params": {
"Requested": {
"dicts_params": {
"Parameters": {"el_name": "Parameter", "subtypes_params": {"class": {}}}
}
}
}
}
},
}
}
}
},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ xmlFormat.class2string(status["total"], inst_name="total", leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ xmlFormat.dict2string(
status_fe["frontends"],
dict_name="frontends",
el_name="frontend",
subtypes_params={
"class": {
"subclass_params": {
"Requested": {
"dicts_params": {"Parameters": {"el_name": "Parameter", "subtypes_params": {"class": {}}}}
}
}
}
},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ "</glideFactoryQStats>\n"
)
glideFactoryMonitoring.monitoringConfig.write_file(monitorAggregatorConfig.status_relname, xml_str)
# write json
glideFactoryMonitoring.monitoringConfig.write_completed_json(
monitorAggregatorConfig.completed_data_relname.split(".")[0], updated, completed_data_tot
)
# Write rrds
glideFactoryMonitoring.monitoringConfig.establish_dir("total")
# Total rrd across all frontends and factories
for tp in global_total:
# values (RRD type) - Status or Requested
if tp not in list(status_attributes.keys()):
continue
tp_str = type_strings[tp]
attributes_tp = status_attributes[tp]
tp_el = global_total[tp]
for a in list(tp_el.keys()):
if a in attributes_tp:
a_el = int(tp_el[a])
val_dict[f"{tp_str}{a}"] = a_el
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
os.path.join("total", "Status_Attributes"), "GAUGE", updated, val_dict
)
# Frontend total rrds across all factories
for fe in list(status_fe["frontends"].keys()):
glideFactoryMonitoring.monitoringConfig.establish_dir(os.path.join("total", f"frontend_{fe}"))
for tp in list(status_fe["frontends"][fe].keys()):
# values (RRD type) - Status or Requested
if tp not in list(type_strings.keys()):
continue
tp_str = type_strings[tp]
attributes_tp = status_attributes[tp]
tp_el = status_fe["frontends"][fe][tp]
for a in list(tp_el.keys()):
if a in attributes_tp:
a_el = int(tp_el[a])
val_dict[f"{tp_str}{a}"] = a_el
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
os.path.join("total", f"frontend_{fe}", "Status_Attributes"), "GAUGE", updated, val_dict
)
return status
######################################################################################
[docs]
def aggregateJobsSummary():
"""Loads the job summary pickle files for each entry, aggregates them per schedd/collector pair, and return them.
:return: A dictionary containing the needed information that looks like:
{
('schedd_name','collector_name') : {
'2994.000': {'condor_duration': 1328, 'glidein_duration': 1334, 'condor_started': 1, 'numjobs': 0},
'2997.000': {'condor_duration': 1328, 'glidein_duration': 1334, 'condor_started': 1, 'numjobs': 0},
...
},
('schedd_name','collector_name') : {
'2003.000': {'condor_duration': 1328, 'glidein_duration': 1334, 'condor_started': 1, 'numjobs': 0},
'206.000': {'condor_duration': 1328, 'glidein_duration': 1334, 'condor_started': 1, 'numjobs': 0},
...
}
}
"""
jobinfo = {}
for entry in monitorAggregatorConfig.entries:
# load entry log summary file
status_fname = os.path.join(
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.jobsummary_relname,
)
try:
with open(status_fname, "rb") as fd:
entry_joblist = pickle.load(fd)
except OSError:
# Errors with the file, e.g. FileNotFoundError, IsADirectoryError, PermissionError
logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing")
continue
except (EOFError, pickle.UnpicklingError):
# Errors with the file content
logSupport.log.debug(f"Empty or corrupted pickle file {status_fname}: ignoring and continuing")
continue
schedd_name = entry_joblist.get("schedd_name", None)
pool_name = entry_joblist.get("collector_name", None)
jobinfo.setdefault((schedd_name, pool_name), {}).update(entry_joblist["joblist"])
return jobinfo
######################################################################################
[docs]
def aggregateLogSummary():
"""
Create an aggregate of log summary files, write it in an aggregate log
summary file and in the end return the values
"""
global monitorAggregatorConfig
# initialize global counters
global_total = {
"Current": {},
"Entered": {},
"Exited": {},
"CompletedCounts": {"Sum": {}, "Waste": {}, "WasteTime": {}, "Lasted": {}, "JobsNr": {}, "JobsDuration": {}},
}
for s in ("Wait", "Idle", "Running", "Held"):
for k in ["Current", "Entered", "Exited"]:
global_total[k][s] = 0
for s in ("Completed", "Removed"):
for k in ["Entered"]:
global_total[k][s] = 0
for k in glideFactoryMonitoring.getAllJobTypes():
for w in ("Waste", "WasteTime"):
el = {}
for t in glideFactoryMonitoring.getAllMillRanges():
el[t] = 0
global_total["CompletedCounts"][w][k] = el
el = {}
for t in glideFactoryMonitoring.getAllTimeRanges():
el[t] = 0
global_total["CompletedCounts"]["Lasted"] = el
el = {}
for t in glideFactoryMonitoring.getAllJobRanges():
el[t] = 0
global_total["CompletedCounts"]["JobsNr"] = el
el = {}
# KEL - why is the same el used twice (see above)
for t in glideFactoryMonitoring.getAllTimeRanges():
el[t] = 0
global_total["CompletedCounts"]["JobsDuration"] = el
global_total["CompletedCounts"]["Sum"] = {
"Glideins": 0,
"Lasted": 0,
"FailedNr": 0,
"JobsNr": 0,
"JobsLasted": 0,
"JobsGoodput": 0,
"JobsTerminated": 0,
"CondorLasted": 0,
}
status = {"entries": {}, "total": global_total}
status_fe = {"frontends": {}} # analogous to above but for frontend totals
nr_entries = 0
nr_feentries = {} # dictionary for nr entries per fe
for entry in monitorAggregatorConfig.entries:
# load entry log summary file
status_fname = os.path.join(
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.logsummary_relname,
)
try:
entry_data = xmlParse.xmlfile2dict(status_fname, always_singular_list=["Fraction", "TimeRange", "Range"])
except OSError:
logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing")
continue # file not found, ignore
# update entry
out_data = {}
for frontend in list(entry_data["frontends"].keys()):
fe_el = entry_data["frontends"][frontend]
out_fe_el = {}
for k in ["Current", "Entered", "Exited"]:
out_fe_el[k] = {}
for s in list(fe_el[k].keys()):
out_fe_el[k][s] = int(fe_el[k][s])
out_fe_el["CompletedCounts"] = {
"Waste": {},
"WasteTime": {},
"Lasted": {},
"JobsNr": {},
"JobsDuration": {},
"Sum": {},
}
for tkey in list(fe_el["CompletedCounts"]["Sum"].keys()):
out_fe_el["CompletedCounts"]["Sum"][tkey] = int(fe_el["CompletedCounts"]["Sum"][tkey])
for k in glideFactoryMonitoring.getAllJobTypes():
for w in ("Waste", "WasteTime"):
out_fe_el["CompletedCounts"][w][k] = {}
for t in glideFactoryMonitoring.getAllMillRanges():
out_fe_el["CompletedCounts"][w][k][t] = int(fe_el["CompletedCounts"][w][k][t]["val"])
for t in glideFactoryMonitoring.getAllTimeRanges():
out_fe_el["CompletedCounts"]["Lasted"][t] = int(fe_el["CompletedCounts"]["Lasted"][t]["val"])
out_fe_el["CompletedCounts"]["JobsDuration"] = {}
for t in glideFactoryMonitoring.getAllTimeRanges():
out_fe_el["CompletedCounts"]["JobsDuration"][t] = int(
fe_el["CompletedCounts"]["JobsDuration"][t]["val"]
)
for t in glideFactoryMonitoring.getAllJobRanges():
out_fe_el["CompletedCounts"]["JobsNr"][t] = int(fe_el["CompletedCounts"]["JobsNr"][t]["val"])
out_data[frontend] = out_fe_el
status["entries"][entry] = {"frontends": out_data}
# update total
if "total" in entry_data:
nr_entries += 1
local_total = {}
for k in ["Current", "Entered", "Exited"]:
local_total[k] = {}
for s in list(global_total[k].keys()):
local_total[k][s] = int(entry_data["total"][k][s])
global_total[k][s] += int(entry_data["total"][k][s])
local_total["CompletedCounts"] = {
"Sum": {},
"Waste": {},
"WasteTime": {},
"Lasted": {},
"JobsNr": {},
"JobsDuration": {},
}
for tkey in list(entry_data["total"]["CompletedCounts"]["Sum"].keys()):
local_total["CompletedCounts"]["Sum"][tkey] = int(entry_data["total"]["CompletedCounts"]["Sum"][tkey])
global_total["CompletedCounts"]["Sum"][tkey] += int(entry_data["total"]["CompletedCounts"]["Sum"][tkey])
for k in glideFactoryMonitoring.getAllJobTypes():
for w in ("Waste", "WasteTime"):
local_total["CompletedCounts"][w][k] = {}
for t in glideFactoryMonitoring.getAllMillRanges():
local_total["CompletedCounts"][w][k][t] = int(
entry_data["total"]["CompletedCounts"][w][k][t]["val"]
)
global_total["CompletedCounts"][w][k][t] += int(
entry_data["total"]["CompletedCounts"][w][k][t]["val"]
)
for t in glideFactoryMonitoring.getAllTimeRanges():
local_total["CompletedCounts"]["Lasted"][t] = int(
entry_data["total"]["CompletedCounts"]["Lasted"][t]["val"]
)
global_total["CompletedCounts"]["Lasted"][t] += int(
entry_data["total"]["CompletedCounts"]["Lasted"][t]["val"]
)
local_total["CompletedCounts"]["JobsDuration"] = {}
for t in glideFactoryMonitoring.getAllTimeRanges():
local_total["CompletedCounts"]["JobsDuration"][t] = int(
entry_data["total"]["CompletedCounts"]["JobsDuration"][t]["val"]
)
global_total["CompletedCounts"]["JobsDuration"][t] += int(
entry_data["total"]["CompletedCounts"]["JobsDuration"][t]["val"]
)
for t in glideFactoryMonitoring.getAllJobRanges():
local_total["CompletedCounts"]["JobsNr"][t] = int(
entry_data["total"]["CompletedCounts"]["JobsNr"][t]["val"]
)
global_total["CompletedCounts"]["JobsNr"][t] += int(
entry_data["total"]["CompletedCounts"]["JobsNr"][t]["val"]
)
status["entries"][entry]["total"] = local_total
# update frontends
for fe in out_data:
# compare each to the list of fe's accumulated so far
if fe not in status_fe["frontends"]:
status_fe["frontends"][fe] = {}
if fe not in nr_feentries:
nr_feentries[fe] = 1 # already found one
else:
nr_feentries[fe] += 1
# sum them up
sumDictInt(out_data[fe], status_fe["frontends"][fe])
# Write xml files
# To do - Igor: Consider adding status_fe to the XML file
updated = time.time()
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<glideFactoryLogSummary>\n"
+ xmlFormat.time2xml(updated, "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ xmlFormat.dict2string(
status["entries"],
dict_name="entries",
el_name="entry",
subtypes_params={
"class": {
"dicts_params": {
"frontends": {
"el_name": "frontend",
"subtypes_params": {
"class": {
"subclass_params": {
"CompletedCounts": glideFactoryMonitoring.get_completed_stats_xml_desc()
}
}
},
}
},
"subclass_params": {
"total": {
"subclass_params": {
"CompletedCounts": glideFactoryMonitoring.get_completed_stats_xml_desc()
}
}
},
}
},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ xmlFormat.class2string(
status["total"],
inst_name="total",
subclass_params={"CompletedCounts": glideFactoryMonitoring.get_completed_stats_xml_desc()},
leading_tab=xmlFormat.DEFAULT_TAB,
)
+ "\n"
+ "</glideFactoryLogSummary>\n"
)
glideFactoryMonitoring.monitoringConfig.write_file(monitorAggregatorConfig.logsummary_relname, xml_str)
# Write rrds
writeLogSummaryRRDs("total", status["total"])
# Frontend total rrds across all factories
for fe in status_fe["frontends"]:
writeLogSummaryRRDs("total/%s" % ("frontend_" + fe), status_fe["frontends"][fe])
return status
[docs]
def sumDictInt(indict, outdict):
for orgi in indict:
i = str(orgi) # RRDs don't like unicode, so make sure we use strings
if isinstance(indict[i], int):
if i not in outdict:
outdict[i] = 0
outdict[i] += indict[i]
else:
# assume it is a dictionary
if i not in outdict:
outdict[i] = {}
sumDictInt(indict[i], outdict[i])
[docs]
def writeLogSummaryRRDs(fe_dir, status_el):
updated = time.time()
sdata = status_el["Current"]
glideFactoryMonitoring.monitoringConfig.establish_dir(fe_dir)
val_dict_counts = {}
val_dict_counts_desc = {}
val_dict_completed = {}
val_dict_stats = {}
val_dict_waste = {}
val_dict_wastetime = {}
for s in ("Wait", "Idle", "Running", "Held", "Completed", "Removed"):
if s not in ("Completed", "Removed"): # I don't have their numbers from inactive logs
count = sdata[s]
val_dict_counts["Status%s" % s] = count
val_dict_counts_desc["Status%s" % s] = {"ds_type": "GAUGE"}
exited = -status_el["Exited"][s]
val_dict_counts["Exited%s" % s] = exited
val_dict_counts_desc["Exited%s" % s] = {"ds_type": "ABSOLUTE"}
entered = status_el["Entered"][s]
val_dict_counts["Entered%s" % s] = entered
val_dict_counts_desc["Entered%s" % s] = {"ds_type": "ABSOLUTE"}
if s == "Completed":
completed_counts = status_el["CompletedCounts"]
count_entered_times = completed_counts["Lasted"]
count_jobnrs = completed_counts["JobsNr"]
count_jobs_duration = completed_counts["JobsDuration"]
count_waste_mill = completed_counts["Waste"]
time_waste_mill = completed_counts["WasteTime"]
# save run times
for timerange in list(count_entered_times.keys()):
val_dict_stats["Lasted_%s" % timerange] = count_entered_times[timerange]
# they all use the same indexes
val_dict_stats["JobsLasted_%s" % timerange] = count_jobs_duration[timerange]
# save jobsnr
for jobrange in list(count_jobnrs.keys()):
val_dict_stats["JobsNr_%s" % jobrange] = count_jobnrs[jobrange]
# save simple vals
for tkey in list(completed_counts["Sum"].keys()):
val_dict_completed[tkey] = completed_counts["Sum"][tkey]
# save waste_mill
for w in list(count_waste_mill.keys()):
count_waste_mill_w = count_waste_mill[w]
for p in list(count_waste_mill_w.keys()):
val_dict_waste[f"{w}_{p}"] = count_waste_mill_w[p]
for w in list(time_waste_mill.keys()):
time_waste_mill_w = time_waste_mill[w]
for p in list(time_waste_mill_w.keys()):
val_dict_wastetime[f"{w}_{p}"] = time_waste_mill_w[p]
# write the data to disk
glideFactoryMonitoring.monitoringConfig.write_rrd_multi_hetero(
"%s/Log_Counts" % fe_dir, val_dict_counts_desc, updated, val_dict_counts
)
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
"%s/Log_Completed" % fe_dir, "ABSOLUTE", updated, val_dict_completed
)
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
"%s/Log_Completed_Stats" % fe_dir, "ABSOLUTE", updated, val_dict_stats
)
# Disable Waste RRDs... WasteTime much more useful
# glideFactoryMonitoring.monitoringConfig.write_rrd_multi("%s/Log_Completed_Waste"%fe_dir,
# "ABSOLUTE",updated,val_dict_waste)
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
"%s/Log_Completed_WasteTime" % fe_dir, "ABSOLUTE", updated, val_dict_wastetime
)
[docs]
def aggregateRRDStats(log=logSupport.log):
"""Create an aggregate of RRD stats, write it files
Args:
log (logging.Logger): logger to use
"""
global monitorAggregatorConfig
# not-used, no side effect. Leave in case want to add more monitoring: factoryStatusData = glideFactoryMonitoring.FactoryStatusData()
rrdstats_relname = glideFactoryMonitoring.RRD_LIST
tab = xmlFormat.DEFAULT_TAB
for rrd in rrdstats_relname:
# assigns the data from every site to 'stats'
stats = {}
for entry in monitorAggregatorConfig.entries:
rrd_fname = os.path.join(monitorAggregatorConfig.monitor_dir, f"entry_{entry}", rrd_site(rrd))
try:
stats[entry] = xmlParse.xmlfile2dict(rrd_fname, always_singular_list={"timezone": {}})
except FileNotFoundError:
log.debug(
f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time)"
)
except OSError:
log.debug(f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError")
if not os.path.exists(rrd_fname):
log.debug(
f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time) - should have been FileNotFoundError"
)
stats_entries = list(stats.keys())
if len(stats_entries) == 0:
continue # skip this RRD... nothing to aggregate
stats_entries.sort()
# Get all the resolutions, data_sets and frontends... for totals
resolution = set()
frontends = set()
data_sets = set()
for entry in stats_entries:
entry_resolution = list(stats[entry]["total"]["periods"].keys())
if len(entry_resolution) == 0:
continue # not an interesting entry
resolution = resolution.union(entry_resolution)
entry_data_sets = stats[entry]["total"]["periods"][entry_resolution[0]]
data_sets = data_sets.union(entry_data_sets)
entry_frontends = list(stats[entry]["frontends"].keys())
frontends = frontends.union(entry_frontends)
entry_data_sets = stats[entry]["total"]["periods"][entry_resolution[0]]
resolution = list(resolution)
frontends = list(frontends)
data_sets = list(data_sets)
# create a dictionary that will hold the aggregate data
clients = frontends + ["total"]
aggregate_output = {}
for client in clients:
aggregate_output[client] = {}
for res in resolution:
aggregate_output[client][res] = {}
for data_set in data_sets:
aggregate_output[client][res][data_set] = 0
# assign the aggregate data to 'aggregate_output'
missing_total_data = False
missing_client_data = False
for client in aggregate_output:
for res in aggregate_output[client]:
for data_set in aggregate_output[client][res]:
for entry in stats_entries:
if client == "total":
try:
aggregate_output[client][res][data_set] += float(
stats[entry][client]["periods"][res][data_set]
)
except KeyError:
missing_total_data = True
# well, some may be just missing.. can happen
# log.debug("aggregate_data, KeyError stats[%s][%s][%s][%s][%s]"%(entry,client,'periods',res,data_set))
else:
if client in stats[entry]["frontends"]:
# not all the entries have all the frontends
try:
aggregate_output[client][res][data_set] += float(
stats[entry]["frontends"][client]["periods"][res][data_set]
)
except KeyError:
missing_client_data = True
# well, some may be just missing.. can happen
# log.debug("aggregate_data, KeyError stats[%s][%s][%s][%s][%s][%s]" %(entry,'frontends',client,'periods',res,data_set))
# We still need to determine what is causing these missing data in case it is a real issue
# but using this flags will at least reduce the number of messages in the logs (see commented out messages above)
if missing_total_data:
log.debug("aggregate_data, missing total data from file %s" % rrd_site(rrd))
if missing_client_data:
log.debug("aggregate_data, missing client data from file %s" % rrd_site(rrd))
# write an aggregate XML file
# data from indivdual entries
entry_str = tab + "<entries>\n"
for entry in stats_entries:
entry_name = entry.split("/")[-1]
entry_str += 2 * tab + '<entry name = "' + entry_name + '">\n'
entry_str += 3 * tab + "<total>\n"
try:
entry_str += (
xmlFormat.dict2string(
stats[entry]["total"]["periods"],
dict_name="periods",
el_name="period",
subtypes_params={"class": {}},
indent_tab=tab,
leading_tab=4 * tab,
)
+ "\n"
)
except (NameError, UnboundLocalError):
log.debug("total_data, NameError or TypeError")
entry_str += 3 * tab + "</total>\n"
entry_str += 3 * tab + "<frontends>\n"
try:
entry_frontends = sorted(stats[entry]["frontends"].keys())
for frontend in entry_frontends:
entry_str += 4 * tab + '<frontend name="' + frontend + '">\n'
try:
entry_str += (
xmlFormat.dict2string(
stats[entry]["frontends"][frontend]["periods"],
dict_name="periods",
el_name="period",
subtypes_params={"class": {}},
indent_tab=tab,
leading_tab=5 * tab,
)
+ "\n"
)
except KeyError:
log.debug("frontend_data, KeyError")
entry_str += 4 * tab + "</frontend>\n"
except TypeError:
log.debug("frontend_data, TypeError")
entry_str += 3 * tab + "</frontends>\n"
entry_str += 2 * tab + "</entry>\n"
entry_str += tab + "</entries>\n"
# aggregated data
total_xml_str = 2 * tab + "<total>\n"
total_data = aggregate_output["total"]
try:
total_xml_str += (
xmlFormat.dict2string(
total_data,
dict_name="periods",
el_name="period",
subtypes_params={"class": {}},
indent_tab=tab,
leading_tab=4 * tab,
)
+ "\n"
)
except (NameError, UnboundLocalError):
log.debug("total_data, NameError or TypeError")
total_xml_str += 2 * tab + "</total>\n"
frontend_xml_str = 2 * tab + "<frontends>\n"
try:
for frontend in frontends:
frontend_xml_str += 3 * tab + '<frontend name="' + frontend + '">\n'
frontend_data = aggregate_output[frontend]
frontend_xml_str += (
xmlFormat.dict2string(
frontend_data,
dict_name="periods",
el_name="period",
subtypes_params={"class": {}},
indent_tab=tab,
leading_tab=4 * tab,
)
+ "\n"
)
frontend_xml_str += 3 * tab + "</frontend>\n"
except TypeError:
log.debug("frontend_data, TypeError")
frontend_xml_str += 2 * tab + "</frontends>\n"
data_str = tab + "<total>\n" + total_xml_str + frontend_xml_str + tab + "</total>\n"
# putting it all together
updated = time.time()
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<glideFactoryRRDStats>\n"
+ xmlFormat.time2xml(
updated, "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB
)
+ "\n"
+ entry_str
+ data_str
+ "</glideFactoryRRDStats>"
)
try:
glideFactoryMonitoring.monitoringConfig.write_file(rrd_site(rrd), xml_str)
except OSError:
log.debug("write_file %s, IOError" % rrd_site(rrd))
return