Source code for glideinwms.frontend.glideinFrontendMonitoring

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

# Description:
#   This module implements the functions needed
#   to monitor the VO frontend

import copy
import os
import os.path
import string
import time

from glideinwms.lib import logSupport, rrdSupport, util, xmlFormat
from glideinwms.lib.defaults import BINARY_ENCODING

############################################################
#
# Configuration
#
############################################################


[docs] class MonitoringConfig: def __init__(self): # set default values # user should modify if needed self.rrd_step = 300 # default to 5 minutes self.rrd_heartbeat = 1800 # default to 30 minutes, should be at least twice the loop time self.rrd_archives = [ ("AVERAGE", 0.8, 1, 740), # max precision, keep 2.5 days ("AVERAGE", 0.92, 12, 740), # 1 h precision, keep for a month (30 days) ("AVERAGE", 0.98, 144, 740), # 12 hour precision, keep for a year ] # The name of the attribute that identifies the glidein self.monitor_dir = "monitor/" self.rrd_obj = rrdSupport.rrdSupport() self.my_name = "Unknown"
[docs] def write_file(self, relative_fname, output_str): fname = os.path.join(self.monitor_dir, relative_fname) os.makedirs(os.path.dirname(fname), exist_ok=True) # print "Writing "+fname with open(fname + ".tmp", "w") as fd: fd.write(output_str + "\n") util.file_tmp2final(fname, mask_exceptions=(logSupport.log.error, f"Failed rename/write into {fname}")) return
[docs] def establish_dir(self, relative_dname): dname = os.path.join(self.monitor_dir, relative_dname) os.makedirs(dname, exist_ok=True) return
[docs] def write_rrd_multi(self, relative_fname, ds_type, time, val_dict, min_val=None, max_val=None): """ Create a RRD file, using rrdtool. """ if self.rrd_obj.isDummy(): return # nothing to do, no rrd bin no rrd creation for tp in ((".rrd", self.rrd_archives),): rrd_ext, rrd_archives = tp fname = os.path.join(self.monitor_dir, relative_fname + rrd_ext) # print "Writing RRD "+fname if not os.path.isfile(fname): # print "Create RRD "+fname if min_val is None: min_val = "U" if max_val is None: max_val = "U" ds_names = sorted(val_dict.keys()) ds_arr = [] for ds_name in ds_names: ds_arr.append((ds_name, ds_type, self.rrd_heartbeat, min_val, max_val)) self.rrd_obj.create_rrd_multi(fname, self.rrd_step, rrd_archives, ds_arr) # print "Updating RRD "+fname try: self.rrd_obj.update_rrd_multi(fname, time, val_dict) except Exception: logSupport.log.error("Failed to update %s" % fname) # logSupport.log.exception(traceback.format_exc()) return
######################################################################################################################################### # # condorQStats # # This class handles the data obtained from condor_q # #########################################################################################################################################
[docs] class groupStats: def __init__(self): self.data = {"factories": {}, "states": {}, "totals": {}} self.updated = time.time() self.files_updated = None self.attributes = { "Jobs": ("Idle", "OldIdle", "Running", "Total", "Idle_3600"), "Glideins": ("Idle", "Running", "Total"), "MatchedJobs": ("Idle", "EffIdle", "OldIdle", "Running", "RunningHere"), #'MatchedGlideins':("Total","Idle","Running","Failed","TotalCores","IdleCores","RunningCores"), "MatchedGlideins": ("Total", "Idle", "Running", "Failed"), "MatchedCores": ("Total", "Idle", "Running"), "Requested": ("Idle", "MaxRun"), } # only these will be states, all other names are assumed to be factories self.states_names = ("Unmatched", "MatchedUp", "MatchedDown")
[docs] def logJobs(self, jobs_data): el = {} self.data["totals"]["Jobs"] = el for k in self.attributes["Jobs"]: if k in jobs_data: el[k] = int(jobs_data[k]) self.updated = time.time()
[docs] def logGlideins(self, slots_data): el = {} self.data["totals"]["Glideins"] = el for k in self.attributes["Glideins"]: if k in slots_data: el[k] = int(slots_data[k]) self.updated = time.time()
[docs] def logMatchedJobs(self, factory, idle, effIdle, oldIdle, running, realRunning): factory_or_state_d = self.get_factory_dict(factory) factory_or_state_d["MatchedJobs"] = { self.attributes["MatchedJobs"][0]: int(idle), self.attributes["MatchedJobs"][1]: int(effIdle), self.attributes["MatchedJobs"][2]: int(oldIdle), self.attributes["MatchedJobs"][3]: int(running), self.attributes["MatchedJobs"][4]: int(realRunning), } self.update = time.time()
[docs] def logFactDown(self, factory, isDown): factory_or_state_d = self.get_factory_dict(factory) if isDown: factory_or_state_d["Down"] = "Down" else: factory_or_state_d["Down"] = "Up" self.updated = time.time()
[docs] def logMatchedGlideins(self, factory, total, idle, running, failed, totalcores, idlecores, runningcores): factory_or_state_d = self.get_factory_dict(factory) factory_or_state_d["MatchedGlideins"] = { self.attributes["MatchedGlideins"][0]: int(total), self.attributes["MatchedGlideins"][1]: int(idle), self.attributes["MatchedGlideins"][2]: int(running), self.attributes["MatchedGlideins"][3]: int(failed), } factory_or_state_d["MatchedCores"] = { self.attributes["MatchedCores"][0]: int(totalcores), self.attributes["MatchedCores"][1]: int(idlecores), self.attributes["MatchedCores"][2]: int(runningcores), } self.update = time.time()
[docs] def logFactAttrs(self, factory, attrs, blacklist): factory_or_state_d = self.get_factory_dict(factory) factory_or_state_d["Attributes"] = {} for attr in attrs: if attr not in blacklist: factory_or_state_d["Attributes"][attr] = attrs[attr] self.update = time.time()
[docs] def logFactReq(self, factory, reqIdle, reqMaxRun, params): factory_or_state_d = self.get_factory_dict(factory) factory_or_state_d["Requested"] = { self.attributes["Requested"][0]: int(reqIdle), self.attributes["Requested"][1]: int(reqMaxRun), "Parameters": copy.deepcopy(params), } self.updated = time.time()
[docs] def get_factories_data(self): return copy.deepcopy(self.data["factories"])
[docs] def get_xml_factories_data(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): data = self.get_factories_data() return xmlFormat.dict2string( data, dict_name="factories", el_name="factory", subtypes_params={ "class": {"subclass_params": {"Requested": {"dicts_params": {"Parameters": {"el_name": "Parameter"}}}}} }, indent_tab=indent_tab, leading_tab=leading_tab, )
[docs] def get_states_data(self): return copy.deepcopy(self.data["states"])
[docs] def get_xml_states_data(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): data = self.get_states_data() return xmlFormat.dict2string( data, dict_name="states", el_name="state", subtypes_params={ "class": {"subclass_params": {"Requested": {"dicts_params": {"Parameters": {"el_name": "Parameter"}}}}} }, indent_tab=indent_tab, leading_tab=leading_tab, )
[docs] def get_xml_updated(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): return xmlFormat.time2xml(self.updated, "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab="")
[docs] def get_total(self): total = { "MatchedJobs": None, "Requested": None, "MatchedGlideins": None, "MatchedCores": None, } numtypes = (int, int, float) for f in list(self.data["factories"].keys()): fa = self.data["factories"][f] for w in list(fa.keys()): if w in total: # ignore eventual not supported classes el = fa[w] tel = total[w] if tel is None: # first one, just copy over total[w] = {} tel = total[w] for a in list(el.keys()): if type(el[a]) in numtypes: # copy only numbers tel[a] = el[a] else: # successive, sum for a in list(el.keys()): if type(el[a]) in numtypes: # consider only numbers if a in tel: tel[a] += el[a] # if other frontends did't have this attribute, ignore # if any attribute from prev. frontends are not in the current one, remove from total for a in list(tel.keys()): if a not in el: del tel[a] elif type(el[a]) not in numtypes: del tel[a] for w in list(total.keys()): if total[w] is None: del total[w] # remove entry if not defined total.update(copy.deepcopy(self.data["totals"])) return total
[docs] def get_xml_total(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): total = self.get_total() return xmlFormat.class2string(total, inst_name="total", indent_tab=indent_tab, leading_tab=leading_tab)
[docs] def write_file(self): global monitoringConfig if (self.files_updated is not None) and ((self.updated - self.files_updated) < 5): # files updated recently, no need to redo it return # write snaphot file xml_str = ( '<?xml version="1.0" encoding="ISO-8859-1"?>\n\n' + "<VOFrontendGroupStats>\n" + self.get_xml_updated(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + self.get_xml_factories_data(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + self.get_xml_states_data(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + self.get_xml_total(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + "</VOFrontendGroupStats>\n" ) monitoringConfig.write_file("frontend_status.xml", xml_str) # update RRDs total_el = self.get_total() self.write_one_rrd("total", total_el) data = self.get_factories_data() for fact in list(data.keys()): self.write_one_rrd("factory_%s" % sanitize(fact), data[fact], 1) data = self.get_states_data() for fact in list(data.keys()): self.write_one_rrd("state_%s" % sanitize(fact), data[fact], 1) self.files_updated = self.updated return
################################################ # PRIVATE - Used to select the right disctionary
[docs] def get_factory_dict(self, factory): if factory in self.states_names: factories = self.data["states"] else: factories = self.data["factories"] if factory not in factories: factories[factory] = {} return factories[factory]
############################### # PRIVATE - Used by write_file # Write one RRD
[docs] def write_one_rrd(self, name, data, fact=0): global monitoringConfig val_dict = {} if fact == 0: type_strings = { "Jobs": "Jobs", "Glideins": "Glidein", "MatchedJobs": "MatchJob", "MatchedGlideins": "MatchGlidein", "MatchedCores": "MatchCore", "Requested": "Req", } else: type_strings = { "MatchedJobs": "MatchJob", "MatchedGlideins": "MatchGlidein", "MatchedCores": "MatchCore", "Requested": "Req", } # init, so that all get created properly for tp in list(self.attributes.keys()): if tp in list(type_strings.keys()): tp_str = type_strings[tp] attributes_tp = self.attributes[tp] for a in attributes_tp: val_dict[f"{tp_str}{a}"] = None for tp in data: # values (RRD type) - Jobs, Slots if tp not in list(self.attributes.keys()): continue if tp not in list(type_strings.keys()): continue tp_str = type_strings[tp] attributes_tp = self.attributes[tp] fe_el_tp = data[tp] for a in list(fe_el_tp.keys()): if a in attributes_tp: a_el = fe_el_tp[a] if not isinstance(a_el, dict): # ignore subdictionaries val_dict[f"{tp_str}{a}"] = a_el monitoringConfig.establish_dir("%s" % name) monitoringConfig.write_rrd_multi(os.path.join(name, "Status_Attributes"), "GAUGE", self.updated, val_dict)
########################################################################
[docs] class factoryStats: def __init__(self): self.data = {} self.updated = time.time() self.files_updated = None self.attributes = { "Jobs": ("Idle", "OldIdle", "Running", "Total"), "Matched": ("Idle", "OldIdle", "Running", "Total"), "Requested": ("Idle", "MaxRun"), "Slots": ("Idle", "Running", "Total"), }
[docs] def logJobs(self, client_name, qc_status): if client_name in self.data: t_el = self.data[client_name] else: t_el = {} self.data[client_name] = t_el el = {} t_el["Status"] = el status_pairs = ( (1, "Idle"), (2, "Running"), (5, "Held"), (1001, "Wait"), (1002, "Pending"), (1010, "StageIn"), (1100, "IdleOther"), (4010, "StageOut"), ) for p in status_pairs: nr, status = p if nr in qc_status: el[status] = int(qc_status[nr]) else: el[status] = 0 self.updated = time.time()
[docs] def logRequest(self, client_name, requests, params): """ requests is a dictinary of requests params is a dictinary of parameters At the moment, it looks only for 'IdleGlideins' 'MaxRunningGlideins' """ if client_name in self.data: t_el = self.data[client_name] else: t_el = {} self.data[client_name] = t_el el = {} t_el["Requested"] = el if "IdleGlideins" in requests: el["Idle"] = int(requests["IdleGlideins"]) if "MaxRunningGlideins" in requests: el["MaxRun"] = int(requests["MaxRunningGlideins"]) el["Parameters"] = copy.deepcopy(params) self.updated = time.time()
[docs] def logClientMonitor(self, client_name, client_monitor, client_internals): """ client_monitor is a dictinary of monitoring info client_internals is a dictinary of internals At the moment, it looks only for 'Idle' 'Running' 'GlideinsIdle' 'GlideinsRunning' 'GlideinsTotal' 'LastHeardFrom' """ if client_name in self.data: t_el = self.data[client_name] else: t_el = {} self.data[client_name] = t_el el = {} t_el["ClientMonitor"] = el for karr in ( ("Idle", "JobsIdle"), ("Running", "JobsRunning"), ("GlideinsIdle", "GlideIdle"), ("GlideinsRunning", "GlideRunning"), ("GlideinsTotal", "GlideTotal"), ): ck, ek = karr if ck in client_monitor: el[ek] = int(client_monitor[ck]) if "LastHeardFrom" in client_internals: el["InfoAge"] = int(time.time() - int(client_internals["LastHeardFrom"])) el["InfoAgeAvgCounter"] = 1 # used for totals since we need an avg in totals, not absnum self.updated = time.time()
[docs] def get_data(self): data1 = copy.deepcopy(self.data) for f in list(data1.keys()): fe = data1[f] for w in list(fe.keys()): el = fe[w] for a in list(el.keys()): if a[-10:] == "AvgCounter": # do not publish avgcounter fields... they are internals del el[a] return data1
[docs] def get_xml_data(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): data = self.get_data() return xmlFormat.dict2string( data, dict_name="frontends", el_name="frontend", subtypes_params={ "class": {"subclass_params": {"Requested": {"dicts_params": {"Parameters": {"el_name": "Parameter"}}}}} }, indent_tab=indent_tab, leading_tab=leading_tab, )
[docs] def get_total(self): total = {"Status": None, "Requested": None, "ClientMonitor": None} numtypes = (int, int, float) for f in list(self.data.keys()): fe = self.data[f] for w in list(fe.keys()): if w in total: # ignore eventual not supported classes el = fe[w] tel = total[w] if tel is None: # first one, just copy over total[w] = {} tel = total[w] for a in list(el.keys()): if type(el[a]) in numtypes: # copy only numbers tel[a] = el[a] else: # successive, sum for a in list(el.keys()): if type(el[a]) in numtypes: # consider only numbers if a in tel: tel[a] += el[a] # if other frontends did't have this attribute, ignore # if any attribute from prev. frontends are not in the current one, remove from total for a in list(tel.keys()): if a not in el: del tel[a] elif type(el[a]) not in numtypes: del tel[a] for w in list(total.keys()): if total[w] is None: del total[w] # remove entry if not defined else: tel = total[w] for a in list(tel.keys()): if a[-10:] == "AvgCounter": # this is an average counter, calc the average of the referred element # like InfoAge=InfoAge/InfoAgeAvgCounter aorg = a[:-10] tel[aorg] = tel[aorg] // tel[a] # the avgcount totals are just for internal purposes del tel[a] return total
[docs] def get_xml_total(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): total = self.get_total() return xmlFormat.class2string(total, inst_name="total", indent_tab=indent_tab, leading_tab=leading_tab)
[docs] def get_xml_updated(self, indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=""): return xmlFormat.time2xml(self.updated, "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab="")
[docs] def write_file(self): global monitoringConfig if (self.files_updated is not None) and ((self.updated - self.files_updated) < 5): # files updated recently, no need to redo it return # write snaphot file xml_str = ( '<?xml version="1.0" encoding="ISO-8859-1"?>\n\n' + "<glideFactoryEntryQStats>\n" + self.get_xml_updated(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + self.get_xml_data(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + self.get_xml_total(indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB) + "\n" + "</glideFactoryEntryQStats>\n" ) monitoringConfig.write_file("schedd_status.xml", xml_str) data = self.get_data() total_el = self.get_total() # update RRDs type_strings = {"Status": "Status", "Requested": "Req", "ClientMonitor": "Client"} for fe in [None] + list(data.keys()): if fe is None: # special key == Total fe_dir = "total" fe_el = total_el else: fe_dir = "frontend_" + fe fe_el = data[fe] val_dict = {} # init, so that all get created properly for tp in list(self.attributes.keys()): tp_str = type_strings[tp] attributes_tp = self.attributes[tp] for a in attributes_tp: val_dict[f"{tp_str}{a}"] = None monitoringConfig.establish_dir(fe_dir) for tp in list(fe_el.keys()): # values (RRD type) - Status, Requested or ClientMonitor if tp not in list(self.attributes.keys()): continue tp_str = type_strings[tp] attributes_tp = self.attributes[tp] fe_el_tp = fe_el[tp] for a in list(fe_el_tp.keys()): if a in attributes_tp: a_el = fe_el_tp[a] if not isinstance(a_el, dict): # ignore subdictionaries val_dict[f"{tp_str}{a}"] = a_el monitoringConfig.write_rrd_multi(os.path.join(fe_dir, "Status_Attributes"), "GAUGE", self.updated, val_dict) self.files_updated = self.updated return
############### P R I V A T E ################ ##################################################
[docs] def sanitize(name): good_chars = string.ascii_letters + string.digits + ".-" outarr = [] for i in range(len(name)): if name[i] in good_chars: outarr.append(name[i]) else: outarr.append("_") return "".join(outarr)
################################################## # global configuration of the module monitoringConfig = MonitoringConfig()
[docs] def write_frontend_descript_xml(frontendDescript, monitor_dir): """ Writes out the frontend descript.xml file in the monitor web area. @type frontendDescript: FrontendDescript @param frontendDescript: contains the data in the frontend.descript file in the frontend instance dir @type monitor_dir: string @param monitor_dir: filepath the the monitor dir in the frontend instance dir """ frontend_data = copy.deepcopy(frontendDescript.data) frontend_str = '<frontend FrontendName="%s"' % frontend_data["FrontendName"] + "/>" dis_link_txt = 'display_txt="{}" href_link="{}"'.format( frontend_data["MonitorDisplayText"], frontend_data["MonitorLink"], ) footer_str = "<monitor_footer " + dis_link_txt + "/>" output = ( '<?xml version="1.0" encoding="ISO-8859-1"?>\n\n' + "<glideinFrontendDescript>\n" + xmlFormat.time2xml( time.time(), "updated", indent_tab=xmlFormat.DEFAULT_TAB, leading_tab=xmlFormat.DEFAULT_TAB ) + "\n" + xmlFormat.DEFAULT_TAB + frontend_str + "\n" + xmlFormat.DEFAULT_TAB + footer_str + "\n" + "</glideinFrontendDescript>" ) fname = os.path.join(monitor_dir, "descript.xml") try: with open(fname + ".tmp", "wb") as f: f.write(output.encode(BINARY_ENCODING)) util.file_tmp2final( fname, mask_exceptions=(logSupport.log.error, f"Failed rename/write of the frontend descript.xml: {fname}"), ) except OSError: logSupport.log.exception("Error writing out the frontend descript.xml: ")