Merge pull request #2078 from guardicore/1965-repository-encryptor

1965 repository encryptor
This commit is contained in:
Mike Salvatore 2022-07-12 06:31:18 -04:00 committed by GitHub
commit 69d513b136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 318 additions and 29 deletions

View File

@ -7,6 +7,8 @@ from .password_based_bytes_encryptor import (
InvalidCredentialsError, InvalidCredentialsError,
InvalidCiphertextError, InvalidCiphertextError,
) )
from .i_lockable_encryptor import ILockableEncryptor, LockedKeyError, UnlockError, ResetKeyError
from .repository_encryptor import RepositoryEncryptor
from .data_store_encryptor import ( from .data_store_encryptor import (
get_datastore_encryptor, get_datastore_encryptor,
unlock_datastore_encryptor, unlock_datastore_encryptor,

View File

@ -15,6 +15,7 @@ _KEY_FILE_NAME = "mongo_key.bin"
_encryptor: Union[None, IEncryptor] = None _encryptor: Union[None, IEncryptor] = None
# NOTE: This class is being replaced by RepositoryEncryptor
class DataStoreEncryptor(IEncryptor): class DataStoreEncryptor(IEncryptor):
_KEY_LENGTH_BYTES = 32 _KEY_LENGTH_BYTES = 32
@ -46,10 +47,10 @@ class DataStoreEncryptor(IEncryptor):
return KeyBasedEncryptor(plaintext_key) return KeyBasedEncryptor(plaintext_key)
def encrypt(self, plaintext: str) -> str: def encrypt(self, plaintext: bytes) -> bytes:
return self._key_based_encryptor.encrypt(plaintext) return self._key_based_encryptor.encrypt(plaintext)
def decrypt(self, ciphertext: str): def decrypt(self, ciphertext: bytes) -> bytes:
return self._key_based_encryptor.decrypt(ciphertext) return self._key_based_encryptor.decrypt(ciphertext)

View File

@ -5,8 +5,8 @@ from . import IFieldEncryptor
class StringEncryptor(IFieldEncryptor): class StringEncryptor(IFieldEncryptor):
@staticmethod @staticmethod
def encrypt(value: str): def encrypt(value: str):
return get_datastore_encryptor().encrypt(value) return get_datastore_encryptor().encrypt(value.encode())
@staticmethod @staticmethod
def decrypt(value: str): def decrypt(value: str):
return get_datastore_encryptor().decrypt(value) return get_datastore_encryptor().decrypt(value).decode()

View File

@ -7,8 +7,8 @@ from . import IFieldEncryptor
class StringListEncryptor(IFieldEncryptor): class StringListEncryptor(IFieldEncryptor):
@staticmethod @staticmethod
def encrypt(value: List[str]): def encrypt(value: List[str]):
return [get_datastore_encryptor().encrypt(string) for string in value] return [get_datastore_encryptor().encrypt(string.encode()) for string in value]
@staticmethod @staticmethod
def decrypt(value: List[str]): def decrypt(value: List[bytes]):
return [get_datastore_encryptor().decrypt(string) for string in value] return [get_datastore_encryptor().decrypt(bytes_).decode() for bytes_ in value]

View File

@ -1,20 +1,21 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any
class IEncryptor(ABC): class IEncryptor(ABC):
@abstractmethod @abstractmethod
def encrypt(self, plaintext: Any) -> Any: def encrypt(self, plaintext: bytes) -> bytes:
"""Encrypts data and returns the ciphertext. """
Encrypts data and returns the ciphertext.
:param plaintext: Data that will be encrypted :param plaintext: Data that will be encrypted
:return: Ciphertext generated by encrypting value :return: Ciphertext generated by encrypting the plaintext
:rtype: Any
""" """
@abstractmethod @abstractmethod
def decrypt(self, ciphertext: Any): def decrypt(self, ciphertext: bytes) -> bytes:
"""Decrypts data and returns the plaintext. """
:param ciphertext: Ciphertext that will be decrypted Decrypts data and returns the plaintext.
:return: Plaintext generated by decrypting value
:rtype: Any :param ciphertext: Ciphertext that will be decrypted
:return: Plaintext generated by decrypting the ciphertext
""" """

View File

@ -0,0 +1,85 @@
from abc import ABC, abstractmethod
# NOTE: The ILockableEncryptor introduces temporal coupling, that is, you must first unlock the
# encryptor before you can use it. This is because the key material used to encrypt repository
# contents is encrypted using the user's username and password. This adds extra security by
# allowing us to fully encrypt data at rest. Without this, we'd need to store the repository
# key in plaintext. Alternative solutions are as follows:
# 1. Store the repository key in plaintext
# 2. Add an initialization phase to the island's boot sequence. At the moment, this is the
# only element of the design that would benefit from adding an additional phase. If
# other temporal coupling begins to creep in, we can add the initialization phase at
# that time and remove this interface.
class LockedKeyError(Exception):
"""
Raised when an ILockableEncryptor attemps to encrypt or decrypt data before the
ILockableEncryptor has been unlocked.
"""
class UnlockError(Exception):
"""
Raised if an error occurs while attempting to unlock an ILockableEncryptor
"""
class ResetKeyError(Exception):
"""
Raised if an error occurs while attempting to reset an ILockableEncryptor's key
"""
class ILockableEncryptor(ABC):
"""
An encryptor that can be locked or unlocked.
ILockableEncryptor's require a secret in order to access their key material. These encryptors
must be unlocked before use and can be re-locked at the user's request.
"""
@abstractmethod
def unlock(self, secret: bytes):
"""
Unlock the encryptor
:param secret: A secret that must be used to access the ILockableEncryptor's key material.
:raises UnlockError: If the ILockableEncryptor could not be unlocked
"""
@abstractmethod
def lock(self):
"""
Lock the encryptor, making it unusable
"""
@abstractmethod
def reset_key(self):
"""
Reset the encryptor's key
Remove the existing key material so that it can never be used again.
:raises ResetKeyError: If an error occurred while attemping to reset the key
"""
@abstractmethod
def encrypt(self, plaintext: bytes) -> bytes:
"""
Encrypts data and returns the ciphertext.
:param plaintext: Data that will be encrypted
:return: Ciphertext generated by encrypting the plaintext
:raises LockedKeyError: If encrypt() is called while the ILockableEncryptor is locked
"""
@abstractmethod
def decrypt(self, ciphertext: bytes) -> bytes:
"""
Decrypts data and returns the plaintext.
:param ciphertext: Ciphertext that will be decrypted
:return: Plaintext generated by decrypting the ciphertext
:raises LockedKeyError: If decrypt() is called while the ILockableEncryptor is locked
"""

View File

@ -31,15 +31,15 @@ class KeyBasedEncryptor(IEncryptor):
# something up. The main drawback to fernet is that it uses AES-128, which is not # something up. The main drawback to fernet is that it uses AES-128, which is not
# quantum-safe. At the present time, human error is probably a greater risk than quantum # quantum-safe. At the present time, human error is probably a greater risk than quantum
# computers. # computers.
def encrypt(self, plaintext: str) -> str: def encrypt(self, plaintext: bytes) -> bytes:
cipher_iv = Random.new().read(AES.block_size) cipher_iv = Random.new().read(AES.block_size)
cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv) cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv)
padded_plaintext = Padding.pad(plaintext.encode(), self._BLOCK_SIZE) padded_plaintext = Padding.pad(plaintext, self._BLOCK_SIZE)
return base64.b64encode(cipher_iv + cipher.encrypt(padded_plaintext)).decode() return base64.b64encode(cipher_iv + cipher.encrypt(padded_plaintext))
def decrypt(self, ciphertext: str): def decrypt(self, ciphertext: bytes) -> bytes:
enc_message = base64.b64decode(ciphertext) enc_message = base64.b64decode(ciphertext)
cipher_iv = enc_message[0 : AES.block_size] cipher_iv = enc_message[0 : AES.block_size]
cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv) cipher = AES.new(self._key, AES.MODE_CBC, cipher_iv)
padded_plaintext = cipher.decrypt(enc_message[AES.block_size :]) padded_plaintext = cipher.decrypt(enc_message[AES.block_size :])
return Padding.unpad(padded_plaintext, self._BLOCK_SIZE).decode() return Padding.unpad(padded_plaintext, self._BLOCK_SIZE)

