Source code for glideinwms.lib.symCrypto

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

"""symCrypto - This module defines classes to perform symmetric key cryptography (shared or hidden key)

It uses M2Crypto: https://github.com/mcepl/M2Crypto
a wrapper around OpenSSL: https://www.openssl.org/docs/man1.1.1/man3/

NOTE For convenience and consistency w/ previous versions of this module, Encryption/Signing functions
    (b64, hex and .encrypt() ) accept bytes-like objects (bytes, bytearray) and also Unicode strings
    utf-8 encoded (defaults.BINARY_ENCODING_CRYPTO).
    B64 and hex Decryption functions, consistent w/ Python's binascii.a2b_* functions, accept bytes and
    Unicode strings containing only ASCII characters, .decrypt() only accepts bytes-like objects (such as bytes,
    bytearray and other objects that support the buffer protocol).
    All these functions return bytes.

    Key definitions accept AnyStr (str, bytes, bytearray), key_str are iv_str bytes, key_iv_code is a str,
    so is the key

"""

import binascii

import M2Crypto.BIO
import M2Crypto.Rand

from . import defaults

######################
#
# Available ciphers:
#  too many to list them all
#     try 'man enc'
#  a few of them are
#   'aes_128_cbc'
#   'aes_128_ofb
#   'aes_256_cbc'
#   'aes_256_cfb'
#   'bf_cbc'
#   'des3'
#
######################


