Agent: Use mock encryptor in test_ransomware_payload.py

This commit is contained in:
Mike Salvatore 2021-07-14 09:18:59 -04:00
parent d9cc66de54
commit 0be919b805
2 changed files with 32 additions and 142 deletions

View File

@ -1,21 +1,14 @@
SUBDIR = "subdir" SUBDIR = "subdir"
ALL_ZEROS_PDF = "all_zeros.pdf" ALL_ZEROS_PDF = "all_zeros.pdf"
ALREADY_ENCRYPTED_TXT_M0NK3Y = "already_encrypted.txt.m0nk3y"
HELLO_TXT = "hello.txt" HELLO_TXT = "hello.txt"
SHORTCUT_LNK = "shortcut.lnk" SHORTCUT_LNK = "shortcut.lnk"
TEST_KEYBOARD_TXT = "test_keyboard.txt" TEST_KEYBOARD_TXT = "test_keyboard.txt"
TEST_LIB_DLL = "test_lib.dll" TEST_LIB_DLL = "test_lib.dll"
ALL_ZEROS_PDF_CLEARTEXT_SHA256 = "ab3df617aaa3140f04dc53f65b5446f34a6b2bdbb1f7b78db8db4d067ba14db9" ALL_ZEROS_PDF_CLEARTEXT_SHA256 = "ab3df617aaa3140f04dc53f65b5446f34a6b2bdbb1f7b78db8db4d067ba14db9"
ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256 = (
"ff5e58498962ab8bd619d3a9cd24b9298e7efc25b4967b1ce3f03b0e6de2aa7a"
)
HELLO_TXT_CLEARTEXT_SHA256 = "0ba904eae8773b70c75333db4de2f3ac45a8ad4ddba1b242f0b3cfc199391dd8"
SHORTCUT_LNK_CLEARTEXT_SHA256 = "5069c8b7c3c70fad55bf0f0790de787080b1b4397c4749affcd3e570ff53aad9"
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 = ( TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 = (
"9d1a38784b7eefef6384bfc4b89048017db840adace11504a947016072750b2b" "9d1a38784b7eefef6384bfc4b89048017db840adace11504a947016072750b2b"
) )
TEST_LIB_DLL_CLEARTEXT_SHA256 = "0922d3132f2378edf313b8c2b6609a2548879911686994ca45fc5c895a7e91b1"
ALL_ZEROS_PDF_ENCRYPTED_SHA256 = "779c176e820dbdaf643419232cb4d2760360c8633d6fe209cf706707db799b4d" ALL_ZEROS_PDF_ENCRYPTED_SHA256 = "779c176e820dbdaf643419232cb4d2760360c8633d6fe209cf706707db799b4d"
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 = ( TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 = (

View File

@ -1,39 +1,19 @@
import os
from pathlib import PurePosixPath from pathlib import PurePosixPath
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import ( from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
ALL_ZEROS_PDF, ALL_ZEROS_PDF,
ALL_ZEROS_PDF_CLEARTEXT_SHA256,
ALL_ZEROS_PDF_ENCRYPTED_SHA256,
ALREADY_ENCRYPTED_TXT_M0NK3Y,
ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256,
HELLO_TXT,
HELLO_TXT_CLEARTEXT_SHA256,
SHORTCUT_LNK,
SHORTCUT_LNK_CLEARTEXT_SHA256,
SUBDIR,
TEST_KEYBOARD_TXT, TEST_KEYBOARD_TXT,
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
TEST_LIB_DLL,
TEST_LIB_DLL_CLEARTEXT_SHA256,
) )
from tests.utils import hash_file, is_user_admin
from infection_monkey.ransomware.ransomware_payload import ( from infection_monkey.ransomware.ransomware_payload import (
EXTENSION,
README_DEST, README_DEST,
README_SRC, README_SRC,
RansomwarePayload, RansomwarePayload,
) )
def with_extension(filename):
return f"{filename}{EXTENSION}"
@pytest.fixture @pytest.fixture
def ransomware_payload_config(ransomware_target): def ransomware_payload_config(ransomware_target):
return { return {
@ -71,22 +51,16 @@ def build_ransomware_payload(
@pytest.fixture @pytest.fixture
def mock_file_encryptor(ransomware_target): def mock_file_encryptor(ransomware_target):
from infection_monkey.ransomware.in_place_file_encryptor import InPlaceFileEncryptor return MagicMock()
from infection_monkey.utils.bit_manipulators import flip_bits
return InPlaceFileEncryptor(encrypt_bytes=flip_bits, new_file_extension=".m0nk3y")
@pytest.fixture @pytest.fixture
def mock_file_selector(ransomware_target): def mock_file_selector(ransomware_target):
mock_file_selector.return_value = [ selected_files = [
ransomware_target / ALL_ZEROS_PDF, ransomware_target / ALL_ZEROS_PDF,
ransomware_target / TEST_KEYBOARD_TXT, ransomware_target / TEST_KEYBOARD_TXT,
] ]
return MagicMock(return_value=mock_file_selector.return_value) return MagicMock(return_value=selected_files)
mock_file_selector.return_value = None
@pytest.fixture @pytest.fixture
@ -113,105 +87,27 @@ def test_env_variables_in_target_dir_resolved_linux(
mock_file_selector.assert_called_with(ransomware_target) mock_file_selector.assert_called_with(ransomware_target)
def test_file_with_excluded_extension_not_encrypted(ransomware_target, ransomware_payload): def test_all_selected_files_encrypted(ransomware_target, ransomware_payload, mock_file_encryptor):
ransomware_payload.run_payload() ransomware_payload.run_payload()
assert hash_file(ransomware_target / TEST_LIB_DLL) == TEST_LIB_DLL_CLEARTEXT_SHA256 assert mock_file_encryptor.call_count == 2
mock_file_encryptor.assert_any_call(ransomware_target / ALL_ZEROS_PDF)
mock_file_encryptor.assert_any_call(ransomware_target / TEST_KEYBOARD_TXT)
def test_shortcut_not_encrypted(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert hash_file(ransomware_target / SHORTCUT_LNK) == SHORTCUT_LNK_CLEARTEXT_SHA256
@pytest.mark.skipif(
os.name == "nt" and not is_user_admin(), reason="Test requires admin rights on Windows"
)
def test_symlink_not_encrypted(ransomware_target, ransomware_payload):
SYMLINK = "symlink.pdf"
link_path = ransomware_target / SYMLINK
link_path.symlink_to(ransomware_target / TEST_LIB_DLL)
ransomware_payload.run_payload()
assert hash_file(ransomware_target / SYMLINK) == TEST_LIB_DLL_CLEARTEXT_SHA256
def test_encryption_not_recursive(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert hash_file(ransomware_target / SUBDIR / HELLO_TXT) == HELLO_TXT_CLEARTEXT_SHA256
def test_all_files_with_included_extension_encrypted(ransomware_target, ransomware_payload):
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256
assert hash_file(ransomware_target / TEST_KEYBOARD_TXT) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
ransomware_payload.run_payload()
assert (
hash_file(ransomware_target / with_extension(ALL_ZEROS_PDF))
== ALL_ZEROS_PDF_ENCRYPTED_SHA256
)
assert (
hash_file(ransomware_target / with_extension(TEST_KEYBOARD_TXT))
== TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
)
def test_file_encrypted_in_place(ransomware_target, ransomware_payload):
expected_test_keyboard_inode = os.stat(ransomware_target / TEST_KEYBOARD_TXT).st_ino
ransomware_payload.run_payload()
actual_test_keyboard_inode = os.stat(
ransomware_target / with_extension(TEST_KEYBOARD_TXT)
).st_ino
assert expected_test_keyboard_inode == actual_test_keyboard_inode
def test_encryption_reversible(ransomware_target, ransomware_payload):
orig_path = ransomware_target / TEST_KEYBOARD_TXT
new_path = ransomware_target / with_extension(TEST_KEYBOARD_TXT)
assert hash_file(orig_path) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
ransomware_payload.run_payload()
assert hash_file(new_path) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
new_path.rename(orig_path)
ransomware_payload.run_payload()
assert (
hash_file(ransomware_target / with_extension(TEST_KEYBOARD_TXT))
== TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
)
def test_skip_already_encrypted_file(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert not (ransomware_target / with_extension(ALREADY_ENCRYPTED_TXT_M0NK3Y)).exists()
assert (
hash_file(ransomware_target / ALREADY_ENCRYPTED_TXT_M0NK3Y)
== ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256
)
def test_encryption_skipped_if_configured_false( def test_encryption_skipped_if_configured_false(
build_ransomware_payload, ransomware_payload_config, ransomware_target build_ransomware_payload, ransomware_payload_config, ransomware_target, mock_file_encryptor
): ):
ransomware_payload_config["encryption"]["enabled"] = False ransomware_payload_config["encryption"]["enabled"] = False
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload() ransomware_payload.run_payload()
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256 assert mock_file_encryptor.call_count == 0
assert hash_file(ransomware_target / TEST_KEYBOARD_TXT) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
def test_encryption_skipped_if_no_directory( def test_encryption_skipped_if_no_directory(
build_ransomware_payload, ransomware_payload_config, telemetry_messenger_spy build_ransomware_payload, ransomware_payload_config, mock_file_encryptor
): ):
ransomware_payload_config["encryption"]["enabled"] = True ransomware_payload_config["encryption"]["enabled"] = True
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = "" ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = ""
@ -219,7 +115,8 @@ def test_encryption_skipped_if_no_directory(
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload() ransomware_payload.run_payload()
assert len(telemetry_messenger_spy.telemetries) == 0
assert mock_file_encryptor.call_count == 0
def test_telemetry_success(ransomware_payload, telemetry_messenger_spy): def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
@ -238,16 +135,27 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
def test_telemetry_failure( def test_telemetry_failure(
monkeypatch, mock_file_selector, ransomware_payload, telemetry_messenger_spy monkeypatch, ransomware_payload_config, mock_leave_readme, telemetry_messenger_spy
): ):
mock_file_selector.return_value = [PurePosixPath("/file/not/exist")] file_not_exists = "/file/not/exist"
ransomware_payload = RansomwarePayload(
ransomware_payload_config,
MagicMock(
side_effect=FileNotFoundError(
f"[Errno 2] No such file or directory: '{file_not_exists}'"
)
),
MagicMock(return_value=[PurePosixPath(file_not_exists)]),
mock_leave_readme,
telemetry_messenger_spy,
)
ransomware_payload.run_payload() ransomware_payload.run_payload()
telem_1 = telemetry_messenger_spy.telemetries[0] telem = telemetry_messenger_spy.telemetries[0]
assert "/file/not/exist" in telem_1.get_data()["files"][0]["path"] assert file_not_exists in telem.get_data()["files"][0]["path"]
assert not telem_1.get_data()["files"][0]["success"] assert not telem.get_data()["files"][0]["success"]
assert "No such file or directory" in telem_1.get_data()["files"][0]["error"] assert "No such file or directory" in telem.get_data()["files"][0]["error"]
def test_readme_false( def test_readme_false(
@ -271,24 +179,13 @@ def test_readme_true(
def test_no_readme_if_no_directory( def test_no_readme_if_no_directory(
monkeypatch, build_ransomware_payload, ransomware_payload_config, mock_leave_readme
ransomware_payload_config,
mock_file_encryptor,
mock_file_selector,
mock_leave_readme,
telemetry_messenger_spy,
ransomware_target,
): ):
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = "" ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = ""
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = "" ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = ""
ransomware_payload_config["other_behaviors"]["readme"] = True ransomware_payload_config["other_behaviors"]["readme"] = True
RansomwarePayload( ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload_config,
mock_file_encryptor,
mock_file_selector,
mock_leave_readme,
telemetry_messenger_spy,
).run_payload()
ransomware_payload.run_payload()
mock_leave_readme.assert_not_called() mock_leave_readme.assert_not_called()