agent: Refactor telemetry processing in BatchingTelemetryMessenger

We need to ensure when a BatchingTelemetryMessenger stops, all remaining
telemetries in its queue are sent. The existing logic does this, but
this commit improves the readability and intent of the code, as well as
adds a test for this condition.
This commit is contained in:
Mike Salvatore 2021-06-29 07:36:49 -04:00
parent 9d3d4611dc
commit 8cf316b64a
2 changed files with 39 additions and 11 deletions

View File

@ -62,7 +62,20 @@ class BatchingTelemetryMessenger(ITelemetryMessenger):
def _manage_telemetry_batches(self):
self._reset()
while self._should_run_batch_thread or not self._queue.empty():
while self._should_run_batch_thread:
self._process_next_telemetry()
if self._period_elapsed():
self._send_telemetry_batches()
self._reset()
self._send_remaining_telemetry_batches()
def _reset(self):
self._last_sent_time = time.time()
self._telemetry_batches = {}
def _process_next_telemetry(self):
try:
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD)
@ -73,16 +86,6 @@ class BatchingTelemetryMessenger(ITelemetryMessenger):
except queue.Empty:
pass
if self._period_elapsed():
self._send_telemetry_batches()
self._reset()
self._send_telemetry_batches()
def _reset(self):
self._last_sent_time = time.time()
self._telemetry_batches = {}
def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem):
telem_category = new_telemetry.telem_category
@ -94,6 +97,12 @@ class BatchingTelemetryMessenger(ITelemetryMessenger):
def _period_elapsed(self):
return (time.time() - self._last_sent_time) > self._period
def _send_remaining_telemetry_batches(self):
while not self._queue.empty():
self._process_next_telemetry()
self._send_telemetry_batches()
def _send_telemetry_batches(self):
for batchable_telemetry in self._telemetry_batches.values():
self._telemetry_messenger.send_telemetry(batchable_telemetry)

View File

@ -122,3 +122,22 @@ def test_send_two_batches(monkeypatch, batching_telemetry_messenger, telemetry_m
assert len(telemetry_messenger_spy.telemetries) == 2
assert telemetry_messenger_spy.telemetries[1] == telem2
def test_send_remaining_telem_after_stop(monkeypatch, telemetry_messenger_spy):
patch_time(monkeypatch, 0)
batching_telemetry_messenger = BatchingTelemetryMessenger(
telemetry_messenger_spy, period=PERIOD
)
expected_data = {"entries": [1]}
telem = BatchableTelemStub(1)
batching_telemetry_messenger.send_telemetry(telem)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 0
del batching_telemetry_messenger
assert len(telemetry_messenger_spy.telemetries) == 1
assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data