Refactor password based encryptor into PasswordBasedStringEncryptor and PasswordBasedByteEncryptor

This change allows to encrypt strings and bytes without any additional conversion done on the caller
This commit is contained in:
VakarisZ 2021-09-30 10:10:20 +03:00
parent f387595104
commit 191fbea665
7 changed files with 64 additions and 37 deletions

View File

@ -4,7 +4,7 @@ import flask_restful
from flask import request from flask import request
from monkey_island.cc.resources.auth.auth import jwt_required from monkey_island.cc.resources.auth.auth import jwt_required
from monkey_island.cc.server_utils.encryption import PasswordBasedEncryptor from monkey_island.cc.server_utils.encryption import PasswordBasedStringEncryptor
from monkey_island.cc.services.config import ConfigService from monkey_island.cc.services.config import ConfigService
@ -21,7 +21,7 @@ class ConfigurationExport(flask_restful.Resource):
password = data["password"] password = data["password"]
plaintext_config = json.dumps(plaintext_config) plaintext_config = json.dumps(plaintext_config)
pb_encryptor = PasswordBasedEncryptor(password) pb_encryptor = PasswordBasedStringEncryptor(password)
config_export = pb_encryptor.encrypt(plaintext_config) config_export = pb_encryptor.encrypt(plaintext_config)
return {"config_export": config_export, "encrypted": should_encrypt} return {"config_export": config_export, "encrypted": should_encrypt}

View File

@ -11,7 +11,7 @@ from monkey_island.cc.resources.auth.auth import jwt_required
from monkey_island.cc.server_utils.encryption import ( from monkey_island.cc.server_utils.encryption import (
InvalidCiphertextError, InvalidCiphertextError,
InvalidCredentialsError, InvalidCredentialsError,
PasswordBasedEncryptor, PasswordBasedStringEncryptor,
is_encrypted, is_encrypted,
) )
from monkey_island.cc.services.config import ConfigService from monkey_island.cc.services.config import ConfigService
@ -72,7 +72,7 @@ class ConfigurationImport(flask_restful.Resource):
try: try:
config = request_contents["config"] config = request_contents["config"]
if ConfigurationImport.is_config_encrypted(request_contents["config"]): if ConfigurationImport.is_config_encrypted(request_contents["config"]):
pb_encryptor = PasswordBasedEncryptor(request_contents["password"]) pb_encryptor = PasswordBasedStringEncryptor(request_contents["password"])
config = pb_encryptor.decrypt(config) config = pb_encryptor.decrypt(config)
return json.loads(config) return json.loads(config)
except (JSONDecodeError, InvalidCiphertextError): except (JSONDecodeError, InvalidCiphertextError):

View File

