Refactored json parsing out of encryption/decryption functionality.

This commit is contained in:
VakarisZ 2021-06-11 16:05:32 +03:00
parent 3450b80a82
commit 5c7bab7a0d
7 changed files with 96 additions and 95 deletions

View File

@ -5,7 +5,7 @@ from flask import request
from monkey_island.cc.resources.auth.auth import jwt_required from monkey_island.cc.resources.auth.auth import jwt_required
from monkey_island.cc.services.config import ConfigService from monkey_island.cc.services.config import ConfigService
from monkey_island.cc.services.utils.config_encryption import encrypt_config from monkey_island.cc.services.utils.encryption import encrypt_string
class ConfigurationExport(flask_restful.Resource): class ConfigurationExport(flask_restful.Resource):
@ -19,6 +19,7 @@ class ConfigurationExport(flask_restful.Resource):
config_export = plaintext_config config_export = plaintext_config
if should_encrypt: if should_encrypt:
password = data["password"] password = data["password"]
config_export = encrypt_config(plaintext_config, password) plaintext_config = json.dumps(plaintext_config)
config_export = encrypt_string(plaintext_config, password)
return {"config_export": config_export, "encrypted": should_encrypt} return {"config_export": config_export, "encrypted": should_encrypt}

View File

@ -9,9 +9,10 @@ from flask import request
from common.utils.exceptions import InvalidConfigurationError from common.utils.exceptions import InvalidConfigurationError
from monkey_island.cc.resources.auth.auth import jwt_required from monkey_island.cc.resources.auth.auth import jwt_required
from monkey_island.cc.services.config import ConfigService from monkey_island.cc.services.config import ConfigService
from monkey_island.cc.services.utils.config_encryption import ( from monkey_island.cc.services.utils.encryption import (
InvalidCiphertextError,
InvalidCredentialsError, InvalidCredentialsError,
decrypt_config, decrypt_ciphertext,
is_encrypted, is_encrypted,
) )
@ -68,13 +69,13 @@ class ConfigurationImport(flask_restful.Resource):
@staticmethod @staticmethod
def _get_plaintext_config_from_request(request_contents: dict) -> dict: def _get_plaintext_config_from_request(request_contents: dict) -> dict:
if ConfigurationImport.is_config_encrypted(request_contents["config"]): try:
return decrypt_config(request_contents["config"], request_contents["password"]) config = request_contents["config"]
else: if ConfigurationImport.is_config_encrypted(request_contents["config"]):
try: config = decrypt_ciphertext(config, request_contents["password"])
return json.loads(request_contents["config"]) return json.loads(config)
except JSONDecodeError: except (JSONDecodeError, InvalidCiphertextError):
raise InvalidConfigurationError raise InvalidConfigurationError
@staticmethod @staticmethod
def import_config(config_json): def import_config(config_json):

View File

@ -1,62 +0,0 @@
import base64
import io
import json
import logging
from typing import Dict
import pyAesCrypt
from common.utils.exceptions import InvalidConfigurationError
BUFFER_SIZE = pyAesCrypt.crypto.bufferSizeDef
logger = logging.getLogger(__name__)
def encrypt_config(config: Dict, password: str) -> str:
plaintext_config_stream = io.BytesIO(json.dumps(config).encode())
ciphertext_config_stream = io.BytesIO()
pyAesCrypt.encryptStream(
plaintext_config_stream, ciphertext_config_stream, password, BUFFER_SIZE
)
ciphertext_b64 = base64.b64encode(ciphertext_config_stream.getvalue())
logger.info("Configuration encrypted.")
return ciphertext_b64.decode()
def decrypt_config(ciphertext: str, password: str) -> Dict:
ciphertext = base64.b64decode(ciphertext)
ciphertext_config_stream = io.BytesIO(ciphertext)
dec_plaintext_config_stream = io.BytesIO()
len_ciphertext_config_stream = len(ciphertext_config_stream.getvalue())
try:
pyAesCrypt.decryptStream(
ciphertext_config_stream,
dec_plaintext_config_stream,
password,
BUFFER_SIZE,
len_ciphertext_config_stream,
)
except ValueError as ex:
if str(ex).startswith("Wrong password"):
logger.info("Wrong password for configuration provided.")
raise InvalidCredentialsError
else:
logger.info("The provided configuration file is corrupt.")
raise InvalidConfigurationError
plaintext_config = json.loads(dec_plaintext_config_stream.getvalue().decode("utf-8"))
return plaintext_config
def is_encrypted(ciphertext: str) -> bool:
ciphertext = base64.b64decode(ciphertext)
return ciphertext.startswith(b"AES")
class InvalidCredentialsError(Exception):
""" Raise when credentials supplied are invalid """

View File

@ -0,0 +1,59 @@
import base64
import io
import logging
import pyAesCrypt
BUFFER_SIZE = pyAesCrypt.crypto.bufferSizeDef
logger = logging.getLogger(__name__)
def encrypt_string(plaintext: str, password: str) -> str:
plaintext_stream = io.BytesIO(plaintext.encode())
ciphertext_stream = io.BytesIO()
pyAesCrypt.encryptStream(plaintext_stream, ciphertext_stream, password, BUFFER_SIZE)
ciphertext_b64 = base64.b64encode(ciphertext_stream.getvalue())
logger.info("String encrypted.")
return ciphertext_b64.decode()
def decrypt_ciphertext(ciphertext: str, password: str) -> str:
ciphertext = base64.b64decode(ciphertext)
ciphertext_stream = io.BytesIO(ciphertext)
plaintext_stream = io.BytesIO()
ciphertext_stream_len = len(ciphertext_stream.getvalue())
try:
pyAesCrypt.decryptStream(
ciphertext_stream,
plaintext_stream,
password,
BUFFER_SIZE,
ciphertext_stream_len,
)
except ValueError as ex:
if str(ex).startswith("Wrong password"):
logger.info("Wrong password provided for decryption.")
raise InvalidCredentialsError
else:
logger.info("The corrupt ciphertext provided.")
raise InvalidCiphertextError
return plaintext_stream.getvalue().decode("utf-8")
def is_encrypted(ciphertext: str) -> bool:
ciphertext = base64.b64decode(ciphertext)
return ciphertext.startswith(b"AES")
class InvalidCredentialsError(Exception):
""" Raised when password for decryption is invalid """
class InvalidCiphertextError(Exception):
""" Raised when ciphertext is corrupted """

