Source code for glideinwms.creation.lib.cWParamDict

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

# Description:
#   Frontend creation module
#   Classes and functions needed to handle dictionary files
#   created out of the parameter object
#
#   Common functions for cvWParamDict and cgWParamDict
#

import os.path

from glideinwms.lib.util import is_true

from . import cWConsts, cWDictFile


[docs] def has_file_wrapper(dicts): for file_dict in ["preentry_file_list", "file_list", "aftergroup_preentry_file_list", "aftergroup_file_list"]: if file_dict in dicts: # dicts[file_dict] contains information about status, ..., vals are the tuples w/ files info and content # tuples are (fname, type, ...) for file_info in list(dicts[file_dict].vals.values()): if file_info[1] == "wrapper": return True return False
[docs] def has_file_wrapper_params(file_params): # file_params is the list in a files section (global o group): each one is a file specification # If there is one wrapper return true for user_file in file_params: if is_true(user_file.wrapper): return True return False
[docs] def add_file_unparsed(user_file, dicts, is_factory): """Add a user file residing in the stage area file as described by Params.file_defaults :param user_file: file from the config files "files" sections :param dicts: parameters dictionaries :param is_factory: True if invoked for the factory (cgWParamDict.py), false for the frontend (cvWParamDict.py) :return: None (dictionaries are modified) """ absfname = user_file.absfname if absfname is None: raise RuntimeError("Found a file element without an absname: %s" % user_file) relfname = user_file.relfname if relfname is None: relfname = os.path.basename(absfname) # default is the final part of absfname if len(relfname) < 1: raise RuntimeError("Found a file element with an empty relfname: %s" % user_file) is_const = is_true(user_file.const) is_executable = is_true(user_file.executable) is_wrapper = is_true(user_file.wrapper) do_untar = is_true(user_file.untar) try: period_value = int(user_file.period) except (AttributeError, KeyError, ValueError, TypeError): period_value = 0 if is_factory: # Factory (file_list, after_file_list) file_list_idx = "file_list" if "after_entry" in user_file: if is_true(user_file.after_entry): # eval(user_file.after_entry,{},{}): file_list_idx = "after_file_list" else: # Frontend (preentry_file_list, file_list, aftergroup_preentry_file_list, aftergroup_file_list) file_list_idx = "preentry_file_list" if "after_entry" in user_file: if is_true(user_file.after_entry): file_list_idx = "file_list" if "after_group" in user_file: if is_true(user_file.after_group): file_list_idx = "aftergroup_%s" % file_list_idx # period has 0 as default (in dictionary definition). Should I still protect against it not being defined? if period_value > 0: if not is_executable: raise RuntimeError("A file cannot have an execution period if it is not executable: %s" % user_file) if is_executable: # a script if not is_const: raise RuntimeError("A file cannot be executable if it is not constant: %s" % user_file) if do_untar: raise RuntimeError("A tar file cannot be executable: %s" % user_file) if is_wrapper: raise RuntimeError("A wrapper file cannot be an executable: %s" % user_file) file_type = "exec" if user_file.type: if user_file.type == "run:s" or user_file.type == "run:singularity": if file_list_idx.endswith("preentry_file_list"): raise RuntimeError("An executable cannot use singularity before the entry setup: %s" % user_file) file_type = "exec:s" else: if not user_file.type.startswith("run"): raise RuntimeError("An executable file type must start with 'run': $s" % user_file) dicts[file_list_idx].add_from_file( relfname, cWDictFile.FileDictFile.make_val_tuple( cWConsts.insert_timestr(relfname), file_type, user_file.period, user_file.prefix ), absfname, ) elif is_wrapper: # a source-able script for the wrapper if not is_const: raise RuntimeError("A file cannot be a wrapper if it is not constant: %s" % user_file) if do_untar: raise RuntimeError("A tar file cannot be a wrapper: %s" % user_file) dicts[file_list_idx].add_from_file( relfname, cWDictFile.FileDictFile.make_val_tuple(cWConsts.insert_timestr(relfname), "wrapper"), absfname ) elif do_untar: # a tarball if not is_const: raise RuntimeError("A file cannot be untarred if it is not constant: %s" % user_file) wnsubdir = user_file.untar_options.dir if wnsubdir is None: wnsubdir = relfname.split(".", 1)[0] # default is relfname up to the first . config_out = user_file.untar_options.absdir_outattr if config_out is None: config_out = "FALSE" cond_attr = user_file.untar_options.cond_attr dicts[file_list_idx].add_from_file( relfname, cWDictFile.FileDictFile.make_val_tuple( cWConsts.insert_timestr(relfname), "untar", cond_download=cond_attr, config_out=config_out ), absfname, ) dicts["untar_cfg"].add(relfname, wnsubdir) else: # not executable nor tarball => simple file if is_const: val = "regular" dicts[file_list_idx].add_from_file( relfname, cWDictFile.FileDictFile.make_val_tuple(cWConsts.insert_timestr(relfname), val), absfname ) else: val = "nocache" dicts[file_list_idx].add_from_file( relfname, cWDictFile.FileDictFile.make_val_tuple(relfname, val), absfname ) # no timestamp in the name if it can be modified