View File

@ -0,0 +1,71 @@
import secrets
from pathlib import Path
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
from . import ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from .key_based_encryptor import KeyBasedEncryptor
from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
class RepositoryEncryptor(ILockableEncryptor):
_KEY_LENGTH_BYTES = 32
def __init__(self, key_file: Path):
self._key_file = key_file
self._password_based_encryptor = None
self._key_based_encryptor = None
def unlock(self, secret: bytes):
try:
self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor()
except Exception as err:
raise UnlockError(err)
def _initialize_key_based_encryptor(self):
if self._key_file.is_file():
return self._load_key()
return self._create_key()
def _load_key(self) -> KeyBasedEncryptor:
with open(self._key_file, "rb") as f:
encrypted_key = f.read()
plaintext_key = self._password_based_encryptor.decrypt(encrypted_key)
return KeyBasedEncryptor(plaintext_key)
def _create_key(self) -> KeyBasedEncryptor:
plaintext_key = secrets.token_bytes(RepositoryEncryptor._KEY_LENGTH_BYTES)
encrypted_key = self._password_based_encryptor.encrypt(plaintext_key)
with open_new_securely_permissioned_file(str(self._key_file), "wb") as f:
f.write(encrypted_key)
return KeyBasedEncryptor(plaintext_key)
def lock(self):
self._key_based_encryptor = None
def reset_key(self):
try:
if self._key_file.is_file():
self._key_file.unlink()
except Exception as err:
raise ResetKeyError(err)
self._password_based_encryptor = None
self._key_based_encryptor = None
def encrypt(self, plaintext: bytes) -> bytes:
if self._key_based_encryptor is None:
raise LockedKeyError("Cannot encrypt while the encryptor is locked)")
return self._key_based_encryptor.encrypt(plaintext)
def decrypt(self, ciphertext: bytes) -> bytes:
if self._key_based_encryptor is None:
raise LockedKeyError("Cannot decrypt while the encryptor is locked)")
return self._key_based_encryptor.decrypt(ciphertext)

