Island: Make IEncryptor interface consistent

An IEncryptor could take Any type and encrypt it, returning Any other
type. This is a poorly defined interface and, IIRC, it was defined this
way so some encryptors could take strings and others could take bytes.

This commit modifies the interface so that it accepts and returns bytes
in all cases. If a string encryptor is needed for convenience, we can
add IStringEncryptor and write a decorator that wraps IEncryptors.
This commit is contained in:
Mike Salvatore 2022-07-11 09:39:05 -04:00
parent f7e632025f
commit 54c1eef309
7 changed files with 21 additions and 22 deletions

View File

@ -46,10 +46,10 @@ class DataStoreEncryptor(IEncryptor):
return KeyBasedEncryptor(plaintext_key) return KeyBasedEncryptor(plaintext_key)
def encrypt(self, plaintext: str) -> str: def encrypt(self, plaintext: bytes) -> bytes:
return self._key_based_encryptor.encrypt(plaintext) return self._key_based_encryptor.encrypt(plaintext)
def decrypt(self, ciphertext: str): def decrypt(self, ciphertext: bytes) -> bytes:
return self._key_based_encryptor.decrypt(ciphertext) return self._key_based_encryptor.decrypt(ciphertext)

View File

@ -5,8 +5,8 @@ from . import IFieldEncryptor
class StringEncryptor(IFieldEncryptor): class StringEncryptor(IFieldEncryptor):
@staticmethod @staticmethod
def encrypt(value: str): def encrypt(value: str):
return get_datastore_encryptor().encrypt(value) return get_datastore_encryptor().encrypt(value.encode())
@staticmethod @staticmethod
def decrypt(value: str): def decrypt(value: str):
return get_datastore_encryptor().decrypt(value) return get_datastore_encryptor().decrypt(value).decode()

View File

@ -7,8 +7,8 @@ from . import IFieldEncryptor
class StringListEncryptor(IFieldEncryptor): class StringListEncryptor(IFieldEncryptor):
@staticmethod @staticmethod
def encrypt(value: List[str]): def encrypt(value: List[str]):
return [get_datastore_encryptor().encrypt(string) for string in value] return [get_datastore_encryptor().encrypt(string.encode()) for string in value]
@staticmethod @staticmethod
def decrypt(value: List[str]): def decrypt(value: List[bytes]):
return [get_datastore_encryptor().decrypt(string) for string in value] return [get_datastore_encryptor().decrypt(bytes_).decode() for bytes_ in value]

View File

@ -1,10 +1,9 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any
class IEncryptor(ABC): class IEncryptor(ABC):
@abstractmethod @abstractmethod
def encrypt(self, plaintext: Any) -> Any: def encrypt(self, plaintext: bytes) -> bytes:
""" """
Encrypts data and returns the ciphertext. Encrypts data and returns the ciphertext.
@ -13,7 +12,7 @@ class IEncryptor(ABC):
""" """
@abstractmethod @abstractmethod
def decrypt(self, ciphertext: Any): def decrypt(self, ciphertext: bytes) -> bytes:
""" """
Decrypts data and returns the plaintext. Decrypts data and returns the plaintext.

View File

@ -31,15 +31,15 @@ class KeyBasedEncryptor(IEncryptor):
# something up. The main drawback to fernet is that it uses AES-128, which is not # something up. The main drawback to fernet is that it uses AES-128, which is not
# quantum-safe. At the present time, human error is probably a greater risk than quantum # quantum-safe. At the present time, human error is probably a greater risk than quantum
# computers. # computers.
def encrypt(self, plaintext: str) -> str: def encrypt(self, plaintext: bytes) -> bytes:
cipher_iv = Random.new().read(AES.block_size) cipher_iv = Random.new().read(AES.block_size)
cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv) cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv)
padded_plaintext = Padding.pad(plaintext.encode(), self._BLOCK_SIZE) padded_plaintext = Padding.pad(plaintext, self._BLOCK_SIZE)
return base64.b64encode(cipher_iv + cipher.encrypt(padded_plaintext)).decode() return base64.b64encode(cipher_iv + cipher.encrypt(padded_plaintext))
def decrypt(self, ciphertext: str): def decrypt(self, ciphertext: bytes) -> bytes:
enc_message = base64.b64decode(ciphertext) enc_message = base64.b64decode(ciphertext)
cipher_iv = enc_message[0 : AES.block_size] cipher_iv = enc_message[0 : AES.block_size]
cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv) cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv)
padded_plaintext = cipher.decrypt(enc_message[AES.block_size :]) padded_plaintext = cipher.decrypt(enc_message[AES.block_size :])
return Padding.unpad(padded_plaintext, self._BLOCK_SIZE).decode() return Padding.unpad(padded_plaintext, self._BLOCK_SIZE)

View File

@ -11,7 +11,7 @@ from monkey_island.cc.server_utils.encryption import (
# Mark all tests in this module as slow # Mark all tests in this module as slow
pytestmark = pytest.mark.slow pytestmark = pytest.mark.slow
PLAINTEXT = "Hello, Monkey!" PLAINTEXT = b"Hello, Monkey!"
MOCK_SECRET = "53CR31" MOCK_SECRET = "53CR31"

View File

@ -14,19 +14,19 @@ kb_encryptor = KeyBasedEncryptor(KEY)
def test_encrypt_decrypt_string_with_key(): def test_encrypt_decrypt_string_with_key():
encrypted = kb_encryptor.encrypt(PLAINTEXT) encrypted = kb_encryptor.encrypt(PLAINTEXT.encode())
decrypted = kb_encryptor.decrypt(encrypted) decrypted = kb_encryptor.decrypt(encrypted).decode()
assert decrypted == PLAINTEXT assert decrypted == PLAINTEXT
@pytest.mark.parametrize("plaintext", [PLAINTEXT_UTF8_1, PLAINTEXT_UTF8_2, PLAINTEXT_UTF8_3]) @pytest.mark.parametrize("plaintext", [PLAINTEXT_UTF8_1, PLAINTEXT_UTF8_2, PLAINTEXT_UTF8_3])
def test_encrypt_decrypt_string_utf8_with_key(plaintext): def test_encrypt_decrypt_string_utf8_with_key(plaintext):
encrypted = kb_encryptor.encrypt(plaintext) encrypted = kb_encryptor.encrypt(plaintext.encode())
decrypted = kb_encryptor.decrypt(encrypted) decrypted = kb_encryptor.decrypt(encrypted).decode()
assert decrypted == plaintext assert decrypted == plaintext
def test_encrypt_decrypt_string_multiple_block_size_with_key(): def test_encrypt_decrypt_string_multiple_block_size_with_key():
encrypted = kb_encryptor.encrypt(PLAINTEXT_MULTIPLE_BLOCK_SIZE) encrypted = kb_encryptor.encrypt(PLAINTEXT_MULTIPLE_BLOCK_SIZE.encode())
decrypted = kb_encryptor.decrypt(encrypted) decrypted = kb_encryptor.decrypt(encrypted).decode()
assert decrypted == PLAINTEXT_MULTIPLE_BLOCK_SIZE assert decrypted == PLAINTEXT_MULTIPLE_BLOCK_SIZE