diff --git a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py index 4d34012d8..9dc051666 100644 --- a/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py +++ b/monkey/infection_monkey/telemetry/messengers/batching_telemetry_messenger.py @@ -1,11 +1,11 @@ import queue import threading -import time from typing import Dict from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem from infection_monkey.telemetry.i_telem import ITelem from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger +from infection_monkey.utils.timer import Timer DEFAULT_PERIOD = 5 WAKES_PER_PERIOD = 4 @@ -40,8 +40,6 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self._period = period self._should_run_batch_thread = True - # TODO: Replace with infection_monkey.utils.timer.Timer - self._last_sent_time = time.time() self._telemetry_batches: Dict[str, IBatchableTelem] = {} self._manage_telemetry_batches_thread = None @@ -59,21 +57,20 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): self._manage_telemetry_batches_thread = None def _manage_telemetry_batches(self): - self._reset() + timer = Timer() + timer.set(self._period) + self._telemetry_batches = {} while self._should_run_batch_thread: self._process_next_telemetry() - if self._period_elapsed(): + if timer.is_expired(): self._send_telemetry_batches() - self._reset() + timer.reset() + self._telemetry_batches = {} self._send_remaining_telemetry_batches() - def _reset(self): - self._last_sent_time = time.time() - self._telemetry_batches = {} - def _process_next_telemetry(self): try: telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD) @@ -93,9 +90,6 @@ class BatchingTelemetryMessenger(ITelemetryMessenger): else: self._telemetry_batches[telem_category] = new_telemetry - def _period_elapsed(self): - return (time.time() - self._last_sent_time) > self._period - def _send_remaining_telemetry_batches(self): while not self._queue.empty(): self._process_next_telemetry()