Source code for glideinwms.frontend.glideinFrontend

#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

"""This is the main of the glideinFrontend

Arguments:
   $1 = work_dir
"""


import fcntl
import os
import shutil
import signal
import subprocess
import sys
import time

from glideinwms.frontend import (
    glideinFrontendConfig,
    glideinFrontendElement,
    glideinFrontendInterface,
    glideinFrontendLib,
    glideinFrontendMonitorAggregator,
    glideinFrontendMonitoring,
    glideinFrontendPidLib,
)
from glideinwms.lib import cleanupSupport, condorExe, logSupport, servicePerformance


############################################################
# KEL remove this method and just call the monitor aggregator method directly below?  we don't use the results
[docs] def aggregate_stats(): return glideinFrontendMonitorAggregator.aggregateStatus()
############################################################
[docs] class FailureCounter: def __init__(self, my_name, max_lifetime): self.my_name = my_name self.max_lifetime = max_lifetime self.failure_times = []
[docs] def add_failure(self, when=None): if when is None: when = time.time() self.clean_old() self.failure_times.append(when)
[docs] def get_failures(self): self.clean_old() return self.failure_times
[docs] def count_failures(self): return len(self.get_failures())
# INTERNAL # clean out any old records
[docs] def clean_old(self): min_time = time.time() - self.max_lifetime while self.failure_times and (self.failure_times[0] < min_time): # Assuming they are ordered self.failure_times.pop(0)
############################################################
[docs] def spawn_group(work_dir, group_name, action): command_list = [sys.executable, glideinFrontendElement.__file__, str(os.getpid()), work_dir, group_name, action] child = subprocess.Popen(command_list, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # set it in non blocking mode for fd in (child.stdout.fileno(), child.stderr.fileno()): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) return child
############################################################
[docs] def poll_group_process(group_name, child): # empty stdout and stderr try: tempOut = child.stdout.read() if tempOut: logSupport.log.info(f"[{group_name}]: {tempOut}") except OSError: pass # ignore try: tempErr = child.stderr.read() if tempOut: logSupport.log.warning(f"[{group_name}]: {tempErr}") except OSError: pass # ignore return child.poll()
############################################################ # return the list of (group,walltime) pairs
[docs] def spawn_iteration(work_dir, frontendDescript, groups, max_active, failure_dict, max_failures, action): childs = {} for group_name in groups: childs[group_name] = {"state": "queued"} active_groups = 0 groups_tofinish = len(groups) max_num_failures = 0 logSupport.log.info("Starting iteration") try: while groups_tofinish > 0: done_something = False # check if any group finished by now for group_name in groups: if childs[group_name]["state"] == "spawned": group_rc = poll_group_process(group_name, childs[group_name]["data"]) if group_rc is not None: # None means "still alive" if group_rc == 0: childs[group_name]["state"] = "finished" else: childs[group_name]["state"] = "failed" failure_dict[group_name].add_failure() num_failures = failure_dict[group_name].count_failures() max_num_failures = max(max_num_failures, num_failures) logSupport.log.warning( "Group %s terminated with exit code %i (%i recent failure)" % (group_name, group_rc, num_failures) ) childs[group_name]["end_time"] = time.time() servicePerformance.endPerfMetricEvent("frontend", "group_%s_iteration" % group_name) active_groups -= 1 groups_tofinish -= 1 done_something = True # see if I can spawn more for group_name in groups: if active_groups < max_active: # can spawn more if childs[group_name]["state"] == "queued": childs[group_name]["data"] = spawn_group(work_dir, group_name, action) childs[group_name]["state"] = "spawned" childs[group_name]["start_time"] = time.time() servicePerformance.startPerfMetricEvent("frontend", "group_%s_iteration" % group_name) active_groups += 1 done_something = True else: break if done_something: logSupport.log.info("Active groups = %i, Groups to finish = %i" % (active_groups, groups_tofinish)) if groups_tofinish > 0: time.sleep(0.01) logSupport.log.info("All groups finished") logSupport.log.info("Aggregate monitoring data") # KEL - can we just call the monitor aggregator method directly? see above servicePerformance.startPerfMetricEvent("frontend", "aggregate_stats") stats = aggregate_stats() servicePerformance.endPerfMetricEvent("frontend", "aggregate_stats") # logSupport.log.debug(stats) # Create the glidefrontendmonitor classad fm_advertiser = glideinFrontendInterface.FrontendMonitorClassadAdvertiser( multi_support=glideinFrontendInterface.frontendConfig.advertise_use_multi ) fm_classad = glideinFrontendInterface.FrontendMonitorClassad(frontendDescript.data["FrontendName"]) fm_classad.setFrontendDetails( frontendDescript.data["FrontendName"], ",".join(groups), glideinFrontendLib.getHAMode(frontendDescript.data) ) try: # pylint: disable=E1136 # (unsubscriptable-object, false positive) idle_jobs = { "Total": stats["total"]["Jobs"]["Idle"], "600": stats["total"]["Jobs"]["OldIdle"], "3600": stats["total"]["Jobs"]["Idle_3600"], } # pylint: enable=E1136 except KeyError as err: idle_jobs = {"Total": 0, "600": 0, "3600": 0} logSupport.log.error( "Error in RRD Database. Setting idle_jobs[%s] Failed. Reconfig the frontend with -fix_rrd to fix this error" % (err,) ) fm_classad.setIdleJobCount(idle_jobs) fm_classad.setPerfMetrics(servicePerformance.getPerfMetric("frontend")) # Gather performance stats from history file of each group for group_name in groups: gname = "group_%s" % group_name try: history_obj = glideinFrontendConfig.HistoryFile(work_dir, group_name, True, dict) pfm = servicePerformance.getPerfMetric(gname) pfm.metric = history_obj["perf_metrics"].metric fm_classad.setPerfMetrics(servicePerformance.getPerfMetric(gname)) except Exception: pass # Do not fail for non-critical actions fm_advertiser.addClassad(fm_classad.adParams["Name"], fm_classad) # Advertise glidefrontendmonitor classad to user pool logSupport.log.info( "Advertising %i %s classad(s) to the user pool" % (len(fm_advertiser.classads), fm_advertiser.adType) ) try: set_frontend_htcondor_env(work_dir, frontendDescript) fm_advertiser.advertiseAllClassads() logSupport.log.info("Done advertising %s classad(s) to the user pool" % fm_advertiser.adType) except condorExe.ExeError: logSupport.log.error( "Exception occurred trying to advertise %s classad(s) to the user pool" % fm_advertiser.adType ) except Exception: # Rethrow any other exception including stop signal raise finally: # Cleanup the env clean_htcondor_env() logSupport.log.info("Cleaning logs") cleanupSupport.cleaners.cleanup() if max_num_failures > max_failures: logSupport.log.info("Too many group failures, aborting") logSupport.log.debug("Failed %i times (limit %i), aborting" % (max_num_failures, max_failures)) raise RuntimeError("Too many group failures, aborting") finally: # cleanup at exit # if anything goes wrong, hardkill the rest for group_name in childs: if childs[group_name]["state"] == "spawned": logSupport.log.info("Hard killing group %s" % group_name) servicePerformance.endPerfMetricEvent("frontend", "group_%s_iteration" % group_name) try: os.kill(childs[group_name]["data"].pid, signal.SIGKILL) except OSError: pass # ignore failed kills of non-existent processes # at this point, all groups should have been run timings = [] for group_name in groups: timings.append((group_name, childs[group_name]["end_time"] - childs[group_name]["start_time"])) return timings
############################################################
[docs] def spawn_cleanup(work_dir, frontendDescript, groups, frontend_name, ha_mode): # Invalidate glidefrontendmonitor classad try: set_frontend_htcondor_env(work_dir, frontendDescript) fm_advertiser = glideinFrontendInterface.FrontendMonitorClassadAdvertiser() constraint = f'(GlideFrontendName=="{frontend_name}")&&(GlideFrontendHAMode=?="{ha_mode}")' fm_advertiser.invalidateConstrainedClassads(constraint) except Exception: # Do not fail in case of errors. logSupport.log.warning("Failed to deadvertise glidefrontendmonitor classad") for group_name in groups: try: command_list = [ sys.executable, glideinFrontendElement.__file__, str(os.getpid()), work_dir, group_name, "deadvertise", ] # logSupport.log.debug("Command list: %s" % command_list) child = subprocess.Popen(command_list, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # set it in non blocking mode for fd in (child.stdout.fileno(), child.stderr.fileno()): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) while poll_group_process(group_name, child) is None: # None means "still alive" time.sleep(0.01) except Exception: # never fail on cleanup pass
############################################################
[docs] def spawn( sleep_time, advertize_rate, work_dir, frontendDescript, groups, max_parallel_workers, restart_interval, restart_attempts, ): num_groups = len(groups) # TODO: Get the ha_check_interval from the config ha = glideinFrontendLib.getHASettings(frontendDescript.data) ha_check_interval = glideinFrontendLib.getHACheckInterval(frontendDescript.data) mode = glideinFrontendLib.getHAMode(frontendDescript.data) master_frontend_name = "" if mode == "slave": master_frontend_name = ha.get("ha_frontends")[0].get("frontend_name") active = mode == "master" hibernate = shouldHibernate(frontendDescript, work_dir, ha, mode, groups) logSupport.log.info("Frontend started with mode = %s" % mode) try: # Service will exit on signal only. # This infinite loop is for the slave to go back into hibernation # once the master becomes alive. # Master never loops infinitely here, but instead it does in # the inner loop while(mode=='master') ... while True: while hibernate: # If I am slave enter hibernation cycle while Master is alive logSupport.log.info("Master Frontend %s is online. Hibernating." % master_frontend_name) time.sleep(ha_check_interval) hibernate = shouldHibernate(frontendDescript, work_dir, ha, mode, groups) # We broke out of hibernation cycle # Either Master has disappeared or I am the Master if mode == "slave": logSupport.log.info("Master frontend %s is offline. Activating slave frontend." % master_frontend_name) active = True failure_dict = {} for group in groups: failure_dict[group] = FailureCounter(group, restart_interval) while (mode == "master") or ((mode == "slave") and active): servicePerformance.startPerfMetricEvent("frontend", "iteration") # start_time = time.time() timings = spawn_iteration( work_dir, frontendDescript, groups, max_parallel_workers, failure_dict, restart_attempts, "run" ) servicePerformance.endPerfMetricEvent("frontend", "iteration") # end_time = time.time() elapsed_time = servicePerformance.getPerfMetricEventLifetime("frontend", "iteration") if elapsed_time < sleep_time: real_sleep_time = sleep_time - elapsed_time logSupport.log.info("Sleep %.1f sec" % real_sleep_time) time.sleep(real_sleep_time) else: logSupport.log.info(f"No sleeping this loop, took {elapsed_time:.1f} sec > {sleep_time:.1f} sec") # order the groups by walltime # longest walltime first timings.sort(key=lambda x: x[1]) # recreate the groups list, with new ordering groups = [el[0] for el in timings] assert num_groups == len(groups), "Something went wrong, number of groups changed" if mode == "slave": # If we are slave, check if master is back and if so # deadvertise my classads and hibernate hibernate = shouldHibernate(frontendDescript, work_dir, ha, mode, groups) if hibernate: active = False logSupport.log.info("Master frontend %s is back online" % master_frontend_name) logSupport.log.info("Deadvertize my ads and enter hibernation cycle") spawn_cleanup(work_dir, frontendDescript, groups, frontendDescript.data["FrontendName"], mode) else: logSupport.log.info("Master frontend %s is still offline" % master_frontend_name) finally: # We have been asked to terminate logSupport.log.info("Deadvertize my ads") spawn_cleanup(work_dir, frontendDescript, groups, frontendDescript.data["FrontendName"], mode)
############################################################
[docs] def shouldHibernate(frontendDescript, work_dir, ha, mode, groups): """ Check if the frontend is running in HA mode. If run in master mode never hibernate. If run in slave mode, hiberate if master is active. @rtype: bool @return: True if we should hibernate else False """ servicePerformance.startPerfMetricEvent("frontend", "ha_check") if mode == "slave": master_frontend_name = str(ha.get("ha_frontends")[0].get("frontend_name")) for group in groups: element = glideinFrontendElement.glideinFrontendElement(os.getpid(), work_dir, group, "run") # Set environment required to query factory collector set_frontend_htcondor_env(work_dir, frontendDescript, element) for factory_pool in element.factory_pools: try: factory_pool_node = factory_pool[0] master_classads = glideinFrontendInterface.findMasterFrontendClassads( factory_pool_node, master_frontend_name ) if master_classads: # Found some classads in one of the collectors # Cleanup the env and return True clean_htcondor_env() servicePerformance.endPerfMetricEvent("frontend", "ha_check") return True except RuntimeError: # Failed to talk if not factory_pool_node: factory_pool_node = "" msg = "Failed to talk to the factory_pool {} to get the status of Master frontend {}".format( factory_pool_node, master_frontend_name, ) logSupport.log.warning(msg) msg = "Exception talking to the factory_pool {} to get the status of Master frontend {}: ".format( factory_pool_node, master_frontend_name, ) logSupport.log.exception(msg) # Cleanup the env clean_htcondor_env() # NOTE: # If we got this far with no errors then we could not find # active master frontend. We should not hibernate as slave # However, if there were errors checking with factory pool # then the master frontend could be down so its safe to wake # up and start advertising. servicePerformance.endPerfMetricEvent("frontend", "ha_check") return False
[docs] def clear_diskcache_dir(work_dir): """Clear the cache by removing the directory used for the cachedir, and recreate it.""" cache_dir = os.path.join(work_dir, glideinFrontendConfig.frontendConfig.cache_dir) try: shutil.rmtree(cache_dir) except OSError as ose: if ose.errno != 2: # errno 2 is ok, dir is missing. Maybe it's the first execution? logSupport.log.exception("Error removing cache directory %s" % cache_dir) raise os.mkdir(cache_dir)
[docs] def set_frontend_htcondor_env(work_dir, frontendDescript, element=None): # Collector DN is only in the group's mapfile. Just get first one. groups = frontendDescript.data["Groups"].split(",") if groups: if element is None: element = glideinFrontendElement.glideinFrontendElement(os.getpid(), work_dir, groups[0], "run") htc_env = { "CONDOR_CONFIG": frontendDescript.data["CondorConfig"], "X509_USER_PROXY": frontendDescript.data["ClassAdProxy"], "_CONDOR_CERTIFICATE_MAPFILE": element.elementDescript.element_data["MapFile"], } set_env(htc_env)
[docs] def set_env(env): for var in env: os.environ[var] = env[var]
[docs] def clean_htcondor_env(): for v in ("CONDOR_CONFIG", "_CONDOR_CERTIFICATE_MAPFILE", "X509_USER_PROXY"): if os.environ.get(v): del os.environ[v]
############################################################
[docs] def spawn_removal(work_dir, frontendDescript, groups, max_parallel_workers, removal_action): failure_dict = {} for group in groups: failure_dict[group] = FailureCounter(group, 3600) spawn_iteration(work_dir, frontendDescript, groups, max_parallel_workers, failure_dict, 1, removal_action)
############################################################
[docs] def cleanup_environ(): for val in list(os.environ.keys()): val_low = val.lower() if val_low[:8] == "_condor_": # remove any CONDOR environment variables # don't want any surprises del os.environ[val] elif val_low[:5] == "x509_": # remove any X509 environment variables # don't want any surprises del os.environ[val]
############################################################
[docs] def main(work_dir, action): startup_time = time.time() glideinFrontendConfig.frontendConfig.frontend_descript_file = os.path.join( work_dir, glideinFrontendConfig.frontendConfig.frontend_descript_file ) frontendDescript = glideinFrontendConfig.FrontendDescript(work_dir) # the log dir is shared between the frontend main and the groups, so use a subdir logSupport.log_dir = os.path.join(frontendDescript.data["LogDir"], "frontend") # Configure frontend process logging logSupport.log = logSupport.get_logger_with_handlers("frontend", logSupport.log_dir, frontendDescript.data) logSupport.log.info("Logging initialized") logSupport.log.debug("Frontend startup time: %s" % str(startup_time)) clear_diskcache_dir(work_dir) try: cleanup_environ() # we use a dedicated config... ignore the system-wide os.environ["CONDOR_CONFIG"] = frontendDescript.data["CondorConfig"] sleep_time = int(frontendDescript.data["LoopDelay"]) advertize_rate = int(frontendDescript.data["AdvertiseDelay"]) max_parallel_workers = int(frontendDescript.data["GroupParallelWorkers"]) restart_attempts = int(frontendDescript.data["RestartAttempts"]) restart_interval = int(frontendDescript.data["RestartInterval"]) groups = sorted(frontendDescript.data["Groups"].split(",")) glideinFrontendMonitorAggregator.monitorAggregatorConfig.config_frontend( os.path.join(work_dir, "monitor"), groups ) except Exception: logSupport.log.exception("Exception occurred configuring monitoring: ") raise glideinFrontendMonitoring.write_frontend_descript_xml(frontendDescript, os.path.join(work_dir, "monitor/")) logSupport.log.info("Enabled groups: %s" % groups) # create lock file pid_obj = glideinFrontendPidLib.FrontendPidSupport(work_dir) # start try: pid_obj.register(action) except glideinFrontendPidLib.pidSupport.AlreadyRunning as err: pid_obj.load_registered() logSupport.log.exception( "Failed starting Frontend with action %s. Instance with pid %s is aready running for action %s. Exception during pid registration: %s" % (action, pid_obj.mypid, str(pid_obj.action_type), err) ) raise try: try: if action == "run": spawn( sleep_time, advertize_rate, work_dir, frontendDescript, groups, max_parallel_workers, restart_interval, restart_attempts, ) elif action in ( "removeWait", "removeIdle", "removeAll", "removeWaitExcess", "removeIdleExcess", "removeAllExcess", ): spawn_removal(work_dir, frontendDescript, groups, max_parallel_workers, action) else: raise ValueError("Unknown action: %s" % action) except KeyboardInterrupt: logSupport.log.info("Received signal...exit") except HUPException: logSupport.log.info("Received SIGHUP, reload config") pid_obj.relinquish() os.execv( os.path.join(glideinFrontendLib.__file__, "../creation/reconfig_frontend"), ["reconfig_frontend", "-sighupreload", "-xml", "/etc/gwms-frontend/frontend.xml"], ) except Exception: logSupport.log.exception("Exception occurred trying to spawn: ") finally: pid_obj.relinquish()
############################################################ # # S T A R T U P # ############################################################
[docs] class HUPException(Exception): pass
[docs] def termsignal(signr, frame): raise KeyboardInterrupt("Received signal %s" % signr)
[docs] def hupsignal(signr, frame): signal.signal(signal.SIGHUP, signal.SIG_IGN) raise HUPException("Received signal %s" % signr)
if __name__ == "__main__": signal.signal(signal.SIGTERM, termsignal) signal.signal(signal.SIGQUIT, termsignal) signal.signal(signal.SIGHUP, hupsignal) if len(sys.argv) == 2: action = "run" else: action = sys.argv[2] main(sys.argv[1], action)