# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
#
# Project:
# glideinWMS
#
# File Version:
#
# Description:
# VO Frontend creation module
# Classes and functions needed to handle dictionary files
#
import copy
import os
import os.path
from . import cvWConsts, cWConsts, cWDictFile
[docs]
class ParamsDictFile(cWDictFile.DictFile):
[docs]
def get_val_type(self, key):
return self.vals[key][0]
[docs]
def get_true_val(self, key):
return self.vals[key][1]
[docs]
def add(self, key, val, allow_overwrite=0):
if type(val) not in (type(()), type([])):
raise RuntimeError("Values '%s' not a list or tuple" % val)
if len(val) != 2:
raise RuntimeError("Values '%s' not (Type,Val)" % str(val))
if val[0] not in ("EXPR", "CONST"):
raise RuntimeError(f"Invalid var type '{val[0]}', should be either EXPR or CONST val: {str(val)}")
return cWDictFile.DictFile.add(self, key, val, allow_overwrite)
[docs]
def add_extended(self, key, is_expression, val, allow_overwrite=0):
if is_expression:
type_str = "EXPR"
else:
type_str = "CONST"
self.add(key, (type_str, val), allow_overwrite)
[docs]
def parse_val(self, line):
if len(line) == 0:
return # ignore empty lines
if line[0] == "#":
return # ignore comments
arr = line.split(None, 2)
if len(arr) == 0:
return # empty line
if len(arr) != 3:
raise RuntimeError("Not a valid var line (expected 3, found %i elements): '%s'" % (len(arr), line))
key = arr[0]
return self.add(key, (arr[1], eval(arr[2])))
################################################
#
# Functions that create default dictionaries
#
################################################
# internal, do not use from outside the module
[docs]
def get_common_dicts(work_dir, stage_dir, simple_work_dir): # if True, do not create params
common_dicts = {
"description": cWDictFile.DescriptionDictFile(
stage_dir, cWConsts.insert_timestr(cWConsts.DESCRIPTION_FILE), fname_idx=cWConsts.DESCRIPTION_FILE
),
"consts": cWDictFile.StrWWorkTypeDictFile(
stage_dir, cWConsts.insert_timestr(cWConsts.CONSTS_FILE), fname_idx=cWConsts.CONSTS_FILE
),
"vars": cWDictFile.VarsDictFile(
stage_dir, cWConsts.insert_timestr(cWConsts.VARS_FILE), fname_idx=cWConsts.VARS_FILE
),
"untar_cfg": cWDictFile.StrDictFile(
stage_dir, cWConsts.insert_timestr(cWConsts.UNTAR_CFG_FILE), fname_idx=cWConsts.UNTAR_CFG_FILE
),
"file_list": cWDictFile.FileDictFile(
stage_dir, cWConsts.insert_timestr(cWConsts.FILE_LISTFILE), fname_idx=cWConsts.FILE_LISTFILE
),
"preentry_file_list": cWDictFile.FileDictFile(
stage_dir,
cWConsts.insert_timestr(cvWConsts.PREENTRY_FILE_LISTFILE),
fname_idx=cvWConsts.PREENTRY_FILE_LISTFILE,
),
"signature": cWDictFile.SHA1DictFile(
stage_dir, cWConsts.insert_timestr(cWConsts.SIGNATURE_FILE), fname_idx=cWConsts.SIGNATURE_FILE
),
}
if not simple_work_dir:
common_dicts["params"] = ParamsDictFile(work_dir, cvWConsts.PARAMS_FILE)
common_dicts["attrs"] = cWDictFile.ReprDictFile(work_dir, cvWConsts.ATTRS_FILE)
refresh_description(common_dicts)
return common_dicts
[docs]
def get_main_dicts(work_dir, stage_dir, simple_work_dir, assume_groups):
main_dicts = get_common_dicts(work_dir, stage_dir, simple_work_dir)
main_dicts["summary_signature"] = cWDictFile.SummarySHA1DictFile(work_dir, cWConsts.SUMMARY_SIGNATURE_FILE)
main_dicts["frontend_descript"] = cWDictFile.StrDictFile(work_dir, cvWConsts.FRONTEND_DESCRIPT_FILE)
main_dicts["gridmap"] = cWDictFile.GridMapDict(stage_dir, cWConsts.insert_timestr(cWConsts.GRIDMAP_FILE))
if assume_groups:
main_dicts["aftergroup_file_list"] = cWDictFile.FileDictFile(
stage_dir,
cWConsts.insert_timestr(cvWConsts.AFTERGROUP_FILE_LISTFILE),
fname_idx=cvWConsts.AFTERGROUP_FILE_LISTFILE,
)
main_dicts["aftergroup_preentry_file_list"] = cWDictFile.FileDictFile(
stage_dir,
cWConsts.insert_timestr(cvWConsts.AFTERGROUP_PREENTRY_FILE_LISTFILE),
fname_idx=cvWConsts.AFTERGROUP_PREENTRY_FILE_LISTFILE,
)
return main_dicts
[docs]
def get_group_dicts(group_work_dir, group_stage_dir, group_name, simple_work_dir):
group_dicts = get_common_dicts(group_work_dir, group_stage_dir, simple_work_dir)
group_dicts["group_descript"] = cWDictFile.StrDictFile(group_work_dir, cvWConsts.GROUP_DESCRIPT_FILE)
return group_dicts
################################################
#
# Functions that load dictionaries
#
################################################
# internal, do not use from outside the module
[docs]
def load_common_dicts(dicts, description_el): # update in place
# first work dir ones (mutable)
if "params" in dicts:
dicts["params"].load()
if "attrs" in dicts:
try:
dicts["attrs"].load()
except RuntimeError:
# to allow for a smooth upgrade path from 2.5.5-, make this file optional
# in the future, we should remove this try...except block
dicts["attrs"].erase()
# now the ones keyed in the description
dicts["signature"].load(fname=description_el.vals2["signature"])
dicts["file_list"].load(fname=description_el.vals2["file_list"])
dicts["preentry_file_list"].load(fname=description_el.vals2["preentry_file_list"])
file_el = dicts["preentry_file_list"]
# all others are keyed in the file_list
dicts["consts"].load(fname=file_el[cWConsts.CONSTS_FILE][0])
dicts["vars"].load(fname=file_el[cWConsts.VARS_FILE][0])
dicts["untar_cfg"].load(fname=file_el[cWConsts.UNTAR_CFG_FILE][0])
if "gridmap" in dicts:
dicts["gridmap"].load(fname=file_el[cWConsts.GRIDMAP_FILE][0])
[docs]
def load_main_dicts(main_dicts): # update in place
main_dicts["frontend_descript"].load()
# summary_signature has keys for description
main_dicts["summary_signature"].load()
# load the description
main_dicts["description"].load(fname=main_dicts["summary_signature"]["main"][1])
# all others are keyed in the description
if "aftergroup_file_list" in main_dicts:
main_dicts["aftergroup_file_list"].load(fname=main_dicts["description"].vals2["aftergroup_file_list"])
# no need for another test, always paired
main_dicts["aftergroup_preentry_file_list"].load(
fname=main_dicts["description"].vals2["aftergroup_preentry_file_list"]
)
load_common_dicts(main_dicts, main_dicts["description"])
[docs]
def load_group_dicts(group_dicts, group_name, summary_signature): # update in place
group_dicts["group_descript"].load()
# load the description (name from summary_signature)
group_dicts["description"].load(fname=summary_signature[cvWConsts.get_group_stage_dir("", group_name)][1])
# all others are keyed in the description
load_common_dicts(group_dicts, group_dicts["description"])
############################################################
#
# Functions that create data out of the existing dictionary
#
############################################################
[docs]
def refresh_description(dicts): # update in place
description_dict = dicts["description"]
description_dict.add(dicts["signature"].get_fname(), "signature", allow_overwrite=True)
for k in ("preentry_file_list", "file_list", "aftergroup_preentry_file_list", "aftergroup_file_list"):
if k in dicts:
description_dict.add(dicts[k].get_fname(), k, allow_overwrite=True)
[docs]
def refresh_file_list(dicts, is_main, files_set_readonly=True, files_reset_changed=True): # update in place
file_dict = dicts["preentry_file_list"]
file_dict.add_from_bytes(
cWConsts.CONSTS_FILE,
cWDictFile.FileDictFile.make_val_tuple(dicts["consts"].get_fname(), "regular", config_out="CONSTS_FILE"),
dicts["consts"].save_into_bytes(set_readonly=files_set_readonly, reset_changed=files_reset_changed),
allow_overwrite=True,
)
file_dict.add_from_bytes(
cWConsts.VARS_FILE,
cWDictFile.FileDictFile.make_val_tuple(dicts["vars"].get_fname(), "regular", config_out="CONDOR_VARS_FILE"),
dicts["vars"].save_into_bytes(set_readonly=files_set_readonly, reset_changed=files_reset_changed),
allow_overwrite=True,
)
file_dict.add_from_bytes(
cWConsts.UNTAR_CFG_FILE,
cWDictFile.FileDictFile.make_val_tuple(dicts["untar_cfg"].get_fname(), "regular", config_out="UNTAR_CFG_FILE"),
dicts["untar_cfg"].save_into_bytes(set_readonly=files_set_readonly, reset_changed=files_reset_changed),
allow_overwrite=True,
)
if is_main:
file_dict.add_from_bytes(
cWConsts.GRIDMAP_FILE,
cWDictFile.FileDictFile.make_val_tuple(dicts["gridmap"].get_fname(), "regular", config_out="GRIDMAP"),
dicts["gridmap"].save_into_bytes(set_readonly=files_set_readonly, reset_changed=files_reset_changed),
allow_overwrite=True,
)
# dictionaries must have been written to disk before using this
[docs]
def refresh_signature(dicts): # update in place
signature_dict = dicts["signature"]
for k in (
"consts",
"vars",
"untar_cfg",
"gridmap",
"preentry_file_list",
"file_list",
"aftergroup_preentry_file_list",
"aftergroup_file_list",
"description",
):
if k in dicts:
signature_dict.add_from_file(dicts[k].get_filepath(), allow_overwrite=True)
# add signatures of all the files linked in the lists
for k in ("preentry_file_list", "file_list", "aftergroup_preentry_file_list", "aftergroup_file_list"):
if k in dicts:
filedict = dicts[k]
for fname in filedict.get_immutable_files():
signature_dict.add_from_file(os.path.join(filedict.dir, fname), allow_overwrite=True)
################################################
#
# Functions that save dictionaries
#
################################################
# internal, do not use from outside the module
[docs]
def save_common_dicts(dicts, is_main, set_readonly=True): # will update in place, too
# make sure decription is up to date
refresh_description(dicts)
# save the immutable ones
for k in ("description",):
dicts[k].save(set_readonly=set_readonly)
# Load files into the file list
# 'consts','untar_cfg','vars','gridmap' will be loaded
refresh_file_list(dicts, is_main)
# save files in the file lists
for k in ("preentry_file_list", "file_list", "aftergroup_preentry_file_list", "aftergroup_file_list"):
if k in dicts:
dicts[k].save_files(allow_overwrite=True)
# then save the lists
for k in ("preentry_file_list", "file_list", "aftergroup_preentry_file_list", "aftergroup_file_list"):
if k in dicts:
dicts[k].save(set_readonly=set_readonly)
# calc and save the signatues
refresh_signature(dicts)
dicts["signature"].save(set_readonly=set_readonly)
# finally save the mutable one(s)
if "params" in dicts:
dicts["params"].save(set_readonly=set_readonly)
if "attrs" in dicts:
dicts["attrs"].save(set_readonly=set_readonly)
# must be invoked after all the groups have been saved
[docs]
def save_main_dicts(main_dicts, set_readonly=True): # will update in place, too
main_dicts["frontend_descript"].save(set_readonly=set_readonly)
save_common_dicts(main_dicts, True, set_readonly=set_readonly)
summary_signature = main_dicts["summary_signature"]
summary_signature.add_from_file(
key="main",
filepath=main_dicts["signature"].get_filepath(),
fname2=main_dicts["description"].get_fname(),
allow_overwrite=True,
)
summary_signature.save(set_readonly=set_readonly)
[docs]
def save_group_dicts(
group_dicts, group_name, summary_signature, set_readonly=True # will update in place, too # update in place
):
group_dicts["group_descript"].save(set_readonly=set_readonly)
save_common_dicts(group_dicts, False, set_readonly=set_readonly)
summary_signature.add_from_file(
key=cvWConsts.get_group_stage_dir("", group_name),
filepath=group_dicts["signature"].get_filepath(),
fname2=group_dicts["description"].get_fname(),
allow_overwrite=True,
)
################################################
#
# Functions that reuse dictionaries
#
################################################
[docs]
def reuse_simple_dict(dicts, other_dicts, key, compare_keys=None):
if dicts[key].is_equal(other_dicts[key], compare_dir=True, compare_fname=False, compare_keys=compare_keys):
# if equal, just use the old one, and mark it as unchanged and readonly
dicts[key] = copy.deepcopy(other_dicts[key])
dicts[key].changed = False
dicts[key].set_readonly(True)
return True
else:
return False
[docs]
def reuse_file_dict(dicts, other_dicts, key):
dicts[key].reuse(other_dicts[key])
return reuse_simple_dict(dicts, other_dicts, key)
[docs]
def reuse_common_dicts(dicts, other_dicts, is_main, all_reused):
# save the immutable ones
# check simple dictionaries
for k in ("consts", "untar_cfg", "vars", "gridmap"):
if k in dicts:
all_reused = reuse_simple_dict(dicts, other_dicts, k) and all_reused
# since the file names may have changed, refresh the file_list
refresh_file_list(dicts, is_main)
# check file-based dictionaries
for k in ("preentry_file_list", "file_list", "aftergroup_preentry_file_list", "aftergroup_file_list"):
if k in dicts:
all_reused = reuse_file_dict(dicts, other_dicts, k) and all_reused
if all_reused:
# description and signature track other files
# so they change iff the others change
for k in ("description", "signature"):
dicts[k] = copy.deepcopy(other_dicts[k])
dicts[k].changed = False
dicts[k].set_readonly(True)
# check the mutable ones
for k in ("params", "attrs"):
if k in dicts:
reuse_simple_dict(dicts, other_dicts, k)
return all_reused
[docs]
def reuse_main_dicts(main_dicts, other_main_dicts):
reuse_simple_dict(main_dicts, other_main_dicts, "frontend_descript")
all_reused = reuse_common_dicts(main_dicts, other_main_dicts, True, True)
# will not try to reuse the summary_signature... being in work_dir
# can be rewritten and it is not worth the pain to try to prevent it
return all_reused
[docs]
def reuse_group_dicts(group_dicts, other_group_dicts, group_name):
reuse_simple_dict(group_dicts, other_group_dicts, "group_descript")
all_reused = reuse_common_dicts(group_dicts, other_group_dicts, False, True)
return all_reused
################################################
#
# Handle dicts as Classes
#
################################################
################################################
#
# This Class contains the main dicts
#
################################################
[docs]
class frontendMainDicts(cWDictFile.FileMainDicts):
def __init__(
self,
work_dir,
stage_dir,
workdir_name,
simple_work_dir=False, # if True, do not create the lib and lock work_dir subdirs, nor the params dict
assume_groups=True,
log_dir=None,
): # used only if simple_work_dir=False
self.assume_groups = assume_groups
cWDictFile.FileMainDicts.__init__(self, work_dir, stage_dir, workdir_name, simple_work_dir, log_dir)
######################################
# Redefine methods needed by parent
[docs]
def load(self):
load_main_dicts(self.dicts)
[docs]
def save(self, set_readonly=True):
save_main_dicts(self.dicts, set_readonly=set_readonly)
# reuse as much of the other as possible
[docs]
def reuse(self, other): # other must be of the same class
cWDictFile.FileMainDicts.reuse(self, other)
reuse_main_dicts(self.dicts, other.dicts)
####################
# Internal
####################
[docs]
def get_daemon_log_dir(self, base_dir):
return os.path.join(base_dir, "frontend")
# Overwritting the empty one
[docs]
def get_main_dicts(self):
return get_main_dicts(self.work_dir, self.stage_dir, self.simple_work_dir, self.assume_groups)
################################################
#
# This Class contains the group dicts
#
################################################
[docs]
class frontendGroupDicts(cWDictFile.FileSubDicts):
######################################
# Redefine methods needed by parent
[docs]
def load(self):
load_group_dicts(self.dicts, self.sub_name, self.summary_signature)
[docs]
def save(self, set_readonly=True):
save_group_dicts(self.dicts, self.sub_name, self.summary_signature, set_readonly=set_readonly)
[docs]
def save_final(self, set_readonly=True):
pass # nothing to do
# reuse as much of the other as possible
[docs]
def reuse(self, other): # other must be of the same class
cWDictFile.FileSubDicts.reuse(self, other)
reuse_group_dicts(self.dicts, other.dicts, self.sub_name)
####################
# Internal
####################
[docs]
def get_sub_work_dir(self, base_dir):
return cvWConsts.get_group_work_dir(base_dir, self.sub_name)
[docs]
def get_sub_log_dir(self, base_dir):
return cvWConsts.get_group_log_dir(base_dir, self.sub_name)
[docs]
def get_sub_stage_dir(self, base_dir):
return cvWConsts.get_group_stage_dir(base_dir, self.sub_name)
[docs]
def get_sub_dicts(self):
return get_group_dicts(self.work_dir, self.stage_dir, self.sub_name, self.simple_work_dir)
[docs]
def reuse_nocheck(self, other):
reuse_group_dicts(self.dicts, other.dicts, self.sub_name)
################################################
#
# This Class contains both the main and
# the group dicts
#
################################################
[docs]
class frontendDicts(cWDictFile.FileDicts):
def __init__(
self,
work_dir,
stage_dir,
group_list=[],
workdir_name="submit",
simple_work_dir=False, # if True, do not create the lib and lock work_dir subdirs, nor the params dict
log_dir=None,
): # used only if simple_work_dir=False
cWDictFile.FileDicts.__init__(self, work_dir, stage_dir, group_list, workdir_name, simple_work_dir, log_dir)
###########
# PRIVATE
###########
######################################
# Redefine methods needed by parent
[docs]
def new_MainDicts(self):
return frontendMainDicts(
self.work_dir,
self.stage_dir,
self.workdir_name,
self.simple_work_dir,
assume_groups=True,
log_dir=self.log_dir,
)
[docs]
def new_SubDicts(self, sub_name):
return frontendGroupDicts(
self.work_dir,
self.stage_dir,
sub_name,
self.main_dicts.get_summary_signature(),
self.workdir_name,
self.simple_work_dir,
self.log_dir,
)
[docs]
def get_sub_name_from_sub_stage_dir(self, sign_key):
return cvWConsts.get_group_name_from_group_stage_dir(sign_key)