From f2a940a4e0a8a9ff7368db1d33f117c76530cfd9 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 24 Jun 2021 12:08:23 -0400 Subject: [PATCH 01/22] agent: Add IBatchableTelem IBatchableTelem adds two methods to the ITelem interface. These methods allow a telemetry object to mange batches of telemetry entries, rather than just one. --- .../telemetry/i_batchable_telem.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 monkey/infection_monkey/telemetry/i_batchable_telem.py diff --git a/monkey/infection_monkey/telemetry/i_batchable_telem.py b/monkey/infection_monkey/telemetry/i_batchable_telem.py new file mode 100644 index 000000000..3cb82fd44 --- /dev/null +++ b/monkey/infection_monkey/telemetry/i_batchable_telem.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +import abc +from typing import Iterable + +from infection_monkey.telemetry.i_telem import ITelem + + +class IBatchableTelem(ITelem, metaclass=abc.ABCMeta): + @abc.abstractmethod + def get_telemetry_entries(self) -> Iterable: + pass + + @abc.abstractmethod + def add_telemetry_to_batch(self, telemetry: IBatchableTelem): + pass From 8e40e44263dd9765e6f3a785fc71d38b1f878589 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 24 Jun 2021 12:09:30 -0400 Subject: [PATCH 02/22] agent: Add BatchableTelemMixin Adds an implementation as a mixin of the two methods specified by IBatchableTelem. --- .../telemetry/batchable_telem_mixin.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 monkey/infection_monkey/telemetry/batchable_telem_mixin.py diff --git a/monkey/infection_monkey/telemetry/batchable_telem_mixin.py b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py new file mode 100644 index 000000000..4189f0cf0 --- /dev/null +++ b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py @@ -0,0 +1,22 @@ +from typing import Iterable + +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem + + +class BatchableTelemMixin: + """ + Implements the IBatchableTelem interface methods using a list. + """ + + @property + def _telemetry_entries(self): + if not hasattr(self, "_list"): + self._list = [] + + return self._list + + def get_telemetry_entries(self) -> Iterable: + return self._telemetry_entries + + def add_telemetry_to_batch(self, telemetry: IBatchableTelem): + self._telemetry_entries.extend(telemetry.get_telemetry_entries()) From a0b43a17a2d37d4b5e72d9e86d738632c3329667 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 24 Jun 2021 12:19:43 -0400 Subject: [PATCH 03/22] agent: Implement IBatchableTelem in RansomwareTelem This allows encryption attempt telmetries to be batched into one telemetry object so they can be sent to the island in batches. --- .../telemetry/ransomware_telem.py | 13 ++++++++----- .../ransomware/test_ransomware_payload.py | 12 ++++++------ .../telemetry/test_ransomware_telem.py | 19 +++++++++---------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/monkey/infection_monkey/telemetry/ransomware_telem.py b/monkey/infection_monkey/telemetry/ransomware_telem.py index c56e8337c..64cce13c2 100644 --- a/monkey/infection_monkey/telemetry/ransomware_telem.py +++ b/monkey/infection_monkey/telemetry/ransomware_telem.py @@ -1,11 +1,13 @@ -from typing import List, Tuple +from typing import Tuple from common.common_consts.telem_categories import TelemCategoryEnum from infection_monkey.telemetry.base_telem import BaseTelem +from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem -class RansomwareTelem(BaseTelem): - def __init__(self, attempts: List[Tuple[str, str]]): +class RansomwareTelem(BatchableTelemMixin, IBatchableTelem, BaseTelem): + def __init__(self, entry: Tuple[str, str]): """ Ransomware telemetry constructor :param attempts: List of tuples with each tuple containing the path @@ -14,9 +16,10 @@ class RansomwareTelem(BaseTelem): containing the directory path and error string. """ super().__init__() - self.attempts = attempts + + self._telemetry_entries.append(entry) telem_category = TelemCategoryEnum.RANSOMWARE def get_data(self): - return {"ransomware_attempts": self.attempts} + return {"ransomware_attempts": self._telemetry_entries} diff --git a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py index 35aef048c..86fb5c336 100644 --- a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py +++ b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py @@ -148,10 +148,10 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy): telem_1 = telemetry_messenger_spy.telemetries[0] telem_2 = telemetry_messenger_spy.telemetries[1] - assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0] - assert telem_1.get_data()["ransomware_attempts"][1] == "" - assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0] - assert telem_2.get_data()["ransomware_attempts"][1] == "" + assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0][0] + assert telem_1.get_data()["ransomware_attempts"][0][1] == "" + assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0][0] + assert telem_2.get_data()["ransomware_attempts"][0][1] == "" def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_spy): @@ -164,5 +164,5 @@ def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_ ransomware_payload.run_payload() telem_1 = telemetry_messenger_spy.telemetries[0] - assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0] - assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][1] + assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0][0] + assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][0][1] diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py b/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py index 4994c9287..e2e674ecd 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/test_ransomware_telem.py @@ -1,20 +1,19 @@ import json -import pytest - from infection_monkey.telemetry.ransomware_telem import RansomwareTelem -ATTEMPTS = [("", "")] +ENCRYPTION_ATTEMPTS = [("", ""), ("", "")] -@pytest.fixture -def ransomware_telem_test_instance(): - return RansomwareTelem(ATTEMPTS) +def test_ransomware_telem_send(spy_send_telemetry): + ransomware_telem_1 = RansomwareTelem(ENCRYPTION_ATTEMPTS[0]) + ransomware_telem_2 = RansomwareTelem(ENCRYPTION_ATTEMPTS[1]) + ransomware_telem_1.add_telemetry_to_batch(ransomware_telem_2) + + ransomware_telem_1.send() + expected_data = {"ransomware_attempts": ENCRYPTION_ATTEMPTS} + expected_data = json.dumps(expected_data, cls=ransomware_telem_1.json_encoder) -def test_ransomware_telem_send(ransomware_telem_test_instance, spy_send_telemetry): - ransomware_telem_test_instance.send() - expected_data = {"ransomware_attempts": ATTEMPTS} - expected_data = json.dumps(expected_data, cls=ransomware_telem_test_instance.json_encoder) assert spy_send_telemetry.data == expected_data assert spy_send_telemetry.telem_category == "ransomware" From e549a4f8f40b8bad3daececbdde46e0aa4f1b018 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Sun, 27 Jun 2021 18:11:42 -0400 Subject: [PATCH 04/22] agent: Rename TelemetryMessengerWrapper The term "wrapper" is sometimes used as synonym for the decorator pattern, whereas this class is a textbook adapter. Use the term "adapter" instead of "wrapper" and rename "TelemetryMessengerWrapper" to "LegacyTelemetryMessengerAdapter", as this class servers as an adapter between the new ITelemetryMessenger interface and the (soon to be) legacy way of sending telemetry. --- monkey/infection_monkey/monkey.py | 6 +++--- ...er_wrapper.py => legacy_telemetry_messenger_adapter.py} | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) rename monkey/infection_monkey/telemetry/messengers/{telemetry_messenger_wrapper.py => legacy_telemetry_messenger_adapter.py} (52%) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index bf3ae80a6..845f754f7 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -25,8 +25,8 @@ from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem -from infection_monkey.telemetry.messengers.telemetry_messenger_wrapper import ( - TelemetryMessengerWrapper, +from infection_monkey.telemetry.messengers.legacy_telemetry_messenger_adapter import ( + LegacyTelemetryMessengerAdapter, ) from infection_monkey.telemetry.scan_telem import ScanTelem from infection_monkey.telemetry.state_telem import StateTelem @@ -470,7 +470,7 @@ class InfectionMonkey(object): @staticmethod def run_ransomware(): try: - telemetry_messenger = TelemetryMessengerWrapper() + telemetry_messenger = LegacyTelemetryMessengerAdapter() RansomewarePayload(WormConfiguration.ransomware, telemetry_messenger).run_payload() except Exception as ex: LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}") diff --git a/monkey/infection_monkey/telemetry/messengers/telemetry_messenger_wrapper.py b/monkey/infection_monkey/telemetry/messengers/legacy_telemetry_messenger_adapter.py similarity index 52% rename from monkey/infection_monkey/telemetry/messengers/telemetry_messenger_wrapper.py rename to monkey/infection_monkey/telemetry/messengers/legacy_telemetry_messenger_adapter.py index e436cdd46..cbbe03595 100644 --- a/monkey/infection_monkey/telemetry/messengers/telemetry_messenger_wrapper.py +++ b/monkey/infection_monkey/telemetry/messengers/legacy_telemetry_messenger_adapter.py @@ -2,6 +2,11 @@ from infection_monkey.telemetry.i_telem import ITelem from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger -class TelemetryMessengerWrapper(ITelemetryMessenger): +class LegacyTelemetryMessengerAdapter(ITelemetryMessenger): + """ + Provides an adapter between modules that require an ITelemetryMessenger and the + legacy method for sending telemetry. + """ + def send_telemetry(self, telemetry: ITelem): telemetry.send() From 691e01e9c147010b907caf05ede94463110a2666 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Sun, 27 Jun 2021 19:35:27 -0400 Subject: [PATCH 05/22] tests: Move telemetry_messenger_spy to infection_monkey/conftest.py --- .../unit_tests/infection_monkey/conftest.py | 17 +++++++++++++++++ .../ransomware/test_ransomware_payload.py | 15 --------------- 2 files changed, 17 insertions(+), 15 deletions(-) create mode 100644 monkey/tests/unit_tests/infection_monkey/conftest.py diff --git a/monkey/tests/unit_tests/infection_monkey/conftest.py b/monkey/tests/unit_tests/infection_monkey/conftest.py new file mode 100644 index 000000000..533572f98 --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/conftest.py @@ -0,0 +1,17 @@ +import pytest + +from infection_monkey.telemetry.i_telem import ITelem +from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger + + +class TelemetryMessengerSpy(ITelemetryMessenger): + def __init__(self): + self.telemetries = [] + + def send_telemetry(self, telemetry: ITelem): + self.telemetries.append(telemetry) + + +@pytest.fixture +def telemetry_messenger_spy(): + return TelemetryMessengerSpy() diff --git a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py index 86fb5c336..f26463ed1 100644 --- a/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py +++ b/monkey/tests/unit_tests/infection_monkey/ransomware/test_ransomware_payload.py @@ -23,16 +23,6 @@ from tests.utils import hash_file, is_user_admin from infection_monkey.ransomware import ransomware_payload as ransomware_payload_module from infection_monkey.ransomware.ransomware_payload import EXTENSION, RansomewarePayload -from infection_monkey.telemetry.i_telem import ITelem -from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger - - -class TelemetryMessengerSpy(ITelemetryMessenger): - def __init__(self): - self.telemetries = [] - - def send_telemetry(self, telemetry: ITelem): - self.telemetries.append(telemetry) def with_extension(filename): @@ -46,11 +36,6 @@ def ransomware_payload_config(ransomware_target): } -@pytest.fixture -def telemetry_messenger_spy(): - return TelemetryMessengerSpy() - - @pytest.fixture def ransomware_payload(ransomware_payload_config, telemetry_messenger_spy): return RansomewarePayload(ransomware_payload_config, telemetry_messenger_spy) From fadd978050f3c63bba8b936221c2d2fd57c5785e Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Sun, 27 Jun 2021 21:02:35 -0400 Subject: [PATCH 06/22] agent: Add BatchedTelemetryMessenger This telemetry messenger is a decorator that aggregates batchable telemetries and sends them to the island periodically. --- .../messengers/batched_telemetry_messenger.py | 82 +++++++++++ .../test_batched_telemetry_messenger.py | 127 ++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py create mode 100644 monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py diff --git a/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py new file mode 100644 index 000000000..16551ef11 --- /dev/null +++ b/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py @@ -0,0 +1,82 @@ +import queue +import threading +import time +from typing import Dict + +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem +from infection_monkey.telemetry.i_telem import ITelem +from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger + +WAKES_PER_PERIOD = 4 + + +class BatchedTelemetryMessenger(ITelemetryMessenger): + """ + An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries + and periodically sends them to Monkey Island. + """ + + def __init__(self, telemetry_messenger: ITelemetryMessenger, period=5): + self._telemetry_messenger = telemetry_messenger + self._period = period + + self._run_batch_thread = True + self._queue: queue.Queue[ITelem] = queue.Queue() + # TODO: Create a "timer" or "countdown" class and inject an object instead of + # using time.time() + self._last_sent_time = time.time() + self._telemetry_batches: Dict[str, IBatchableTelem] = {} + + self._manage_telemetry_batches_thread = threading.Thread( + target=self._manage_telemetry_batches + ) + self._manage_telemetry_batches_thread.start() + + def __del__(self): + self.stop() + + def stop(self): + self._run_batch_thread = False + self._manage_telemetry_batches_thread.join() + + def send_telemetry(self, telemetry: ITelem): + self._queue.put(telemetry) + + def _manage_telemetry_batches(self): + self._reset() + + while self._run_batch_thread: + try: + telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) + + if isinstance(telemetry, IBatchableTelem): + self._add_telemetry_to_batch(telemetry) + else: + self._telemetry_messenger.send_telemetry(telemetry) + except queue.Empty: + pass + + if self._period_elapsed(): + self._send_telemetry_batches() + self._reset() + + self._send_telemetry_batches() + + def _reset(self): + self._last_sent_time = time.time() + self._telemetry_batches = {} + + def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): + telem_category = new_telemetry.telem_category + + if telem_category in self._telemetry_batches: + self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry) + else: + self._telemetry_batches[telem_category] = new_telemetry + + def _period_elapsed(self): + return (time.time() - self._last_sent_time) > self._period + + def _send_telemetry_batches(self): + for batchable_telemetry in self._telemetry_batches.values(): + self._telemetry_messenger.send_telemetry(batchable_telemetry) diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py new file mode 100644 index 000000000..84377ba7c --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py @@ -0,0 +1,127 @@ +import time + +import pytest + +from infection_monkey.telemetry.base_telem import BaseTelem +from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem +from infection_monkey.telemetry.messengers.batched_telemetry_messenger import ( + BatchedTelemetryMessenger, +) + +PERIOD = 0.001 + + +def release_GIL(): + time.sleep(PERIOD) + + +def advance_clock_to_next_period(monkeypatch): + patch_time(monkeypatch, time.time() + (PERIOD * 1.01)) + + +def patch_time(monkeypatch, new_time: float): + monkeypatch.setattr(time, "time", lambda: new_time) + + +class NonBatchableTelemStub(BaseTelem): + telem_category = "NonBatchableTelemStub" + + def send(self, log_data=True): + raise NotImplementedError + + def get_data(self) -> dict: + return {"1": {"i": "a", "ii": "b"}} + + def __eq__(self, other): + return self.get_data() == other.get_data() and self.telem_category == other.telem_category + + +class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): + def __init__(self, value, telem_category="cat1"): + self._telemetry_entries.append(value) + self._telem_category = telem_category + + @property + def telem_category(self): + return self._telem_category + + def send(self, log_data=True): + raise NotImplementedError + + def get_data(self) -> dict: + return {"entries": self._telemetry_entries} + + +@pytest.fixture +def batched_telemetry_messenger(monkeypatch, telemetry_messenger_spy): + patch_time(monkeypatch, 0) + btm = BatchedTelemetryMessenger(telemetry_messenger_spy, period=0.001) + yield btm + + btm.stop() + + +def test_send_immediately(batched_telemetry_messenger, telemetry_messenger_spy): + telem = NonBatchableTelemStub() + + batched_telemetry_messenger.send_telemetry(telem) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0] == telem + + +def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy): + expected_data = {"entries": [1, 2]} + telem1 = BatchableTelemStub(1) + telem2 = BatchableTelemStub(2) + + batched_telemetry_messenger.send_telemetry(telem1) + batched_telemetry_messenger.send_telemetry(telem2) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data + + +def test_send_different_telem_types( + monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy +): + telem1 = BatchableTelemStub(1, "cat1") + telem2 = BatchableTelemStub(2, "cat2") + + batched_telemetry_messenger.send_telemetry(telem1) + batched_telemetry_messenger.send_telemetry(telem2) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 2 + assert telemetry_messenger_spy.telemetries[0] == telem1 + assert telemetry_messenger_spy.telemetries[1] == telem2 + + +def test_send_two_batches(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy): + telem1 = BatchableTelemStub(1, "cat1") + telem2 = BatchableTelemStub(2, "cat1") + + batched_telemetry_messenger.send_telemetry(telem1) + advance_clock_to_next_period(monkeypatch) + release_GIL() + + batched_telemetry_messenger.send_telemetry(telem2) + release_GIL() + assert len(telemetry_messenger_spy.telemetries) == 1 + + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 2 + assert telemetry_messenger_spy.telemetries[1] == telem2 From 85c91f55bb7782253ae203c5cdd4ae96c0c5ec83 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 06:52:33 -0400 Subject: [PATCH 07/22] agent: Use BatchingTelemetryMessenger in RansomewarePayload We don't want the ransomware payload to encrypt all files and then send telemetry to the island. This could lead to a long period of time where the user has no insight into what the monkey is doing on a node. We also don't want to flood the island with telemetries. By using the BatchingTelemetryMessenger, ransomware encryption telemetries are batched together and periodically sent to the island. --- monkey/infection_monkey/monkey.py | 10 ++++++- ...ger.py => batching_telemetry_messenger.py} | 2 +- ...y => test_batching_telemetry_messenger.py} | 30 +++++++++---------- 3 files changed, 25 insertions(+), 17 deletions(-) rename monkey/infection_monkey/telemetry/messengers/{batched_telemetry_messenger.py => batching_telemetry_messenger.py} (98%) rename monkey/tests/unit_tests/infection_monkey/telemetry/messengers/{test_batched_telemetry_messenger.py => test_batching_telemetry_messenger.py} (74%) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 845f754f7..506bc5db2 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -25,6 +25,9 @@ from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem +from infection_monkey.telemetry.messengers.batching_telemetry_messenger import ( + BatchingTelemetryMessenger, +) from infection_monkey.telemetry.messengers.legacy_telemetry_messenger_adapter import ( LegacyTelemetryMessengerAdapter, ) @@ -471,6 +474,11 @@ class InfectionMonkey(object): def run_ransomware(): try: telemetry_messenger = LegacyTelemetryMessengerAdapter() - RansomewarePayload(WormConfiguration.ransomware, telemetry_messenger).run_payload() + batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger) + RansomewarePayload( + WormConfiguration.ransomware, batching_telemetry_messenger + ).run_payload() except Exception as ex: LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}") + finally: + batching_telemetry_messenger.stop() diff --git a/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py similarity index 98% rename from monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py rename to monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index 16551ef11..43fbf1306 100644 --- a/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -10,7 +10,7 @@ from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemet WAKES_PER_PERIOD = 4 -class BatchedTelemetryMessenger(ITelemetryMessenger): +class BatchingTelemetryMessenger(ITelemetryMessenger): """ An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries and periodically sends them to Monkey Island. diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py similarity index 74% rename from monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py rename to monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py index 84377ba7c..58ee96d89 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -5,8 +5,8 @@ import pytest from infection_monkey.telemetry.base_telem import BaseTelem from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem -from infection_monkey.telemetry.messengers.batched_telemetry_messenger import ( - BatchedTelemetryMessenger, +from infection_monkey.telemetry.messengers.batching_telemetry_messenger import ( + BatchingTelemetryMessenger, ) PERIOD = 0.001 @@ -54,31 +54,31 @@ class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): @pytest.fixture -def batched_telemetry_messenger(monkeypatch, telemetry_messenger_spy): +def batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy): patch_time(monkeypatch, 0) - btm = BatchedTelemetryMessenger(telemetry_messenger_spy, period=0.001) + btm = BatchingTelemetryMessenger(telemetry_messenger_spy, period=0.001) yield btm btm.stop() -def test_send_immediately(batched_telemetry_messenger, telemetry_messenger_spy): +def test_send_immediately(batching_telemetry_messenger, telemetry_messenger_spy): telem = NonBatchableTelemStub() - batched_telemetry_messenger.send_telemetry(telem) + batching_telemetry_messenger.send_telemetry(telem) release_GIL() assert len(telemetry_messenger_spy.telemetries) == 1 assert telemetry_messenger_spy.telemetries[0] == telem -def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy): +def test_send_telem_batch(monkeypatch, batching_telemetry_messenger, telemetry_messenger_spy): expected_data = {"entries": [1, 2]} telem1 = BatchableTelemStub(1) telem2 = BatchableTelemStub(2) - batched_telemetry_messenger.send_telemetry(telem1) - batched_telemetry_messenger.send_telemetry(telem2) + batching_telemetry_messenger.send_telemetry(telem1) + batching_telemetry_messenger.send_telemetry(telem2) release_GIL() assert len(telemetry_messenger_spy.telemetries) == 0 @@ -90,13 +90,13 @@ def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_me def test_send_different_telem_types( - monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy + monkeypatch, batching_telemetry_messenger, telemetry_messenger_spy ): telem1 = BatchableTelemStub(1, "cat1") telem2 = BatchableTelemStub(2, "cat2") - batched_telemetry_messenger.send_telemetry(telem1) - batched_telemetry_messenger.send_telemetry(telem2) + batching_telemetry_messenger.send_telemetry(telem1) + batching_telemetry_messenger.send_telemetry(telem2) release_GIL() assert len(telemetry_messenger_spy.telemetries) == 0 @@ -108,15 +108,15 @@ def test_send_different_telem_types( assert telemetry_messenger_spy.telemetries[1] == telem2 -def test_send_two_batches(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy): +def test_send_two_batches(monkeypatch, batching_telemetry_messenger, telemetry_messenger_spy): telem1 = BatchableTelemStub(1, "cat1") telem2 = BatchableTelemStub(2, "cat1") - batched_telemetry_messenger.send_telemetry(telem1) + batching_telemetry_messenger.send_telemetry(telem1) advance_clock_to_next_period(monkeypatch) release_GIL() - batched_telemetry_messenger.send_telemetry(telem2) + batching_telemetry_messenger.send_telemetry(telem2) release_GIL() assert len(telemetry_messenger_spy.telemetries) == 1 From 13c9e41a4c71d6da0b1d6337b12fb53b7b034b0e Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 11:15:46 -0400 Subject: [PATCH 08/22] agent: Extract default period to constant --- .../telemetry/messengers/batching_telemetry_messenger.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index 43fbf1306..c170a0ef0 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -7,6 +7,7 @@ from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem from infection_monkey.telemetry.i_telem import ITelem from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger +DEFAULT_PERIOD = 5 WAKES_PER_PERIOD = 4 @@ -16,7 +17,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): and periodically sends them to Monkey Island. """ - def __init__(self, telemetry_messenger: ITelemetryMessenger, period=5): + def __init__(self, telemetry_messenger: ITelemetryMessenger, period=DEFAULT_PERIOD): self._telemetry_messenger = telemetry_messenger self._period = period From be6e76757d8ebc28acc0685fee560c64540cf371 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 11:25:13 -0400 Subject: [PATCH 09/22] agent: Move telemetry messenger construction out of "try" --- monkey/infection_monkey/monkey.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 506bc5db2..28e59c616 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -472,9 +472,9 @@ class InfectionMonkey(object): @staticmethod def run_ransomware(): + telemetry_messenger = LegacyTelemetryMessengerAdapter() + batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger) try: - telemetry_messenger = LegacyTelemetryMessengerAdapter() - batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger) RansomewarePayload( WormConfiguration.ransomware, batching_telemetry_messenger ).run_payload() From 0a9c98f06184b73b08378ded1ca9954bd20968a2 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 11:33:06 -0400 Subject: [PATCH 10/22] agent: Rename _run_batch_thread -> _should_run_batch_thread --- .../telemetry/messengers/batching_telemetry_messenger.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index c170a0ef0..6596e89e7 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -21,7 +21,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self._telemetry_messenger = telemetry_messenger self._period = period - self._run_batch_thread = True + self._should_run_batch_thread = True self._queue: queue.Queue[ITelem] = queue.Queue() # TODO: Create a "timer" or "countdown" class and inject an object instead of # using time.time() @@ -37,7 +37,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self.stop() def stop(self): - self._run_batch_thread = False + self._should_run_batch_thread = False self._manage_telemetry_batches_thread.join() def send_telemetry(self, telemetry: ITelem): @@ -46,7 +46,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def _manage_telemetry_batches(self): self._reset() - while self._run_batch_thread: + while self._should_run_batch_thread: try: telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) From 1d066c8e6da23d4d8c092fb7a2a2d5d26816abf6 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 12:05:57 -0400 Subject: [PATCH 11/22] agent: Add explicit start to BatchingTelemetryMessenger My original plan was to start a thread in __init__() and stop the thread when __del__() was called. Since the running thread (object) contains a reference to the BatchingTelemetryMessenger object that launched it, the destructor will not be called until the thread is stopped. Therefore, a stop() was added to allow the BatchingTelemetryMessenger to be stopped. Since it has an explicit stop, it should also have an explicit start, rather than starting the thread in the constructor. --- monkey/infection_monkey/monkey.py | 1 + .../telemetry/messengers/batching_telemetry_messenger.py | 8 +++++--- .../messengers/test_batching_telemetry_messenger.py | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 28e59c616..bd9062eeb 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -474,6 +474,7 @@ class InfectionMonkey(object): def run_ransomware(): telemetry_messenger = LegacyTelemetryMessengerAdapter() batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger) + batching_telemetry_messenger.start() try: RansomewarePayload( WormConfiguration.ransomware, batching_telemetry_messenger diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index 6596e89e7..f5f21a760 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -28,14 +28,16 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self._last_sent_time = time.time() self._telemetry_batches: Dict[str, IBatchableTelem] = {} + def __del__(self): + self.stop() + + def start(self): + self._should_run_batch_thread = True self._manage_telemetry_batches_thread = threading.Thread( target=self._manage_telemetry_batches ) self._manage_telemetry_batches_thread.start() - def __del__(self): - self.stop() - def stop(self): self._should_run_batch_thread = False self._manage_telemetry_batches_thread.join() diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py index 58ee96d89..65eebc582 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -57,6 +57,7 @@ class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): def batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy): patch_time(monkeypatch, 0) btm = BatchingTelemetryMessenger(telemetry_messenger_spy, period=0.001) + btm.start() yield btm btm.stop() From 2f62a14fbf4c56e66448454bffd04d7e925fb8a8 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 12:21:57 -0400 Subject: [PATCH 12/22] agent: Remove start/stop from BatchingTelemetryMessenger My original plan was to start a thread in __init__() and stop the thread when __del__() was called. Since the running thread (object) contains a reference to the BatchingTelemetryMessenger object that launched it, the destructor will not be called until the thread is stopped. This resulted in adding a stop() method (fadd978) followed by adding a start() method (1d066c8e). By using an inner class to run the thread, we enable the class to be used as originally intended, reducing the burden on the user of this class. The thread is now started on construction and stopped on destruction. The user can remain blissfully unaware that anything resembling threading is going in, and can use the BatchingTelemetryMessenger just like any other ITelemetryMessenger. --- monkey/infection_monkey/monkey.py | 4 +- .../batching_telemetry_messenger.py | 107 ++++++++++-------- .../test_batching_telemetry_messenger.py | 6 +- 3 files changed, 60 insertions(+), 57 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index bd9062eeb..622c17d7d 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -474,12 +474,10 @@ class InfectionMonkey(object): def run_ransomware(): telemetry_messenger = LegacyTelemetryMessengerAdapter() batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger) - batching_telemetry_messenger.start() + try: RansomewarePayload( WormConfiguration.ransomware, batching_telemetry_messenger ).run_payload() except Exception as ex: LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}") - finally: - batching_telemetry_messenger.stop() diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index f5f21a760..9541d34d1 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -18,68 +18,77 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): """ def __init__(self, telemetry_messenger: ITelemetryMessenger, period=DEFAULT_PERIOD): - self._telemetry_messenger = telemetry_messenger - self._period = period - - self._should_run_batch_thread = True self._queue: queue.Queue[ITelem] = queue.Queue() - # TODO: Create a "timer" or "countdown" class and inject an object instead of - # using time.time() - self._last_sent_time = time.time() - self._telemetry_batches: Dict[str, IBatchableTelem] = {} + self._thread = BatchingTelemetryMessenger._BatchingTelemetryMessengerThread( + self._queue, telemetry_messenger, period + ) + + self._thread.start() def __del__(self): - self.stop() - - def start(self): - self._should_run_batch_thread = True - self._manage_telemetry_batches_thread = threading.Thread( - target=self._manage_telemetry_batches - ) - self._manage_telemetry_batches_thread.start() - - def stop(self): - self._should_run_batch_thread = False - self._manage_telemetry_batches_thread.join() + self._thread.stop() def send_telemetry(self, telemetry: ITelem): self._queue.put(telemetry) - def _manage_telemetry_batches(self): - self._reset() + class _BatchingTelemetryMessengerThread: + def __init__(self, queue: queue.Queue, telemetry_messenger: ITelemetryMessenger, period): + self._queue: queue.Queue[ITelem] = queue + self._telemetry_messenger = telemetry_messenger + self._period = period - while self._should_run_batch_thread: - try: - telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) + self._should_run_batch_thread = True + # TODO: Create a "timer" or "countdown" class and inject an object instead of + # using time.time() + self._last_sent_time = time.time() + self._telemetry_batches: Dict[str, IBatchableTelem] = {} - if isinstance(telemetry, IBatchableTelem): - self._add_telemetry_to_batch(telemetry) - else: - self._telemetry_messenger.send_telemetry(telemetry) - except queue.Empty: - pass + def start(self): + self._should_run_batch_thread = True + self._manage_telemetry_batches_thread = threading.Thread( + target=self._manage_telemetry_batches + ) + self._manage_telemetry_batches_thread.start() - if self._period_elapsed(): - self._send_telemetry_batches() - self._reset() + def stop(self): + self._should_run_batch_thread = False + self._manage_telemetry_batches_thread.join() - self._send_telemetry_batches() + def _manage_telemetry_batches(self): + self._reset() - def _reset(self): - self._last_sent_time = time.time() - self._telemetry_batches = {} + while self._should_run_batch_thread: + try: + telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) - def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): - telem_category = new_telemetry.telem_category + if isinstance(telemetry, IBatchableTelem): + self._add_telemetry_to_batch(telemetry) + else: + self._telemetry_messenger.send_telemetry(telemetry) + except queue.Empty: + pass - if telem_category in self._telemetry_batches: - self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry) - else: - self._telemetry_batches[telem_category] = new_telemetry + if self._period_elapsed(): + self._send_telemetry_batches() + self._reset() - def _period_elapsed(self): - return (time.time() - self._last_sent_time) > self._period + self._send_telemetry_batches() - def _send_telemetry_batches(self): - for batchable_telemetry in self._telemetry_batches.values(): - self._telemetry_messenger.send_telemetry(batchable_telemetry) + def _reset(self): + self._last_sent_time = time.time() + self._telemetry_batches = {} + + def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): + telem_category = new_telemetry.telem_category + + if telem_category in self._telemetry_batches: + self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry) + else: + self._telemetry_batches[telem_category] = new_telemetry + + def _period_elapsed(self): + return (time.time() - self._last_sent_time) > self._period + + def _send_telemetry_batches(self): + for batchable_telemetry in self._telemetry_batches.values(): + self._telemetry_messenger.send_telemetry(batchable_telemetry) diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py index 65eebc582..2de3e0ffe 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -56,11 +56,7 @@ class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): @pytest.fixture def batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy): patch_time(monkeypatch, 0) - btm = BatchingTelemetryMessenger(telemetry_messenger_spy, period=0.001) - btm.start() - yield btm - - btm.stop() + return BatchingTelemetryMessenger(telemetry_messenger_spy, period=0.001) def test_send_immediately(batching_telemetry_messenger, telemetry_messenger_spy): From 7e3eef90cbad277d1f37f4baaaa79e2c35528a02 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 12:27:32 -0400 Subject: [PATCH 13/22] agent: Rename get_telemetry_entries() -> get_telemetry_batch() --- monkey/infection_monkey/telemetry/batchable_telem_mixin.py | 4 ++-- monkey/infection_monkey/telemetry/i_batchable_telem.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/telemetry/batchable_telem_mixin.py b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py index 4189f0cf0..913d7d40e 100644 --- a/monkey/infection_monkey/telemetry/batchable_telem_mixin.py +++ b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py @@ -15,8 +15,8 @@ class BatchableTelemMixin: return self._list - def get_telemetry_entries(self) -> Iterable: + def get_telemetry_batch(self) -> Iterable: return self._telemetry_entries def add_telemetry_to_batch(self, telemetry: IBatchableTelem): - self._telemetry_entries.extend(telemetry.get_telemetry_entries()) + self._telemetry_entries.extend(telemetry.get_telemetry_batch()) diff --git a/monkey/infection_monkey/telemetry/i_batchable_telem.py b/monkey/infection_monkey/telemetry/i_batchable_telem.py index 3cb82fd44..e02c41118 100644 --- a/monkey/infection_monkey/telemetry/i_batchable_telem.py +++ b/monkey/infection_monkey/telemetry/i_batchable_telem.py @@ -8,7 +8,7 @@ from infection_monkey.telemetry.i_telem import ITelem class IBatchableTelem(ITelem, metaclass=abc.ABCMeta): @abc.abstractmethod - def get_telemetry_entries(self) -> Iterable: + def get_telemetry_batch(self) -> Iterable: pass @abc.abstractmethod From 543f0031a24222c742dd16f46c67acbe43eec169 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 28 Jun 2021 12:34:24 -0400 Subject: [PATCH 14/22] agent: Fully flush BatchingTelemetryMessenger queue before stopping --- .../telemetry/messengers/batching_telemetry_messenger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index 9541d34d1..252a5e8a7 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -57,7 +57,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def _manage_telemetry_batches(self): self._reset() - while self._should_run_batch_thread: + while self._should_run_batch_thread or not self._queue.empty(): try: telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) From e34599779b5e23c1a678ed5d1f23fe568bb93642 Mon Sep 17 00:00:00 2001 From: VakarisZ Date: Tue, 29 Jun 2021 09:14:43 +0300 Subject: [PATCH 15/22] Add keywords to arguments that create RansomwarePayload in monkey.py --- monkey/infection_monkey/monkey.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 622c17d7d..b1bb61e4f 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -477,7 +477,8 @@ class InfectionMonkey(object): try: RansomewarePayload( - WormConfiguration.ransomware, batching_telemetry_messenger + config=WormConfiguration.ransomware, + telemetry_messenger=batching_telemetry_messenger, ).run_payload() except Exception as ex: LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}") From a454449ccafc637cd93fd957996f2f4c4a4126fc Mon Sep 17 00:00:00 2001 From: VakarisZ Date: Tue, 29 Jun 2021 12:09:26 +0300 Subject: [PATCH 16/22] Do small readability changes in batching_telemetry_messenger.py --- .../telemetry/messengers/batching_telemetry_messenger.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index 252a5e8a7..f47d5392e 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -19,7 +19,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def __init__(self, telemetry_messenger: ITelemetryMessenger, period=DEFAULT_PERIOD): self._queue: queue.Queue[ITelem] = queue.Queue() - self._thread = BatchingTelemetryMessenger._BatchingTelemetryMessengerThread( + self._thread = self._BatchingTelemetryMessengerThread( self._queue, telemetry_messenger, period ) @@ -32,8 +32,10 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self._queue.put(telemetry) class _BatchingTelemetryMessengerThread: - def __init__(self, queue: queue.Queue, telemetry_messenger: ITelemetryMessenger, period): - self._queue: queue.Queue[ITelem] = queue + def __init__( + self, telem_queue: queue.Queue, telemetry_messenger: ITelemetryMessenger, period: int + ): + self._queue: queue.Queue[ITelem] = telem_queue self._telemetry_messenger = telemetry_messenger self._period = period From 9d3d4611dcc5eb9ebe33d15bc08e97b8869594d8 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 29 Jun 2021 06:38:56 -0400 Subject: [PATCH 17/22] agent: Define _manage_telemetry_batches_thread in __init_() --- .../telemetry/messengers/batching_telemetry_messenger.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index f47d5392e..aa4351567 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -45,6 +45,8 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self._last_sent_time = time.time() self._telemetry_batches: Dict[str, IBatchableTelem] = {} + self._manage_telemetry_batches_thread = None + def start(self): self._should_run_batch_thread = True self._manage_telemetry_batches_thread = threading.Thread( @@ -55,6 +57,7 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def stop(self): self._should_run_batch_thread = False self._manage_telemetry_batches_thread.join() + self._manage_telemetry_batches_thread = None def _manage_telemetry_batches(self): self._reset() From 8cf316b64a862b3a7c87250153f8cad61edd9fa3 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 29 Jun 2021 07:36:49 -0400 Subject: [PATCH 18/22] agent: Refactor telemetry processing in BatchingTelemetryMessenger We need to ensure when a BatchingTelemetryMessenger stops, all remaining telemetries in its queue are sent. The existing logic does this, but this commit improves the readability and intent of the code, as well as adds a test for this condition. --- .../batching_telemetry_messenger.py | 31 ++++++++++++------- .../test_batching_telemetry_messenger.py | 19 ++++++++++++ 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index aa4351567..123903fb0 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -62,27 +62,30 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def _manage_telemetry_batches(self): self._reset() - while self._should_run_batch_thread or not self._queue.empty(): - try: - telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) - - if isinstance(telemetry, IBatchableTelem): - self._add_telemetry_to_batch(telemetry) - else: - self._telemetry_messenger.send_telemetry(telemetry) - except queue.Empty: - pass + while self._should_run_batch_thread: + self._process_next_telemetry() if self._period_elapsed(): self._send_telemetry_batches() self._reset() - self._send_telemetry_batches() + self._send_remaining_telemetry_batches() def _reset(self): self._last_sent_time = time.time() self._telemetry_batches = {} + def _process_next_telemetry(self): + try: + telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) + + if isinstance(telemetry, IBatchableTelem): + self._add_telemetry_to_batch(telemetry) + else: + self._telemetry_messenger.send_telemetry(telemetry) + except queue.Empty: + pass + def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): telem_category = new_telemetry.telem_category @@ -94,6 +97,12 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def _period_elapsed(self): return (time.time() - self._last_sent_time) > self._period + def _send_remaining_telemetry_batches(self): + while not self._queue.empty(): + self._process_next_telemetry() + + self._send_telemetry_batches() + def _send_telemetry_batches(self): for batchable_telemetry in self._telemetry_batches.values(): self._telemetry_messenger.send_telemetry(batchable_telemetry) diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py index 2de3e0ffe..8b894a4f9 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -122,3 +122,22 @@ def test_send_two_batches(monkeypatch, batching_telemetry_messenger, telemetry_m assert len(telemetry_messenger_spy.telemetries) == 2 assert telemetry_messenger_spy.telemetries[1] == telem2 + + +def test_send_remaining_telem_after_stop(monkeypatch, telemetry_messenger_spy): + patch_time(monkeypatch, 0) + batching_telemetry_messenger = BatchingTelemetryMessenger( + telemetry_messenger_spy, period=PERIOD + ) + + expected_data = {"entries": [1]} + telem = BatchableTelemStub(1) + + batching_telemetry_messenger.send_telemetry(telem) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + del batching_telemetry_messenger + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data From d5a26ca6eb119f76fcef1616eadee9452b8de015 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 29 Jun 2021 07:48:18 -0400 Subject: [PATCH 19/22] agent: Refactor BatchingTelemetryMessenger tests to destroy threads --- .../test_batching_telemetry_messenger.py | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py index 8b894a4f9..e3d2f89e0 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -1,7 +1,5 @@ import time -import pytest - from infection_monkey.telemetry.base_telem import BaseTelem from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem @@ -53,15 +51,21 @@ class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): return {"entries": self._telemetry_entries} -@pytest.fixture -def batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy): +# Note that this function is not a fixture. This is because BatchingTelemetyMessenger +# stops its thread when it is destructed. If this were a fixture, it may live +# past the end of the test, which would allow the in the BatchingTelemetryMessenger +# instance to keep running instead of stopping +def build_batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy): patch_time(monkeypatch, 0) - return BatchingTelemetryMessenger(telemetry_messenger_spy, period=0.001) + return BatchingTelemetryMessenger(telemetry_messenger_spy, period=PERIOD) -def test_send_immediately(batching_telemetry_messenger, telemetry_messenger_spy): +def test_send_immediately(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + telem = NonBatchableTelemStub() - batching_telemetry_messenger.send_telemetry(telem) release_GIL() @@ -69,7 +73,11 @@ def test_send_immediately(batching_telemetry_messenger, telemetry_messenger_spy) assert telemetry_messenger_spy.telemetries[0] == telem -def test_send_telem_batch(monkeypatch, batching_telemetry_messenger, telemetry_messenger_spy): +def test_send_telem_batch(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + expected_data = {"entries": [1, 2]} telem1 = BatchableTelemStub(1) telem2 = BatchableTelemStub(2) @@ -86,9 +94,11 @@ def test_send_telem_batch(monkeypatch, batching_telemetry_messenger, telemetry_m assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data -def test_send_different_telem_types( - monkeypatch, batching_telemetry_messenger, telemetry_messenger_spy -): +def test_send_different_telem_types(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + telem1 = BatchableTelemStub(1, "cat1") telem2 = BatchableTelemStub(2, "cat2") @@ -105,7 +115,11 @@ def test_send_different_telem_types( assert telemetry_messenger_spy.telemetries[1] == telem2 -def test_send_two_batches(monkeypatch, batching_telemetry_messenger, telemetry_messenger_spy): +def test_send_two_batches(monkeypatch, telemetry_messenger_spy): + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy + ) + telem1 = BatchableTelemStub(1, "cat1") telem2 = BatchableTelemStub(2, "cat1") @@ -125,9 +139,8 @@ def test_send_two_batches(monkeypatch, batching_telemetry_messenger, telemetry_m def test_send_remaining_telem_after_stop(monkeypatch, telemetry_messenger_spy): - patch_time(monkeypatch, 0) - batching_telemetry_messenger = BatchingTelemetryMessenger( - telemetry_messenger_spy, period=PERIOD + batching_telemetry_messenger = build_batching_telemetry_messenger( + monkeypatch, telemetry_messenger_spy ) expected_data = {"entries": [1]} From 7e7d46d4e779634b705de825c7c6b26ff4a88441 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 29 Jun 2021 09:53:32 -0400 Subject: [PATCH 20/22] agent: Improve description in BatchableTelemMixin docstring --- monkey/infection_monkey/telemetry/batchable_telem_mixin.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/telemetry/batchable_telem_mixin.py b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py index 913d7d40e..905e9e91b 100644 --- a/monkey/infection_monkey/telemetry/batchable_telem_mixin.py +++ b/monkey/infection_monkey/telemetry/batchable_telem_mixin.py @@ -5,7 +5,8 @@ from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem class BatchableTelemMixin: """ - Implements the IBatchableTelem interface methods using a list. + Implements the get_telemetry_batch() and add_telemetry_to_batch() methods from the + IBatchableTelem interface using a list. """ @property From f8579300b34b39290d3b2b35cd6955d5c02250ba Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 29 Jun 2021 09:55:29 -0400 Subject: [PATCH 21/22] Revert "Add keywords to arguments that create RansomwarePayload in monkey.py" This reverts commit e34599779b5e23c1a678ed5d1f23fe568bb93642. --- monkey/infection_monkey/monkey.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index b1bb61e4f..622c17d7d 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -477,8 +477,7 @@ class InfectionMonkey(object): try: RansomewarePayload( - config=WormConfiguration.ransomware, - telemetry_messenger=batching_telemetry_messenger, + WormConfiguration.ransomware, batching_telemetry_messenger ).run_payload() except Exception as ex: LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}") From 8281a9d7386fa1e51692446f4e415b77982534bc Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 29 Jun 2021 10:34:43 -0400 Subject: [PATCH 22/22] agent: Add docstring to IBatchableTelem --- monkey/infection_monkey/telemetry/i_batchable_telem.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/monkey/infection_monkey/telemetry/i_batchable_telem.py b/monkey/infection_monkey/telemetry/i_batchable_telem.py index e02c41118..3503316fa 100644 --- a/monkey/infection_monkey/telemetry/i_batchable_telem.py +++ b/monkey/infection_monkey/telemetry/i_batchable_telem.py @@ -7,6 +7,11 @@ from infection_monkey.telemetry.i_telem import ITelem class IBatchableTelem(ITelem, metaclass=abc.ABCMeta): + """ + Extends the ITelem interface and enables telemetries to be aggregated into + batches. + """ + @abc.abstractmethod def get_telemetry_batch(self) -> Iterable: pass