diff --git a/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py new file mode 100644 index 000000000..16551ef11 --- /dev/null +++ b/monkey/infection_monkey/telemetry/messengers/batched_telemetry_messenger.py @@ -0,0 +1,82 @@ +import queue +import threading +import time +from typing import Dict + +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem +from infection_monkey.telemetry.i_telem import ITelem +from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger + +WAKES_PER_PERIOD = 4 + + +class BatchedTelemetryMessenger(ITelemetryMessenger): + """ + An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries + and periodically sends them to Monkey Island. + """ + + def __init__(self, telemetry_messenger: ITelemetryMessenger, period=5): + self._telemetry_messenger = telemetry_messenger + self._period = period + + self._run_batch_thread = True + self._queue: queue.Queue[ITelem] = queue.Queue() + # TODO: Create a "timer" or "countdown" class and inject an object instead of + # using time.time() + self._last_sent_time = time.time() + self._telemetry_batches: Dict[str, IBatchableTelem] = {} + + self._manage_telemetry_batches_thread = threading.Thread( + target=self._manage_telemetry_batches + ) + self._manage_telemetry_batches_thread.start() + + def __del__(self): + self.stop() + + def stop(self): + self._run_batch_thread = False + self._manage_telemetry_batches_thread.join() + + def send_telemetry(self, telemetry: ITelem): + self._queue.put(telemetry) + + def _manage_telemetry_batches(self): + self._reset() + + while self._run_batch_thread: + try: + telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) + + if isinstance(telemetry, IBatchableTelem): + self._add_telemetry_to_batch(telemetry) + else: + self._telemetry_messenger.send_telemetry(telemetry) + except queue.Empty: + pass + + if self._period_elapsed(): + self._send_telemetry_batches() + self._reset() + + self._send_telemetry_batches() + + def _reset(self): + self._last_sent_time = time.time() + self._telemetry_batches = {} + + def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): + telem_category = new_telemetry.telem_category + + if telem_category in self._telemetry_batches: + self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry) + else: + self._telemetry_batches[telem_category] = new_telemetry + + def _period_elapsed(self): + return (time.time() - self._last_sent_time) > self._period + + def _send_telemetry_batches(self): + for batchable_telemetry in self._telemetry_batches.values(): + self._telemetry_messenger.send_telemetry(batchable_telemetry) diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py new file mode 100644 index 000000000..84377ba7c --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batched_telemetry_messenger.py @@ -0,0 +1,127 @@ +import time + +import pytest + +from infection_monkey.telemetry.base_telem import BaseTelem +from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin +from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem +from infection_monkey.telemetry.messengers.batched_telemetry_messenger import ( + BatchedTelemetryMessenger, +) + +PERIOD = 0.001 + + +def release_GIL(): + time.sleep(PERIOD) + + +def advance_clock_to_next_period(monkeypatch): + patch_time(monkeypatch, time.time() + (PERIOD * 1.01)) + + +def patch_time(monkeypatch, new_time: float): + monkeypatch.setattr(time, "time", lambda: new_time) + + +class NonBatchableTelemStub(BaseTelem): + telem_category = "NonBatchableTelemStub" + + def send(self, log_data=True): + raise NotImplementedError + + def get_data(self) -> dict: + return {"1": {"i": "a", "ii": "b"}} + + def __eq__(self, other): + return self.get_data() == other.get_data() and self.telem_category == other.telem_category + + +class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem): + def __init__(self, value, telem_category="cat1"): + self._telemetry_entries.append(value) + self._telem_category = telem_category + + @property + def telem_category(self): + return self._telem_category + + def send(self, log_data=True): + raise NotImplementedError + + def get_data(self) -> dict: + return {"entries": self._telemetry_entries} + + +@pytest.fixture +def batched_telemetry_messenger(monkeypatch, telemetry_messenger_spy): + patch_time(monkeypatch, 0) + btm = BatchedTelemetryMessenger(telemetry_messenger_spy, period=0.001) + yield btm + + btm.stop() + + +def test_send_immediately(batched_telemetry_messenger, telemetry_messenger_spy): + telem = NonBatchableTelemStub() + + batched_telemetry_messenger.send_telemetry(telem) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0] == telem + + +def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy): + expected_data = {"entries": [1, 2]} + telem1 = BatchableTelemStub(1) + telem2 = BatchableTelemStub(2) + + batched_telemetry_messenger.send_telemetry(telem1) + batched_telemetry_messenger.send_telemetry(telem2) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data + + +def test_send_different_telem_types( + monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy +): + telem1 = BatchableTelemStub(1, "cat1") + telem2 = BatchableTelemStub(2, "cat2") + + batched_telemetry_messenger.send_telemetry(telem1) + batched_telemetry_messenger.send_telemetry(telem2) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 2 + assert telemetry_messenger_spy.telemetries[0] == telem1 + assert telemetry_messenger_spy.telemetries[1] == telem2 + + +def test_send_two_batches(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy): + telem1 = BatchableTelemStub(1, "cat1") + telem2 = BatchableTelemStub(2, "cat1") + + batched_telemetry_messenger.send_telemetry(telem1) + advance_clock_to_next_period(monkeypatch) + release_GIL() + + batched_telemetry_messenger.send_telemetry(telem2) + release_GIL() + assert len(telemetry_messenger_spy.telemetries) == 1 + + advance_clock_to_next_period(monkeypatch) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 2 + assert telemetry_messenger_spy.telemetries[1] == telem2