Island: Add RepositoryEncryptor
This commit is contained in:
parent
bd2d79fd43
commit
c7257cf000
|
@ -7,7 +7,8 @@ from .password_based_bytes_encryptor import (
|
|||
InvalidCredentialsError,
|
||||
InvalidCiphertextError,
|
||||
)
|
||||
from .i_lockable_encryptor import ILockableEncryptor
|
||||
from .i_lockable_encryptor import ILockableEncryptor, LockedKeyError
|
||||
from .repository_encryptor import RepositoryEncryptor
|
||||
from .data_store_encryptor import (
|
||||
get_datastore_encryptor,
|
||||
unlock_datastore_encryptor,
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
import os
|
||||
import secrets
|
||||
from pathlib import Path
|
||||
|
||||
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
|
||||
|
||||
from . import ILockableEncryptor, LockedKeyError
|
||||
from .key_based_encryptor import KeyBasedEncryptor
|
||||
from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
|
||||
|
||||
|
||||
class RepositoryEncryptor(ILockableEncryptor):
|
||||
_KEY_LENGTH_BYTES = 32
|
||||
|
||||
def __init__(self, key_file: Path):
|
||||
self._key_file = key_file
|
||||
self._password_based_encryptor = None
|
||||
self._key_based_encryptor = None
|
||||
|
||||
def unlock(self, secret: bytes):
|
||||
self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode())
|
||||
self._key_based_encryptor = self._initialize_key_based_encryptor()
|
||||
|
||||
def _initialize_key_based_encryptor(self):
|
||||
if os.path.exists(self._key_file):
|
||||
return self._load_key()
|
||||
|
||||
return self._create_key()
|
||||
|
||||
def _load_key(self) -> KeyBasedEncryptor:
|
||||
with open(self._key_file, "rb") as f:
|
||||
encrypted_key = f.read()
|
||||
|
||||
plaintext_key = self._password_based_encryptor.decrypt(encrypted_key)
|
||||
return KeyBasedEncryptor(plaintext_key)
|
||||
|
||||
def _create_key(self) -> KeyBasedEncryptor:
|
||||
plaintext_key = secrets.token_bytes(RepositoryEncryptor._KEY_LENGTH_BYTES)
|
||||
|
||||
encrypted_key = self._password_based_encryptor.encrypt(plaintext_key)
|
||||
with open_new_securely_permissioned_file(str(self._key_file), "wb") as f:
|
||||
f.write(encrypted_key)
|
||||
|
||||
return KeyBasedEncryptor(plaintext_key)
|
||||
|
||||
def lock(self):
|
||||
self._key_based_encryptor = None
|
||||
|
||||
def encrypt(self, plaintext: bytes) -> bytes:
|
||||
if self._key_based_encryptor is None:
|
||||
raise LockedKeyError("Cannot encrypt while the encryptor is locked)")
|
||||
|
||||
return self._key_based_encryptor.encrypt(plaintext)
|
||||
|
||||
def decrypt(self, ciphertext: bytes) -> bytes:
|
||||
if self._key_based_encryptor is None:
|
||||
raise LockedKeyError("Cannot decrypt while the encryptor is locked)")
|
||||
|
||||
return self._key_based_encryptor.decrypt(ciphertext)
|
|
@ -0,0 +1,70 @@
|
|||
import random
|
||||
import string
|
||||
|
||||
import pytest
|
||||
|
||||
from common.utils.file_utils import get_file_sha256_hash
|
||||
from monkey_island.cc.server_utils.encryption import LockedKeyError, RepositoryEncryptor
|
||||
|
||||
PLAINTEXT = b"Hello, Monkey!"
|
||||
SECRET = b"53CR31"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def key_file(tmp_path):
|
||||
return tmp_path / "test_key.bin"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def encryptor(key_file):
|
||||
return RepositoryEncryptor(key_file)
|
||||
|
||||
|
||||
def test_encryption(encryptor):
|
||||
plaintext = "".join(
|
||||
random.choice(string.ascii_uppercase + string.digits) for _ in range(128) # noqa: DUO102
|
||||
).encode()
|
||||
encryptor.unlock(SECRET)
|
||||
|
||||
encrypted_data = encryptor.encrypt(plaintext)
|
||||
assert encrypted_data != plaintext
|
||||
|
||||
decrypted_data = encryptor.decrypt(encrypted_data)
|
||||
assert decrypted_data == plaintext
|
||||
|
||||
|
||||
def test_key_creation(encryptor, key_file):
|
||||
assert not key_file.is_file()
|
||||
encryptor.unlock(SECRET)
|
||||
assert key_file.is_file()
|
||||
|
||||
|
||||
def test_existing_key_reused(encryptor, key_file):
|
||||
assert not key_file.is_file()
|
||||
|
||||
encryptor.unlock(SECRET)
|
||||
key_file_hash_1 = get_file_sha256_hash(key_file)
|
||||
|
||||
encryptor.unlock(SECRET)
|
||||
key_file_hash_2 = get_file_sha256_hash(key_file)
|
||||
|
||||
assert key_file_hash_1 == key_file_hash_2
|
||||
|
||||
|
||||
def test_use_locked_encryptor__encrypt(encryptor):
|
||||
with pytest.raises(LockedKeyError):
|
||||
encryptor.encrypt(PLAINTEXT)
|
||||
|
||||
|
||||
def test_use_locked_encryptor__decrypt(encryptor):
|
||||
with pytest.raises(LockedKeyError):
|
||||
encryptor.decrypt(PLAINTEXT)
|
||||
|
||||
|
||||
def test_lock(encryptor):
|
||||
encryptor.unlock(SECRET)
|
||||
encrypted_data = encryptor.encrypt(PLAINTEXT)
|
||||
encryptor.lock()
|
||||
|
||||
with pytest.raises(LockedKeyError):
|
||||
encryptor.decrypt(encrypted_data)
|
Loading…
Reference in New Issue