Agent: Inject copy_file callable into RansomwarePayload

In order to test certain conditions, our options are to either
monkeypatch shutil.copyfile(), or inject a callable into the
RansomwarePayload.  Monkeypatching shutil.copyfile() could lead to
issues down the road. For example, if the implementation of
`_leave_readme()` is changed to no longer use copyfile(), a test that
asserts that copyfile() has not been called will pass, even though a
file may have been copied.
This commit is contained in:
Mike Salvatore 2021-07-08 11:23:15 -04:00
parent e1b08079f1
commit f0e9109f64
3 changed files with 32 additions and 17 deletions

View File

@ -1,6 +1,7 @@
import argparse
import logging
import os
import shutil
import subprocess
import sys
import time
@ -477,7 +478,7 @@ class InfectionMonkey(object):
try:
RansomwarePayload(
WormConfiguration.ransomware, batching_telemetry_messenger
WormConfiguration.ransomware, batching_telemetry_messenger, shutil.copyfile
).run_payload()
except Exception as ex:
LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}")

View File

@ -1,8 +1,7 @@
import logging
import shutil
from pathlib import Path
from pprint import pformat
from typing import List, Optional, Tuple
from typing import Callable, List, Optional, Tuple
from common.utils.file_utils import InvalidPath, expand_path
from infection_monkey.ransomware.bitflip_encryptor import BitflipEncryptor
@ -22,7 +21,12 @@ README_DEST = "README.txt"
class RansomwarePayload:
def __init__(self, config: dict, telemetry_messenger: ITelemetryMessenger):
def __init__(
self,
config: dict,
telemetry_messenger: ITelemetryMessenger,
copy_file: Callable[[str, str], None],
):
LOG.debug(f"Ransomware payload configuration:\n{pformat(config)}")
self._encryption_enabled = config["encryption"]["enabled"]
@ -34,6 +38,7 @@ class RansomwarePayload:
self._valid_file_extensions_for_encryption.discard(self._new_file_extension)
self._encryptor = BitflipEncryptor(chunk_size=CHUNK_SIZE)
self._copy_file = copy_file
self._telemetry_messenger = telemetry_messenger
@staticmethod
@ -97,6 +102,6 @@ class RansomwarePayload:
LOG.info(f"Leaving a ransomware README file at {readme_dest_path}")
try:
shutil.copyfile(README_SRC, readme_dest_path)
self._copy_file(README_SRC, readme_dest_path)
except Exception as ex:
LOG.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")

View File

@ -1,4 +1,5 @@
import os
import shutil
from pathlib import Path, PurePosixPath
import pytest
@ -44,12 +45,20 @@ def ransomware_payload_config(ransomware_target):
@pytest.fixture
def ransomware_payload(ransomware_payload_config, telemetry_messenger_spy):
return RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
def ransomware_payload(build_ransomware_payload, ransomware_payload_config):
return build_ransomware_payload(ransomware_payload_config)
@pytest.fixture
def build_ransomware_payload(telemetry_messenger_spy):
def inner(config):
return RansomwarePayload(config, telemetry_messenger_spy, shutil.copyfile)
return inner
def test_env_variables_in_target_dir_resolved_linux(
ransomware_payload_config, ransomware_target, telemetry_messenger_spy, patched_home_env
ransomware_payload_config, build_ransomware_payload, ransomware_target, patched_home_env
):
path_with_env_variable = "$HOME/ransomware_target"
@ -58,7 +67,7 @@ def test_env_variables_in_target_dir_resolved_linux(
] = ransomware_payload_config["encryption"]["directories"][
"windows_target_dir"
] = path_with_env_variable
RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy).run_payload()
build_ransomware_payload(ransomware_payload_config).run_payload()
assert (
hash_file(ransomware_target / with_extension(ALL_ZEROS_PDF))
@ -152,11 +161,11 @@ def test_skip_already_encrypted_file(ransomware_target, ransomware_payload):
def test_encryption_skipped_if_configured_false(
ransomware_payload_config, ransomware_target, telemetry_messenger_spy
build_ransomware_payload, ransomware_payload_config, ransomware_target
):
ransomware_payload_config["encryption"]["enabled"] = False
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256
@ -164,13 +173,13 @@ def test_encryption_skipped_if_configured_false(
def test_encryption_skipped_if_no_directory(
ransomware_payload_config, telemetry_messenger_spy, monkeypatch
build_ransomware_payload, ransomware_payload_config, telemetry_messenger_spy
):
ransomware_payload_config["encryption"]["enabled"] = True
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = ""
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = ""
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert len(telemetry_messenger_spy.telemetries) == 0
@ -205,17 +214,17 @@ def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_
assert "No such file or directory" in telem_1.get_data()["files"][0]["error"]
def test_readme_false(ransomware_payload_config, ransomware_target, telemetry_messenger_spy):
def test_readme_false(build_ransomware_payload, ransomware_payload_config, ransomware_target):
ransomware_payload_config["other_behaviors"]["readme"] = False
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert not Path(ransomware_target / README_DEST).exists()
def test_readme_true(ransomware_payload_config, ransomware_target, telemetry_messenger_spy):
def test_readme_true(build_ransomware_payload, ransomware_payload_config, ransomware_target):
ransomware_payload_config["other_behaviors"]["readme"] = True
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert Path(ransomware_target / README_DEST).exists()