View File

@ -18,3 +18,8 @@ def monkey_config(data_for_tests_dir):
) )
plaintext_config = json.loads(open(plaintext_monkey_config_standard_path, "r").read()) plaintext_config = json.loads(open(plaintext_monkey_config_standard_path, "r").read())
return plaintext_config return plaintext_config
@pytest.fixture
def monkey_config_json(monkey_config):
return json.dumps(monkey_config)

View File

@ -1,5 +1,3 @@
import json
import pytest import pytest
from tests.unit_tests.monkey_island.cc.services.utils.ciphertexts_for_encryption_test import ( from tests.unit_tests.monkey_island.cc.services.utils.ciphertexts_for_encryption_test import (
MALFORMED_CIPHER_TEXT_CORRUPTED, MALFORMED_CIPHER_TEXT_CORRUPTED,
@ -8,16 +6,15 @@ from tests.unit_tests.monkey_island.cc.services.utils.test_config_encryption imp
from common.utils.exceptions import InvalidConfigurationError from common.utils.exceptions import InvalidConfigurationError
from monkey_island.cc.resources.configuration_import import ConfigurationImport from monkey_island.cc.resources.configuration_import import ConfigurationImport
from monkey_island.cc.services.utils.config_encryption import encrypt_config from monkey_island.cc.services.utils.encryption import encrypt_string
def test_is_config_encrypted__json(monkey_config): def test_is_config_encrypted__json(monkey_config_json):
monkey_config = json.dumps(monkey_config) assert not ConfigurationImport.is_config_encrypted(monkey_config_json)
assert not ConfigurationImport.is_config_encrypted(monkey_config)
def test_is_config_encrypted__ciphertext(monkey_config): def test_is_config_encrypted__ciphertext(monkey_config_json):
encrypted_config = encrypt_config(monkey_config, PASSWORD) encrypted_config = encrypt_string(monkey_config_json, PASSWORD)
assert ConfigurationImport.is_config_encrypted(encrypted_config) assert ConfigurationImport.is_config_encrypted(encrypted_config)

View File

@ -3,10 +3,10 @@ from tests.unit_tests.monkey_island.cc.services.utils.ciphertexts_for_encryption
MALFORMED_CIPHER_TEXT_CORRUPTED, MALFORMED_CIPHER_TEXT_CORRUPTED,
) )
from monkey_island.cc.services.utils.config_encryption import ( from monkey_island.cc.services.utils.encryption import (
InvalidCredentialsError, InvalidCredentialsError,
decrypt_config, decrypt_ciphertext,
encrypt_config, encrypt_string,
) )
MONKEY_CONFIGS_DIR_PATH = "monkey_configs" MONKEY_CONFIGS_DIR_PATH = "monkey_configs"
@ -15,23 +15,23 @@ PASSWORD = "hello123"
INCORRECT_PASSWORD = "goodbye321" INCORRECT_PASSWORD = "goodbye321"
def test_encrypt_decrypt_config(monkey_config): def test_encrypt_decrypt_string(monkey_config_json):
encrypted_config = encrypt_config(monkey_config, PASSWORD) encrypted_config = encrypt_string(monkey_config_json, PASSWORD)
assert decrypt_config(encrypted_config, PASSWORD) == monkey_config assert decrypt_ciphertext(encrypted_config, PASSWORD) == monkey_config_json
def test_encrypt_decrypt_config__wrong_password(monkey_config): def test_encrypt_decrypt_string__wrong_password(monkey_config_json):
encrypted_config = encrypt_config(monkey_config, PASSWORD) encrypted_config = encrypt_string(monkey_config_json, PASSWORD)
with pytest.raises(InvalidCredentialsError): with pytest.raises(InvalidCredentialsError):
decrypt_config(encrypted_config, INCORRECT_PASSWORD) decrypt_ciphertext(encrypted_config, INCORRECT_PASSWORD)
def test_encrypt_decrypt_config__malformed_corrupted(): def test_encrypt_decrypt_string__malformed_corrupted():
with pytest.raises(ValueError): with pytest.raises(ValueError):
decrypt_config(MALFORMED_CIPHER_TEXT_CORRUPTED, PASSWORD) decrypt_ciphertext(MALFORMED_CIPHER_TEXT_CORRUPTED, PASSWORD)
def test_encrypt_decrypt_config__decrypt_no_password(monkey_config): def test_encrypt_decrypt_string__decrypt_no_password(monkey_config_json):
encrypted_config = encrypt_config(monkey_config, PASSWORD) encrypted_config = encrypt_string(monkey_config_json, PASSWORD)
with pytest.raises(InvalidCredentialsError): with pytest.raises(InvalidCredentialsError):
decrypt_config(encrypted_config, "") decrypt_ciphertext(encrypted_config, "")