# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module implements the functions needed to monitor the glidein factory
"""
import copy
import json
import math
import os
import pickle
import re
import time
from glideinwms.lib import cleanupSupport, logSupport, rrdSupport, timeConversion, util, xmlFormat
# list of rrd files that each site has
RRD_LIST = (
"Status_Attributes.rrd",
"Log_Completed.rrd",
"Log_Completed_Stats.rrd",
"Log_Completed_WasteTime.rrd",
"Log_Counts.rrd",
)
############################################################
#
# Configuration
#
############################################################
[docs]
class MonitoringConfig:
def __init__(self, log=logSupport.log):
# set default values
# user should modify if needed
self.rrd_step = 300 # default to 5 minutes
self.rrd_heartbeat = 1800 # default to 30 minutes, should be at least twice the loop time
self.rrd_ds_name = "val"
self.rrd_archives = [
("AVERAGE", 0.8, 1, 740), # max precision, keep 2.5 days
("AVERAGE", 0.92, 12, 740), # 1 h precision, keep for a month (30 days)
("AVERAGE", 0.98, 144, 740), # 12 hour precision, keep for a year
]
self.monitor_dir = "monitor/"
self.log_dir = "log/"
self.logCleanupObj = None
self.rrd_obj = rrdSupport.rrdSupport()
"""@ivar: The name of the attribute that identifies the glidein """
self.my_name = "Unknown"
self.log = log
[docs]
def config_log(self, log_dir, max_days, min_days, max_mbs):
self.log_dir = log_dir
cleaner = cleanupSupport.DirCleanupWSpace(
log_dir,
r"(completed_jobs_.*\.log)",
int(max_days * 24 * 3600),
int(min_days * 24 * 3600),
int(max_mbs * (1024.0 * 1024.0)),
)
cleanupSupport.cleaners.add_cleaner(cleaner)
[docs]
def logCompleted(self, client_name, entered_dict):
"""
This function takes all newly completed glideins and
logs them in logs/entry_Name/completed_jobs_date.log in an
XML-like format.
It counts the jobs completed on a glidein but does not keep track of the cores received or used by the jobs
@type client_name: String
@param client_name: the name of the frontend client
@type entered_dict: Dictionary of dictionaries
@param entered_dict: This is the dictionary of all jobs that have "Entered" the "Completed" states. It is indexed by job_id. Each data is an info dictionary containing the keys: username, jobs_duration (subkeys:total,goodput,terminated), wastemill (subkeys:validation,idle,nosuccess,badput) , duration, condor_started, condor_duration, jobsnr
"""
now = time.time()
job_ids = list(entered_dict.keys())
if len(job_ids) == 0:
return # nothing to do
job_ids.sort()
relative_fname = "completed_jobs_%s.log" % time.strftime("%Y%m%d", time.localtime(now))
fname = os.path.join(self.log_dir, relative_fname)
with open(fname, "a") as fd:
for job_id in job_ids:
el = entered_dict[job_id]
username = el["username"]
username = username.split(":")[0]
jobs_duration = el["jobs_duration"]
waste_mill = el["wastemill"]
fd.write(
(
"<job %37s %34s %22s %17s %17s %22s %24s>"
% (
('terminated="%s"' % timeConversion.getISO8601_Local(now)),
('client="%s"' % client_name),
('username="%s"' % username),
('id="%s"' % job_id),
('duration="%i"' % el["duration"]),
# Keep comparison, may be None
('condor_started="%s"' % (el["condor_started"] == True)), # noqa: E712
('condor_duration="%i"' % el["condor_duration"]),
)
)
+ (
"<user %14s %17s %16s %19s/>"
% (
('jobsnr="%i"' % el["jobsnr"]),
('duration="%i"' % jobs_duration["total"]),
('goodput="%i"' % jobs_duration["goodput"]),
('terminated="%i"' % jobs_duration["terminated"]),
)
)
+ (
"<wastemill %17s %11s %16s %13s/></job>\n"
% (
('validation="%i"' % waste_mill["validation"]),
('idle="%i"' % waste_mill["idle"]),
('nosuccess="%i"' % waste_mill["nosuccess"]),
('badput="%i"' % waste_mill["badput"]),
)
)
)
[docs]
def write_file(self, relative_fname, output_str):
"""Write out a string or bytes to a file
Args:
relative_fname (AnyStr): The relative path name to write out
output_str (AnyStr): the string (unicode str or bytes) to write to the file
"""
# TODO: Fix str/bytes confusion in the pathname
fname = os.path.join(self.monitor_dir, relative_fname)
if isinstance(fname, bytes):
fname = fname.decode("utf-8")
# print "Writing "+fname
# Check to see if the output_str represented as bytes
if isinstance(output_str, (bytes, bytearray)):
with open(fname + ".tmp", "wb") as fd:
fd.write(output_str)
else:
with open(fname + ".tmp", "w") as fd:
fd.write(output_str + "\n")
util.file_tmp2final(fname, mask_exceptions=(self.log.error, "Failed rename/write into %s" % fname))
return
[docs]
def write_completed_json(self, relative_fname, time, val_dict):
"""
Write val_dict to a json file, creating if needed
relative_fname: location of json relative to self.monitor_dir
time: typically self.updated
val_dict: dictionary object to be dumped to file
"""
fname = os.path.join(self.monitor_dir, relative_fname + ".json")
data = {}
data["time"] = time
data["stats"] = val_dict
try:
f = None
self.log.info(f"Writing {relative_fname} to {str(fname)}")
f = open(fname, "w")
json.dump(data, f)
# f.write(json.dumps(data, indent=4))
except OSError as e:
self.log.err(f"unable to open and write to file {str(fname)} in write_completed_json: {str(e)}")
finally:
if f:
f.close()
return
[docs]
def establish_dir(self, relative_dname):
dname = os.path.join(self.monitor_dir, relative_dname)
# make a directory, its parents and rise no exception if already there
os.makedirs(dname, exist_ok=True)
return
[docs]
def write_rrd_multi(self, relative_fname, ds_type, time, val_dict, min_val=None, max_val=None):
"""
Create a RRD file, using rrdtool.
"""
if self.rrd_obj.isDummy():
return # nothing to do, no rrd bin no rrd creation
# MM don't understand the need for this loop, there is only one element in the tuple, why not:
# rrd_ext = ".rrd"
# rrd_archives = self.rrd_archives
for tp in ((".rrd", self.rrd_archives),):
rrd_ext, rrd_archives = tp
fname = os.path.join(self.monitor_dir, relative_fname + rrd_ext)
# print "Writing RRD "+fname
if not os.path.isfile(fname):
# print "Create RRD "+fname
if min_val is None:
min_val = "U"
if max_val is None:
max_val = "U"
ds_names = sorted(val_dict.keys())
ds_arr = []
for ds_name in ds_names:
ds_arr.append((ds_name, ds_type, self.rrd_heartbeat, min_val, max_val))
self.rrd_obj.create_rrd_multi(fname, self.rrd_step, rrd_archives, ds_arr)
# print "Updating RRD "+fname
try:
self.rrd_obj.update_rrd_multi(fname, time, val_dict)
except Exception:
self.log.exception("Failed to update %s: " % fname)
return
[docs]
def write_rrd_multi_hetero(self, relative_fname, ds_desc_dict, time, val_dict):
"""Create a RRD file, using rrdtool.
Like write_rrd_multi, but with each ds having each a specified type
each element of ds_desc_dict is a dictionary with any of ds_type, min, max
if ds_desc_dict[name] is not present, the defaults are {'ds_type':'GAUGE', 'min':'U', 'max':'U'}
"""
if self.rrd_obj.isDummy():
return # nothing to do, no rrd bin no rrd creation
# MM don't understand the need for this loop, there is only one element in the tuple, why not:
# rrd_ext = ".rrd"
# rrd_archives = self.rrd_archives
for tp in ((".rrd", self.rrd_archives),):
rrd_ext, rrd_archives = tp
fname = os.path.join(self.monitor_dir, relative_fname + rrd_ext)
# print "Writing RRD "+fname
if not os.path.isfile(fname):
# print "Create RRD "+fname
ds_names = sorted(val_dict.keys())
ds_arr = []
for ds_name in ds_names:
ds_desc = {"ds_type": "GAUGE", "min": "U", "max": "U"}
if ds_name in ds_desc_dict:
for k in list(ds_desc_dict[ds_name].keys()):
ds_desc[k] = ds_desc_dict[ds_name][k]
ds_arr.append((ds_name, ds_desc["ds_type"], self.rrd_heartbeat, ds_desc["min"], ds_desc["max"]))
self.rrd_obj.create_rrd_multi(fname, self.rrd_step, rrd_archives, ds_arr)
# print "Updating RRD "+fname
try:
self.rrd_obj.update_rrd_multi(fname, time, val_dict)
except Exception:
self.log.exception("Failed to update %s: " % fname)
return
#######################################################################################################################
#
# condorQStats
#
# This class handles the data obtained from condor_q
#
#######################################################################################################################
# TODO: ['Downtime'] is added to the self.data[client_name] dictionary only if logRequest is called before logSchedd, logClientMonitor
# This is inconsistent and should be changed, Redmine [#17244]
[docs]
class condorQStats:
def __init__(self, log=logSupport.log, cores=1):
self.data = {}
self.updated = time.time()
self.log = log
self.files_updated = None
self.attributes = {
"Status": (
"Idle",
"Running",
"Held",
"Wait",
"Pending",
"StageIn",
"IdleOther",
"StageOut",
"RunningCores",
),
"Requested": ("Idle", "MaxGlideins", "IdleCores", "MaxCores"),
"ClientMonitor": (
"InfoAge",
"JobsIdle",
"JobsRunning",
"JobsRunHere",
"GlideIdle",
"GlideRunning",
"GlideTotal",
"CoresIdle",
"CoresRunning",
"CoresTotal",
),
}
# create a global downtime field since we want to propagate it in various places
self.downtime = "True"
self.expected_cores = (
cores # This comes from GLIDEIN_CPUS and GLIDEIN_ESTIMATED_CPUS, actual cores received may differ
)
[docs]
def logSchedd(self, client_name, qc_status, qc_status_sf):
"""Create or update a dictionary with aggregated HTCondor stats
client_name is the client requesting the glideins
qc_status is a dictionary of condor_status:nr_jobs
qc_status_sf is a dictionary of submit_file:qc_status
OUTPUT: self.data[client_name]['Status'] is the status for all Glideins
self.data[client_name]['StatusEntries'] is the Glidein status by Entry
"""
if client_name in self.data:
t_el = self.data[client_name]
else:
t_el = {}
self.data[client_name] = t_el
if "Status" in t_el:
el = t_el["Status"]
else:
el = {}
t_el["Status"] = el
self.aggregateStates(qc_status, el)
# And now aggregate states by submit file
if "StatusEntries" in t_el:
dsf = t_el["StatusEntries"]
else:
dsf = {}
t_el["StatusEntries"] = dsf
for sf in qc_status_sf:
ksf = self.getEntryFromSubmitFile(sf)
if ksf:
elsf = dsf.setdefault(ksf, {})
self.aggregateStates(qc_status_sf[sf], elsf)
self.updated = time.time()
[docs]
@staticmethod
def getEntryFromSubmitFile(submitFile):
"""Extract the entry name from submit files that look like:
'entry_T2_CH_CERN/job.CMSHTPC_T2_CH_CERN_ce301.condor'
"""
# Matches: anything that is not a dot, a dot, anything that is not a dot (captured),
# another dot, and finally anything that is not a dot
m = re.match(r"^[^\.]+\.([^\.]+)\.[^\.]+$", submitFile)
return m.group(1) if m else ""
[docs]
def get_zero_data_element(self):
"""
Return a dictionary with the keys defined in self.attributes, and all values to 0
:return: data element w/ all 0 values
"""
empty_data = {}
for k in self.attributes:
empty_data[k] = {}
for kk in self.attributes[k]:
empty_data[k][kk] = 0
return empty_data
[docs]
def aggregateStates(self, qc_status, el):
"""For each status in the condor_q count status dictionary (qc_status)
add the count to the el dictionary (whose keys are state like 'Idle'
instead of its number: 1)
"""
# Listing pairs with jobs counting as 1. Avoid duplicates with the list below
# These numbers must be consistent w/ the one used to build qc_status
status_pairs = (
(1, "Idle"),
(2, "Running"),
(5, "Held"),
(1001, "Wait"),
(1002, "Pending"),
(1010, "StageIn"),
(1100, "IdleOther"),
(4010, "StageOut"),
)
for p in status_pairs:
nr, status = p
# TODO: rewrite w/ if in ... else (after m31)
if status not in el:
el[status] = 0
if nr in qc_status:
el[status] += qc_status[nr]
# Listing pairs counting the cores (expected_cores). Avoid duplicates with the list above
# These numbers must be consistent w/ the one used to build qc_status
status_pairs = ((2, "RunningCores"),)
for p in status_pairs:
nr, status = p
# TODO: rewrite w/ if in ... else (after m31)
if status not in el:
el[status] = 0
if nr in qc_status:
el[status] += qc_status[nr] * self.expected_cores
[docs]
def logRequest(self, client_name, requests):
"""
requests is a dictinary of requests
params is a dictinary of parameters
At the moment, it looks only for
'IdleGlideins'
'MaxGlideins'
Request contains only that (no real cores info)
It is eveluated using GLIDEIN_CPUS
"""
if client_name in self.data:
t_el = self.data[client_name]
else:
t_el = {}
t_el["Downtime"] = {"status": self.downtime}
self.data[client_name] = t_el
if "Requested" in t_el:
el = t_el["Requested"]
else:
el = {}
t_el["Requested"] = el
for reqpair in (("IdleGlideins", "Idle"), ("MaxGlideins", "MaxGlideins")):
org, new = reqpair
if new not in el:
el[new] = 0
if org in requests:
el[new] += requests[org]
for reqpair in (("IdleGlideins", "IdleCores"), ("MaxGlideins", "MaxCores")):
org, new = reqpair
if new not in el:
el[new] = 0
if org in requests:
el[new] += requests[org] * self.expected_cores
# Had to get rid of this
# Does not make sense when one aggregates
# el['Parameters']=copy.deepcopy(params)
# Replacing with an empty list
el["Parameters"] = {}
self.updated = time.time()
[docs]
def logClientMonitor(self, client_name, client_monitor, client_internals, fraction=1.0):
"""
client_monitor is a dictionary of monitoring info (GlideinMonitor... from glideclient ClassAd)
client_internals is a dictionary of internals (from glideclient ClassAd)
If fraction is specified it will be used to extract partial info
At the moment, it looks only for
'Idle'
'Running'
'RunningHere'
'GlideinsIdle', 'GlideinsIdleCores'
'GlideinsRunning', 'GlideinsRunningCores'
'GlideinsTotal', 'GlideinsTotalCores'
'LastHeardFrom'
updates go in self.data (self.data[client_name]['ClientMonitor'])
"""
if client_name in self.data:
t_el = self.data[client_name]
else:
t_el = {}
self.data[client_name] = t_el
if "ClientMonitor" in t_el:
el = t_el["ClientMonitor"]
else:
el = {}
t_el["ClientMonitor"] = el
for karr in (
("Idle", "JobsIdle"),
("Running", "JobsRunning"),
("RunningHere", "JobsRunHere"),
("GlideinsIdle", "GlideIdle"),
("GlideinsRunning", "GlideRunning"),
("GlideinsTotal", "GlideTotal"),
("GlideinsIdleCores", "CoresIdle"),
("GlideinsRunningCores", "CoresRunning"),
("GlideinsTotalCores", "CoresTotal"),
):
ck, ek = karr
if ek not in el:
el[ek] = 0
if ck in client_monitor:
el[ek] += int(client_monitor[ck]) * fraction
elif ck == "RunningHere":
# for compatibility, if RunningHere not defined, use min between Running and GlideinsRunning
if "Running" in client_monitor and "GlideinsRunning" in client_monitor:
el[ek] += min(int(client_monitor["Running"]), int(client_monitor["GlideinsRunning"])) * fraction
if "InfoAge" not in el:
el["InfoAge"] = 0
el["InfoAgeAvgCounter"] = 0 # used for totals since we need an avg in totals, not absnum
if "LastHeardFrom" in client_internals:
el["InfoAge"] += int(time.time() - int(client_internals["LastHeardFrom"])) * fraction
el["InfoAgeAvgCounter"] += fraction
self.updated = time.time()
# call this after the last logClientMonitor
[docs]
def finalizeClientMonitor(self):
# convert all ClinetMonitor numbers in integers
# needed due to fraction calculations
for client_name in list(self.data.keys()):
if "ClientMonitor" in self.data[client_name]:
el = self.data[client_name]["ClientMonitor"]
for k in list(el.keys()):
el[k] = int(round(el[k]))
return
[docs]
def get_data(self):
data1 = copy.deepcopy(self.data)
for f in list(data1.keys()):
fe = data1[f]
for w in list(fe.keys()):
el = fe[w]
for a in list(el.keys()):
if a[-10:] == "AvgCounter": # do not publish avgcounter fields... they are internals
del el[a]
return data1
[docs]
@staticmethod
def get_xml_data(data, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
"""
Return a string with the XML formatted statistic data
@param data: self.get_data()
@param indent_tab: indentation space
@param leading_tab: leading space
@return: XML string
"""
return xmlFormat.dict2string(
data,
dict_name="frontends",
el_name="frontend",
subtypes_params={
"class": {"subclass_params": {"Requested": {"dicts_params": {"Parameters": {"el_name": "Parameter"}}}}}
},
indent_tab=indent_tab,
leading_tab=leading_tab,
)
[docs]
def get_total(self, history={"set_to_zero": False}):
total = {"Status": None, "Requested": None, "ClientMonitor": None}
set_to_zero = False
for f in list(self.data.keys()):
fe = self.data[f]
for w in list(fe.keys()):
if w in total: # ignore eventual not supported classes
el = fe[w]
tel = total[w]
if tel is None:
# first one, just copy over
total[w] = {}
tel = total[w]
for a in list(el.keys()):
if isinstance(el[a], int): # copy only numbers
tel[a] = el[a]
else:
# successive, sum
for a in list(el.keys()):
if isinstance(el[a], int): # consider only numbers
if a in tel:
tel[a] += el[a]
# if other frontends did't have this attribute, ignore
# if any attribute from prev. frontends are not in the current one, remove from total
for a in list(tel.keys()):
if a not in el:
del tel[a]
elif not isinstance(el[a], int):
del tel[a]
for w in list(total.keys()):
if total[w] is None:
if w == "Status":
total[w] = self.get_zero_data_element()[w]
set_to_zero = True
else:
del total[w] # remove entry if not defined unless is 'Status'
else:
tel = total[w]
for a in list(tel.keys()):
if a[-10:] == "AvgCounter":
# this is an average counter, calc the average of the referred element
# like InfoAge=InfoAge/InfoAgeAvgCounter
aorg = a[:-10]
tel[aorg] = tel[aorg] // tel[a]
# the avgcount totals are just for internal purposes
del tel[a]
if set_to_zero != history["set_to_zero"]:
if set_to_zero:
self.updated = time.time()
history["set_to_zero"] = set_to_zero
return total
[docs]
@staticmethod
def get_xml_total(total, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
"""
Return formatted XML for the total statistics
@param total: self.get_total()
@param indent_tab: indentation space
@param leading_tab: leading space
@return: XML string
"""
return xmlFormat.class2string(total, inst_name="total", indent_tab=indent_tab, leading_tab=leading_tab)
[docs]
def get_xml_updated(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
return xmlFormat.time2xml(self.updated, "updated", indent_tab, leading_tab)
[docs]
def set_downtime(self, in_downtime):
self.downtime = str(in_downtime)
return
[docs]
def get_xml_downtime(self, leading_tab=xmlFormat.DEFAULT_TAB):
xml_downtime = xmlFormat.dict2string(
{}, dict_name="downtime", el_name="", params={"status": self.downtime}, leading_tab=leading_tab
)
return xml_downtime
[docs]
def write_file(self, monitoringConfig=None, alt_stats=None):
"""
Calculate a summary for the entry and write statistics to files
@param monitoringConfig: used to pass information from the Entry
@param alt_stats: an alternative condorQStats object to use if self has no data
@return:
"""
if monitoringConfig is None:
monitoringConfig = globals()["monitoringConfig"]
if (self.files_updated is not None) and ((self.updated - self.files_updated) < 5):
# files updated recently, no need to redo it
return
# Retrieve and calculate data
data = self.get_data()
if not data and alt_stats is not None:
total_el = alt_stats.get_total()
else:
total_el = self.get_total()
# write snapshot file
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<glideFactoryEntryQStats>\n"
+ self.get_xml_updated(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ self.get_xml_downtime(leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ self.get_xml_data(data, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ self.get_xml_total(total_el, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ "</glideFactoryEntryQStats>\n"
)
monitoringConfig.write_file("schedd_status.xml", xml_str)
# update RRDs
type_strings = {"Status": "Status", "Requested": "Req", "ClientMonitor": "Client"}
for fe in [None] + list(data.keys()):
if fe is None: # special key == Total
fe_dir = "total"
fe_el = total_el
else:
fe_dir = "frontend_" + fe
fe_el = data[fe]
val_dict = {}
# Initialize, so that all get created properly
for tp in list(self.attributes.keys()):
tp_str = type_strings[tp]
attributes_tp = self.attributes[tp]
for a in attributes_tp:
val_dict[f"{tp_str}{a}"] = None
monitoringConfig.establish_dir(fe_dir)
for tp in list(fe_el.keys()):
# values (RRD type) - Status, Requested or ClientMonitor
if tp not in list(self.attributes.keys()):
continue
tp_str = type_strings[tp]
attributes_tp = self.attributes[tp]
fe_el_tp = fe_el[tp]
for a in list(fe_el_tp.keys()):
if a in attributes_tp:
a_el = fe_el_tp[a]
if not isinstance(a_el, dict): # ignore subdictionaries
val_dict[f"{tp_str}{a}"] = a_el
monitoringConfig.write_rrd_multi(os.path.join(fe_dir, "Status_Attributes"), "GAUGE", self.updated, val_dict)
self.files_updated = self.updated
return
######################################################################################################################
#
# condorLogSummary
#
# This class handles the data obtained from parsing the glidein log files
#
######################################################################################################################
[docs]
class condorLogSummary:
"""
This class handles the data obtained from parsing the glidein log files
"""
def __init__(self, log=logSupport.log):
self.data = {} # not used
self.updated = time.time()
self.updated_year = time.localtime(self.updated)[0]
self.current_stats_data = {} # will contain dictionary client->username->dirSummarySimple
self.old_stats_data = {}
self.stats_diff = {} # will contain the differences
self.job_statuses = ("Running", "Idle", "Wait", "Held", "Completed", "Removed") # const
self.job_statuses_short = ("Running", "Idle", "Wait", "Held") # const
self.files_updated = None
self.log = log
[docs]
def reset(self):
"""
Replaces old_stats_data with current_stats_data
Sets current_stats_data to empty.
This is called every iteration in order to later
compare the diff of the previous iteration and current one
to find any newly changed jobs (ie newly completed jobs)
"""
# reserve only those that has been around this time
new_stats_data = {}
for c in list(self.stats_diff.keys()):
# but carry over all the users... should not change that often
new_stats_data[c] = self.current_stats_data[c]
self.old_stats_data = new_stats_data
self.current_stats_data = {}
# and flush out the differences
self.stats_diff = {}
[docs]
def diffTimes(self, end_time, start_time):
year = self.updated_year
try:
start_list = [
year,
int(start_time[0:2]),
int(start_time[3:5]),
int(start_time[6:8]),
int(start_time[9:11]),
int(start_time[12:14]),
0,
0,
-1,
]
end_list = [
year,
int(end_time[0:2]),
int(end_time[3:5]),
int(end_time[6:8]),
int(end_time[9:11]),
int(end_time[12:14]),
0,
0,
-1,
]
except ValueError:
return -1 # invalid
try:
start_ctime = time.mktime(start_list)
end_ctime = time.mktime(end_list)
except TypeError:
return -1 # invalid
if start_ctime <= end_ctime:
return end_ctime - start_ctime
# else must have gone over the year boundary
start_list[0] -= 1 # decrease start year
try:
start_ctime = time.mktime(start_list)
except TypeError:
return -1 # invalid
return end_ctime - start_ctime
[docs]
def logSummary(self, client_name, stats):
"""
log_stats taken during during an iteration of perform_work are
added/merged into the condorLogSummary class here.
@type stats: dictionary of glideFactoryLogParser.dirSummaryTimingsOut
@param stats: Dictionary keyed by "username:client_int_name"
client_int_name is needed for frontends with multiple groups
"""
if client_name not in self.current_stats_data:
self.current_stats_data[client_name] = {}
for username in list(stats.keys()):
if username not in self.current_stats_data[client_name]:
self.current_stats_data[client_name][username] = stats[username].get_simple()
else:
self.current_stats_data[client_name][username].merge(stats[username])
self.updated = time.time()
self.updated_year = time.localtime(self.updated)[0]
[docs]
def computeDiff(self):
"""
This function takes the current_stats_data from the current iteration
and the old_stats_data from the last iteration (see reset() function)
to create a diff of the data in the stats_diff dictionary.
This stats_diff will be a dictionary with two entries for each
status: "Entered" and "Exited" denoting which job ids have recently
changed status, ie.
stats_diff[frontend][username:client_int_name]["Completed"]["Entered"]
"""
for client_name in list(self.current_stats_data.keys()):
self.stats_diff[client_name] = {}
if client_name in self.old_stats_data:
stats = self.current_stats_data[client_name]
for username in list(stats.keys()):
if username in self.old_stats_data[client_name]:
self.stats_diff[client_name][username] = stats[username].diff(
self.old_stats_data[client_name][username]
)
[docs]
def get_stats_data_summary(self):
"""
Summarizes current_stats_data:
Adds up current_stats_data[frontend][user:client][status]
across all username keys.
@return: returns dictionary stats_data[frontend][status]=count
"""
stats_data = {}
for client_name in list(self.current_stats_data.keys()):
out_el = {}
for s in self.job_statuses:
if s not in ("Completed", "Removed"): # I don't have their numbers from inactive logs
count = 0
for username in list(self.current_stats_data[client_name].keys()):
client_el = self.current_stats_data[client_name][username].data
if (client_el is not None) and (s in list(client_el.keys())):
count += len(client_el[s])
out_el[s] = count
stats_data[client_name] = out_el
return stats_data
[docs]
def get_xml_stats_data(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
data = self.get_stats_data_summary()
return xmlFormat.dict2string(
data,
dict_name="frontends",
el_name="frontend",
subtypes_params={"class": {}},
indent_tab=indent_tab,
leading_tab=leading_tab,
)
# in: entered_list=self.stats_diff[*]['Entered']
# out: entered_list[job_id]{'duration','condor_started','condor_duration','jobsnr',wastemill':{'validation','idle','nosuccess','badput'}}
[docs]
def get_completed_stats(self, entered_list):
out_list = {}
for enle in entered_list:
enle_job_id = enle[0]
enle_running_time = enle[2]
enle_last_time = enle[3]
enle_difftime = self.diffTimes(enle_last_time, enle_running_time)
# get stats
enle_stats = enle[4]
username = "unknown"
enle_condor_started = 0
enle_condor_duration = 0 # default is 0, in case it never started
enle_glidein_duration = enle_difftime # best guess
if enle_stats is not None:
enle_condor_started = enle_stats["condor_started"]
if "glidein_duration" in enle_stats:
enle_glidein_duration = enle_stats["glidein_duration"]
if "username" in enle_stats:
username = enle_stats["username"]
if not enle_condor_started:
# 100% waste_mill
enle_nr_jobs = 0
enle_jobs_duration = 0
enle_goodput = 0
enle_terminated_duration = 0
enle_waste_mill = {
"validation": 1000,
"idle": 0,
"nosuccess": 0, # no jobs run, no failures
"badput": 1000,
}
else:
# get waste_mill
enle_condor_duration = enle_stats["condor_duration"]
if enle_condor_duration is None:
enle_condor_duration = 0 # assume failed
if enle_condor_duration > enle_glidein_duration: # can happen... Condor-G has its delays
enle_glidein_duration = enle_condor_duration
# get waste numbers, in permill
if enle_condor_duration < 5: # very short means 100% loss
enle_nr_jobs = 0
enle_jobs_duration = 0
enle_goodput = 0
enle_terminated_duration = 0
enle_waste_mill = {
"validation": 1000,
"idle": 0,
"nosuccess": 0, # no jobs run, no failures
"badput": 1000,
}
else:
if "validation_duration" in enle_stats:
enle_validation_duration = enle_stats["validation_duration"]
else:
enle_validation_duration = enle_difftime - enle_condor_duration
enle_condor_stats = enle_stats["stats"]
enle_jobs_duration = enle_condor_stats["Total"]["secs"]
enle_nr_jobs = enle_condor_stats["Total"]["jobsnr"]
enle_waste_mill = {
"validation": 1000.0 * enle_validation_duration / enle_glidein_duration,
"idle": 1000.0 * (enle_condor_duration - enle_jobs_duration) / enle_condor_duration,
}
enle_goodput = enle_condor_stats["goodZ"]["secs"]
if enle_goodput > enle_jobs_duration:
enle_goodput = enle_jobs_duration # cannot be more
if enle_jobs_duration > 0:
enle_waste_mill["nosuccess"] = 1000.0 * (enle_jobs_duration - enle_goodput) / enle_jobs_duration
else:
enle_waste_mill["nosuccess"] = 0 # no jobs run, no failures
enle_terminated_duration = enle_goodput + enle_condor_stats["goodNZ"]["secs"]
if enle_terminated_duration > enle_jobs_duration:
enle_terminated_duration = enle_jobs_duration # cannot be more
enle_waste_mill["badput"] = (
1000.0 * (enle_glidein_duration - enle_terminated_duration) / enle_glidein_duration
)
out_list[enle_job_id] = {
"username": username,
"duration": enle_glidein_duration,
"condor_started": enle_condor_started,
"condor_duration": enle_condor_duration,
"jobsnr": enle_nr_jobs,
"jobs_duration": {
"total": enle_jobs_duration,
"goodput": enle_goodput,
"terminated": enle_terminated_duration,
},
"wastemill": enle_waste_mill,
}
return out_list
# in: entered_list=get_completed_data()
# out: {'Lasted':{'2hours':...,...},'Sum':{...:12,...},'JobsNr':...,
# 'Waste':{'validation':{'0m':...,...},...},'WasteTime':{...:{...},...}}
[docs]
def summarize_completed_stats(self, entered_list):
# summarize completed data
count_entered_times = {}
for enle_timerange in getAllTimeRanges():
count_entered_times[enle_timerange] = 0 # make sure all are initialized
count_jobnrs = {}
for enle_jobrange in getAllJobRanges():
count_jobnrs[enle_jobrange] = 0 # make sure all are initialized
count_jobs_duration = {}
for enle_jobs_duration_range in getAllTimeRanges():
count_jobs_duration[enle_jobs_duration_range] = 0 # make sure all are intialized
count_total = getLogCompletedDefaults()
count_waste_mill = {
"validation": {},
"idle": {},
"nosuccess": {}, # i.e. everything but jobs terminating with 0
"badput": {},
} # i.e. everything but jobs terminating
for w in list(count_waste_mill.keys()):
count_waste_mill_w = count_waste_mill[w]
for enle_waste_mill_w_range in getAllMillRanges():
count_waste_mill_w[enle_waste_mill_w_range] = 0 # make sure all are intialized
time_waste_mill = {
"validation": {},
"idle": {},
"nosuccess": {}, # i.e. everything but jobs terminating with 0
"badput": {},
} # i.e. everything but jobs terminating
for w in list(time_waste_mill.keys()):
time_waste_mill_w = time_waste_mill[w]
for enle_waste_mill_w_range in getAllMillRanges():
time_waste_mill_w[enle_waste_mill_w_range] = 0 # make sure all are intialized
for enle_job in list(entered_list.keys()):
enle = entered_list[enle_job]
enle_waste_mill = enle["wastemill"]
enle_glidein_duration = enle["duration"]
enle_condor_duration = enle["condor_duration"]
enle_jobs_nr = enle["jobsnr"]
enle_jobs_duration = enle["jobs_duration"]
enle_condor_started = enle["condor_started"]
count_total["Glideins"] += 1
if not enle_condor_started:
count_total["FailedNr"] += 1
# find and save time range
count_total["Lasted"] += enle_glidein_duration
enle_timerange = getTimeRange(enle_glidein_duration)
count_entered_times[enle_timerange] += 1
count_total["CondorLasted"] += enle_condor_duration
# find and save job range
count_total["JobsNr"] += enle_jobs_nr
enle_jobrange = getJobRange(enle_jobs_nr)
count_jobnrs[enle_jobrange] += 1
if enle_jobs_nr > 0:
enle_jobs_duration_range = getTimeRange(enle_jobs_duration["total"] // enle_jobs_nr)
else:
enle_jobs_duration_range = getTimeRange(-1)
count_jobs_duration[enle_jobs_duration_range] += 1
count_total["JobsLasted"] += enle_jobs_duration["total"]
count_total["JobsTerminated"] += enle_jobs_duration["terminated"]
count_total["JobsGoodput"] += enle_jobs_duration["goodput"]
# find and save waste range
for w in list(enle_waste_mill.keys()):
if w == "duration":
continue # not a waste
# find and save time range
enle_waste_mill_w_range = getMillRange(enle_waste_mill[w])
count_waste_mill_w = count_waste_mill[w]
count_waste_mill_w[enle_waste_mill_w_range] += 1
time_waste_mill_w = time_waste_mill[w]
time_waste_mill_w[enle_waste_mill_w_range] += enle_glidein_duration
return {
"Lasted": count_entered_times,
"JobsNr": count_jobnrs,
"Sum": count_total,
"JobsDuration": count_jobs_duration,
"Waste": count_waste_mill,
"WasteTime": time_waste_mill,
}
[docs]
def get_data_summary(self):
"""
Summarizes stats_diff data (computeDiff should have
already been called)
Sums over username in the dictionary
stats_diff[frontend][username][entered/exited][status]
to make stats_data[client_name][entered/exited][status]=count
@return: dictionary[client_name][entered/exited][status]=count
"""
stats_data = {}
for client_name in list(self.stats_diff.keys()):
out_el = {"Current": {}, "Entered": {}, "Exited": {}}
for s in self.job_statuses:
entered = 0
entered_list = []
exited = 0
for username in list(self.stats_diff[client_name].keys()):
diff_el = self.stats_diff[client_name][username]
if (diff_el is not None) and (s in list(diff_el.keys())):
entered_list += diff_el[s]["Entered"]
entered += len(diff_el[s]["Entered"])
exited -= len(diff_el[s]["Exited"])
out_el["Entered"][s] = entered
if s not in ("Completed", "Removed"): # I don't have their numbers from inactive logs
count = 0
for username in list(self.current_stats_data[client_name].keys()):
stats_el = self.current_stats_data[client_name][username].data
if (stats_el is not None) and (s in list(stats_el.keys())):
count += len(stats_el[s])
out_el["Current"][s] = count
# and we can never get out of the terminal state
out_el["Exited"][s] = exited
elif s == "Completed":
completed_stats = self.get_completed_stats(entered_list)
completed_counts = self.summarize_completed_stats(completed_stats)
out_el["CompletedCounts"] = completed_counts
stats_data[client_name] = out_el
return stats_data
[docs]
def get_xml_data(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
data = self.get_data_summary()
return xmlFormat.dict2string(
data,
dict_name="frontends",
el_name="frontend",
subtypes_params={"class": {"subclass_params": {"CompletedCounts": get_completed_stats_xml_desc()}}},
indent_tab=indent_tab,
leading_tab=leading_tab,
)
[docs]
def get_stats_total(self):
"""
@return: Dictionary with keys (wait,idle,running,held)
"""
total = {"Wait": None, "Idle": None, "Running": None, "Held": None}
for k in list(total.keys()):
tdata = []
for client_name in list(self.current_stats_data.keys()):
for username in self.current_stats_data[client_name]:
sdata = self.current_stats_data[client_name][username].data
if (sdata is not None) and (k in list(sdata.keys())):
tdata = tdata + sdata[k]
total[k] = tdata
return total
[docs]
def get_stats_total_summary(self):
in_total = self.get_stats_total()
out_total = {}
for k in list(in_total.keys()):
out_total[k] = len(in_total[k])
return out_total
[docs]
def get_xml_stats_total(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
total = self.get_stats_total_summary()
return xmlFormat.class2string(total, inst_name="total", indent_tab=indent_tab, leading_tab=leading_tab)
[docs]
def get_diff_summary(self):
"""
Flattens stats_diff differential data.
@return: Dictionary of client_name with sub_keys Wait,Idle,Running,Held,Completed,Removed
"""
out_data = {}
for client_name in list(self.stats_diff.keys()):
client_el = {"Wait": None, "Idle": None, "Running": None, "Held": None, "Completed": None, "Removed": None}
for k in list(client_el.keys()):
client_el[k] = {"Entered": [], "Exited": []}
tdata = client_el[k]
# flatten all usernames into one
for username in list(self.stats_diff[client_name].keys()):
sdiff = self.stats_diff[client_name][username]
if (sdiff is not None) and (k in list(sdiff.keys())):
if k == "Completed":
# for completed jobs, add the username
# not for the others since there is no adequate place in the object
for sdel in sdiff[k]["Entered"]:
sdel[4]["username"] = username
for e in list(tdata.keys()):
for sdel in sdiff[k][e]:
tdata[e].append(sdel) # pylint: disable=unsubscriptable-object
out_data[client_name] = client_el
return out_data
[docs]
def get_diff_total(self):
total = {"Wait": None, "Idle": None, "Running": None, "Held": None, "Completed": None, "Removed": None}
for k in list(total.keys()):
total[k] = {"Entered": [], "Exited": []}
tdata = total[k]
for client_name in list(self.stats_diff.keys()):
for username in list(self.stats_diff[client_name].keys()):
sdiff = self.stats_diff[client_name][username]
if (sdiff is not None) and (k in list(sdiff.keys())):
for e in list(tdata.keys()):
tdata[e] = ( # pylint: disable=unsupported-assignment-operation
tdata[e] + sdiff[k][e] # pylint: disable=unsubscriptable-object
)
return total
[docs]
def get_total_summary(self):
stats_total = self.get_stats_total()
diff_total = self.get_diff_total()
out_total = {"Current": {}, "Entered": {}, "Exited": {}}
for k in list(diff_total.keys()):
out_total["Entered"][k] = len(diff_total[k]["Entered"]) # pylint: disable=unsubscriptable-object
if k in stats_total:
out_total["Current"][k] = len(stats_total[k])
# if no current, also exited does not have sense (terminal state)
out_total["Exited"][k] = len(diff_total[k]["Exited"]) # pylint: disable=unsubscriptable-object
elif k == "Completed":
completed_stats = self.get_completed_stats(
diff_total[k]["Entered"] # pylint: disable=unsubscriptable-object
)
completed_counts = self.summarize_completed_stats(completed_stats)
out_total["CompletedCounts"] = completed_counts
return out_total
[docs]
def get_xml_total(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
total = self.get_total_summary()
return xmlFormat.class2string(
total,
inst_name="total",
subclass_params={"CompletedCounts": get_completed_stats_xml_desc()},
indent_tab=indent_tab,
leading_tab=leading_tab,
)
[docs]
def get_xml_updated(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""):
return xmlFormat.time2xml(self.updated, "updated", indent_tab, leading_tab)
[docs]
def write_file(self, monitoringConfig=None):
if monitoringConfig is None:
monitoringConfig = globals()["monitoringConfig"]
if (self.files_updated is not None) and ((self.updated - self.files_updated) < 5):
# files updated recently, no need to redo it
return
# write snaphot file
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<glideFactoryEntryLogSummary>\n"
+ self.get_xml_updated(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ self.get_xml_data(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ self.get_xml_total(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB)
+ "\n"
+ "</glideFactoryEntryLogSummary>\n"
)
monitoringConfig.write_file("log_summary.xml", xml_str)
# update rrds
stats_data_summary = self.get_stats_data_summary()
diff_summary = self.get_diff_summary()
stats_total_summary = self.get_stats_total_summary()
for client_name in [None] + list(diff_summary.keys()):
if client_name is None:
fe_dir = "total"
sdata = stats_total_summary
sdiff = self.get_diff_total()
else:
fe_dir = "frontend_" + client_name
sdata = stats_data_summary[client_name]
sdiff = diff_summary[client_name]
monitoringConfig.establish_dir(fe_dir)
val_dict_counts = {}
val_dict_counts_desc = {}
val_dict_completed = {}
val_dict_stats = {}
val_dict_waste = {}
val_dict_wastetime = {}
for s in self.job_statuses:
if s not in ("Completed", "Removed"): # I don't have their numbers from inactive logs
count = sdata[s]
val_dict_counts["Status%s" % s] = count
val_dict_counts_desc["Status%s" % s] = {"ds_type": "GAUGE"}
if (sdiff is not None) and (s in list(sdiff.keys())):
entered_list = sdiff[s]["Entered"]
entered = len(entered_list)
exited = -len(sdiff[s]["Exited"])
else:
entered_list = []
entered = 0
exited = 0
val_dict_counts["Entered%s" % s] = entered
val_dict_counts_desc["Entered%s" % s] = {"ds_type": "ABSOLUTE"}
if s not in ("Completed", "Removed"): # Always 0 for them
val_dict_counts["Exited%s" % s] = exited
val_dict_counts_desc["Exited%s" % s] = {"ds_type": "ABSOLUTE"}
elif s == "Completed":
completed_stats = self.get_completed_stats(entered_list)
if client_name is not None: # do not repeat for total
monitoringConfig.logCompleted(client_name, completed_stats)
completed_counts = self.summarize_completed_stats(completed_stats)
# save simple vals
for tkey in list(completed_counts["Sum"].keys()):
val_dict_completed[tkey] = completed_counts["Sum"][tkey]
count_entered_times = completed_counts["Lasted"]
count_jobnrs = completed_counts["JobsNr"]
count_jobs_duration = completed_counts["JobsDuration"]
count_waste_mill = completed_counts["Waste"]
time_waste_mill = completed_counts["WasteTime"]
# save run times
for timerange in list(count_entered_times.keys()):
val_dict_stats["Lasted_%s" % timerange] = count_entered_times[timerange]
# they all use the same indexes
val_dict_stats["JobsLasted_%s" % timerange] = count_jobs_duration[timerange]
# save jobsnr
for jobrange in list(count_jobnrs.keys()):
val_dict_stats["JobsNr_%s" % jobrange] = count_jobnrs[jobrange]
# save waste_mill
for w in list(count_waste_mill.keys()):
count_waste_mill_w = count_waste_mill[w]
for p in list(count_waste_mill_w.keys()):
val_dict_waste[f"{w}_{p}"] = count_waste_mill_w[p]
for w in list(time_waste_mill.keys()):
time_waste_mill_w = time_waste_mill[w]
for p in list(time_waste_mill_w.keys()):
val_dict_wastetime[f"{w}_{p}"] = time_waste_mill_w[p]
# end for s in self.job_statuses
# write the data to disk
monitoringConfig.write_rrd_multi_hetero(
os.path.join(fe_dir, "Log_Counts"), val_dict_counts_desc, self.updated, val_dict_counts
)
monitoringConfig.write_rrd_multi(
os.path.join(fe_dir, "Log_Completed"), "ABSOLUTE", self.updated, val_dict_completed
)
monitoringConfig.write_completed_json(
os.path.join(fe_dir, "Log_Completed"), self.updated, val_dict_completed
)
monitoringConfig.write_rrd_multi(
os.path.join(fe_dir, "Log_Completed_Stats"), "ABSOLUTE", self.updated, val_dict_stats
)
monitoringConfig.write_completed_json(
os.path.join(fe_dir, "Log_Completed_Stats"), self.updated, val_dict_stats
)
# Disable Waste RRDs... WasteTime much more useful
# monitoringConfig.write_rrd_multi(os.path.join(fe_dir, "Log_Completed_Waste"),
# "ABSOLUTE",self.updated,val_dict_waste)
monitoringConfig.write_rrd_multi(
os.path.join(fe_dir, "Log_Completed_WasteTime"), "ABSOLUTE", self.updated, val_dict_wastetime
)
monitoringConfig.write_completed_json(
os.path.join(fe_dir, "Log_Completed_WasteTime"), self.updated, val_dict_wastetime
)
self.aggregate_frontend_data(self.updated, diff_summary)
self.files_updated = self.updated
return
[docs]
def aggregate_frontend_data(self, updated, diff_summary):
"""
This goes into each frontend in the current entry and aggregates
the completed/stats/wastetime data into completed_data.json
at the entry level
"""
entry_data = {"frontends": {}}
for frontend in list(diff_summary.keys()):
fe_dir = "frontend_" + frontend
completed_filename = os.path.join(monitoringConfig.monitor_dir, fe_dir, "Log_Completed.json")
completed_stats_filename = os.path.join(monitoringConfig.monitor_dir, fe_dir, "Log_Completed_Stats.json")
completed_wastetime_filename = os.path.join(
monitoringConfig.monitor_dir, fe_dir, "Log_Completed_WasteTime.json"
)
try:
with open(completed_filename) as completed_fp:
completed_data = json.load(completed_fp)
with open(completed_stats_filename) as completed_stats_fp:
completed_stats_data = json.load(completed_stats_fp)
with open(completed_wastetime_filename) as completed_wastetime_fp:
completed_wastetime_data = json.load(completed_wastetime_fp)
entry_data["frontends"][frontend] = {
"completed": completed_data,
"completed_stats": completed_stats_data,
"completed_wastetime": completed_wastetime_data,
}
except OSError as e:
self.log.info("Could not find files to aggregate in frontend %s" % fe_dir)
self.log.info(str(e))
continue
monitoringConfig.write_completed_json("completed_data", updated, entry_data)
[docs]
def write_job_info(self, scheddName, collectorName):
"""The method itereates over the stats_diff dictionary looking for
completed jobs and then fills out a dictionary that contains the
monitoring information needed for this job. Those info looks like:
{
'schedd_name': 'name',
'collector_name': 'name',
'joblist' : {
'2994.000': {'condor_duration': 1328, 'glidein_duration': 1334, 'condor_started': 1, 'numjobs': 0,
'2997.000': {'condor_duration': 1328, 'glidein_duration': 1334, 'condor_started': 1, 'numjobs': 0
...
}
}
:param scheddName: The schedd name to update the job
:param collectorName: The collector name to update the job
"""
jobinfo = {
"schedd_name": scheddName,
"collector_name": collectorName,
"joblist": {},
}
for _sec_name, sndata in self.stats_diff.items():
for _frname, frdata in sndata.items():
for state, jobs in frdata.items():
if state == "Completed":
for job in jobs["Entered"]:
jobid = job[0]
jobstats = job[4]
# This is the dictionary that is going to be written out as a monitoring classad
jobinfo["joblist"][jobid] = {
# activation_claims is a new key in 3.2.19. Using "get" For backward compatiobility,
# but it can be removed in future versions
"activation_claims": jobstats.get("activations_claims", "unknown"),
"glidein_duration": jobstats["glidein_duration"],
# condor_duration could be missing if the glidein had problems and condor was not started
# set it to 0
# and ser condor_started to None if missing
"condor_duration": jobstats.get("condor_duration", 0),
"condor_started": jobstats.get("condor_started", None),
"numjobs": jobstats.get("stats", {}).get("Total", {}).get("jobsnr", "unknown"),
}
# cannot use monitorAggregatorConfig.jobsummary_relname, looks like a circular import
monitoringConfig.write_file("job_summary.pkl", pickle.dumps(jobinfo))
###############################################################################
#
# factoryStatusData
# added by C.W. Murphy starting on 08/09/10
# this class handles the data obtained from the rrd files
#
###############################################################################
[docs]
class FactoryStatusData:
"""this class handles the data obtained from the rrd files"""
def __init__(self, log=logSupport.log, base_dir=None):
self.data = {}
for rrd in RRD_LIST:
self.data[rrd] = {}
# KEL why are we setting time here and not just getting the current time (like in Descript2XML)
self.updated = time.time()
self.tab = xmlFormat.DEFAULT_TAB
self.resolution = (7200, 86400, 604800) # 2hr, 1 day, 1 week
self.total = "total/"
self.frontends = []
if base_dir is None:
self.base_dir = monitoringConfig.monitor_dir
self.log = log
[docs]
def getUpdated(self):
"""returns the time of last update"""
return xmlFormat.time2xml(self.updated, "updated", indent_tab=self.tab, leading_tab=self.tab)
[docs]
def fetchData(self, rrd_file, pathway, res, start, end):
"""Uses rrdtool to fetch data from the clients. Returns a dictionary of lists of data. There is a list for each element.
rrdtool fetch returns 3 tuples: a[0], a[1], & a[2].
[0] lists the resolution, start and end time, which can be specified as arugments of fetchData.
[1] returns the names of the datasets. These names are listed in the key.
[2] is a list of tuples. each tuple contains data from every dataset. There is a tuple for each time data was collected.
"""
# use rrdtool to fetch data
baseRRDSupport = rrdSupport.rrdSupport()
try:
fetched = baseRRDSupport.fetch_rrd(pathway + rrd_file, "AVERAGE", resolution=res, start=start, end=end)
except Exception:
# probably not created yet
self.log.debug("Failed to load %s" % (pathway + rrd_file))
return {}
# converts fetched from tuples to lists
fetched_names = list(fetched[1])
fetched_data_raw = fetched[2][
:-1
] # drop the last entry... rrdtool will return one more than needed, and often that one is unreliable (in the python version)
fetched_data = []
for data in fetched_data_raw:
fetched_data.append(list(data))
# creates a dictionary to be filled with lists of data
data_sets = {}
for name in fetched_names:
data_sets[name] = []
# check to make sure the data exists
all_empty = True
for data_set in data_sets:
index = fetched_names.index(data_set)
for data in fetched_data:
if isinstance(data[index], (int, float)):
data_sets[data_set].append(data[index])
all_empty = False
if all_empty:
# probably not updated recently
return {}
else:
return data_sets
[docs]
def average(self, input_list):
try:
if len(input_list) > 0:
avg_list = sum(input_list) / len(input_list)
else:
avg_list = 0
return avg_list
except TypeError:
self.log.exception("glideFactoryMonitoring average: ")
return
[docs]
def getData(self, input_val, monitoringConfig=None):
"""Return the data fetched by rrdtool as a dictionary
This also modifies the rrd data dictionary for the client (input_val) in all RRD files
and appends the client to the list of frontends
Where this side effect is used:
- totals are updated in Entry.writeStats (writing the XML)
- frontend data in check_and_perform_work
"""
if monitoringConfig is None:
monitoringConfig = globals()["monitoringConfig"]
folder = str(input_val)
if folder == self.total:
client = folder
else:
folder_name = folder.split("@")[-1]
client = folder_name.join(["frontend_", "/"])
if client not in self.frontends:
self.frontends.append(client)
for rrd in RRD_LIST:
self.data[rrd][client] = {}
for res_raw in self.resolution:
# calculate the best resolution
res_idx = 0
rrd_res = monitoringConfig.rrd_archives[res_idx][2] * monitoringConfig.rrd_step
period_mul = int(res_raw / rrd_res)
while period_mul >= monitoringConfig.rrd_archives[res_idx][3]:
# not all elements in the higher bucket, get next lower resolution
res_idx += 1
rrd_res = monitoringConfig.rrd_archives[res_idx][2] * monitoringConfig.rrd_step
period_mul = int(res_raw / rrd_res)
period = period_mul * rrd_res
self.data[rrd][client][period] = {}
end = (
int(time.time() / rrd_res) - 1
) * rrd_res # round due to RRDTool requirements, -1 to avoid the last (partial) one
start = end - period
try:
fetched_data = self.fetchData(
rrd_file=rrd, pathway=self.base_dir + "/" + client, start=start, end=end, res=rrd_res
)
for data_set in fetched_data:
self.data[rrd][client][period][data_set] = self.average(fetched_data[data_set])
except TypeError:
self.log.exception("FactoryStatusData:fetchData: ")
return self.data
[docs]
def getXMLData(self, rrd):
"""Return a XML formatted string the specific RRD file for the data fetched from a given site (all clients+total).
This also has side effects in the getData(self.total) invocation:
- modifies the rrd data dictionary (all RRDs) for the total for this entry
- and appends the total (self.total aka 'total/') to the list of clients (frontends)
@param rrd:
@return: XML formatted string with stats data
"""
# create a string containing the total data
total_xml_str = self.tab + "<total>\n"
# this is invoked to trigger the side effect but the data is retrieved directly from self.data dict below
get_data_total = self.getData(self.total) # noqa: F841 # keep, side effect needed
try:
total_data = self.data[rrd][self.total]
total_xml_str += (
xmlFormat.dict2string(
total_data,
dict_name="periods",
el_name="period",
subtypes_params={"class": {}},
indent_tab=self.tab,
leading_tab=2 * self.tab,
)
+ "\n"
)
except (NameError, UnboundLocalError):
self.log.exception("FactoryStatusData:total_data: ")
total_xml_str += self.tab + "</total>\n"
# create a string containing the frontend data
frontend_xml_str = self.tab + "<frontends>\n"
for frontend in self.frontends:
fe_name = frontend.split("/")[0]
frontend_xml_str += 2 * self.tab + '<frontend name="' + fe_name + '">\n'
try:
frontend_data = self.data[rrd][frontend]
frontend_xml_str += (
xmlFormat.dict2string(
frontend_data,
dict_name="periods",
el_name="period",
subtypes_params={"class": {}},
indent_tab=self.tab,
leading_tab=3 * self.tab,
)
+ "\n"
)
except (NameError, UnboundLocalError):
self.log.exception("FactoryStatusData:frontend_data: ")
frontend_xml_str += 2 * self.tab + "</frontend>"
frontend_xml_str += self.tab + "</frontends>\n"
data_str = total_xml_str + frontend_xml_str
return data_str
[docs]
def writeFiles(self, monitoringConfig=None):
"""Write an xml file for the data fetched from a given site.
Write rrd files
NOTE: writeFiles triggers the side effect of updating the rrd for totals (via getXMLData/getData)
@param monitoringConfig:
@return: None
"""
if monitoringConfig is None:
monitoringConfig = globals()["monitoringConfig"]
for rrd in RRD_LIST:
file_name = "rrd_" + rrd.split(".")[0] + ".xml"
xml_str = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<glideFactoryEntryRRDStats>\n"
+ self.getUpdated()
+ "\n"
+ self.getXMLData(rrd)
+ "</glideFactoryEntryRRDStats>"
)
try:
monitoringConfig.write_file(file_name, xml_str)
except OSError:
self.log.exception("FactoryStatusData:write_file: ")
return
##############################################################################
#
# create an XML file out of glidein.descript, frontend.descript,
# entry.descript, attributes.cfg, and params.cfg
#
#############################################################################
[docs]
class Descript2XML:
"""
create an XML file out of glidein.descript, frontend.descript,
entry.descript, attributes.cfg, and params.cfg
TODO: The XML is used by ... "the monioring page"?
The file created is descript.xml, w/ glideFactoryDescript and glideFactoryEntryDescript elements
"""
def __init__(self, log=logSupport.log):
self.tab = xmlFormat.DEFAULT_TAB
self.entry_descript_blacklist = ("DowntimesFile", "EntryName", "Schedd")
self.frontend_blacklist = ("usermap",)
self.glidein_whitelist = (
"AdvertiseDelay",
"FactoryName",
"GlideinName",
"LoopDelay",
"PubKeyType",
"WebURL",
"MonitorDisplayText",
"MonitorLink",
)
self.log = log
[docs]
def frontendDescript(self, fe_dict):
for key in self.frontend_blacklist:
try:
for frontend in fe_dict:
try:
del fe_dict[frontend][key]
except KeyError:
continue
except RuntimeError:
self.log.exception("blacklist error frontendDescript: ")
try:
xml_str = xmlFormat.dict2string(
fe_dict, dict_name="frontends", el_name="frontend", subtypes_params={"class": {}}, leading_tab=self.tab
)
return xml_str + "\n"
except RuntimeError:
self.log.exception("xmlFormat error in frontendDescript: ")
return
[docs]
def entryDescript(self, e_dict):
for key in self.entry_descript_blacklist:
try:
for entry in e_dict:
try:
del e_dict[entry]["descript"][key]
except KeyError:
continue
except RuntimeError:
self.log.exception("blacklist error in entryDescript: ")
try:
xml_str = xmlFormat.dict2string(
e_dict,
dict_name="entries",
el_name="entry",
subtypes_params={"class": {"subclass_params": {}}},
leading_tab=self.tab,
)
return xml_str + "\n"
except RuntimeError:
self.log.exception("xmlFormat Error in entryDescript: ")
return
[docs]
def glideinDescript(self, g_dict):
w_dict = {}
for key in self.glidein_whitelist:
try:
w_dict[key] = g_dict[key]
except KeyError:
continue
try:
a = xmlFormat.dict2string(
{"": w_dict}, dict_name="glideins", el_name="factory", subtypes_params={"class": {}}
)
b = a.split("\n")[1]
c = b.split('name="" ')
xml_str = "".join(c)
return xml_str + "\n"
except (SyntaxError, RuntimeError):
logSupport.log.exception("xmlFormat error in glideinDescript: ")
return
[docs]
def getUpdated(self):
"""returns the time of last update"""
return xmlFormat.time2xml(time.time(), "updated", indent_tab=self.tab, leading_tab=self.tab)
[docs]
def writeFile(self, path, xml_str, singleEntry=False):
if singleEntry:
root_el = "glideFactoryEntryDescript"
else:
root_el = "glideFactoryDescript"
output = (
'<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'
+ "<"
+ root_el
+ ">\n"
+ self.getUpdated()
+ "\n"
+ xml_str
+ "</"
+ root_el
+ ">"
)
fname = path + "descript.xml"
with open(fname + ".tmp", "wb") as f:
f.write(output.encode("utf-8"))
util.file_tmp2final(fname)
return
############### P R I V A T E ################
##################################################
[docs]
def getAllJobTypes():
return ("validation", "idle", "badput", "nosuccess")
[docs]
def getLogCompletedDefaults():
return {
"Glideins": 0,
"Lasted": 0,
"FailedNr": 0,
"JobsNr": 0,
"JobsLasted": 0,
"JobsTerminated": 0,
"JobsGoodput": 0,
"CondorLasted": 0,
}
[docs]
def getTimeRange(absval):
if absval < 1:
return "Unknown"
if absval < (25 * 60):
return "Minutes"
if absval > (64 * 3600): # limit detail to 64 hours
return "Days"
# start with 7.5 min, and than exp2
logval = int(math.log(absval / 450.0, 4) + 0.49)
level = math.pow(4, logval) * 450.0
if level < 3600:
return "%imins" % (int(level / 60 + 0.49))
else:
return "%ihours" % (int(level / 3600 + 0.49))
[docs]
def getAllTimeRanges():
return ("Unknown", "Minutes", "30mins", "2hours", "8hours", "32hours", "Days")
[docs]
def getJobRange(absval):
if absval < 1:
return "None"
if absval == 1:
return "1job"
if absval == 2:
return "2jobs"
if absval < 9:
return "4jobs"
if absval < 30: # limit detail to 30 jobs
return "16jobs"
else:
return "Many"
[docs]
def getAllJobRanges():
return ("None", "1job", "2jobs", "4jobs", "16jobs", "Many")
[docs]
def getMillRange(absval):
if absval < 2:
return "None"
if absval < 15:
return "5m"
if absval < 60:
return "25m"
if absval < 180:
return "100m"
if absval < 400:
return "250m"
if absval < 700:
return "500m"
if absval > 998:
return "All"
else:
return "Most"
[docs]
def getAllMillRanges():
return ("None", "5m", "25m", "100m", "250m", "500m", "Most", "All")
##################################################
[docs]
def get_completed_stats_xml_desc():
return {
"dicts_params": {
"Lasted": {"el_name": "TimeRange"},
"JobsDuration": {"el_name": "TimeRange"},
"JobsNr": {"el_name": "Range"},
},
"subclass_params": {
"Waste": {
"dicts_params": {
"idle": {"el_name": "Fraction"},
"validation": {"el_name": "Fraction"},
"badput": {"el_name": "Fraction"},
"nosuccess": {"el_name": "Fraction"},
}
},
"WasteTime": {
"dicts_params": {
"idle": {"el_name": "Fraction"},
"validation": {"el_name": "Fraction"},
"badput": {"el_name": "Fraction"},
"nosuccess": {"el_name": "Fraction"},
}
},
},
}
##################################################
# global configuration of the module
monitoringConfig = MonitoringConfig()