Source code for glideinwms.creation.lib.matchPolicy
# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""This module contains the Match Policy related class
"""
import os
import os.path
# import copy
import re
from glideinwms.lib.util import import_module
# import string
# import socket
# from collections import OrderedDict
from glideinwms.lib.xmlParse import OrderedDict
# from . import cWParams
# import pprint
[docs]
class MatchPolicyLoadError(Exception):
def __init__(self, file="", search_path=[]):
self.file = file
self.searchPath = search_path
def __str__(self):
err_str = ""
if self.file == "":
err_str = "No match policy file provided"
else:
err_str = f"Failed to load policy from the file {self.file} in the search path {self.searchPath}"
return err_str
[docs]
class MatchPolicyContentError(Exception):
def __init__(self, file, attr, expected_type, actual_type):
self.file = file
self.attr = attr
self.attrExpectedType = expected_type
self.attrType = actual_type
def __str__(self):
return "{} in policy file {} should be of type {} and not {}".format(
self.attr,
self.file,
self.attrExpectedType,
self.attrType,
)
[docs]
class MatchPolicy:
def __init__(self, file, search_path=[]):
"""
Load match policy from the policy file
@param file: Path to the python file
@type file: string
@param search_path: Search path to the python module to load
@type search_path: list
@rtype: MatchPolicy Object
"""
if (file is not None) and (file != ""):
self.file = file
self.name = self.policyFileToPyModuleName()
search_path.append(os.path.dirname(os.path.realpath(file)))
self.searchPath = search_path
try:
self.pyObject = import_module(self.name, self.searchPath)
except Exception:
raise MatchPolicyLoadError(file=file, search_path=self.searchPath)
else:
raise MatchPolicyLoadError()
match_attrs = self.loadMatchAttrs()
self.factoryMatchAttrs = match_attrs.get("factory_match_attrs")
self.jobMatchAttrs = match_attrs.get("job_match_attrs")
# Assume TRUE as default for all expressions
self.factoryQueryExpr = "TRUE"
if "factory_query_expr" in dir(self.pyObject):
self.factoryQueryExpr = self.pyObject.factory_query_expr
self.jobQueryExpr = "TRUE"
if "job_query_expr" in dir(self.pyObject):
self.jobQueryExpr = self.pyObject.job_query_expr
self.startExpr = "TRUE"
if "start_expr" in dir(self.pyObject):
self.startExpr = self.pyObject.start_expr
[docs]
def policyFileToPyModuleName(self):
policy_fname = os.path.basename(self.file)
policy_module_name = re.sub(".py$", "", policy_fname)
return policy_module_name
[docs]
def loadMatchAttrs(self):
"""
If given match_attr i.e. factory_match_attr or job_match_attr exits
load it from the pyObject
"""
# match_attrs = {}
match_attrs = {"factory_match_attrs": {}, "job_match_attrs": {}}
for ma_name in ("factory_match_attrs", "job_match_attrs"):
if ma_name in dir(self.pyObject):
ma_attr = getattr(self.pyObject, ma_name)
# Check if the match_attr is of dict type
# TODO: Also need to check that match_attr is of string/int/bool
if isinstance(ma_attr, dict):
data = OrderedDict()
for k, v in ma_attr.items():
data[k] = OrderedDict(v)
match_attrs[ma_name] = data
else:
# Raise error if match_attr is not of type dict
raise MatchPolicyContentError(self.file, ma_name, type(ma_attr).__name__, "dict")
return match_attrs
def __repr__(self):
return self.__str__()
def __str__(self):
contents = {
"file": self.file,
"name": self.name,
"searchPath": "%s" % self.searchPath,
"pyObject": "%s" % self.pyObject,
"factoryMatchAttrs": "%s" % self.factoryMatchAttrs,
"jobMatchAttrs": "%s" % self.jobMatchAttrs,
"factoryQueryExpr": "%s" % self.factoryQueryExpr,
"jobQueryExpr": "%s" % self.jobQueryExpr,
"startExpr": "%s" % self.startExpr,
}
return "%s" % contents