@ -1,11 +1,10 @@
from monkey_island.cc.server_utils.encryption.i_encryptor import IEncryptor from monkey_island.cc.server_utils.encryption.i_encryptor import IEncryptor
from monkey_island.cc.server_utils.encryption.key_based_encryptor import KeyBasedEncryptor from monkey_island.cc.server_utils.encryption.key_based_encryptor import KeyBasedEncryptor
from monkey_island.cc.server_utils.encryption.password_based_encryption import ( from monkey_island.cc.server_utils.encryption.password_based_string_encryption import (
InvalidCiphertextError, PasswordBasedStringEncryptor,
InvalidCredentialsError,
PasswordBasedEncryptor,
is_encrypted, is_encrypted,
) )
from .password_based_byte_encryption import InvalidCredentialsError, InvalidCiphertextError
from monkey_island.cc.server_utils.encryption.data_store_encryptor import ( from monkey_island.cc.server_utils.encryption.data_store_encryptor import (
DataStoreEncryptor, DataStoreEncryptor,
get_datastore_encryptor, get_datastore_encryptor,

View File

@ -1,6 +1,6 @@
import base64
import io import io
import logging import logging
from io import BytesIO
import pyAesCrypt import pyAesCrypt
@ -17,36 +17,28 @@ logger = logging.getLogger(__name__)
# Note: password != key # Note: password != key
class PasswordBasedEncryptor(IEncryptor): class PasswordBasedByteEncryptor(IEncryptor):
_BUFFER_SIZE = pyAesCrypt.crypto.bufferSizeDef _BUFFER_SIZE = pyAesCrypt.crypto.bufferSizeDef
def __init__(self, password: str): def __init__(self, password: str):
self.password = password self.password = password
def encrypt(self, plaintext: str) -> str: def encrypt(self, plaintext: BytesIO) -> BytesIO:
plaintext_stream = io.BytesIO(plaintext.encode())
ciphertext_stream = io.BytesIO() ciphertext_stream = io.BytesIO()
pyAesCrypt.encryptStream( pyAesCrypt.encryptStream(plaintext, ciphertext_stream, self.password, self._BUFFER_SIZE)
plaintext_stream, ciphertext_stream, self.password, self._BUFFER_SIZE
)
ciphertext_b64 = base64.b64encode(ciphertext_stream.getvalue()) return ciphertext_stream
logger.info("String encrypted.")
return ciphertext_b64.decode() def decrypt(self, ciphertext: BytesIO) -> BytesIO:
def decrypt(self, ciphertext: str):
ciphertext = base64.b64decode(ciphertext)
ciphertext_stream = io.BytesIO(ciphertext)
plaintext_stream = io.BytesIO() plaintext_stream = io.BytesIO()
ciphertext_stream_len = len(ciphertext_stream.getvalue()) ciphertext_stream_len = len(ciphertext.getvalue())
try: try:
pyAesCrypt.decryptStream( pyAesCrypt.decryptStream(
ciphertext_stream, ciphertext,
plaintext_stream, plaintext_stream,
self.password, self.password,
self._BUFFER_SIZE, self._BUFFER_SIZE,
@ -59,7 +51,7 @@ class PasswordBasedEncryptor(IEncryptor):
else: else:
logger.info("The corrupt ciphertext provided.") logger.info("The corrupt ciphertext provided.")
raise InvalidCiphertextError raise InvalidCiphertextError
return plaintext_stream.getvalue().decode("utf-8") return plaintext_stream
class InvalidCredentialsError(Exception): class InvalidCredentialsError(Exception):
@ -68,8 +60,3 @@ class InvalidCredentialsError(Exception):
class InvalidCiphertextError(Exception): class InvalidCiphertextError(Exception):
""" Raised when ciphertext is corrupted """ """ Raised when ciphertext is corrupted """
def is_encrypted(ciphertext: str) -> bool:
ciphertext = base64.b64decode(ciphertext)
return ciphertext.startswith(b"AES")

View File

@ -0,0 +1,38 @@
import base64
import io
import logging
import pyAesCrypt
from monkey_island.cc.server_utils.encryption import IEncryptor
from monkey_island.cc.server_utils.encryption.password_based_byte_encryption import (
PasswordBasedByteEncryptor,
)
logger = logging.getLogger(__name__)
class PasswordBasedStringEncryptor(IEncryptor):
_BUFFER_SIZE = pyAesCrypt.crypto.bufferSizeDef
def __init__(self, password: str):
self.password = password
def encrypt(self, plaintext: str) -> str:
plaintext_stream = io.BytesIO(plaintext.encode())
ciphertext = PasswordBasedByteEncryptor(self.password).encrypt(plaintext_stream)
return base64.b64encode(ciphertext.getvalue()).decode()
def decrypt(self, ciphertext: str) -> str:
ciphertext = base64.b64decode(ciphertext)
ciphertext_stream = io.BytesIO(ciphertext)
plaintext_stream = PasswordBasedByteEncryptor(self.password).decrypt(ciphertext_stream)
return plaintext_stream.getvalue().decode("utf-8")
def is_encrypted(ciphertext: str) -> bool:
ciphertext = base64.b64decode(ciphertext)
return ciphertext.startswith(b"AES")

View File

@ -8,7 +8,7 @@ from tests.unit_tests.monkey_island.cc.services.utils.ciphertexts_for_encryption
from common.utils.exceptions import InvalidConfigurationError from common.utils.exceptions import InvalidConfigurationError
from monkey_island.cc.resources.configuration_import import ConfigurationImport from monkey_island.cc.resources.configuration_import import ConfigurationImport
from monkey_island.cc.server_utils.encryption import PasswordBasedEncryptor from monkey_island.cc.server_utils.encryption import PasswordBasedStringEncryptor
def test_is_config_encrypted__json(monkey_config_json): def test_is_config_encrypted__json(monkey_config_json):
@ -17,7 +17,7 @@ def test_is_config_encrypted__json(monkey_config_json):
@pytest.mark.slow @pytest.mark.slow
def test_is_config_encrypted__ciphertext(monkey_config_json): def test_is_config_encrypted__ciphertext(monkey_config_json):
pb_encryptor = PasswordBasedEncryptor(PASSWORD) pb_encryptor = PasswordBasedStringEncryptor(PASSWORD)
encrypted_config = pb_encryptor.encrypt(monkey_config_json) encrypted_config = pb_encryptor.encrypt(monkey_config_json)
assert ConfigurationImport.is_config_encrypted(encrypted_config) assert ConfigurationImport.is_config_encrypted(encrypted_config)

View File

@ -4,7 +4,10 @@ from tests.unit_tests.monkey_island.cc.services.utils.ciphertexts_for_encryption
VALID_CIPHER_TEXT, VALID_CIPHER_TEXT,
) )
from monkey_island.cc.server_utils.encryption import InvalidCredentialsError, PasswordBasedEncryptor from monkey_island.cc.server_utils.encryption import (
InvalidCredentialsError,
PasswordBasedStringEncryptor,
)
MONKEY_CONFIGS_DIR_PATH = "monkey_configs" MONKEY_CONFIGS_DIR_PATH = "monkey_configs"
STANDARD_PLAINTEXT_MONKEY_CONFIG_FILENAME = "monkey_config_standard.json" STANDARD_PLAINTEXT_MONKEY_CONFIG_FILENAME = "monkey_config_standard.json"
@ -14,27 +17,27 @@ INCORRECT_PASSWORD = "goodbye321"
@pytest.mark.slow @pytest.mark.slow
def test_encrypt_decrypt_string(monkey_config_json): def test_encrypt_decrypt_string(monkey_config_json):
pb_encryptor = PasswordBasedEncryptor(PASSWORD) pb_encryptor = PasswordBasedStringEncryptor(PASSWORD)
encrypted_config = pb_encryptor.encrypt(monkey_config_json) encrypted_config = pb_encryptor.encrypt(monkey_config_json)
assert pb_encryptor.decrypt(encrypted_config) == monkey_config_json assert pb_encryptor.decrypt(encrypted_config) == monkey_config_json
@pytest.mark.slow @pytest.mark.slow
def test_decrypt_string__wrong_password(monkey_config_json): def test_decrypt_string__wrong_password(monkey_config_json):
pb_encryptor = PasswordBasedEncryptor(INCORRECT_PASSWORD) pb_encryptor = PasswordBasedStringEncryptor(INCORRECT_PASSWORD)
with pytest.raises(InvalidCredentialsError): with pytest.raises(InvalidCredentialsError):
pb_encryptor.decrypt(VALID_CIPHER_TEXT) pb_encryptor.decrypt(VALID_CIPHER_TEXT)
@pytest.mark.slow @pytest.mark.slow
def test_decrypt_string__malformed_corrupted(): def test_decrypt_string__malformed_corrupted():
pb_encryptor = PasswordBasedEncryptor(PASSWORD) pb_encryptor = PasswordBasedStringEncryptor(PASSWORD)
with pytest.raises(ValueError): with pytest.raises(ValueError):
pb_encryptor.decrypt(MALFORMED_CIPHER_TEXT_CORRUPTED) pb_encryptor.decrypt(MALFORMED_CIPHER_TEXT_CORRUPTED)
@pytest.mark.slow @pytest.mark.slow
def test_decrypt_string__no_password(monkey_config_json): def test_decrypt_string__no_password(monkey_config_json):
pb_encryptor = PasswordBasedEncryptor("") pb_encryptor = PasswordBasedStringEncryptor("")
with pytest.raises(InvalidCredentialsError): with pytest.raises(InvalidCredentialsError):
pb_encryptor.decrypt(VALID_CIPHER_TEXT) pb_encryptor.decrypt(VALID_CIPHER_TEXT)