diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index aa4351567..123903fb0 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -62,27 +62,30 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def _manage_telemetry_batches(self): self._reset() - while self._should_run_batch_thread or not self._queue.empty(): - try: - telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) - - if isinstance(telemetry, IBatchableTelem): - self._add_telemetry_to_batch(telemetry) - else: - self._telemetry_messenger.send_telemetry(telemetry) - except queue.Empty: - pass + while self._should_run_batch_thread: + self._process_next_telemetry() if self._period_elapsed(): self._send_telemetry_batches() self._reset() - self._send_telemetry_batches() + self._send_remaining_telemetry_batches() def _reset(self): self._last_sent_time = time.time() self._telemetry_batches = {} + def _process_next_telemetry(self): + try: + telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) + + if isinstance(telemetry, IBatchableTelem): + self._add_telemetry_to_batch(telemetry) + else: + self._telemetry_messenger.send_telemetry(telemetry) + except queue.Empty: + pass + def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem): telem_category = new_telemetry.telem_category @@ -94,6 +97,12 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): def _period_elapsed(self): return (time.time() - self._last_sent_time) > self._period + def _send_remaining_telemetry_batches(self): + while not self._queue.empty(): + self._process_next_telemetry() + + self._send_telemetry_batches() + def _send_telemetry_batches(self): for batchable_telemetry in self._telemetry_batches.values(): self._telemetry_messenger.send_telemetry(batchable_telemetry) diff --git a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py index 2de3e0ffe..8b894a4f9 100644 --- a/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py +++ b/monkey/tests/unit_tests/infection_monkey/telemetry/messengers/test_batching_telemetry_messenger.py @@ -122,3 +122,22 @@ def test_send_two_batches(monkeypatch, batching_telemetry_messenger, telemetry_m assert len(telemetry_messenger_spy.telemetries) == 2 assert telemetry_messenger_spy.telemetries[1] == telem2 + + +def test_send_remaining_telem_after_stop(monkeypatch, telemetry_messenger_spy): + patch_time(monkeypatch, 0) + batching_telemetry_messenger = BatchingTelemetryMessenger( + telemetry_messenger_spy, period=PERIOD + ) + + expected_data = {"entries": [1]} + telem = BatchableTelemStub(1) + + batching_telemetry_messenger.send_telemetry(telem) + release_GIL() + + assert len(telemetry_messenger_spy.telemetries) == 0 + del batching_telemetry_messenger + + assert len(telemetry_messenger_spy.telemetries) == 1 + assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data