Agent: Use Timer in BatchingTelemetryMessenger

This commit is contained in:
Shreya Malviya 2022-04-05 13:03:55 +05:30
parent cc83896724
commit 0be709a9eb
1 changed files with 7 additions and 13 deletions

View File

@ -1,11 +1,11 @@
import queue
import threading
import time
from typing import Dict
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
from infection_monkey.utils.timer import Timer
DEFAULT_PERIOD = 5
WAKES_PER_PERIOD = 4
@ -40,8 +40,6 @@ class BatchingTelemetryMessenger(ITelemetryMessenger):
self._period = period
self._should_run_batch_thread = True
# TODO: Replace with infection_monkey.utils.timer.Timer
self._last_sent_time = time.time()
self._telemetry_batches: Dict[str, IBatchableTelem] = {}
self._manage_telemetry_batches_thread = None
@ -59,21 +57,20 @@ class BatchingTelemetryMessenger(ITelemetryMessenger):
self._manage_telemetry_batches_thread = None
def _manage_telemetry_batches(self):
self._reset()
timer = Timer()
timer.set(self._period)
self._telemetry_batches = {}
while self._should_run_batch_thread:
self._process_next_telemetry()
if self._period_elapsed():
if timer.is_expired():
self._send_telemetry_batches()
self._reset()
timer.reset()
self._telemetry_batches = {}
self._send_remaining_telemetry_batches()
def _reset(self):
self._last_sent_time = time.time()
self._telemetry_batches = {}
def _process_next_telemetry(self):
try:
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD)
@ -93,9 +90,6 @@ class BatchingTelemetryMessenger(ITelemetryMessenger):
else:
self._telemetry_batches[telem_category] = new_telemetry
def _period_elapsed(self):
return (time.time() - self._last_sent_time) > self._period
def _send_remaining_telemetry_batches(self):
while not self._queue.empty():
self._process_next_telemetry()