# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module implements classes to track
changes in glidein status logs
"""
import copy
import mmap
import os
import os.path
import re
import stat
import time
from glideinwms.lib import condorLogParser, logSupport
rawJobId2Nr = condorLogParser.rawJobId2Nr
rawTime2cTime = condorLogParser.rawTime2cTime
[docs]
class logSummaryTimingsOutWrapper:
def __init__(self):
self.obj = None
[docs]
def getObj(self, logname=None, cache_dir=None, username="all"):
if (logname is not None) and (cache_dir is not None):
self.obj = logSummaryTimingsOut(logname, cache_dir, username)
return self.obj
[docs]
class logSummaryTimingsOut(condorLogParser.logSummaryTimings):
"""
Class logSummaryTimingsOut logs timing and status of a job.
It declares a job complete only after the output file has been received
The format is slightly different than the one of logSummaryTimings;
we add the dirname in the job id
When a output file is found, it adds a 4th parameter to the completed jobs
See extractLogData below for more details
"""
def __init__(self, logname, cache_dir, username):
"""
This class uses the condorLogParser clInit function to initialize
"""
self.clInit(logname, cache_dir, ".%s.ftstpk" % username)
self.dirname = os.path.dirname(logname)
self.cache_dir = cache_dir
self.now = time.time()
self.year = time.localtime(self.now)[0]
[docs]
def loadFromLog(self):
"""
This class inherits from cachedLogClass. So, load() will
first check the cached files. If changed, it will call this function.
This uses the condorLogParser to load the log, then does
some post-processing to check the job.NUMBER.out files
to see if the job has finished and to extract some data.
"""
condorLogParser.logSummaryTimings.loadFromLog(self)
if "Completed" not in self.data:
return # nothing else to fo
org_completed = self.data["Completed"]
new_completed = []
new_waitout = []
now = time.time()
year = time.localtime(now)[0]
for el in org_completed:
job_id = rawJobId2Nr(el[0])
job_fname = "job.%i.%i.out" % job_id
job_fullname = os.path.join(self.dirname, job_fname)
end_time = rawTime2cTime(el[3], year)
if end_time > now:
end_time = rawTime2cTime(el[3], year - 1)
try:
statinfo = os.stat(job_fullname)
ftime = statinfo[stat.ST_MTIME]
fsize = statinfo[stat.ST_SIZE]
file_ok = (
(fsize > 0)
and (ftime > (end_time - 300)) # log files are ==0 only before Condor_G transfers them back
and (ftime < (now - 5)) # same here
) # make sure it is not being written into
except OSError:
# no file, report invalid
file_ok = 0
if file_ok:
# try:
# fdata=extractLogData(job_fullname)
# except Exception:
# fdata=None # just protect
new_completed.append(el)
else:
if (now - end_time) < 3600: # give him 1 hour to return the log files
new_waitout.append(el)
else:
new_completed.append(el)
self.data["CompletedNoOut"] = new_waitout
self.data["Completed"] = new_completed
# append log name prefix
for k in list(self.data.keys()):
new_karr = []
for el in self.data[k]:
job_id = rawJobId2Nr(el[0])
job_fname = "job.%i.%i" % (job_id[0], job_id[1])
job_fullname = os.path.join(self.dirname, job_fname)
new_el = el + (job_fullname,)
new_karr.append(new_el)
self.data[k] = new_karr
return
[docs]
def diff_raw(self, other):
"""
Diff self.data with other info,
add glidein log data to Entered/Exited.
Used to compare current data with previous iteration.
Uses symmetric difference of sets to compare the two dictionaries.
@type other: dictionary of statuses -> jobs
@return: data[status]['Entered'|'Exited'] - list of jobs
"""
if other is None:
outdata = {}
if self.data is not None:
for k in list(self.data.keys()):
outdata[k] = {"Exited": [], "Entered": self.data[k]}
return outdata
elif self.data is None:
outdata = {}
for k in list(other.keys()):
outdata[k] = {"Entered": [], "Exited": other[k]}
return outdata
else:
outdata = {}
keys = {} # keys will contain the merge of the two lists
for s in list(self.data.keys()) + list(other.keys()):
keys[s] = None
for s in list(keys.keys()):
sel = []
if s in self.data:
for sel_e in self.data[s]:
sel.append(sel_e[0])
oel = []
if s in other:
for oel_e in other[s]:
oel.append(oel_e[0])
#################
outdata_s = {"Entered": [], "Exited": []}
outdata[s] = outdata_s
sset = set(sel)
oset = set(oel)
entered_set = sset.difference(oset)
entered = []
if s in self.data:
for sel_e in self.data[s]:
if sel_e[0] in entered_set:
entered.append(sel_e)
exited_set = oset.difference(sset)
exited = []
if s in other:
for oel_e in other[s]:
if oel_e[0] in exited_set:
exited.append(oel_e)
outdata_s["Entered"] = entered
outdata_s["Exited"] = exited
return outdata
# diff self data with other info
# add glidein log data to Entered/Completed
# return data[status]['Entered'|'Exited'] - list of jobs
# completed jobs are augmented with data from the log
[docs]
def diff(self, other):
"""
Diff self.data with other for use in comparing current
iteration data with previous iteration.
Uses diff_raw to perform symmetric difference of self.data
and other and puts it into data[status]['Entered'|'Exited']
Completed jobs are augmented with data from the log
@return: data[status]['Entered'|'Exited'] - list of jobs
"""
outdata = self.diff_raw(other)
if "Completed" in outdata:
outdata_s = outdata["Completed"]
entered = outdata_s["Entered"]
for i in range(len(entered)):
sel_e = entered[i]
job_fullname = sel_e[-1] + ".out"
try:
fdata = extractLogData(job_fullname)
except Exception:
fdata = copy.deepcopy(EMPTY_LOG_DATA) # just protect
entered[i] = sel_e[:-1] + (fdata, sel_e[-1])
return outdata
[docs]
class dirSummarySimple:
"""
dirSummary Simple
for now it is just a constructor wrapper
Further on it will need to implement glidein exit code checks
"""
def __init__(self, obj):
self.data = copy.deepcopy(obj.data)
self.logClass = obj.logClass
self.wrapperClass = obj.wrapperClass
if obj.wrapperClass is not None:
self.logClass = obj.wrapperClass.getObj()
else:
logSupport.log.debug("== MISHANDLED LogParser Object! ==")
[docs]
def mkTempLogObj(self):
if self.wrapperClass is not None:
dummyobj = self.wrapperClass.getObj(logname=os.path.join("/tmp", "dummy.txt"), cache_dir="/tmp")
else:
dummyobj = self.logClass(os.path.join("/tmp", "dummy.txt"), "/tmp")
# dummyobj=self.logClass(os.path.join('/tmp','dummy.txt'),'/tmp')
dummyobj.data = self.data # a little rough but works
return dummyobj
# diff self data with other info
[docs]
def diff(self, other):
dummyobj = self.mkTempLogObj()
return dummyobj.diff(other.data)
# merge other into myself
[docs]
def merge(self, other):
dummyobj = self.mkTempLogObj()
dummyobj.merge(copy.deepcopy(other.data))
self.data = dummyobj.data
[docs]
class dirSummaryTimingsOut(condorLogParser.cacheDirClass):
"""
This class uses a lambda function to initialize an instance
of cacheDirClass.
The function chooses all condor_activity files in a directory
that correspond to a particular client.
"""
def __init__(self, dirname, cache_dir, client_name, user_name, inactive_files=None, inactive_timeout=24 * 3600):
self.cdInit(
None,
dirname,
"condor_activity_",
"_%s.log" % client_name,
".%s.cifpk" % user_name,
inactive_files,
inactive_timeout,
cache_dir,
wrapperClass=logSummaryTimingsOutWrapper(),
username=user_name,
)
[docs]
def get_simple(self):
try:
obj = dirSummarySimple(self)
except Exception:
logSupport.log.exception("dirSummarySimple failed")
raise
return obj
[docs]
class dirSummaryTimingsOutFull(condorLogParser.cacheDirClass):
"""
This class uses a lambda function to initialize an instance
of cacheDirClass.
The function chooses all condor_activity files in a directory
regardless of client name.
"""
def __init__(self, dirname, cache_dir, inactive_files=None, inactive_timeout=24 * 3600):
self.cdInit(
lambda ln, cd: logSummaryTimingsOut(ln, cd, "all"),
dirname,
"condor_activity_",
".log",
".all.cifpk",
inactive_files,
inactive_timeout,
cache_dir,
)
[docs]
def get_simple(self):
return dirSummarySimple(self)
#########################################################
# P R I V A T E
#########################################################
ELD_RC_VALIDATE_END = re.compile(b"=== Last script starting .* after validating for (?P<secs>[0-9]+) ===")
ELD_RC_CONDOR_START = re.compile(b"=== Condor starting.*===")
ELD_RC_CONDOR_END = re.compile(b"=== Condor ended.*after (?P<secs>[0-9]+) ===")
ELD_RC_CONDOR_SLOT = re.compile(
rb"=== Stats of (?P<slot>\S+) ===(?P<content>.*)=== End Stats of (?P<slot2>\S+) ===", re.M | re.DOTALL
)
ELD_RC_CONDOR_SLOT_CONTENT_COUNT = re.compile(
b"Total(?P<name>.*)jobs (?P<jobsnr>[0-9]+) .*utilization (?P<secs>[0-9]+)"
)
ELD_RC_CONDOR_SLOT_ACTIVATIONS_COUNT = re.compile(b"Total number of activations/claims: (?P<nr>[0-9]+)")
ELD_RC_GLIDEIN_END = re.compile(b"=== Glidein ending .* with code (?P<code>[0-9]+) after (?P<secs>[0-9]+) ===")
KNOWN_SLOT_STATS = ["Total", "goodZ", "goodNZ", "badSignal", "badOther"]
EMPTY_LOG_DATA = {"condor_started": 0, "glidein_duration": 0}