diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index bf3ae80a6..622c17d7d 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -25,8 +25,11 @@ from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem -from infection_monkey.telemetry.messengers.telemetry_messenger_wrapper import ( - TelemetryMessengerWrapper, +from infection_monkey.telemetry.messengers.batching_telemetry_messenger import ( + BatchingTelemetryMessenger, +) +from infection_monkey.telemetry.messengers.legacy_telemetry_messenger_adapter import ( + LegacyTelemetryMessengerAdapter, ) from infection_monkey.telemetry.scan_telem import ScanTelem from infection_monkey.telemetry.state_telem import StateTelem @@ -469,8 +472,12 @@ class InfectionMonkey(object): @staticmethod def run_ransomware(): + telemetry_messenger = LegacyTelemetryMessengerAdapter() + batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger) + try: - telemetry_messenger = TelemetryMessengerWrapper() - RansomewarePayload(WormConfiguration.ransomware, telemetry_messenger).run_payload() + RansomewarePayload( + WormConfiguration.ransomware, batching_telemetry_messenger + ).run_payload() except Exception as ex: LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}") diff --git a/monkey/infection_monkey/telemetry/batchable_telem_mixin.py b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py new file mode 100644 index 000000000..905e9e91b --- /dev/null +++ b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py @@ -0,0 +1,23 @@ +from typing import Iterable + +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem + + +class BatchableTelemMixin: + """ + Implements the get_telemetry_batch() and add_telemetry_to_batch() methods from the + IBatchableTelem interface using a list. + """ + + @property + def _telemetry_entries(self): + if not hasattr(self, "_list"): + self._list = [] + + return self._list + + def get_telemetry_batch(self) -> Iterable: + return self._telemetry_entries + + def add_telemetry_to_batch(self, telemetry: IBatchableTelem): + self._telemetry_entries.extend(telemetry.get_telemetry_batch()) diff --git a/monkey/infection_monkey/telemetry/i_batchable_telem.py b/monkey/infection_monkey/telemetry/i_batchable_telem.py new file mode 100644 index 000000000..3503316fa --- /dev/null +++ b/monkey/infection_monkey/telemetry/i_batchable_telem.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import abc +from typing import Iterable + +from infection_monkey.telemetry.i_telem import ITelem + + +class IBatchableTelem(ITelem, metaclass=abc.ABCMeta): + """ + Extends the ITelem interface and enables telemetries to be aggregated into + batches. + """ + + @abc.abstractmethod + def get_telemetry_batch(self) -> Iterable: + pass + + @abc.abstractmethod + def add_telemetry_to_batch(self, telemetry: IBatchableTelem): + pass diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py new file mode 100644 index 000000000..123903fb0 --- /dev/null +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -0,0 +1,108 @@ +import queue +import threading +import time +from typing import Dict + +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem +from infection_monkey.telemetry.i_telem import ITelem +from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger + +DEFAULT_PERIOD = 5 +WAKES_PER_PERIOD = 4 + + +class BatchingTelemetryMessenger(ITelemetryMessenger): + """ + An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries + and periodically sends them to Monkey Island. + """ + + def __init__(self, telemetry_messenger: ITelemetryMessenger, period=DEFAULT_PERIOD): + self._queue: queue.Queue[ITelem] = queue.Queue() + self._thread = self._BatchingTelemetryMessengerThread( + self._queue, telemetry_messenger, period + ) + + self._thread.start() + + def __del__(self): + self._thread.stop() + + def send_telemetry(self, telemetry: ITelem): + self._queue.put(telemetry) + + class _BatchingTelemetryMessengerThread: + def __init__( + self, telem_queue: queue.Queue, telemetry_messenger: ITelemetryMessenger, period: int + ): + self._queue: queue.Queue[ITelem] = telem_queue + self._telemetry_messenger = telemetry_messenger + self._period = period + + self._should_run_batch_thread = True + # TODO: Create a "timer" or "countdown" class and inject an object instead of + # using time.time() + self._last_sent_time = time.time() + self._telemetry_batches: Dict[str, IBatchableTelem] = {} + + self._manage_telemetry_batches_thread = None + + def start(self): + self._should_run_batch_thread = True + self._manage_telemetry_batches_thread = threading.Thread( + target=self._manage_telemetry_batches + ) + self._manage_telemetry_batches_thread.start() + + def stop(self): + self._should_run_batch_thread = False + self._manage_telemetry_batches_thread.join() + self._manage_telemetry_batches_thread = None + + def _manage_telemetry_batches(self): + self._reset() + + while self._should_run_batch_thread: + self._process_next_telemetry() + + if self._period_elapsed(): + self._send_telemetry_batches() + self._reset() + + self._send_remaining_telemetry_batches() + + def _reset(self): + self._last_sent_time = time.time() + self._telemetry_batches = {} + + def _process_next_telemetry(self): + try: + telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) + + if isinstance(telemetry, IBatchableTelem): + self._add_telemetry_to_batch(telemetry) + else: + self._telemetry_messenger.send_telemetry(telemetry) + except queue.Empty: + pass + + def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): + telem_category = new_telemetry.telem_category + + if telem_category in self._telemetry_batches: + self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry) + else: + self._telemetry_batches[telem_category] = new_telemetry + + def _period_elapsed(self): + return (time.time() - self._last_sent_time) > self._period + + def _send_remaining_telemetry_batches(self): + while not self._queue.empty(): + self._process_next_telemetry() + + self._send_telemetry_batches() + + def _send_telemetry_batches(self): + for batchable_telemetry in self._telemetry_batches.values(): + self._telemetry_messenger.send_telemetry(batchable_telemetry) diff --git a/monkey/infection_monkey/telemetry/messengers/telemetry_messenger_wrapper.py b/monkey/infection_monkey/telemetry/messengers/legacy_telemetry_messenger_adapter.py similarity index 52% rename from monkey/infection_monkey/telemetry/messengers/telemetry_messenger_wrapper.py rename to monkey/infection_monkey/telemetry/messengers/legacy_telemetry_messenger_adapter.py index e436cdd46..cbbe03595 100644 --- a/monkey/infection_monkey/telemetry/messengers/telemetry_messenger_wrapper.py +++ b/monkey/infection_monkey/telemetry/messengers/legacy_telemetry_messenger_adapter.py @@ -2,6 +2,11 @@ from infection_monkey.telemetry.i_telem import ITelem from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger -class TelemetryMessengerWrapper(ITelemetryMessenger): +class LegacyTelemetryMessengerAdapter(ITelemetryMessenger): + """ + Provides an adapter between modules that require an ITelemetryMessenger and the + legacy method for sending telemetry. + """ + def send_telemetry(self, telemetry: ITelem): telemetry.send() diff --git a/monkey/infection_monkey/telemetry/ransomware_telem.py b/monkey/infection_monkey/telemetry/ransomware_telem.py index c56e8337c..64cce13c2 100644 --- a/monkey/infection_monkey/telemetry/ransomware_telem.py +++ b/monkey/infection_monkey/telemetry/ransomware_telem.py @@ -1,11 +1,13 @@ -from typing import List, Tuple +from typing import Tuple from common.common_consts.telem_categories import TelemCategoryEnum from infection_monkey.telemetry.base_telem import BaseTelem +from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem -class RansomwareTelem(BaseTelem): - def __init__(self, attempts: List[Tuple[str, str]]): +class RansomwareTelem(BatchableTelemMixin, IBatchableTelem, BaseTelem): + def __init__(self, entry: Tuple[str, str]): """ Ransomware telemetry constructor :param attempts: List of tuples with each tuple containing the path @@ -14,9 +16,10 @@ class RansomwareTelem(BaseTelem): containing the directory path and error string. """ super().__init__() - self.attempts = attempts + + self._telemetry_entries.append(entry) telem_category = TelemCategoryEnum.RANSOMWARE def get_data(self): - return {"ransomware_attempts": self.attempts} + return {"ransomware_attempts": self._telemetry_entries} diff --git a/monkey/tests/unit_tests/infection_monkey/conftest.py b/monkey/tests/unit_tests/infection_monkey/conftest.py new file mode 100644 index 000000000..533572f98 --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/conftest.py @@ -0,0 +1,17 @@ +import pytest + +from infection_monkey.telemetry.i_telem import ITelem +from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger + + +class TelemetryMessengerSpy(ITelemetryMessenger): + def __init__(self): + self.telemetries = [] + + def send_telemetry(self, telemetry: ITelem): + self.telemetries.append(telemetry) + + +@pytest.fixture +def telemetry_messenger_spy(): + return TelemetryMessengerSpy() diff --git a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py index 35aef048c..f26463ed1 100644 --- a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py +++ b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py @@ -23,16 +23,6 @@ from tests.utils import hash_file, is_user_admin from infection_monkey.ransomware import ransomware_payload as ransomware_payload_module from infection_monkey.ransomware.ransomware_payload import EXTENSION, RansomewarePayload -from infection_monkey.telemetry.i_telem import ITelem -from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger - - -class TelemetryMessengerSpy(ITelemetryMessenger): - def __init__(self): - self.telemetries = [] - - def send_telemetry(self, telemetry: ITelem): - self.telemetries.append(telemetry) def with_extension(filename): @@ -46,11 +36,6 @@ def ransomware_payload_config(ransomware_target): } -@pytest.fixture -def telemetry_messenger_spy(): - return TelemetryMessengerSpy() - - @pytest.fixture def ransomware_payload(ransomware_payload_config, telemetry_messenger_spy): return RansomewarePayload(ransomware_payload_config, telemetry_messenger_spy) @@ -148,10 +133,10 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy): telem_1 = telemetry_messenger_spy.telemetries[0] telem_2 = telemetry_messenger_spy.telemetries[1] - assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0] - assert telem_1.get_data()["ransomware_attempts"][1] == "" - assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0] - assert telem_2.get_data()["ransomware_attempts"][1] == "" + assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0][0] + assert telem_1.get_data()["ransomware_attempts"][0][1] == "" + assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0][0] + assert telem_2.get_data()["ransomware_attempts"][0][1] == "" def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_spy): @@ -164,5 +149,5 @@ def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_ ransomware_payload.run_payload() telem_1 = telemetry_messenger_spy.telemetries[0] - assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0] - assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][1] + assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0][0] + assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][0][1] diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py new file mode 100644 index 000000000..e3d2f89e0 --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -0,0 +1,156 @@ +import time + +from infection_monkey.telemetry.base_telem import BaseTelem +from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem +from infection_monkey.telemetry.messengers.batching_telemetry_messenger import ( + BatchingTelemetryMessenger, +) + +PERIOD = 0.001 + + +def release_GIL(): + time.sleep(PERIOD) + + +def advance_clock_to_next_period(monkeypatch): + patch_time(monkeypatch, time.time() + (PERIOD * 1.01)) + + +def patch_time(monkeypatch, new_time: float): + monkeypatch.setattr(time, "time", lambda: new_time) + + +class NonBatchableTelemStub(BaseTelem): + telem_category = "NonBatchableTelemStub" + + def send(self, log_data=True): + raise NotImplementedError + + def get_data(self) -> dict: + return {"1": {"i": "a", "ii": "b"}} + + def __eq__(self, other): + return self.get_data() == other.get_data() and self.telem_category == other.telem_category + + +class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): + def __init__(self, value, telem_category="cat1"): + self._telemetry_entries.append(value) + self._telem_category = telem_category + + @property + def telem_category(self): + return self._telem_category + + def send(self, log_data=True): + raise NotImplementedError + + def get_data(self) -> dict: + return {"entries": self._telemetry_entries} + + +# Note that this function is not a fixture. This is because BatchingTelemetyMessenger +# stops its thread when it is destructed. If this were a fixture, it may live +# past the end of the test, which would allow the in the BatchingTelemetryMessenger +# instance to keep running instead of stopping +def build_batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy): + patch_time(monkeypatch, 0) + return BatchingTelemetryMessenger(telemetry_messenger_spy, period=PERIOD) + + +def test_send_immediately(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + + telem = NonBatchableTelemStub() + batching_telemetry_messenger.send_telemetry(telem) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0] == telem + + +def test_send_telem_batch(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + + expected_data = {"entries": [1, 2]} + telem1 = BatchableTelemStub(1) + telem2 = BatchableTelemStub(2) + + batching_telemetry_messenger.send_telemetry(telem1) + batching_telemetry_messenger.send_telemetry(telem2) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data + + +def test_send_different_telem_types(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + + telem1 = BatchableTelemStub(1, "cat1") + telem2 = BatchableTelemStub(2, "cat2") + + batching_telemetry_messenger.send_telemetry(telem1) + batching_telemetry_messenger.send_telemetry(telem2) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 2 + assert telemetry_messenger_spy.telemetries[0] == telem1 + assert telemetry_messenger_spy.telemetries[1] == telem2 + + +def test_send_two_batches(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + + telem1 = BatchableTelemStub(1, "cat1") + telem2 = BatchableTelemStub(2, "cat1") + + batching_telemetry_messenger.send_telemetry(telem1) + advance_clock_to_next_period(monkeypatch) + release_GIL() + + batching_telemetry_messenger.send_telemetry(telem2) + release_GIL() + assert len(telemetry_messenger_spy.telemetries) == 1 + + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 2 + assert telemetry_messenger_spy.telemetries[1] == telem2 + + +def test_send_remaining_telem_after_stop(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + + expected_data = {"entries": [1]} + telem = BatchableTelemStub(1) + + batching_telemetry_messenger.send_telemetry(telem) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + del batching_telemetry_messenger + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py b/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py index 4994c9287..e2e674ecd 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py @@ -1,20 +1,19 @@ import json -import pytest - from infection_monkey.telemetry.ransomware_telem import RansomwareTelem -ATTEMPTS = [("", "")] +ENCRYPTION_ATTEMPTS = [("", ""), ("", "")] -@pytest.fixture -def ransomware_telem_test_instance(): - return RansomwareTelem(ATTEMPTS) +def test_ransomware_telem_send(spy_send_telemetry): + ransomware_telem_1 = RansomwareTelem(ENCRYPTION_ATTEMPTS[0]) + ransomware_telem_2 = RansomwareTelem(ENCRYPTION_ATTEMPTS[1]) + ransomware_telem_1.add_telemetry_to_batch(ransomware_telem_2) + + ransomware_telem_1.send() + expected_data = {"ransomware_attempts": ENCRYPTION_ATTEMPTS} + expected_data = json.dumps(expected_data, cls=ransomware_telem_1.json_encoder) -def test_ransomware_telem_send(ransomware_telem_test_instance, spy_send_telemetry): - ransomware_telem_test_instance.send() - expected_data = {"ransomware_attempts": ATTEMPTS} - expected_data = json.dumps(expected_data, cls=ransomware_telem_test_instance.json_encoder) assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.telem_category == "ransomware"