Agent: Pass a RansomwareConfig to RansomwarePayload

Rather than RansomwarePayload being responsible fro translating the
config dictionary into something usable, it now just accepts a
RansomwareConfig object which contains pre-processed configuration
options.
This commit is contained in:
Mike Salvatore 2021-07-15 11:26:02 -04:00
parent 6f5a7faaa1
commit 9044c587a6
3 changed files with 62 additions and 84 deletions

View File

@ -1,12 +1,10 @@
import logging import logging
from pathlib import Path from pathlib import Path
from pprint import pformat
from typing import Callable, List from typing import Callable, List
from common.utils.file_utils import InvalidPath, expand_path from infection_monkey.ransomware.ransomware_config import RansomwareConfig
from infection_monkey.telemetry.file_encryption_telem import FileEncryptionTelem from infection_monkey.telemetry.file_encryption_telem import FileEncryptionTelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
from infection_monkey.utils.environment import is_windows_os
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -17,57 +15,38 @@ README_DEST = "README.txt"
class RansomwarePayload: class RansomwarePayload:
def __init__( def __init__(
self, self,
config: dict, config: RansomwareConfig,
encrypt_file: Callable[[Path], None], encrypt_file: Callable[[Path], None],
select_files: Callable[[Path], List[Path]], select_files: Callable[[Path], List[Path]],
leave_readme: Callable[[Path, Path], None], leave_readme: Callable[[Path, Path], None],
telemetry_messenger: ITelemetryMessenger, telemetry_messenger: ITelemetryMessenger,
): ):
LOG.debug(f"Ransomware payload configuration:\n{pformat(config)}") self._config = config
self._encryption_enabled = config["encryption"]["enabled"]
self._readme_enabled = config["other_behaviors"]["readme"]
self._target_dir = RansomwarePayload.get_target_dir(config)
self._encrypt_file = encrypt_file self._encrypt_file = encrypt_file
self._select_files = select_files self._select_files = select_files
self._leave_readme = leave_readme self._leave_readme = leave_readme
self._telemetry_messenger = telemetry_messenger self._telemetry_messenger = telemetry_messenger
@staticmethod
def get_target_dir(config: dict):
target_directories = config["encryption"]["directories"]
if is_windows_os():
target_dir_field = target_directories["windows_target_dir"]
else:
target_dir_field = target_directories["linux_target_dir"]
try:
return expand_path(target_dir_field)
except InvalidPath as e:
LOG.debug(f"Target ransomware dir set to None: {e}")
return None
def run_payload(self): def run_payload(self):
if not self._target_dir: if not self._config.target_directory:
return return
LOG.info("Running ransomware payload") LOG.info("Running ransomware payload")
if self._encryption_enabled: if self._config.encryption_enabled:
file_list = self._find_files() file_list = self._find_files()
self._encrypt_files(file_list) self._encrypt_files(file_list)
if self._readme_enabled: if self._config.readme_enabled:
self._leave_readme(README_SRC, self._target_dir / README_DEST) self._leave_readme(README_SRC, self._config.target_directory / README_DEST)
def _find_files(self) -> List[Path]: def _find_files(self) -> List[Path]:
LOG.info(f"Collecting files in {self._target_dir}") LOG.info(f"Collecting files in {self._config.target_directory}")
return sorted(self._select_files(self._target_dir)) return sorted(self._select_files(self._config.target_directory))
def _encrypt_files(self, file_list: List[Path]): def _encrypt_files(self, file_list: List[Path]):
LOG.info(f"Encrypting files in {self._target_dir}") LOG.info(f"Encrypting files in {self._config.target_directory}")
for filepath in file_list: for filepath in file_list:
try: try:

View File

