Merge pull request #1307 from guardicore/ransomware-inject-copy-dependency

Ransomware README improvements
This commit is contained in:
Mike Salvatore 2021-07-08 12:50:32 -04:00 committed by GitHub
commit 24fdb9e299
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 23 deletions

View File

@ -1,6 +1,7 @@
import argparse
import logging
import os
import shutil
import subprocess
import sys
import time
@ -477,7 +478,7 @@ class InfectionMonkey(object):
try:
RansomwarePayload(
WormConfiguration.ransomware, batching_telemetry_messenger
WormConfiguration.ransomware, batching_telemetry_messenger, shutil.copyfile
).run_payload()
except Exception as ex:
LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}")

View File

@ -1,8 +1,7 @@
import logging
import shutil
from pathlib import Path
from pprint import pformat
from typing import List, Optional, Tuple
from typing import Callable, List, Optional, Tuple
from common.utils.file_utils import InvalidPath, expand_path
from infection_monkey.ransomware.bitflip_encryptor import BitflipEncryptor
@ -22,7 +21,12 @@ README_DEST = "README.txt"
class RansomwarePayload:
def __init__(self, config: dict, telemetry_messenger: ITelemetryMessenger):
def __init__(
self,
config: dict,
telemetry_messenger: ITelemetryMessenger,
copy_file: Callable[[Path, Path], None],
):
LOG.debug(f"Ransomware payload configuration:\n{pformat(config)}")
self._encryption_enabled = config["encryption"]["enabled"]
@ -34,6 +38,7 @@ class RansomwarePayload:
self._valid_file_extensions_for_encryption.discard(self._new_file_extension)
self._encryptor = BitflipEncryptor(chunk_size=CHUNK_SIZE)
self._copy_file = copy_file
self._telemetry_messenger = telemetry_messenger
@staticmethod
@ -92,11 +97,21 @@ class RansomwarePayload:
self._telemetry_messenger.send_telemetry(encryption_attempt)
def _leave_readme(self):
if self._readme_enabled:
readme_dest_path = self._target_dir / README_DEST
LOG.info(f"Leaving a ransomware README file at {readme_dest_path}")
if not self._readme_enabled:
return
try:
shutil.copyfile(README_SRC, readme_dest_path)
except Exception as ex:
LOG.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")
readme_dest_path = self._target_dir / README_DEST
if readme_dest_path.exists():
LOG.warning(f"{readme_dest_path} already exists, not leaving a new README.txt")
return
self._copy_readme_file(readme_dest_path)
def _copy_readme_file(self, dest: Path):
LOG.info(f"Leaving a ransomware README file at {dest}")
try:
self._copy_file(README_SRC, dest)
except Exception as ex:
LOG.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")

View File

@ -1,5 +1,7 @@
import os
import shutil
from pathlib import Path, PurePosixPath
from unittest.mock import MagicMock
import pytest
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
@ -44,12 +46,20 @@ def ransomware_payload_config(ransomware_target):
@pytest.fixture
def ransomware_payload(ransomware_payload_config, telemetry_messenger_spy):
return RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
def ransomware_payload(build_ransomware_payload, ransomware_payload_config):
return build_ransomware_payload(ransomware_payload_config)
@pytest.fixture
def build_ransomware_payload(telemetry_messenger_spy):
def inner(config):
return RansomwarePayload(config, telemetry_messenger_spy, shutil.copyfile)
return inner
def test_env_variables_in_target_dir_resolved_linux(
ransomware_payload_config, ransomware_target, telemetry_messenger_spy, patched_home_env
ransomware_payload_config, build_ransomware_payload, ransomware_target, patched_home_env
):
path_with_env_variable = "$HOME/ransomware_target"
@ -58,7 +68,7 @@ def test_env_variables_in_target_dir_resolved_linux(
] = ransomware_payload_config["encryption"]["directories"][
"windows_target_dir"
] = path_with_env_variable
RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy).run_payload()
build_ransomware_payload(ransomware_payload_config).run_payload()
assert (
hash_file(ransomware_target / with_extension(ALL_ZEROS_PDF))
@ -152,11 +162,11 @@ def test_skip_already_encrypted_file(ransomware_target, ransomware_payload):
def test_encryption_skipped_if_configured_false(
ransomware_payload_config, ransomware_target, telemetry_messenger_spy
build_ransomware_payload, ransomware_payload_config, ransomware_target
):
ransomware_payload_config["encryption"]["enabled"] = False
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256
@ -164,13 +174,13 @@ def test_encryption_skipped_if_configured_false(
def test_encryption_skipped_if_no_directory(
ransomware_payload_config, telemetry_messenger_spy, monkeypatch
build_ransomware_payload, ransomware_payload_config, telemetry_messenger_spy
):
ransomware_payload_config["encryption"]["enabled"] = True
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = ""
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = ""
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert len(telemetry_messenger_spy.telemetries) == 0
@ -205,17 +215,32 @@ def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_
assert "No such file or directory" in telem_1.get_data()["files"][0]["error"]
def test_readme_false(ransomware_payload_config, ransomware_target, telemetry_messenger_spy):
def test_readme_false(build_ransomware_payload, ransomware_payload_config, ransomware_target):
ransomware_payload_config["other_behaviors"]["readme"] = False
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert not Path(ransomware_target / README_DEST).exists()
def test_readme_true(ransomware_payload_config, ransomware_target, telemetry_messenger_spy):
def test_readme_true(build_ransomware_payload, ransomware_payload_config, ransomware_target):
ransomware_payload_config["other_behaviors"]["readme"] = True
ransomware_payload = RansomwarePayload(ransomware_payload_config, telemetry_messenger_spy)
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
ransomware_payload.run_payload()
assert Path(ransomware_target / README_DEST).exists()
def test_readme_already_exists(
monkeypatch, ransomware_payload_config, telemetry_messenger_spy, ransomware_target
):
monkeypatch.setattr(ransomware_payload_module, "TARGETED_FILE_EXTENSIONS", set()),
mock_copy_file = MagicMock()
ransomware_payload_config["other_behaviors"]["readme"] = True
Path(ransomware_target / README_DEST).touch()
RansomwarePayload(
ransomware_payload_config, telemetry_messenger_spy, mock_copy_file
).run_payload()
mock_copy_file.assert_not_called()