View File

@ -11,7 +11,7 @@ from monkey_island.cc.server_utils.encryption import (
# Mark all tests in this module as slow # Mark all tests in this module as slow
pytestmark = pytest.mark.slow pytestmark = pytest.mark.slow
PLAINTEXT = "Hello, Monkey!" PLAINTEXT = b"Hello, Monkey!"
MOCK_SECRET = "53CR31" MOCK_SECRET = "53CR31"

View File

@ -14,19 +14,19 @@ kb_encryptor = KeyBasedEncryptor(KEY)
def test_encrypt_decrypt_string_with_key(): def test_encrypt_decrypt_string_with_key():
encrypted = kb_encryptor.encrypt(PLAINTEXT) encrypted = kb_encryptor.encrypt(PLAINTEXT.encode())
decrypted = kb_encryptor.decrypt(encrypted) decrypted = kb_encryptor.decrypt(encrypted).decode()
assert decrypted == PLAINTEXT assert decrypted == PLAINTEXT
@pytest.mark.parametrize("plaintext", [PLAINTEXT_UTF8_1, PLAINTEXT_UTF8_2, PLAINTEXT_UTF8_3]) @pytest.mark.parametrize("plaintext", [PLAINTEXT_UTF8_1, PLAINTEXT_UTF8_2, PLAINTEXT_UTF8_3])
def test_encrypt_decrypt_string_utf8_with_key(plaintext): def test_encrypt_decrypt_string_utf8_with_key(plaintext):
encrypted = kb_encryptor.encrypt(plaintext) encrypted = kb_encryptor.encrypt(plaintext.encode())
decrypted = kb_encryptor.decrypt(encrypted) decrypted = kb_encryptor.decrypt(encrypted).decode()
assert decrypted == plaintext assert decrypted == plaintext
def test_encrypt_decrypt_string_multiple_block_size_with_key(): def test_encrypt_decrypt_string_multiple_block_size_with_key():
encrypted = kb_encryptor.encrypt(PLAINTEXT_MULTIPLE_BLOCK_SIZE) encrypted = kb_encryptor.encrypt(PLAINTEXT_MULTIPLE_BLOCK_SIZE.encode())
decrypted = kb_encryptor.decrypt(encrypted) decrypted = kb_encryptor.decrypt(encrypted).decode()
assert decrypted == PLAINTEXT_MULTIPLE_BLOCK_SIZE assert decrypted == PLAINTEXT_MULTIPLE_BLOCK_SIZE

View File

@ -0,0 +1,129 @@
import random
import string
import pytest
from common.utils.file_utils import get_file_sha256_hash
from monkey_island.cc.server_utils.encryption import (
LockedKeyError,
RepositoryEncryptor,
ResetKeyError,
UnlockError,
)
# Mark all tests in this module as slow
pytestmark = pytest.mark.slow
PLAINTEXT = b"Hello, Monkey!"
SECRET = b"53CR31"
@pytest.fixture
def key_file(tmp_path):
return tmp_path / "test_key.bin"
@pytest.fixture
def encryptor(key_file):
return RepositoryEncryptor(key_file)
def test_encryption(encryptor):
plaintext = "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(128) # noqa: DUO102
).encode()
encryptor.unlock(SECRET)
encrypted_data = encryptor.encrypt(plaintext)
assert encrypted_data != plaintext
decrypted_data = encryptor.decrypt(encrypted_data)
assert decrypted_data == plaintext
def test_key_creation(encryptor, key_file):
assert not key_file.is_file()
encryptor.unlock(SECRET)
assert key_file.is_file()
def test_existing_key_reused(encryptor, key_file):
assert not key_file.is_file()
encryptor.unlock(SECRET)
key_file_hash_1 = get_file_sha256_hash(key_file)
encryptor.unlock(SECRET)
key_file_hash_2 = get_file_sha256_hash(key_file)
assert key_file_hash_1 == key_file_hash_2
def test_unlock_os_error(encryptor, key_file):
key_file.mkdir()
with pytest.raises(UnlockError):
encryptor.unlock(SECRET)
def test_unlock_wrong_password(encryptor):
encryptor.unlock(SECRET)
with pytest.raises(UnlockError):
encryptor.unlock(b"WRONG_PASSWORD")
def test_use_locked_encryptor__encrypt(encryptor):
with pytest.raises(LockedKeyError):
encryptor.encrypt(PLAINTEXT)
def test_use_locked_encryptor__decrypt(encryptor):
with pytest.raises(LockedKeyError):
encryptor.decrypt(PLAINTEXT)
def test_lock(encryptor):
encryptor.unlock(SECRET)
encrypted_data = encryptor.encrypt(PLAINTEXT)
encryptor.lock()
with pytest.raises(LockedKeyError):
encryptor.decrypt(encrypted_data)
def test_reset(encryptor, key_file):
encryptor.unlock(SECRET)
key_file_hash_1 = get_file_sha256_hash(key_file)
encryptor.reset_key()
encryptor.unlock(SECRET)
key_file_hash_2 = get_file_sha256_hash(key_file)
assert key_file_hash_1 != key_file_hash_2
def test_encrypt_after_reset(encryptor, key_file):
encryptor.unlock(SECRET)
encryptor.reset_key()
with pytest.raises(LockedKeyError):
encryptor.encrypt(PLAINTEXT)
def test_reset_before_unlock(encryptor):
# Test will fail if an exception is raised
encryptor.reset_key()
def test_reset_key_error(key_file):
class UnlinkErrorWrapper(key_file.__class__):
def unlink(self):
raise OSError("Can't delete file")
encryptor = RepositoryEncryptor(UnlinkErrorWrapper(key_file))
encryptor.unlock(SECRET)
encryptor.lock()
with pytest.raises(ResetKeyError):
encryptor.reset_key()