@ -1,6 +1,10 @@
from infection_monkey.ransomware import readme_utils import logging
from pprint import pformat
from infection_monkey.ransomware import ransomware_payload, readme_utils
from infection_monkey.ransomware.file_selectors import ProductionSafeTargetFileSelector from infection_monkey.ransomware.file_selectors import ProductionSafeTargetFileSelector
from infection_monkey.ransomware.in_place_file_encryptor import InPlaceFileEncryptor from infection_monkey.ransomware.in_place_file_encryptor import InPlaceFileEncryptor
from infection_monkey.ransomware.ransomware_config import RansomwareConfig
from infection_monkey.ransomware.ransomware_payload import RansomwarePayload from infection_monkey.ransomware.ransomware_payload import RansomwarePayload
from infection_monkey.ransomware.targeted_file_extensions import TARGETED_FILE_EXTENSIONS from infection_monkey.ransomware.targeted_file_extensions import TARGETED_FILE_EXTENSIONS
from infection_monkey.telemetry.messengers.batching_telemetry_messenger import ( from infection_monkey.telemetry.messengers.batching_telemetry_messenger import (
@ -14,14 +18,23 @@ from infection_monkey.utils.bit_manipulators import flip_bits
EXTENSION = ".m0nk3y" EXTENSION = ".m0nk3y"
CHUNK_SIZE = 4096 * 24 CHUNK_SIZE = 4096 * 24
LOG = logging.getLogger(__name__)
def build_ransomware_payload(config: dict): def build_ransomware_payload(config: dict):
LOG.debug(f"Ransomware payload configuration:\n{pformat(config)}")
ransomware_config = RansomwareConfig(config)
file_encryptor = _build_file_encryptor() file_encryptor = _build_file_encryptor()
file_selector = _build_file_selector() file_selector = _build_file_selector()
telemetry_messenger = _build_telemetry_messenger() telemetry_messenger = _build_telemetry_messenger()
return RansomwarePayload( return RansomwarePayload(
config, file_encryptor, file_selector, readme_utils.leave_readme, telemetry_messenger ransomware_config,
file_encryptor,
file_selector,
readme_utils.leave_readme,
telemetry_messenger,
) )
@ -33,7 +46,7 @@ def _build_file_encryptor():
def _build_file_selector(): def _build_file_selector():
targeted_file_extensions = TARGETED_FILE_EXTENSIONS.copy() targeted_file_extensions = TARGETED_FILE_EXTENSIONS.copy()
targeted_file_extensions.discard(EXTENSION) targeted_file_extensions.discard(ransomware_payload.EXTENSION)
return ProductionSafeTargetFileSelector(targeted_file_extensions) return ProductionSafeTargetFileSelector(targeted_file_extensions)

View File

@ -7,6 +7,7 @@ from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import
TEST_KEYBOARD_TXT, TEST_KEYBOARD_TXT,
) )
from infection_monkey.ransomware.ransomware_config import RansomwareConfig
from infection_monkey.ransomware.ransomware_payload import ( from infection_monkey.ransomware.ransomware_payload import (
README_DEST, README_DEST,
README_SRC, README_SRC,
@ -14,20 +15,6 @@ from infection_monkey.ransomware.ransomware_payload import (
) )
@pytest.fixture
def ransomware_payload_config(ransomware_target):
return {
"encryption": {
"enabled": True,
"directories": {
"linux_target_dir": str(ransomware_target),
"windows_target_dir": str(ransomware_target),
},
},
"other_behaviors": {"readme": False},
}
@pytest.fixture @pytest.fixture
def ransomware_payload(build_ransomware_payload, ransomware_payload_config): def ransomware_payload(build_ransomware_payload, ransomware_payload_config):
return build_ransomware_payload(ransomware_payload_config) return build_ransomware_payload(ransomware_payload_config)
@ -50,15 +37,26 @@ def build_ransomware_payload(
@pytest.fixture @pytest.fixture
def mock_file_encryptor(ransomware_target): def ransomware_payload_config(ransomware_test_data):
class RansomwareConfigStub(RansomwareConfig):
def __init__(self, encryption_enabled, readme_enabled, target_directory):
self.encryption_enabled = encryption_enabled
self.readme_enabled = readme_enabled
self.target_directory = target_directory
return RansomwareConfigStub(True, False, ransomware_test_data)
@pytest.fixture
def mock_file_encryptor():
return MagicMock() return MagicMock()
@pytest.fixture @pytest.fixture
def mock_file_selector(ransomware_target): def mock_file_selector(ransomware_test_data):
selected_files = [ selected_files = [
ransomware_target / ALL_ZEROS_PDF, ransomware_test_data / ALL_ZEROS_PDF,
ransomware_target / TEST_KEYBOARD_TXT, ransomware_test_data / TEST_KEYBOARD_TXT,
] ]
return MagicMock(return_value=selected_files) return MagicMock(return_value=selected_files)
@ -68,37 +66,29 @@ def mock_leave_readme():
return MagicMock() return MagicMock()
def test_env_variables_in_target_dir_resolved_linux( def test_files_selected_from_target_dir(
ransomware_payload,
ransomware_payload_config, ransomware_payload_config,
build_ransomware_payload,
ransomware_target,
patched_home_env,
mock_file_selector, mock_file_selector,
): ):
path_with_env_variable = "$HOME/ransomware_target" ransomware_payload.run_payload()
mock_file_selector.assert_called_with(ransomware_payload_config.target_directory)
ransomware_payload_config["encryption"]["directories"][
"linux_target_dir"
] = ransomware_payload_config["encryption"]["directories"][
"windows_target_dir"
] = path_with_env_variable
build_ransomware_payload(ransomware_payload_config).run_payload()
mock_file_selector.assert_called_with(ransomware_target)
def test_all_selected_files_encrypted(ransomware_target, ransomware_payload, mock_file_encryptor): def test_all_selected_files_encrypted(
ransomware_test_data, ransomware_payload, mock_file_encryptor
):
ransomware_payload.run_payload() ransomware_payload.run_payload()
assert mock_file_encryptor.call_count == 2 assert mock_file_encryptor.call_count == 2
mock_file_encryptor.assert_any_call(ransomware_target / ALL_ZEROS_PDF) mock_file_encryptor.assert_any_call(ransomware_test_data / ALL_ZEROS_PDF)
mock_file_encryptor.assert_any_call(ransomware_target / TEST_KEYBOARD_TXT) mock_file_encryptor.assert_any_call(ransomware_test_data / TEST_KEYBOARD_TXT)
def test_encryption_skipped_if_configured_false( def test_encryption_skipped_if_configured_false(
build_ransomware_payload, ransomware_payload_config, ransomware_target, mock_file_encryptor build_ransomware_payload, ransomware_payload_config, mock_file_encryptor
): ):
ransomware_payload_config["encryption"]["enabled"] = False ransomware_payload_config.encryption_enabled = False
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload() ransomware_payload.run_payload()
@ -109,9 +99,8 @@ def test_encryption_skipped_if_configured_false(
def test_encryption_skipped_if_no_directory( def test_encryption_skipped_if_no_directory(
build_ransomware_payload, ransomware_payload_config, mock_file_encryptor build_ransomware_payload, ransomware_payload_config, mock_file_encryptor
): ):
ransomware_payload_config["encryption"]["enabled"] = True ransomware_payload_config.encryption_enabled = True
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = "" ransomware_payload_config.target_directory = None
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = ""
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload() ransomware_payload.run_payload()
@ -158,10 +147,8 @@ def test_telemetry_failure(
assert "No such file or directory" in telem.get_data()["files"][0]["error"] assert "No such file or directory" in telem.get_data()["files"][0]["error"]
def test_readme_false( def test_readme_false(build_ransomware_payload, ransomware_payload_config, mock_leave_readme):
build_ransomware_payload, ransomware_payload_config, mock_leave_readme, ransomware_target ransomware_payload_config.readme_enabled = False
):
ransomware_payload_config["other_behaviors"]["readme"] = False
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload() ransomware_payload.run_payload()
@ -169,21 +156,20 @@ def test_readme_false(
def test_readme_true( def test_readme_true(
build_ransomware_payload, ransomware_payload_config, mock_leave_readme, ransomware_target build_ransomware_payload, ransomware_payload_config, mock_leave_readme, ransomware_test_data
): ):
ransomware_payload_config["other_behaviors"]["readme"] = True ransomware_payload_config.readme_enabled = True
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload() ransomware_payload.run_payload()
mock_leave_readme.assert_called_with(README_SRC, ransomware_target / README_DEST) mock_leave_readme.assert_called_with(README_SRC, ransomware_test_data / README_DEST)
def test_no_readme_if_no_directory( def test_no_readme_if_no_directory(
build_ransomware_payload, ransomware_payload_config, mock_leave_readme build_ransomware_payload, ransomware_payload_config, mock_leave_readme
): ):
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = "" ransomware_payload_config.target_directory = None
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = "" ransomware_payload_config.readme_enabled = True
ransomware_payload_config["other_behaviors"]["readme"] = True
ransomware_payload = build_ransomware_payload(ransomware_payload_config) ransomware_payload = build_ransomware_payload(ransomware_payload_config)