# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0
"""pubCrypto - This module defines classes to perform public key cryptography
It uses M2Crypto: https://github.com/mcepl/M2Crypto
a wrapper around OpenSSL: https://www.openssl.org/docs/man1.1.1/man3/
NOTE For convenience and consistency w/ previous versions of this module, Encryption/Signing functions
(b64, hex and .encrypt() ) accept bytes-like objects (bytes, bytearray) and also Unicode strings
utf-8 encoded (defaults.BINARY_ENCODING_CRYPTO).
B64 and hex Decryption functions, consistent w/ Python's binascii.a2b_* functions, accept bytes and
Unicode strings containing only ASCII characters, .decrypt() only accepts bytes-like objects (such as bytes,
bytearray and other objects that support the buffer protocol).
All these functions return bytes.
Keys can be loaded from AnyStr (str, bytes, bytearray). Keys are returned as bytes string. Key files are binary.
"""
import binascii
import os
import M2Crypto.BIO
import M2Crypto.Err
import M2Crypto.RSA
from . import defaults
[docs]
def passphrase_callback(v: bool, prompt1: str = "Enter passphrase:", prompt2: str = "Verify passphrase:"):
# Example callback (uncomment for manual testing)
# str3 = prompt1 + prompt2
# return str3 # Optional return
pass
[docs]
def _default_callback(*args):
"""Return a dummy passphrase
Good for service key processing where human not present.
Used as a callback in the :mod:M2Crypto module:
A Python callable object that is invoked to acquire a passphrase with which to unlock the key.
The default is :func:M2Crypto.util.passphrase_callback ::
def passphrase_callback(v: bool, prompt1: str = 'Enter passphrase:', prompt2: str = 'Verify passphrase:'
): -> Optional[str]
Args:
*args:
Returns:
Optional[str]: str or None
"""
# TODO: according to the M2Crypto spec this function is expected to return a str (unicode)
# but doing so fails the unit test (test_factory_glideFactoryConfig.py), leaving the bytes now
# maybe the fixture w/ the key should be foxed (fixtures/factory/work-dir/rsa.key/rsa.key.bak)
# return "default"
return b"default"
[docs]
class PubCryptoError(Exception):
"""Exception masking M2Crypto exceptions,
to ease error handling in modules importing pubCrypto
"""
def __init__(self, msg):
Exception.__init__(self, msg)
######################
#
# Available paddings:
# M2Crypto.RSA.no_padding
# M2Crypto.RSA.pkcs1_padding
# M2Crypto.RSA.sslv23_padding
# M2Crypto.RSA.pkhas1_oaep_padding
#
# Available sign algos:
# 'sha1'
# 'sha224'
# 'sha256',
# 'ripemd160'
# 'md5'
#
# Available ciphers:
# too many to list them all
# try 'man enc'
# a few of them are
# 'aes_128_cbc'
# 'aes_128_ofb
# 'aes_256_cbc'
# 'aes_256_cfb'
# 'bf_cbc'
# 'des3'
#
######################
##########################################################################
# Public part of the RSA key
[docs]
class PubRSAKey:
"""Public part of the RSA key"""
def __init__(
self, key_str=None, key_fname=None, encryption_padding=M2Crypto.RSA.pkcs1_oaep_padding, sign_algo="sha256"
):
"""Constructor for RSA public key
One and only one of the two key_str or key_fname must be defined (not None)
Available paddings:
M2Crypto.RSA.no_padding
M2Crypto.RSA.pkcs1_padding
M2Crypto.RSA.sslv23_padding
M2Crypto.RSA.pkhas1_oaep_padding
Available sign algos:
'sha1', 'sha224', 'sha256', 'ripemd160', 'md5'
Available ciphers, too many to list them all, try `man enc` a few of them are:
'aes_128_cbc'
'aes_128_ofb
'aes_256_cbc'
'aes_256_cfb'
'bf_cbc'
'des3'
Args:
key_str (str/bytes): string w/ base64 encoded key
Must be bytes-like object or ASCII string, like base64 inputs
key_fname (str): key file path
encryption_padding:
sign_algo (str): valid signing algorithm (default: 'sha256')
"""
self.rsa_key = None
self.has_private = False
self.encryption_padding = encryption_padding
self.sign_algo = sign_algo
try:
self.load(key_str, key_fname)
except M2Crypto.RSA.RSAError as e:
# Put some additional information in the exception object to be printed later on
# This helps operator understand which file might be corrupted so that they can try to delete it
e.key_fname = key_fname
e.cwd = os.getcwd()
raise e from e # Need to raise a new exception to have the modified values (only raise keeps the original)
return
###########################################
# Load key functions
[docs]
def load(self, key_str=None, key_fname=None):
"""Load key from a string or a file
Only one of the two can be defined (not None)
Load the key into self.rsa_key
Args:
key_str (str/bytes): string w/ base64 encoded key
Must be bytes-like object or ASCII string, like base64 inputs
key_fname (str): file name
Raises:
ValueError: if both key_str and key_fname are defined
"""
if key_str is not None:
if key_fname is not None:
raise ValueError("Illegal to define both key_str and key_fname")
key_str = defaults.force_bytes(key_str)
try:
bio = M2Crypto.BIO.MemoryBuffer(key_str)
self._load_from_bio(bio)
except M2Crypto.RSA.RSAError as e:
raise PubCryptoError("M2Crypto.RSA.RSAError: %s" % e) from e
elif key_fname is not None:
bio = M2Crypto.BIO.openfile(key_fname)
if bio is None:
# File not found or wrong permissions
raise M2Crypto.BIO.BIOError(M2Crypto.Err.get_error())
self._load_from_bio(bio)
else:
self.rsa_key = None
return
# meant to be internal
[docs]
def _load_from_bio(self, bio):
"""Load the key into the object
Protected, overridden by child classes. Used by load
Args:
bio (M2Crypto.BIO.BIO): BIO to retrieve the key from (file or memory buffer)
"""
self.rsa_key = M2Crypto.RSA.load_pub_key_bio(bio)
self.has_private = False
return
###########################################
# Save key functions
[docs]
def save(self, key_fname):
"""Save the key to a file
The file is binary and is written using M2Crypto.BIO
Args:
key_fname (str): file name
Returns:
"""
bio = M2Crypto.BIO.openfile(key_fname, "wb")
try:
return self._save_to_bio(bio)
except Exception:
# need to remove the file in case of error
bio.close()
del bio
os.unlink(key_fname)
raise
# like save, but return a string
[docs]
def get(self):
"""Retrieve the key
Returns:
bytes: key
"""
bio = M2Crypto.BIO.MemoryBuffer()
self._save_to_bio(bio)
return bio.read()
# meant to be internal
[docs]
def _save_to_bio(self, bio):
"""Save the key from the object
Protected, overridden by child classes. Used by save and get
Args:
bio (M2Crypto.BIO.BIO): BIO object to save the key to (file or memory buffer)
Returns:
int: status returned by M2Crypto.m2.rsa_write_pub_key
Raises:
KeyError: if the key is not defined
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
return self.rsa_key.save_pub_key_bio(bio)
###########################################
# encrypt/verify data inline
[docs]
def encrypt(self, data):
"""Encrypt the data
Args:
data (AnyStr): string to encrypt. bytes-like or str. If unicode,
it is encoded using utf-8 before being encrypted.
len(data) must be less than len(key)
Returns:
bytes: encrypted data
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
bdata = defaults.force_bytes(data)
return self.rsa_key.public_encrypt(bdata, self.encryption_padding)
[docs]
def encrypt_base64(self, data):
"""like encrypt, but base64 encoded"""
return binascii.b2a_base64(self.encrypt(data))
[docs]
def encrypt_hex(self, data):
"""like encrypt, but hex encoded"""
return binascii.b2a_hex(self.encrypt(data))
[docs]
def verify(self, data, signature):
"""Verify that the signature gets you the data
Args:
data (AnyStr): string to verify. bytes-like or str. If unicode,
it is encoded using utf-8 before being encrypted. :
signature (bytes): signature to use in the verification
Returns:
bool: True if the signature gets you the data
Raises:
KeyError: if the key is not defined
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
bdata = defaults.force_bytes(data)
return self.rsa_key.verify(bdata, signature, self.sign_algo)
[docs]
def verify_base64(self, data, signature):
"""like verify, but the signature is base64 encoded"""
return self.verify(data, binascii.a2b_base64(signature))
[docs]
def verify_hex(self, data, signature):
"""like verify, but the signature is hex encoded"""
return self.verify(data, binascii.a2b_hex(signature))
##########################################################################
# Public and private part of the RSA key
[docs]
class RSAKey(PubRSAKey):
"""Public and private part of the RSA key"""
def __init__(
self,
key_str=None,
key_fname=None,
private_cipher="aes_256_cbc",
private_callback=_default_callback,
encryption_padding=M2Crypto.RSA.pkcs1_oaep_padding,
sign_algo="sha256",
):
self.private_cipher = private_cipher
self.private_callback = private_callback
PubRSAKey.__init__(self, key_str, key_fname, encryption_padding, sign_algo)
return
###########################################
# Downgrade to PubRSAKey
[docs]
def PubRSAKey(self):
"""Return the public part only. Downgrade to PubRSAKey
Returns:
PubRSAKey: an object w/ only the public part of the key
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
bio = M2Crypto.BIO.MemoryBuffer()
self.rsa_key.save_pub_key_bio(bio)
public_key = bio.read()
return PubRSAKey(key_str=public_key, encryption_padding=self.encryption_padding, sign_algo=self.sign_algo)
###########################################
# Load key functions
[docs]
def _load_from_bio(self, bio):
"""Load the key into the object
Internal, overrides the parent _load_from_bio. Used by load
Args:
bio (M2Crypto.BIO.BIO):
"""
self.rsa_key = M2Crypto.RSA.load_key_bio(bio, self.private_callback)
self.has_private = True
return
###########################################
# Save key functions
[docs]
def _save_to_bio(self, bio):
"""Save the key from the object
Protected, overridden by child classes. Used by save and get
Args:
bio (M2Crypto.BIO.BIO): BIO to save the key into (file or memory buffer)
Returns:
Raises:
KeyError: if the key is not defined
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
return self.rsa_key.save_key_bio(bio, self.private_cipher, self.private_callback)
###########################################
# generate key function
[docs]
def new(self, key_length=None, exponent=65537):
"""Refresh/Generate a new key and store it in the object
Args:
key_length (int/None): if no key_length provided, use the length of the existing one
exponent (int): exponent
"""
if key_length is None:
if self.rsa_key is None:
raise KeyError("No RSA key and no key length provided")
key_length = len(self.rsa_key)
self.rsa_key = M2Crypto.RSA.gen_key(key_length, exponent)
return
###########################################
[docs]
def decrypt(self, data):
"""Decrypt data inline
Args:
data (bytes): data to decrypt
Returns:
bytes: decrypted string
Raises:
KeyError: if the key is not defined
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
return self.rsa_key.private_decrypt(data, self.encryption_padding)
[docs]
def decrypt_base64(self, data):
"""like decrypt, but base64 encoded"""
return self.decrypt(binascii.a2b_base64(data))
[docs]
def decrypt_hex(self, data):
"""like decrypt, but hex encoded"""
return self.decrypt(binascii.a2b_hex(data))
[docs]
def sign(self, data):
"""Sign data inline. Same as private_encrypt
Args:
data (AnyStr): string to encrypt. If unicode, it is encoded using utf-8 before being encrypted.
len(data) must be less than len(key)
Returns:
bytes: encrypted data
"""
if self.rsa_key is None:
raise KeyError("No RSA key")
bdata = defaults.force_bytes(data)
return self.rsa_key.sign(bdata, self.sign_algo)
[docs]
def sign_base64(self, data):
"""like sign, but base64 encoded"""
return binascii.b2a_base64(self.sign(data))
[docs]
def sign_hex(self, data):
"""like sign, but hex encoded"""
return binascii.b2a_hex(self.sign(data))
# def generate():
# privkey_file = "priv.pem"
# pubkey_file = "pub.pem"
# key_length = 1024
# cr=RSAKey()
# cr.new(key_length)
# cr_pub=cr.PubRSAKey()
#
# cr.save(privkey_file)
# cr_pub.save(pubkey_file)
#
# def debug_print(description, text):
# print "<%s>\n%s\n</%s>\n" % (description,text,description)
#
# def test():
# privkey_file = "priv.pem"
# pubkey_file = "pub.pem"
# key_length = 1024
# cr=RSAKey(key_fname=privkey_file)
# cr_pub=cr.PubRSAKey()
#
# plaintext = "5105105105105100"
# encrypted = cr_pub.encrypt_base64(plaintext)
# decrypted = cr.decrypt_base64(encrypted)
# signed = cr.sign_base64(plaintext)
#
# assert cr_pub.verify_base64(plaintext,signed)
#
# assert plaintext == decrypted
#
# debug_print("plain text", plaintext)
# debug_print("cipher text", encrypted)
# debug_print("signed text", signed)
# debug_print("decrypted text", decrypted)