forked from p15670423/monkey
Fix unit tests for data store encryptor
This commit is contained in:
parent
ea6fe37b44
commit
a2b09a9e7a
|
@ -15,14 +15,14 @@ _BLOCK_SIZE = 32
|
||||||
_encryptor: Union[None, IEncryptor] = None
|
_encryptor: Union[None, IEncryptor] = None
|
||||||
|
|
||||||
|
|
||||||
def _load_existing_key(key_file_path: str, secret: str):
|
def _load_existing_key(key_file_path: str, secret: str) -> KeyBasedEncryptor:
|
||||||
with open(key_file_path, "rb") as f:
|
with open(key_file_path, "rb") as f:
|
||||||
encrypted_key = f.read()
|
encrypted_key = f.read()
|
||||||
cipher_key = PasswordBasedBytesEncryptor(secret).decrypt(encrypted_key)
|
cipher_key = PasswordBasedBytesEncryptor(secret).decrypt(encrypted_key)
|
||||||
return KeyBasedEncryptor(cipher_key)
|
return KeyBasedEncryptor(cipher_key)
|
||||||
|
|
||||||
|
|
||||||
def _create_new_key(key_file_path: str, secret: str):
|
def _create_new_key(key_file_path: str, secret: str) -> KeyBasedEncryptor:
|
||||||
cipher_key = _get_random_bytes()
|
cipher_key = _get_random_bytes()
|
||||||
encrypted_key = PasswordBasedBytesEncryptor(secret).encrypt(cipher_key)
|
encrypted_key = PasswordBasedBytesEncryptor(secret).encrypt(cipher_key)
|
||||||
with open_new_securely_permissioned_file(key_file_path, "wb") as f:
|
with open_new_securely_permissioned_file(key_file_path, "wb") as f:
|
||||||
|
@ -50,9 +50,9 @@ def initialize_datastore_encryptor(key_file_dir: str, secret: str):
|
||||||
_encryptor = _create_new_key(key_file_path, secret)
|
_encryptor = _create_new_key(key_file_path, secret)
|
||||||
|
|
||||||
|
|
||||||
def _get_key_file_path(key_file_dir: str):
|
def _get_key_file_path(key_file_dir: str) -> str:
|
||||||
return os.path.join(key_file_dir, _KEY_FILENAME)
|
return os.path.join(key_file_dir, _KEY_FILENAME)
|
||||||
|
|
||||||
|
|
||||||
def get_datastore_encryptor():
|
def get_datastore_encryptor() -> IEncryptor:
|
||||||
return _encryptor
|
return _encryptor
|
||||||
|
|
|
@ -10,10 +10,8 @@ from tests.unit_tests.monkey_island.cc.server_utils.encryption.test_password_bas
|
||||||
STANDARD_PLAINTEXT_MONKEY_CONFIG_FILENAME,
|
STANDARD_PLAINTEXT_MONKEY_CONFIG_FILENAME,
|
||||||
)
|
)
|
||||||
|
|
||||||
from monkey_island.cc.server_utils.encryption import (
|
from monkey_island.cc.server_utils.encryption import initialize_datastore_encryptor
|
||||||
initialize_datastore_encryptor,
|
from monkey_island.cc.services.authentication import AuthenticationService
|
||||||
initialize_encryptor_factory,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -36,5 +34,5 @@ MOCK_PASSWORD = "3cr3t_p455w0rd"
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def uses_encryptor(data_for_tests_dir):
|
def uses_encryptor(data_for_tests_dir):
|
||||||
initialize_encryptor_factory(data_for_tests_dir)
|
secret = AuthenticationService._get_secret_from_credentials(MOCK_USERNAME, MOCK_PASSWORD)
|
||||||
initialize_datastore_encryptor(MOCK_USERNAME, MOCK_PASSWORD)
|
initialize_datastore_encryptor(data_for_tests_dir, secret)
|
||||||
|
|
|
@ -1,68 +1,61 @@
|
||||||
import pytest
|
import pytest
|
||||||
from tests.unit_tests.monkey_island.cc.conftest import MOCK_PASSWORD, MOCK_USERNAME
|
|
||||||
|
|
||||||
from monkey_island.cc.server_utils.encryption import (
|
from monkey_island.cc.server_utils.encryption import (
|
||||||
FactoryNotInitializedError,
|
|
||||||
data_store_encryptor,
|
data_store_encryptor,
|
||||||
encryptor_factory,
|
|
||||||
get_datastore_encryptor,
|
get_datastore_encryptor,
|
||||||
initialize_datastore_encryptor,
|
initialize_datastore_encryptor,
|
||||||
initialize_encryptor_factory,
|
|
||||||
remove_old_datastore_key,
|
remove_old_datastore_key,
|
||||||
)
|
)
|
||||||
from monkey_island.cc.server_utils.encryption.data_store_encryptor import DataStoreEncryptor
|
|
||||||
from monkey_island.cc.server_utils.encryption.encryptor_factory import EncryptorFactory
|
|
||||||
|
|
||||||
PLAINTEXT = "Hello, Monkey!"
|
PLAINTEXT = "Hello, Monkey!"
|
||||||
|
MOCK_SECRET = "53CR31"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("uses_encryptor")
|
@pytest.mark.usefixtures("uses_encryptor")
|
||||||
def test_encryption(data_for_tests_dir):
|
def test_encryption(data_for_tests_dir):
|
||||||
encrypted_data = get_datastore_encryptor().enc(PLAINTEXT)
|
encrypted_data = get_datastore_encryptor().encrypt(PLAINTEXT)
|
||||||
assert encrypted_data != PLAINTEXT
|
assert encrypted_data != PLAINTEXT
|
||||||
|
|
||||||
decrypted_data = get_datastore_encryptor().dec(encrypted_data)
|
decrypted_data = get_datastore_encryptor().decrypt(encrypted_data)
|
||||||
assert decrypted_data == PLAINTEXT
|
assert decrypted_data == PLAINTEXT
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def initialized_key_dir(tmpdir):
|
def cleanup_encryptor():
|
||||||
initialize_encryptor_factory(tmpdir)
|
yield
|
||||||
initialize_datastore_encryptor(MOCK_USERNAME, MOCK_PASSWORD)
|
|
||||||
yield tmpdir
|
|
||||||
data_store_encryptor._encryptor = None
|
data_store_encryptor._encryptor = None
|
||||||
encryptor_factory._factory = None
|
|
||||||
|
|
||||||
|
|
||||||
def test_key_creation(initialized_key_dir):
|
@pytest.mark.usefixtures("cleanup_encryptor")
|
||||||
assert (initialized_key_dir / EncryptorFactory._KEY_FILENAME).isfile()
|
@pytest.fixture
|
||||||
|
def initialized_encryptor_dir(tmpdir):
|
||||||
|
initialize_datastore_encryptor(tmpdir, MOCK_SECRET)
|
||||||
|
return tmpdir
|
||||||
|
|
||||||
|
|
||||||
def test_key_removal(initialized_key_dir):
|
def test_key_creation(initialized_encryptor_dir):
|
||||||
remove_old_datastore_key()
|
assert (initialized_encryptor_dir / data_store_encryptor._KEY_FILENAME).isfile()
|
||||||
assert not (initialized_key_dir / EncryptorFactory._KEY_FILENAME).isfile()
|
|
||||||
|
|
||||||
|
def test_key_removal(initialized_encryptor_dir):
|
||||||
|
remove_old_datastore_key(initialized_encryptor_dir)
|
||||||
|
assert not (initialized_encryptor_dir / data_store_encryptor._KEY_FILENAME).isfile()
|
||||||
|
|
||||||
|
|
||||||
def test_key_removal__no_key(tmpdir):
|
def test_key_removal__no_key(tmpdir):
|
||||||
initialize_encryptor_factory(tmpdir)
|
assert not (tmpdir / data_store_encryptor._KEY_FILENAME).isfile()
|
||||||
assert not (tmpdir / EncryptorFactory._KEY_FILENAME).isfile()
|
|
||||||
# Make sure no error thrown when we try to remove an non-existing key
|
# Make sure no error thrown when we try to remove an non-existing key
|
||||||
remove_old_datastore_key()
|
remove_old_datastore_key(tmpdir)
|
||||||
encryptor_factory._factory = None
|
data_store_encryptor._factory = None
|
||||||
|
|
||||||
|
|
||||||
def test_encryptor_not_initialized():
|
|
||||||
with pytest.raises(FactoryNotInitializedError):
|
|
||||||
remove_old_datastore_key()
|
|
||||||
initialize_datastore_encryptor(MOCK_USERNAME, MOCK_PASSWORD)
|
|
||||||
|
|
||||||
|
|
||||||
def test_initialize_encryptor(tmpdir):
|
|
||||||
initialize_encryptor_factory(tmpdir)
|
|
||||||
assert not (tmpdir / EncryptorFactory._KEY_FILENAME).isfile()
|
|
||||||
initialize_datastore_encryptor(MOCK_USERNAME, MOCK_PASSWORD)
|
|
||||||
assert (tmpdir / EncryptorFactory._KEY_FILENAME).isfile()
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("cleanup_encryptor")
|
||||||
def test_key_file_encryption(tmpdir, monkeypatch):
|
def test_key_file_encryption(tmpdir, monkeypatch):
|
||||||
monkeypatch(DataStoreEncryptor._)
|
monkeypatch.setattr(data_store_encryptor, "_get_random_bytes", lambda: PLAINTEXT.encode())
|
||||||
|
initialize_datastore_encryptor(tmpdir, MOCK_SECRET)
|
||||||
|
key_file_path = data_store_encryptor._get_key_file_path(tmpdir)
|
||||||
|
key_file_contents = open(key_file_path, "rb").read()
|
||||||
|
assert not key_file_contents == PLAINTEXT.encode()
|
||||||
|
|
||||||
|
key_based_encryptor = data_store_encryptor._load_existing_key(key_file_path, MOCK_SECRET)
|
||||||
|
assert key_based_encryptor._key == PLAINTEXT.encode()
|
||||||
|
|
|
@ -156,7 +156,7 @@ def test_get_stolen_creds_exploit(fake_mongo):
|
||||||
assert expected_stolen_creds_exploit == stolen_creds_exploit
|
assert expected_stolen_creds_exploit == stolen_creds_exploit
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("uses_database")
|
@pytest.mark.usefixtures("uses_database", "uses_encryptor")
|
||||||
def test_get_stolen_creds_system_info(fake_mongo):
|
def test_get_stolen_creds_system_info(fake_mongo):
|
||||||
fake_mongo.db.monkey.insert_one(MONKEY_TELEM)
|
fake_mongo.db.monkey.insert_one(MONKEY_TELEM)
|
||||||
save_telemetry(SYSTEM_INFO_TELEMETRY_TELEM)
|
save_telemetry(SYSTEM_INFO_TELEMETRY_TELEM)
|
||||||
|
|
|
@ -18,12 +18,14 @@ def mock_port_in_env_singleton(monkeypatch, PORT):
|
||||||
monkeypatch.setattr("monkey_island.cc.services.config.env_singleton", mock_singleton)
|
monkeypatch.setattr("monkey_island.cc.services.config.env_singleton", mock_singleton)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("uses_encryptor")
|
||||||
def test_set_server_ips_in_config_command_servers(config, IPS, PORT):
|
def test_set_server_ips_in_config_command_servers(config, IPS, PORT):
|
||||||
ConfigService.set_server_ips_in_config(config)
|
ConfigService.set_server_ips_in_config(config)
|
||||||
expected_config_command_servers = [f"{ip}:{PORT}" for ip in IPS]
|
expected_config_command_servers = [f"{ip}:{PORT}" for ip in IPS]
|
||||||
assert config["internal"]["island_server"]["command_servers"] == expected_config_command_servers
|
assert config["internal"]["island_server"]["command_servers"] == expected_config_command_servers
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("uses_encryptor")
|
||||||
def test_set_server_ips_in_config_current_server(config, IPS, PORT):
|
def test_set_server_ips_in_config_current_server(config, IPS, PORT):
|
||||||
ConfigService.set_server_ips_in_config(config)
|
ConfigService.set_server_ips_in_config(config)
|
||||||
expected_config_current_server = f"{IPS[0]}:{PORT}"
|
expected_config_current_server = f"{IPS[0]}:{PORT}"
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
from monkey_island.cc.services.config_manipulator import update_config_on_mode_set
|
from monkey_island.cc.services.config_manipulator import update_config_on_mode_set
|
||||||
from monkey_island.cc.services.mode.mode_enum import IslandModeEnum
|
from monkey_island.cc.services.mode.mode_enum import IslandModeEnum
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("uses_encryptor")
|
||||||
def test_update_config_on_mode_set_advanced(config, monkeypatch):
|
def test_update_config_on_mode_set_advanced(config, monkeypatch):
|
||||||
monkeypatch.setattr("monkey_island.cc.services.config.ConfigService.get_config", lambda: config)
|
monkeypatch.setattr("monkey_island.cc.services.config.ConfigService.get_config", lambda: config)
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
|
@ -14,6 +17,7 @@ def test_update_config_on_mode_set_advanced(config, monkeypatch):
|
||||||
assert manipulated_config == config
|
assert manipulated_config == config
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("uses_encryptor")
|
||||||
def test_update_config_on_mode_set_ransomware(config, monkeypatch):
|
def test_update_config_on_mode_set_ransomware(config, monkeypatch):
|
||||||
monkeypatch.setattr("monkey_island.cc.services.config.ConfigService.get_config", lambda: config)
|
monkeypatch.setattr("monkey_island.cc.services.config.ConfigService.get_config", lambda: config)
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
|
|
Loading…
Reference in New Issue