| import hashlib |
|
|
| from Crypto.Cipher import AES |
| from Crypto.PublicKey import RSA |
| from Crypto.Random import get_random_bytes |
|
|
| from extensions.ext_redis import redis_client |
| from extensions.ext_storage import storage |
| from libs import gmpy2_pkcs10aep_cipher |
|
|
|
|
| def generate_key_pair(tenant_id): |
| private_key = RSA.generate(2048) |
| public_key = private_key.publickey() |
|
|
| pem_private = private_key.export_key() |
| pem_public = public_key.export_key() |
|
|
| filepath = "privkeys/{tenant_id}".format(tenant_id=tenant_id) + "/private.pem" |
|
|
| storage.save(filepath, pem_private) |
|
|
| return pem_public.decode() |
|
|
|
|
| prefix_hybrid = b"HYBRID:" |
|
|
|
|
| def encrypt(text, public_key): |
| if isinstance(public_key, str): |
| public_key = public_key.encode() |
|
|
| aes_key = get_random_bytes(16) |
| cipher_aes = AES.new(aes_key, AES.MODE_EAX) |
|
|
| ciphertext, tag = cipher_aes.encrypt_and_digest(text.encode()) |
|
|
| rsa_key = RSA.import_key(public_key) |
| cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key) |
|
|
| enc_aes_key = cipher_rsa.encrypt(aes_key) |
|
|
| encrypted_data = enc_aes_key + cipher_aes.nonce + tag + ciphertext |
|
|
| return prefix_hybrid + encrypted_data |
|
|
|
|
| def get_decrypt_decoding(tenant_id): |
| filepath = "privkeys/{tenant_id}".format(tenant_id=tenant_id) + "/private.pem" |
|
|
| cache_key = "tenant_privkey:{hash}".format(hash=hashlib.sha3_256(filepath.encode()).hexdigest()) |
| private_key = redis_client.get(cache_key) |
| if not private_key: |
| try: |
| private_key = storage.load(filepath) |
| except FileNotFoundError: |
| raise PrivkeyNotFoundError("Private key not found, tenant_id: {tenant_id}".format(tenant_id=tenant_id)) |
|
|
| redis_client.setex(cache_key, 120, private_key) |
|
|
| rsa_key = RSA.import_key(private_key) |
| cipher_rsa = gmpy2_pkcs10aep_cipher.new(rsa_key) |
|
|
| return rsa_key, cipher_rsa |
|
|
|
|
| def decrypt_token_with_decoding(encrypted_text, rsa_key, cipher_rsa): |
| if encrypted_text.startswith(prefix_hybrid): |
| encrypted_text = encrypted_text[len(prefix_hybrid) :] |
|
|
| enc_aes_key = encrypted_text[: rsa_key.size_in_bytes()] |
| nonce = encrypted_text[rsa_key.size_in_bytes() : rsa_key.size_in_bytes() + 16] |
| tag = encrypted_text[rsa_key.size_in_bytes() + 16 : rsa_key.size_in_bytes() + 32] |
| ciphertext = encrypted_text[rsa_key.size_in_bytes() + 32 :] |
|
|
| aes_key = cipher_rsa.decrypt(enc_aes_key) |
|
|
| cipher_aes = AES.new(aes_key, AES.MODE_EAX, nonce=nonce) |
| decrypted_text = cipher_aes.decrypt_and_verify(ciphertext, tag) |
| else: |
| decrypted_text = cipher_rsa.decrypt(encrypted_text) |
|
|
| return decrypted_text.decode() |
|
|
|
|
| def decrypt(encrypted_text, tenant_id): |
| rsa_key, cipher_rsa = get_decrypt_decoding(tenant_id) |
|
|
| return decrypt_token_with_decoding(encrypted_text, rsa_key, cipher_rsa) |
|
|
|
|
| class PrivkeyNotFoundError(Exception): |
| pass |
|
|