Source code for glideinwms.factory.glideFactory

#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

"""This is the main of the glideinFactory

Arguments:
   $1 = glidein submit_dir
"""

import copy
import fcntl
import glob
import json
import math
import os
import resource
import signal
import subprocess
import sys
import tarfile
import time
import urllib.error
import urllib.parse
import urllib.request

import jwt

from M2Crypto.RSA import RSAError

from glideinwms.factory import (
    glideFactoryConfig,
    glideFactoryCredentials,
    glideFactoryDowntimeLib,
    glideFactoryEntryGroup,
    glideFactoryInterface,
    glideFactoryLib,
    glideFactoryMonitorAggregator,
    glideFactoryMonitoring,
    glideFactoryPidLib,
)
from glideinwms.lib import cleanupSupport, condorMonitor, glideinWMSVersion, logSupport, util
from glideinwms.lib.condorMonitor import CondorQEdit, QueryError
from glideinwms.lib.pubCrypto import RSAKey

FACTORY_DIR = os.path.dirname(glideFactoryLib.__file__)


############################################################
[docs] def aggregate_stats(in_downtime): """ Aggregate all the monitoring stats @type in_downtime: boolean @param in_downtime: Entry downtime information :return stats dictionary """ stats = {} try: _ = glideFactoryMonitorAggregator.aggregateStatus(in_downtime) except Exception: # protect and report logSupport.log.exception("aggregateStatus failed: ") try: stats["LogSummary"] = glideFactoryMonitorAggregator.aggregateLogSummary() except Exception: # protect and report logSupport.log.exception("aggregateLogStatus failed: ") try: glideFactoryMonitorAggregator.aggregateRRDStats(log=logSupport.log) except Exception: # protect and report logSupport.log.exception("aggregateRRDStats failed: ") return stats
[docs] def update_classads(): """Loads the aggregate job summary pickle files, and then quedit the finished jobs adding a new classad called MONITOR_INFO with the monitor information. :return: """ jobinfo = glideFactoryMonitorAggregator.aggregateJobsSummary() for cnames, joblist in jobinfo.items(): schedd_name = cnames[0] pool_name = cnames[1] try: qe = CondorQEdit(pool_name=pool_name, schedd_name=schedd_name) qe.executeAll( joblist=list(joblist.keys()), attributes=["MONITOR_INFO"] * len(joblist), values=list(map(json.dumps, list(joblist.values()))), ) except QueryError as qerr: logSupport.log.error("Failed to add monitoring info to the glidein job classads: %s" % qerr)
[docs] def save_stats(stats, fname): """Serialize and save aggregated statistics so that each component (Factory and Entries) can retrieve and use it to log and advertise stats is a dictionary pickled in binary format stats['LogSummary'] - log summary aggregated info :param stats: aggregated Factory statistics :param fname: name of the file with the serialized data :return: """ util.file_pickle_dump( fname, stats, mask_exceptions=(logSupport.log.exception, "Saving of aggregated statistics failed: ") )
# Added by C.W. Murphy to make descript.xml
[docs] def write_descript(glideinDescript, frontendDescript, monitor_dir): """ Write the descript.xml to the monitoring directory @type glideinDescript: glideFactoryConfig.GlideinDescript @param glideinDescript: Factory config's glidein description object @type frontendDescript: glideFactoryConfig.FrontendDescript @param frontendDescript: Factory config's frontend description object @type monitor_dir: String @param monitor_dir: Path to monitoring directory """ glidein_data = copy.deepcopy(glideinDescript.data) frontend_data = copy.deepcopy(frontendDescript.data) entry_data = {} for entry in glidein_data["Entries"].split(","): entry_data[entry] = {} entryDescript = glideFactoryConfig.JobDescript(entry) entry_data[entry]["descript"] = entryDescript.data entryAttributes = glideFactoryConfig.JobAttributes(entry) entry_data[entry]["attributes"] = entryAttributes.data entryParams = glideFactoryConfig.JobParams(entry) entry_data[entry]["params"] = entryParams.data descript2XML = glideFactoryMonitoring.Descript2XML() xml_str = ( descript2XML.glideinDescript(glidein_data) + descript2XML.frontendDescript(frontend_data) + descript2XML.entryDescript(entry_data) ) try: descript2XML.writeFile(monitor_dir, xml_str) except OSError: logSupport.log.exception("Unable to write the descript.xml file: ")
############################################################
[docs] def generate_log_tokens(startup_dir, glideinDescript): """Generate the JSON Web Tokens used to authenticate with the remote HTTP log server. Note: tokens are generated for disabled entries too Args: startup_dir (str|Path): Path to the glideinsubmit directory glideinDescript: Factory config's glidein description object Returns: None Raises: IOError: If can't open/read/write a file (key/token) """ logSupport.log.info("Generating JSON Web Tokens for authentication with log server") # Get a list of all entries, enabled and disabled # TODO: there are more reliable ways to do so, i.e. reading the xml config entries = [ed[len("entry_") :] for ed in glob.glob("entry_*") if os.path.isdir(ed)] # Retrieve the factory secret key (manually delivered) for token generation credentials_dir = os.path.realpath(os.path.join(startup_dir, "..", "server-credentials")) jwt_key = os.path.join(credentials_dir, "jwt_secret.key") if not os.path.exists(jwt_key): # create one and log if it doesnt exist, otherwise needs a # manual undocumented step to start factory logSupport.log.info( "creating %s -manually install this key for " % (jwt_key) + "authenticating to external web sites" ) rsa = RSAKey() rsa.new(2048) rsa.save(jwt_key) try: with open(os.path.join(credentials_dir, "jwt_secret.key")) as keyfile: secret = keyfile.readline().strip() except OSError: logSupport.log.exception("Cannot find the key for JWT generation (must be manually deposited).") raise factory_name = glideinDescript.data["FactoryName"] # Issue a token for each entry-recipient pair for entry in entries: # Get the list of recipients if "LOG_RECIPIENTS_FACTORY" in glideFactoryConfig.JobParams(entry).data: log_recipients = glideFactoryConfig.JobParams(entry).data["LOG_RECIPIENTS_FACTORY"].split() else: log_recipients = [] curtime = int(time.time()) # Directory where to put tokens.tgz and url_dirs.desc entry_dir = os.path.join(credentials_dir, "entry_" + entry) # Directory where tokens are initially generated, before flushing them to tokens.tgz tokens_dir = os.path.join(entry_dir, "tokens") # Create the entry + tokens directories if they do not already exist if not os.path.exists(tokens_dir): try: os.makedirs(tokens_dir) except OSError as oe: logSupport.log.exception( f"Unable to create JWT entry dir ({os.path.join(tokens_dir, entry)}): {oe.strerror}" ) raise # Create the url_dirs.desc file open(os.path.join(entry_dir, "url_dirs.desc"), "w").close() for recipient_url in log_recipients: # Obtain a legal filename from the url, escaping "/" and other tricky symbols recipient_safe_url = urllib.parse.quote(recipient_url, "") # Generate the monitoring token # TODO: in the future must include Frontend tokens as well factory_token = "default.jwt" token_name = factory_token if not os.path.exists(os.path.join(tokens_dir, recipient_safe_url)): try: os.makedirs(os.path.join(tokens_dir, recipient_safe_url)) except OSError as oe: logSupport.log.exception( "Unable to create JWT recipient dir (%s): %s" % (os.path.join(tokens_dir, recipient_safe_url), oe.strerror) ) raise token_filepath = os.path.join(tokens_dir, recipient_safe_url, token_name) # Payload fields: # iss->issuer, sub->subject, aud->audience # iat->issued_at, exp->expiration, nbf->not_before token_payload = { "iss": factory_name, "sub": entry, "aud": recipient_safe_url, "iat": curtime, "exp": curtime + 604800, "nbf": curtime - 300, } token = jwt.encode(token_payload, secret, algorithm="HS256") # TODO: PyJWT bug workaround. Remove this conversion once affected PyJWT is no more around # PyJWT in EL7 (PyJWT <2.0.0) has a bug, jwt.encode() is declaring str as return type, but it is returning bytes # https://github.com/jpadilla/pyjwt/issues/391 if isinstance(token, bytes): token = token.decode("UTF-8") try: # Write the factory token with open(token_filepath, "w") as tkfile: tkfile.write(token) # Write to url_dirs.desc with open(os.path.join(entry_dir, "url_dirs.desc"), "a") as url_dirs_desc: url_dirs_desc.write(f"{recipient_url} {recipient_safe_url}\n") except OSError: logSupport.log.exception("Unable to create JWT file: ") raise # Create and write tokens.tgz try: tokens_tgz = tarfile.open(os.path.join(entry_dir, "tokens.tgz"), "w:gz", dereference=True) tokens_tgz.add(tokens_dir, arcname=os.path.basename(tokens_dir)) except tarfile.TarError as te: logSupport.log.exception("TarError: %s" % str(te)) raise tokens_tgz.close()
###########################################################
[docs] def entry_grouper(size, entries): """ Group the entries into n smaller groups KNOWN ISSUE: Needs improvement to do better grouping in certain cases TODO: Migrate to itertools when only supporting python 2.6 and higher @type size: long @param size: Size of each subgroup @type entries: list @param size: List of entries @rtype: list @return: List of grouped entries. Each group is a list """ list = [] if size == 0: return list if len(entries) <= size: list.insert(0, entries) else: for group in range(len(entries) // size): list.insert(group, entries[group * size : (group + 1) * size]) if size * len(list) < len(entries): list.insert(group + 1, entries[(group + 1) * size :]) return list
############################################################
[docs] def is_crashing_often(startup_time, restart_interval, restart_attempts): """ Check if the entry is crashing/dieing often @type startup_time: long @param startup_time: Startup time of the entry process in second @type restart_interval: long @param restart_interval: Allowed restart interval in second @type restart_attempts: long @param restart_attempts: Number of allowed restart attempts in the interval @rtype: bool @return: True if entry process is crashing/dieing often """ crashing_often = True if len(startup_time) < restart_attempts: # We haven't exhausted restart attempts crashing_often = False else: # Check if the service has been restarted often if restart_attempts == 1: crashing_often = True elif (time.time() - startup_time[0]) >= restart_interval: crashing_often = False else: crashing_often = True return crashing_often
[docs] def is_file_old(filename, allowed_time): """ Check if the file is older than given time @type filename: String @param filename: Full path to the file @type allowed_time: long @param allowed_time: Time is second @rtype: bool @return: True if file is older than the given time, else False """ if time.time() > (os.path.getmtime(filename) + allowed_time): return True return False
############################################################
[docs] def clean_exit(childs): count = 100000000 # set it high, so it is triggered at the first iteration sleep_time = 0.1 # start with very little sleep while len(list(childs.keys())) > 0: count += 1 if count > 4: # Send a term signal to the childs # May need to do it several times, in case there are in the # middle of something count = 0 logSupport.log.info("Killing EntryGroups %s" % list(childs.keys())) for group in childs: try: os.kill(childs[group].pid, signal.SIGTERM) except OSError: logSupport.log.warning("EntryGroup %s already dead" % group) del childs[group] # already dead logSupport.log.info("Sleep") time.sleep(sleep_time) # exponentially increase, up to 5 secs sleep_time = sleep_time * 2 if sleep_time > 5: sleep_time = 5 logSupport.log.info("Checking dying EntryGroups %s" % list(childs.keys())) dead_entries = [] for group in childs: child = childs[group] # empty stdout and stderr try: tempOut = child.stdout.read() if len(tempOut) != 0: logSupport.log.warning(f"EntryGroup {group} STDOUT: {tempOut}") except OSError: pass # ignore try: tempErr = child.stderr.read() if len(tempErr) != 0: logSupport.log.warning(f"EntryGroup {group} STDERR: {tempErr}") except OSError: pass # ignore # look for exited child if child.poll(): # the child exited dead_entries.append(group) del childs[group] tempOut = child.stdout.readlines() tempErr = child.stderr.readlines() if len(dead_entries) > 0: logSupport.log.info("These EntryGroups died: %s" % dead_entries) logSupport.log.info("All EntryGroups dead")
############################################################
[docs] def spawn( sleep_time, advertize_rate, startup_dir, glideinDescript, frontendDescript, entries, restart_attempts, restart_interval, ): """ Spawn and keep track of the entry processes. Restart them if required. Advertise glidefactoryglobal classad every iteration @type sleep_time: long @param sleep_time: Delay between every iteration @type advertize_rate: long @param advertize_rate: Rate at which entries advertise their classads @type startup_dir: String @param startup_dir: Path to glideinsubmit directory @type glideinDescript: glideFactoryConfig.GlideinDescript @param glideinDescript: Factory config's glidein description object @type frontendDescript: glideFactoryConfig.FrontendDescript @param frontendDescript: Factory config's frontend description object @type entries: list @param entries: Sorted list of entry names @type restart_interval: long @param restart_interval: Allowed restart interval in second @type restart_attempts: long @param restart_attempts: Number of allowed restart attempts in the interval """ childs = {} # Number of glideFactoryEntry processes to spawn and directly relates to # number of concurrent condor_status processess # # NOTE: If number of entries gets too big, we may excede the shell args # limit. If that becomes an issue, move the logic to identify the # entries to serve to the group itself. # # Each process will handle multiple entries split as follows # - Sort the entries alphabetically. Already done # - Divide the list into equal chunks as possible # - Last chunk may get fewer entries entry_process_count = 1 starttime = time.time() oldkey_gracetime = int(glideinDescript.data["OldPubKeyGraceTime"]) oldkey_eoltime = starttime + oldkey_gracetime childs_uptime = {} factory_downtimes = glideFactoryDowntimeLib.DowntimeFile(glideinDescript.data["DowntimesFile"]) logSupport.log.info("Available Entries: %s" % entries) group_size = int(math.ceil(float(len(entries)) / entry_process_count)) entry_groups = entry_grouper(group_size, entries) def _set_rlimit(soft_l=None, hard_l=None): # set new hard and soft open file limits # if setting limits fails or no input parameters use inherited limits # from parent process # nb 1. it is possible to raise limits # up to [hard_l,hard_l] but once lowered they cannot be raised # nb 2. it may be better just to omit calling this function at # all from subprocess - in which case it inherits limits from # parent process lim = resource.getrlimit(resource.RLIMIT_NOFILE) if soft_l is not None or hard_l is not None: if not hard_l: hard_l = soft_l if not soft_l: soft_l = hard_l try: new_lim = [soft_l, hard_l] resource.setrlimit(resource.RLIMIT_NOFILE, new_lim) except Exception: resource.setrlimit(resource.RLIMIT_NOFILE, lim) try: for group in range(len(entry_groups)): entry_names = ":".join(entry_groups[group]) logSupport.log.info(f"Starting EntryGroup {group}: {entry_groups[group]}") # Converted to using the subprocess module command_list = [ sys.executable, glideFactoryEntryGroup.__file__, str(os.getpid()), str(sleep_time), str(advertize_rate), startup_dir, entry_names, str(group), ] childs[group] = subprocess.Popen( command_list, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, preexec_fn=_set_rlimit, ) # Get the startup time. Used to check if the entry is crashing # periodically and needs to be restarted. childs_uptime[group] = list() childs_uptime[group].insert(0, time.time()) logSupport.log.info("EntryGroup startup times: %s" % childs_uptime) generate_log_tokens(startup_dir, glideinDescript) for group in childs: # set it in non blocking mode # since we will run for a long time, we do not want to block for fd in (childs[group].stdout.fileno(), childs[group].stderr.fileno()): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # If RemoveOldCredFreq < 0, do not do credential cleanup. curr_time = 0 # To ensure curr_time is always initialized if int(glideinDescript.data["RemoveOldCredFreq"]) > 0: # Convert credential removal frequency from hours to seconds remove_old_cred_freq = int(glideinDescript.data["RemoveOldCredFreq"]) * 60 * 60 curr_time = time.time() update_time = curr_time + remove_old_cred_freq # Convert credential removal age from days to seconds remove_old_cred_age = int(glideinDescript.data["RemoveOldCredAge"]) * 60 * 60 * 24 # Create cleaners for old credential files logSupport.log.info("Adding cleaners for old credentials") cred_base_dir = glideinDescript.data["ClientProxiesBaseDir"] for username in frontendDescript.get_all_usernames(): cred_base_user = os.path.join(cred_base_dir, "user_%s" % username) cred_user_instance_dirname = os.path.join( cred_base_user, "glidein_%s" % glideinDescript.data["GlideinName"] ) cred_cleaner = cleanupSupport.DirCleanupCredentials( cred_user_instance_dirname, "(credential_*)", remove_old_cred_age ) cleanupSupport.cred_cleaners.add_cleaner(cred_cleaner) iteration_basetime = time.time() while True: # retrieves WebMonitoringURL from glideclient classAd iteration_timecheck = time.time() iteration_timediff = iteration_timecheck - iteration_basetime if iteration_timediff >= 3600: # every hour iteration_basetime = time.time() # reset the start time fronmonpath = os.path.join(startup_dir, "monitor", "frontendmonitorlink.txt") fronmonconstraint = '(MyType=="glideclient")' fronmonformat_list = [("WebMonitoringURL", "s"), ("FrontendName", "s")] fronmonstatus = condorMonitor.CondorStatus(subsystem_name="any") fronmondata = fronmonstatus.fetch(constraint=fronmonconstraint, format_list=fronmonformat_list) fronmon_list_names = list(fronmondata.keys()) if fronmon_list_names is not None: urlset = set() if os.path.exists(fronmonpath): os.remove(fronmonpath) for frontend_entry in fronmon_list_names: fronmonelement = fronmondata[frontend_entry] fronmonurl = fronmonelement["WebMonitoringURL"].encode("utf-8") fronmonfrt = fronmonelement["FrontendName"].encode("utf-8") if (fronmonfrt, fronmonurl) not in urlset: urlset.add((fronmonfrt, fronmonurl)) with open(fronmonpath, "w") as fronmonf: fronmonf.write(f"{fronmonfrt}, {fronmonurl}") # Record the iteration start time iteration_stime = time.time() # THIS IS FOR SECURITY # Make sure you delete the old key when its grace is up. # If a compromised key is left around and if attacker can somehow # trigger FactoryEntry process crash, we do not want the entry # to pick up the old key again when factory auto restarts it. if time.time() > oldkey_eoltime and glideinDescript.data["OldPubKeyObj"] is not None: glideinDescript.data["OldPubKeyObj"] = None glideinDescript.data["OldPubKeyType"] = None try: glideinDescript.remove_old_key() logSupport.log.info( "Removed the old public key after its grace time of %s seconds" % oldkey_gracetime ) except Exception: # Do not crash if delete fails. Just log it. logSupport.log.warning("Failed to remove the old public key after its grace time") # Only removing credentials in the v3+ protocol # Affects Corral Frontend which only supports the v3+ protocol. # IF freq < zero, do not do cleanup. if int(glideinDescript.data["RemoveOldCredFreq"]) > 0 and curr_time >= update_time: logSupport.log.info("Checking credentials for cleanup") # Query queue for glideins. Don't remove proxies in use. try: in_use_creds = glideFactoryLib.getCondorQCredentialList() cleanupSupport.cred_cleaners.cleanup(in_use_creds) except Exception: logSupport.log.exception("Unable to cleanup old credentials") update_time = curr_time + remove_old_cred_freq curr_time = time.time() logSupport.log.info("Checking for credentials %s" % entries) # Read in the frontend globals classad # Do this first so that the credentials are immediately # available when the Entries startup classads = {} try: classads = glideFactoryCredentials.get_globals_classads() except Exception: logSupport.log.error("Error occurred retrieving globals classad -- is Condor running?") for classad_key in classads: classad = classads[classad_key] try: glideFactoryCredentials.process_global(classad, glideinDescript, frontendDescript) except Exception: logSupport.log.exception("Error occurred processing the globals classads: ") logSupport.log.info("Checking EntryGroups %s" % list(childs.keys())) for group in list(childs): # making a copy of the keys because the dict is being modified in the loop entry_names = ":".join(entry_groups[group]) child = childs[group] # empty stdout and stderr try: tempOut = child.stdout.read() if tempOut and len(tempOut) != 0: logSupport.log.warning(f"EntryGroup {group} STDOUT: {tempOut}") except OSError: pass # ignore try: tempErr = child.stderr.read() if tempErr and len(tempErr) != 0: logSupport.log.warning(f"EntryGroup {group} STDERR: {tempErr}") except OSError: pass # ignore # look for exited child if child.poll(): # the child exited logSupport.log.warning("EntryGroup %s exited. Checking if it should be restarted." % (group)) tempOut = child.stdout.readlines() tempErr = child.stderr.readlines() if is_crashing_often(childs_uptime[group], restart_interval, restart_attempts): del childs[group] raise RuntimeError( "EntryGroup '%s' has been crashing too often, quit the whole factory:\n%s\n%s" % (group, tempOut, tempErr) ) else: # Restart the entry setting its restart time logSupport.log.warning("Restarting EntryGroup %s." % (group)) del childs[group] command_list = [ sys.executable, glideFactoryEntryGroup.__file__, str(os.getpid()), str(sleep_time), str(advertize_rate), startup_dir, entry_names, str(group), ] childs[group] = subprocess.Popen( command_list, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, preexec_fn=_set_rlimit, ) if len(childs_uptime[group]) == restart_attempts: childs_uptime[group].pop(0) childs_uptime[group].append(time.time()) for fd in (childs[group].stdout.fileno(), childs[group].stderr.fileno()): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) logSupport.log.warning(f"EntryGroup startup/restart times: {childs_uptime}") # Aggregate Monitoring data periodically logSupport.log.info("Aggregate monitoring data") stats = aggregate_stats(factory_downtimes.checkDowntime()) save_stats(stats, os.path.join(startup_dir, glideFactoryConfig.factoryConfig.aggregated_stats_file)) # Aggregate job data periodically if glideinDescript.data.get("AdvertisePilotAccounting", False) in [ "True", "1", ]: # data attributes are strings logSupport.log.info("Starting updating job classads") update_classads() logSupport.log.info("Finishing updating job classads") # Advertise the global classad with the factory keys and Factory statistics try: # KEL TODO need to add factory downtime? glideFactoryInterface.advertizeGlobal( glideinDescript.data["FactoryName"], glideinDescript.data["GlideinName"], glideFactoryLib.factoryConfig.supported_signtypes, glideinDescript.data["PubKeyObj"], ) except Exception as e: logSupport.log.exception("Error advertising global classads: %s" % e) cleanupSupport.cleaners.cleanup() iteration_etime = time.time() iteration_sleep_time = sleep_time - (iteration_etime - iteration_stime) if iteration_sleep_time < 0: iteration_sleep_time = 0 logSupport.log.info("Sleep %s secs" % iteration_sleep_time) time.sleep(iteration_sleep_time) # end while 1: finally: # cleanup at exit logSupport.log.info("Received signal...exit") try: try: clean_exit(childs) except Exception: # if anything goes wrong, hardkill the rest for group in childs: logSupport.log.info("Hard killing EntryGroup %s" % group) try: os.kill(childs[group].pid, signal.SIGKILL) except OSError: pass # ignore dead clients finally: logSupport.log.info("Deadvertize myself") try: glideFactoryInterface.deadvertizeFactory( glideinDescript.data["FactoryName"], glideinDescript.data["GlideinName"] ) except Exception: logSupport.log.exception("Factory deadvertize failed!") try: glideFactoryInterface.deadvertizeFactoryClientMonitoring( glideinDescript.data["FactoryName"], glideinDescript.data["GlideinName"] ) except Exception: logSupport.log.exception("Factory Monitoring deadvertize failed!") logSupport.log.info("All EntryGroups should be terminated")
[docs] def increase_process_limit(new_limit=10000): """Raise RLIMIT_NPROC to new_limit""" (soft, hard) = resource.getrlimit(resource.RLIMIT_NPROC) if soft < new_limit: try: resource.setrlimit(resource.RLIMIT_NPROC, (new_limit, hard)) logSupport.log.info("Raised RLIMIT_NPROC from %d to %d" % (soft, new_limit)) except ValueError: logSupport.log.info("Warning: could not raise RLIMIT_NPROC " "from %d to %d" % (soft, new_limit)) else: logSupport.log.info("RLIMIT_NPROC already %d, not changing to %d" % (soft, new_limit))
############################################################
[docs] def main(startup_dir): """ Reads in the configuration file and starts up the factory @type startup_dir: String @param startup_dir: Path to glideinsubmit directory """ # Force integrity checks on all condor operations glideFactoryLib.set_condor_integrity_checks() glideFactoryInterface.factoryConfig.lock_dir = os.path.join(startup_dir, "lock") glideFactoryConfig.factoryConfig.glidein_descript_file = os.path.join( startup_dir, glideFactoryConfig.factoryConfig.glidein_descript_file ) glideinDescript = glideFactoryConfig.GlideinDescript() frontendDescript = glideFactoryConfig.FrontendDescript() # set factory_collector at a global level, since we do not expect it to change glideFactoryInterface.factoryConfig.factory_collector = glideinDescript.data["FactoryCollector"] # Setup the glideFactoryLib.factoryConfig so that we can process the # globals classads glideFactoryLib.factoryConfig.config_whoamI( glideinDescript.data["FactoryName"], glideinDescript.data["GlideinName"] ) glideFactoryLib.factoryConfig.config_dirs( startup_dir, glideinDescript.data["LogDir"], glideinDescript.data["ClientLogBaseDir"], glideinDescript.data["ClientProxiesBaseDir"], ) # Set the Log directory logSupport.log_dir = os.path.join(glideinDescript.data["LogDir"], "factory") # Configure factory process logging logSupport.log = logSupport.get_logger_with_handlers("factory", logSupport.log_dir, glideinDescript.data) logSupport.log.info("Logging initialized") if glideinDescript.data["Entries"].strip() in ("", ","): # No entries are enabled. There is nothing to do. Just exit here. log_msg = "No Entries are enabled. Exiting." logSupport.log.error(log_msg) sys.exit(1) write_descript(glideinDescript, frontendDescript, os.path.join(startup_dir, "monitor/")) try: os.chdir(startup_dir) except Exception: logSupport.log.exception("Failed starting Factory. Unable to change to startup_dir: ") raise try: if is_file_old(glideinDescript.default_rsakey_fname, int(glideinDescript.data["OldPubKeyGraceTime"])): # First backup and load any existing key logSupport.log.info("Backing up and loading old key") glideinDescript.backup_and_load_old_key() # Create a new key for this run logSupport.log.info("Recreating and loading new key") glideinDescript.load_pub_key(recreate=True) else: # Key is recent enough. Just reuse it. logSupport.log.info("Key is recent enough, reusing for this run") glideinDescript.load_pub_key(recreate=False) logSupport.log.info("Loading old key") glideinDescript.load_old_rsa_key() except RSAError as e: logSupport.log.exception("Failed starting Factory. Exception occurred loading factory keys: ") key_fname = getattr(e, "key_fname", None) cwd = getattr(e, "cwd", None) if key_fname and cwd: logSupport.log.error("Failed to load RSA key %s with current working direcotry %s", key_fname, cwd) logSupport.log.error( "If you think the rsa key might be corrupted, try to remove it, and then reconfigure the factory to recreate it" ) raise except OSError as ioe: logSupport.log.exception("Failed starting Factory. Exception occurred loading factory keys: ") if ioe.filename == "rsa.key" and ioe.errno == 2: logSupport.log.error("Missing rsa.key file. Please, reconfigure the factory to recreate it") raise except Exception: logSupport.log.exception("Failed starting Factory. Exception occurred loading factory keys: ") raise glideFactoryMonitorAggregator.glideFactoryMonitoring.monitoringConfig.my_name = "{}@{}".format( glideinDescript.data["GlideinName"], glideinDescript.data["FactoryName"], ) glideFactoryInterface.factoryConfig.advertise_use_tcp = glideinDescript.data["AdvertiseWithTCP"] in ("True", "1") glideFactoryInterface.factoryConfig.advertise_use_multi = glideinDescript.data["AdvertiseWithMultiple"] in ( "True", "1", ) sleep_time = int(glideinDescript.data["LoopDelay"]) advertize_rate = int(glideinDescript.data["AdvertiseDelay"]) restart_attempts = int(glideinDescript.data["RestartAttempts"]) restart_interval = int(glideinDescript.data["RestartInterval"]) try: glideFactoryInterface.factoryConfig.glideinwms_version = glideinWMSVersion.GlideinWMSDistro( "checksum.factory" ).version() except Exception: logSupport.log.exception( "Non critical Factory error. Exception occurred while trying to retrieve the glideinwms version: " ) entries = sorted(glideinDescript.data["Entries"].split(",")) glideFactoryMonitorAggregator.monitorAggregatorConfig.config_factory( os.path.join(startup_dir, "monitor"), entries, log=logSupport.log ) # create lock file pid_obj = glideFactoryPidLib.FactoryPidSupport(startup_dir) increase_process_limit() # start try: pid_obj.register() except glideFactoryPidLib.pidSupport.AlreadyRunning as err: pid_obj.load_registered() logSupport.log.exception( "Failed starting Factory. Instance with pid %s is aready running. Exception during pid registration: %s" % (pid_obj.mypid, err) ) raise # TODO: use a single try.. except.. finally when moving to Python 3.8 or above (dropping 3.6) try: try: # Spawn the EntryGroup processes handling the work spawn( sleep_time, advertize_rate, startup_dir, glideinDescript, frontendDescript, entries, restart_attempts, restart_interval, ) # No need for special handling of KeyboardInterrupt # It is not in Exception so it will remain un-handled # except KeyboardInterrupt as e: # raise e # raise e is re-raising a different exceptoin from here? Use raise instead? except HUPException: # inside spawn(), outermost try will catch HUPException, # then the code within the finally clouse of spawn() will run # which will terminate glideFactoryEntryGroup children processes # and then the following 3 lines will be executed. logSupport.log.info("Received SIGHUP, reload config uid = %d" % os.getuid()) # must empty the lock file so that when the thread returns from reconfig_glidein and # begins from the beginning, it will not error out which will happen # if the lock file is not empty pid_obj.relinquish() os.execv( os.path.join(FACTORY_DIR, "../creation/reconfig_glidein"), [ "reconfig_glidein", "-update_scripts", "no", "-sighupreload", "-xml", "/etc/gwms-factory/glideinWMS.xml", ], ) # TODO: verify. This is invoking reconfig but how is the Factory/EntryGroups re-started? # Should there be an infinite loop around spawn? except Exception as e: # Exception excludes SystemExit, KeyboardInterrupt, GeneratorExit # Log the exception and exit logSupport.log.exception("Exception occurred spawning the factory: %s" % e) finally: pid_obj.relinquish()
############################################################ # # S T A R T U P # ############################################################
[docs] class HUPException(Exception): """Used to catch SIGHUP and trigger a reconfig""" pass
[docs] def termsignal(signr, frame): """Signal handler. Raise KeyboardInterrupt when receiving SIGTERN or SIGQUIT""" raise KeyboardInterrupt("Received signal %s" % signr)
[docs] def hupsignal(signr, frame): """Signal handler. Raise HUPException when receiving SIGHUP. Used to trigger a reconfig and restart.""" signal.signal(signal.SIGHUP, signal.SIG_IGN) raise HUPException("Received signal %s" % signr)
if __name__ == "__main__": if os.getsid(os.getpid()) != os.getpgrp(): os.setpgid(0, 0) signal.signal(signal.SIGTERM, termsignal) signal.signal(signal.SIGQUIT, termsignal) signal.signal(signal.SIGHUP, hupsignal) try: main(sys.argv[1]) except KeyboardInterrupt as e: logSupport.log.info("Terminating: %s" % e)