#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This is the glideinFactoryEntryGroup. Common Tasks like querying collector
and advertizing the work done by group are done here
Arguments:
$1 = parent_pid (int): The pid for the Factory daemon
$2 = sleep_time (int): The number of seconds to sleep between iterations
$3 = advertize_rate (int): The rate at which advertising should occur (every $3 loops)
$4 = startup_dir (str|Path): The "home" directory for the entry.
$5 = entry_names (str): Colon separated list with the names of the entries this process should work on
$6 = group_id (str): Group id, normally a number (with the "group_" prefix it forms the group name),
It can change between Factory reconfigurations
"""
import os
import os.path
import pickle
import sys
import time
from glideinwms.factory import glideFactoryConfig as gfc
from glideinwms.factory import glideFactoryDowntimeLib, glideFactoryEntry
from glideinwms.factory import glideFactoryInterface as gfi
from glideinwms.factory import glideFactoryLib as gfl
from glideinwms.factory import glideFactoryPidLib
from glideinwms.lib import classadSupport, cleanupSupport, logSupport
from glideinwms.lib.fork import fetch_fork_result_list, ForkManager, print_child_processes
from glideinwms.lib.pidSupport import register_sighandler, unregister_sighandler
############################################################
# Memory foot print of a entry process when forked for check_and_perform_work
# Set a conservative limit of 500 MB (based on USCD 2.6 factory Pss of 115 MB)
# plus a safety factor of 2
ENTRY_MEM_REQ_BYTES = 500000000 * 2
############################################################
[docs]
class EntryGroup:
def __init__(self):
pass
############################################################
[docs]
def check_parent(parent_pid, glideinDescript, my_entries):
"""
Check to make sure that we aren't an orphaned process. If Factory
daemon has died, then clean up after ourselves and kill ourselves off.
@type parent_pid: int
@param parent_pid: pid for the Factory daemon process
@type glideinDescript: glideFactoryConfig.GlideinDescript
@param glideinDescript: Object that encapsulates glidein.descript in the Factory root directory
@type my_entries: dict
@param my_entries: Dictionary of entry objects keyed on entry name
@raise KeyboardInterrupt: Raised when the Factory daemon cannot be found
"""
if os.path.exists("/proc/%s" % parent_pid):
return # parent still exists, we are fine
logSupport.log.info("Parent died, exit.")
# there is nobody to clean up after ourselves... do it here
logSupport.log.info("Deadvertize myself")
for entry in list(my_entries.values()):
# Deadvertise glidefactory classad
try:
gfi.deadvertizeGlidein(glideinDescript.data["FactoryName"], glideinDescript.data["GlideinName"], entry.name)
except Exception:
logSupport.log.warning("Failed to deadvertize entry '%s'" % entry.name)
# Deadvertise glidefactoryclient classad
try:
gfi.deadvertizeAllGlideinClientMonitoring(
glideinDescript.data["FactoryName"], glideinDescript.data["GlideinName"], entry.name
)
except Exception:
logSupport.log.warning("Failed to deadvertize monitoring for entry '%s'" % entry.name)
raise KeyboardInterrupt("Parent died. Quiting.")
############################################################
[docs]
def find_work(factory_in_downtime, glideinDescript, frontendDescript, group_name, my_entries):
"""
Find work for all the entries in the group
@type factory_in_downtime: boolean
@param factory_in_downtime: True if factory is in downtime
@type glideinDescript: dict
@param glideinDescript: Factory glidein config values
@type frontendDescript: dict
@param frontendDescript: Security mappings for frontend identities, security classes, and usernames
@type group_name: string
@param group_name: Name of the group
@type my_entries: dict
@param my_entries: Dictionary of entry objects keyed on entry name
@return: Dictionary of work to do keyed on entry name
@rtype: dict
"""
pub_key_obj = glideinDescript.data["PubKeyObj"]
old_pub_key_obj = glideinDescript.data["OldPubKeyObj"]
logSupport.log.info("Finding work")
work = gfi.findGroupWork(
gfl.factoryConfig.factory_name,
gfl.factoryConfig.glidein_name,
list(my_entries.keys()),
gfl.factoryConfig.supported_signtypes,
pub_key_obj,
)
log_work_info(work, key="existing")
# If old key is valid, find the work using old key as well and append it
# to existing work dictionary
if old_pub_key_obj is not None:
work_oldkey = {}
# still using the old key in this cycle
logSupport.log.info("Old factory key is still valid. Trying to find work using old factory key.")
work_oldkey = gfi.findGroupWork(
gfl.factoryConfig.factory_name,
gfl.factoryConfig.glidein_name,
list(my_entries.keys()),
gfl.factoryConfig.supported_signtypes,
old_pub_key_obj,
)
log_work_info(work, key="old")
# Merge the work_oldkey with work
for w in work_oldkey:
if w in work:
# This should not happen but still as a safegaurd warn
logSupport.log.warning(
"Work task for %s exists using existing key and old key. Ignoring the work from old key." % w
)
continue
work[w] = work_oldkey[w]
# Append empty work item for entries that do not have work
# This is required to trigger glidein sanitization further in the code
for ent in my_entries:
if ent not in work:
work[ent] = {}
return work
[docs]
def log_work_info(work, key=""):
if key.strip() != "":
logSupport.log.info(f"Work tasks grouped by entries using {key} factory key")
else:
logSupport.log.info("Work tasks grouped by entries")
for entry in work:
# Only log if there is work to do
if len(work[entry]) > 0:
logSupport.log.info(f"Entry: {entry} (Tasks: {len(work[entry])})")
[docs]
def get_work_count(work):
"""
Get total work to do i.e. sum of work to do for every entry
@type work: dict
@param work: Dictionary of work to do keyed on entry name
@rtype: int
@return: Total work to do.
"""
count = 0
for entry in work:
count += len(work[entry])
return count
[docs]
def forked_update_entries_stats(factory_in_downtime, entries_list):
"""Update statistics for entries that have no work to do
:param factory_in_downtime:
:param entries_list:
:return:
"""
entries_updated = glideFactoryEntry.update_entries_stats(factory_in_downtime, entries_list)
# entry objects now have updated info in the child process
# This info is required for monitoring and advertising
# Compile the return info from the updated entry object
# Can't dumps the entry object directly, so need to extract
# the info required.
# Making the entries pickle-friendly
return_dict = {"entries": [(e.name, e.getState()) for e in entries_updated]}
# should set also e['work_done'] = 0 ?
return return_dict
##############################################
# Functions managing the Entries life-cycle
[docs]
def iterate_one(do_advertize, factory_in_downtime, glideinDescript, frontendDescript, group_name, my_entries):
"""One iteration of the entry group
Args:
do_advertize (bool): True if glidefactory classads should be advertised
factory_in_downtime (bool): True if factory is in downtime
glideinDescript (dict): Factory glidein config values
frontendDescript (dict): Security mappings for frontend identities, security classes, and usernames
group_name (str): Name of the group
my_entries (dict): Dictionary of entry objects (glideFactoryEntry.Entry) keyed on entry name
Returns:
int: Units of work preformed (0 if no Glidein was submitted)
"""
groupwork_done = {}
done_something = 0
for entry in list(my_entries.values()):
entry.initIteration(factory_in_downtime)
try:
groupwork_done = find_and_perform_work(
do_advertize, factory_in_downtime, glideinDescript, frontendDescript, group_name, my_entries
)
except Exception:
logSupport.log.warning("Error occurred while trying to find and do work.")
logSupport.log.exception("Exception: ")
logSupport.log.debug("Group Work done: %s" % groupwork_done)
# Classad files to use
gf_filename = classadSupport.generate_classad_filename(prefix="gfi_adm_gf")
gfc_filename = classadSupport.generate_classad_filename(prefix="gfi_adm_gfc")
logSupport.log.info(
f"Generating glidefactory ({gf_filename}) and glidefactoryclient ({gfc_filename}) classads as needed"
)
entries_to_advertise = []
for entry in list(my_entries.values()):
# Write classads to file if work was done or if advertise flag is set
# Actual advertise is done using multi classad advertisement
entrywork_done = 0
if (entry.name in groupwork_done) and ("work_done" in groupwork_done[entry.name]):
entrywork_done = groupwork_done[entry.name]["work_done"]
done_something += entrywork_done
if (do_advertize) or (entrywork_done > 0):
entries_to_advertise.append(entry.name)
entry.writeClassadsToFile(factory_in_downtime, gf_filename, gfc_filename)
entry.unsetInDowntime()
if (do_advertize) or (done_something > 0):
logSupport.log.debug(
"Generated glidefactory and glidefactoryclient classads for entries: %s" % ", ".join(entries_to_advertise)
)
# ADVERTISE: glidefactory classads
gfi.advertizeGlideinFromFile(gf_filename, remove_file=True, is_multi=True)
# ADVERTISE: glidefactoryclient classads
gfi.advertizeGlideinClientMonitoringFromFile(gfc_filename, remove_file=True, is_multi=True)
else:
logSupport.log.info("Not advertising glidefactory and glidefactoryclient classads this round")
return done_something
############################################################
[docs]
def iterate(parent_pid, sleep_time, advertize_rate, glideinDescript, frontendDescript, group_name, my_entries):
"""Iterate over set of tasks until it is time to quit or die.
The main "worker" function for the Factory Entry Group.
Args:
parent_pid (int): The pid for the Factory daemon
sleep_time (int): The number of seconds to sleep between iterations
advertize_rate (int): The rate at which advertising should occur
glideinDescript (glideFactoryConfig.GlideinDescript): glidein.descript object in the Factory root dir
frontendDescript (glideFactoryConfig.FrontendDescript): frontend.descript object in the Factory root dir
group_name (str): Name of the group
my_entries (dict): Dictionary of entry objects keyed on entry name
"""
is_first = True # In first iteration
count = 0
# Record the starttime so we know when to disable the use of old pub key
starttime = time.time()
# The grace period should be in the factory config. Use it to determine
# the end of lifetime for the old key object. Hardcoded for now to 30 mins.
oldkey_gracetime = int(glideinDescript.data["OldPubKeyGraceTime"])
oldkey_eoltime = starttime + oldkey_gracetime
factory_downtimes = glideFactoryDowntimeLib.DowntimeFile(glideinDescript.data["DowntimesFile"])
while True:
# Check if parent is still active. If not cleanup and die.
check_parent(parent_pid, glideinDescript, my_entries)
cleanupSupport.cleaners.start_background_cleanup()
# Check if its time to invalidate factory's old key
if (time.time() > oldkey_eoltime) and (glideinDescript.data["OldPubKeyObj"] is not None):
# Invalidate the use of factory's old key
logSupport.log.info("Retiring use of old key.")
logSupport.log.info(
f"Old key was valid from {starttime} to {oldkey_eoltime} ie grace of ~{oldkey_gracetime} sec"
)
glideinDescript.data["OldPubKeyType"] = None
glideinDescript.data["OldPubKeyObj"] = None
# Check if the factory is in downtime. Group is in downtime only if the
# factory is in downtime. Entry specific downtime is handled in entry
factory_in_downtime = factory_downtimes.checkDowntime(entry="factory")
# Record the iteration start time
iteration_stime = time.time()
iteration_stime_str = time.ctime()
if factory_in_downtime:
logSupport.log.info("Iteration at (in downtime) %s" % iteration_stime_str)
else:
logSupport.log.info("Iteration at %s" % iteration_stime_str)
# PM: Shouldn't this be inside the else statement above?
# Why do we want to execute this if we are in downtime?
# Or do we want to execute only few steps here but code prevents us?
try:
done_something = iterate_one( # noqa: F841
count == 0, factory_in_downtime, glideinDescript, frontendDescript, group_name, my_entries
)
logSupport.log.info("Writing stats for all entries")
try:
pids = []
# generate a list of entries for each CPU
cpuCount = int(glideinDescript.data["MonitorUpdateThreadCount"])
logSupport.log.info("Number of parallel writes for stats: %i" % cpuCount)
entrylists = [list(my_entries.values())[cpu::cpuCount] for cpu in range(cpuCount)]
# Fork's keyed by cpu number. Actual key is irrelevant
pipe_ids = {}
post_writestats_info = {}
for cpu in range(cpuCount):
r, w = os.pipe()
unregister_sighandler()
pid = os.fork()
if pid:
# I am the parent
register_sighandler()
pids.append(pid)
os.close(w)
pipe_ids[cpu] = {"r": r, "pid": pid}
else:
# I am the child
os.close(r)
logSupport.disable_rotate = True
# Return the pickled entry object in form of dict
# return_dict[entry.name][entry.getState()]
return_dict = {}
for entry in entrylists[cpu]:
try:
entry.writeStats()
return_dict[entry.name] = entry.getState()
except Exception:
entry.log.warning(f"Error writing stats for entry '{entry.name}'")
entry.log.exception(f"Error writing stats for entry '{entry.name}': ")
try:
os.write(w, pickle.dumps(return_dict))
except Exception:
# Catch and log exceptions if any to avoid
# runaway processes.
logSupport.log.exception(f"Error writing pickled state for entries '{entrylists[cpu]}': ")
os.close(w)
# Exit without triggering SystemExit exception
# Note that this is skippihg also all the cleanup (files closing, finally clauses)
os._exit(0)
try:
logSupport.log.info("Processing response from children after write stats")
post_writestats_info = fetch_fork_result_list(pipe_ids)
except Exception:
logSupport.log.exception("Error processing response from one or more children after write stats")
logSupport.roll_all_logs()
for i in post_writestats_info:
for ent in post_writestats_info[i]:
(my_entries[ent]).setState(post_writestats_info[i][ent])
except KeyboardInterrupt:
raise # this is an exit signal, pass through
except Exception:
# never fail for stats reasons!
logSupport.log.exception("Error writing stats: ")
except KeyboardInterrupt:
raise # this is an exit signal, pass through
except Exception:
if is_first:
raise
else:
# If not the first pass, just warn
logSupport.log.exception("Exception occurred in the main loop of Factory Group %s: " % group_name)
cleanupSupport.cleaners.wait_for_cleanup()
iteration_etime = time.time()
iteration_sleep_time = sleep_time - (iteration_etime - iteration_stime)
if iteration_sleep_time < 0:
iteration_sleep_time = 0
logSupport.log.info("Sleep %is" % iteration_sleep_time)
time.sleep(iteration_sleep_time)
count = (count + 1) % advertize_rate
is_first = False # Entering following iterations
############################################################
[docs]
def main(parent_pid, sleep_time, advertize_rate, startup_dir, entry_names, group_id):
"""GlideinFactoryEntryGroup main function
Setup logging, monitoring, and configuration information. Starts the Entry
group main loop and handles cleanup at shutdown.
Args:
parent_pid (int): The pid for the Factory daemon
sleep_time (int): The number of seconds to sleep between iterations
advertize_rate (int): The rate at which advertising should occur
startup_dir (str|Path): The "home" directory for the entry.
entry_names (str): Colon separated list with the names of the entries this process should work on
group_id (str): Group id, normally a number (with the "group_" prefix formes the group name),
It can change between Factory reconfigurations
"""
# Assume name to be group_[0,1,2] etc. Only required to create log_dir
# where tasks common to the group will be stored. There is no other
# significance to the group_name and number of entries supported by a group
# can change between factory reconfigs
group_name = "group_%s" % group_id
os.chdir(startup_dir)
# Set up the lock_dir
gfi.factoryConfig.lock_dir = os.path.join(startup_dir, "lock")
# Read information about the glidein and frontends
glideinDescript = gfc.GlideinDescript()
frontendDescript = gfc.FrontendDescript()
# set factory_collector at a global level, since we do not expect it to change
gfi.factoryConfig.factory_collector = glideinDescript.data["FactoryCollector"]
# Load factory keys
glideinDescript.load_pub_key()
glideinDescript.load_old_rsa_key()
# Dictionary of Entry objects this group will process
my_entries = {}
glidein_entries = glideinDescript.data["Entries"]
# Initialize log files for entry groups
logSupport.log_dir = os.path.join(glideinDescript.data["LogDir"], "factory")
logSupport.log = logSupport.get_logger_with_handlers(group_name, logSupport.log_dir, glideinDescript.data)
logSupport.log.info(f"Logging initialized for {group_name}")
logSupport.log.info("Starting up")
logSupport.log.info(f"Entries processed by {group_name}: {entry_names} ")
# Check if all the entries in this group are valid
for entry in entry_names.split(":"):
if entry not in glidein_entries.split(","):
msg = f"Entry '{entry}' not configured: {glidein_entries}"
logSupport.log.warning(msg)
raise RuntimeError(msg)
# Create entry objects
my_entries[entry] = glideFactoryEntry.Entry(entry, startup_dir, glideinDescript, frontendDescript)
# Create lock file for this group and register its parent
pid_obj = glideFactoryPidLib.EntryGroupPidSupport(startup_dir, group_name)
pid_obj.register(parent_pid)
try:
try:
try:
iterate(
parent_pid, sleep_time, advertize_rate, glideinDescript, frontendDescript, group_name, my_entries
)
except KeyboardInterrupt:
logSupport.log.info("Received signal...exit")
except Exception:
logSupport.log.exception("Exception occurred in iterate: ")
raise
finally:
# No need to cleanup. The parent should be doing it
logSupport.log.info("Dying")
finally:
pid_obj.relinquish()
################################################################################
# Pickle Friendly data
################################################################################
[docs]
def compile_pickle_data(entry, work_done):
"""Extract the state of the entry after doing work
Args:
entry (Entry): Entry object
work_done (int): Work done info
Returns:
dict: pickle-friendly version of the Entry (state of the Entry)
"""
return_dict = entry.getState()
return_dict["work_done"] = work_done
return return_dict
############################################################
#
# S T A R T U P
#
############################################################
if __name__ == "__main__":
register_sighandler()
# Force integrity checks on all condor operations
gfl.set_condor_integrity_checks()
main(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]), sys.argv[4], sys.argv[5], sys.argv[6])