[docs] class SymKey: """Symmetric keys cryptography You probably don't want to use this, use the child classes instead self.key_str and self.iv_str are bytes (strings) with HEX encoded data Available ciphers, too many to list them all, try `man enc`, a few of them are: 'aes_128_cbc' 'aes_128_ofb 'aes_256_cbc' 'aes_256_cfb' 'bf_cbc' 'des3' """ def __init__(self, cypher_name, key_len, iv_len, key_str=None, iv_str=None, key_iv_code=None): """Constructor Args: cypher_name: key_len: iv_len: key_str: iv_str: key_iv_code: """ self.cypher_name = cypher_name self.key_len = key_len self.iv_len = iv_len self.key_str = None self.iv_str = None self.ket_str = None self.load(key_str, iv_str, key_iv_code) return ########################################### # load a new key
[docs] def load(self, key_str=None, iv_str=None, key_iv_code=None): """Load a new key from text (str/bytes) Args: key_str (str/bytes): string w/ base64 encoded key Must be bytes-like object or ASCII string, like base64 inputs iv_str (str/bytes): initialization vector key_iv_code (str/bytes): comma separated text with cypher, key, iv Returns: """ if key_str is not None: if key_iv_code is not None: raise ValueError("Illegal to define both key_str and key_iv_code") # just in case it was unicode" key_str = defaults.force_bytes(key_str) if len(key_str) != (self.key_len * 2): raise ValueError("Key must be exactly %i long, got %i" % (self.key_len * 2, len(key_str))) if iv_str is None: # if key_str defined, one needs the iv_str, too # set to default of 0 iv_str = b"0" * (self.iv_len * 2) else: if len(iv_str) != (self.iv_len * 2): raise ValueError( "Initialization vector must be exactly %i long, got %i" % (self.iv_len * 2, len(iv_str)) ) # just in case it was unicode" iv_str = defaults.force_bytes(iv_str) elif key_iv_code is not None: # just in case it was unicode" key_iv_code = defaults.force_bytes(key_iv_code) ki_arr = key_iv_code.split(b",") if len(ki_arr) != 3: raise ValueError("Invalid format, commas not found") if ki_arr[0] != (b"cypher:%b" % self.cypher_name.encode(defaults.BINARY_ENCODING_CRYPTO)): raise ValueError("Invalid format, not my cypher(%s)" % self.cypher_name) if ki_arr[1][:4] != b"key:": raise ValueError("Invalid format, key not found") if ki_arr[2][:3] != b"iv:": raise ValueError("Invalid format, iv not found") # call itself, but with key and iv decoded, to run the checks on key and iv return self.load(key_str=ki_arr[1][4:], iv_str=ki_arr[2][3:]) # else keep None self.key_str = key_str self.iv_str = iv_str
[docs] def is_valid(self): """Return true if the key is valid Returns: bool: True if the key string is not None """ return self.key_str is not None
[docs] def get(self): """Get the key and initialization vector Returns: tuple: (key, iv) tuple wehere both key and iv are bytes """ return (self.key_str, self.iv_str)
[docs] def get_code(self): """Return the key code: cypher, key, iv, as a comma separated string Returns: str: key description in the string """ return "cypher:{},key:{},iv:{}".format( self.cypher_name, self.key_str.decode(defaults.BINARY_ENCODING_CRYPTO), self.iv_str.decode(defaults.BINARY_ENCODING_CRYPTO), )
[docs] def new(self, random_iv=True): """Generate a new key Set self.key_str and self.iv_str Args: random_iv (bool): if False, set iv to 0 """ self.key_str = binascii.b2a_hex(M2Crypto.Rand.rand_bytes(self.key_len)) if random_iv: self.iv_str = binascii.b2a_hex(M2Crypto.Rand.rand_bytes(self.iv_len)) else: self.iv_str = b"0" * (self.iv_len * 2) return
[docs] def encrypt(self, data): """Encrypt data inline Args: data (AnyStr): data to encrypt Returns: bytes: encrypted data Raises: KeyError: if there is no valid crypto key """ if not self.is_valid(): raise KeyError("No key") bdata = defaults.force_bytes(data) b = M2Crypto.BIO.MemoryBuffer() c = M2Crypto.BIO.CipherStream(b) c.set_cipher(self.cypher_name, binascii.a2b_hex(self.key_str), binascii.a2b_hex(self.iv_str), 1) c.write(bdata) c.flush() c.close() e = b.read() return e
[docs] def encrypt_base64(self, data): """like encrypt, but the result is base64 encoded""" return binascii.b2a_base64(self.encrypt(data))
[docs] def encrypt_hex(self, data): """like encrypt, but the result is hex encoded""" return binascii.b2a_hex(self.encrypt(data))
[docs] def decrypt(self, data): """Decrypt data inline Args: data (bytes): data to decrypt Returns: bytes: decrypted data Raises: KeyError: if there is no valid crypto key """ if not self.is_valid(): raise KeyError("No key") b = M2Crypto.BIO.MemoryBuffer() c = M2Crypto.BIO.CipherStream(b) c.set_cipher(self.cypher_name, binascii.a2b_hex(self.key_str), binascii.a2b_hex(self.iv_str), 0) c.write(data) c.flush() c.close() d = b.read() return d
[docs] def decrypt_base64(self, data): """like decrypt, but the input is base64 encoded Args: data (AnyStrASCII): Base64 input data. bytes or ASCII encoded Unicode str Returns: bytes: decrypted data """ return self.decrypt(binascii.a2b_base64(data))
[docs] def decrypt_hex(self, data): """like decrypt, but the input is hex encoded Args: data (AnyStrASCII): HEX input data. bytes or ASCII encoded Unicode str Returns: bytes: decrypted data """ return self.decrypt(binascii.a2b_hex(data))
[docs] class MutableSymKey(SymKey): """SymKey class, allows to change the crypto after instantiation""" def __init__(self, cypher_name=None, key_len=None, iv_len=None, key_str=None, iv_str=None, key_iv_code=None): self.redefine(cypher_name, key_len, iv_len, key_str, iv_str, key_iv_code)
[docs] def redefine(self, cypher_name=None, key_len=None, iv_len=None, key_str=None, iv_str=None, key_iv_code=None): """Load a new crypto type and a new key Args: cypher_name: key_len: iv_len: key_str: iv_str: key_iv_code: Returns: """ self.cypher_name = cypher_name self.key_len = key_len self.iv_len = iv_len self.load(key_str, iv_str, key_iv_code) return
[docs] def is_valid(self): """Return true if the key is valid. Redefine, as null crypto name could be used in this class Returns: bool: True if both the key string and cypher name are not None """ return (self.key_str is not None) and (self.cypher_name is not None)
[docs] def get_wcrypto(self): """Get the stored key and the crypto name Returns: str: cypher name bytes: key string bytes: iv string """ return (self.cypher_name, self.key_str, self.iv_str)
########################################################################## # Parametrized sym algo classes # dict of crypt_name -> (key_len, iv_len) cypher_dict = {"aes_128_cbc": (16, 16), "aes_256_cbc": (32, 16), "bf_cbc": (16, 8), "des3": (24, 8), "des_cbc": (8, 8)}
[docs] class ParametrizedSymKey(SymKey): """Helper class to build different types of Symmetric Keys from a parameter dictionary (cypher_dict).""" def __init__(self, cypher_name, key_str=None, iv_str=None, key_iv_code=None): if cypher_name not in list(cypher_dict.keys()): raise KeyError("Unsupported cypher %s" % cypher_name) cypher_params = cypher_dict[cypher_name] SymKey.__init__(self, cypher_name, cypher_params[0], cypher_params[1], key_str, iv_str, key_iv_code)
[docs] class AutoSymKey(MutableSymKey): """Symmetric Keys from code strings. Get cypher name from key_iv_code""" def __init__(self, key_iv_code=None): """Constructor Args: key_iv_code (AnyStr): cypher byte string. str is encoded using BINARY_ENCODING_CRYPTO """ self.auto_load(key_iv_code)
[docs] def auto_load(self, key_iv_code=None): """Load a new key_iv_key and extract the cypher Args: key_iv_code (AnyStr): cypher byte string. str is encoded using BINARY_ENCODING_CRYPTO Raises: ValueError: if the format of the code is incorrect """ if key_iv_code is None: self.cypher_name = None self.key_str = None else: key_iv_code = defaults.force_bytes(key_iv_code) # just in case it was unicode" ki_arr = key_iv_code.split(b",") if len(ki_arr) != 3: raise ValueError("Invalid format, commas not found") if ki_arr[0][:7] != b"cypher:": raise ValueError("Invalid format, cypher not found") cypher_name = ki_arr[0][7:].decode(defaults.BINARY_ENCODING_CRYPTO) if ki_arr[1][:4] != b"key:": raise ValueError("Invalid format, key not found") key_str = ki_arr[1][4:] if ki_arr[2][:3] != b"iv:": raise ValueError("Invalid format, iv not found") iv_str = ki_arr[2][3:] cypher_params = cypher_dict[cypher_name] self.redefine(cypher_name, cypher_params[0], cypher_params[1], key_str, iv_str)
########################################################################## # Explicit sym algo classes
[docs] class SymAES128Key(ParametrizedSymKey): def __init__(self, key_str=None, iv_str=None, key_iv_code=None): ParametrizedSymKey.__init__(self, "aes_128_cbc", key_str, iv_str, key_iv_code)
[docs] class SymAES256Key(ParametrizedSymKey): def __init__(self, key_str=None, iv_str=None, key_iv_code=None): ParametrizedSymKey.__init__(self, "aes_256_cbc", key_str, iv_str, key_iv_code)
[docs] class Sym3DESKey(ParametrizedSymKey): def __init__(self, key_str=None, iv_str=None, key_iv_code=None): ParametrizedSymKey.__init__(self, "des3", key_str, iv_str, key_iv_code)
# Removed SymBlowfishKey, bf_cbc and SymDESKey, des_cbc, because not supported in openssl3 (EL9) # def debug_print(description, text): # print "<%s>\n%s\n</%s>\n" % (description,text,description) # # def test(): # plaintext = "5105105105105100" # # sk=SymAES256Key() # sk.new() # # key_iv_code=sk.get_code() # # encrypted = sk.encrypt_hex(plaintext) # # sk2=AutoSymKey(key_iv_code=key_iv_code) # decrypted = sk2.decrypt_hex(encrypted) # # assert plaintext == decrypted # # debug_print("key_id", key_iv_code) # debug_print("plain text", plaintext) # debug_print("cipher text", encrypted) # debug_print("decrypted text", decrypted)