agent: Add BatchedTelemetryMessenger

This telemetry messenger is a decorator that aggregates batchable
telemetries and sends them to the island periodically.
This commit is contained in:
Mike Salvatore 2021-06-27 21:02:35 -04:00
parent 691e01e9c1
commit fadd978050
2 changed files with 209 additions and 0 deletions

View File

@ -0,0 +1,82 @@
import queue
import threading
import time
from typing import Dict
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
WAKES_PER_PERIOD = 4
class BatchedTelemetryMessenger(ITelemetryMessenger):
"""
An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries
and periodically sends them to Monkey Island.
"""
def __init__(self, telemetry_messenger: ITelemetryMessenger, period=5):
self._telemetry_messenger = telemetry_messenger
self._period = period
self._run_batch_thread = True
self._queue: queue.Queue[ITelem] = queue.Queue()
# TODO: Create a "timer" or "countdown" class and inject an object instead of
# using time.time()
self._last_sent_time = time.time()
self._telemetry_batches: Dict[str, IBatchableTelem] = {}
self._manage_telemetry_batches_thread = threading.Thread(
target=self._manage_telemetry_batches
)
self._manage_telemetry_batches_thread.start()
def __del__(self):
self.stop()
def stop(self):
self._run_batch_thread = False
self._manage_telemetry_batches_thread.join()
def send_telemetry(self, telemetry: ITelem):
self._queue.put(telemetry)
def _manage_telemetry_batches(self):
self._reset()
while self._run_batch_thread:
try:
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD)
if isinstance(telemetry, IBatchableTelem):
self._add_telemetry_to_batch(telemetry)
else:
self._telemetry_messenger.send_telemetry(telemetry)
except queue.Empty:
pass
if self._period_elapsed():
self._send_telemetry_batches()
self._reset()
self._send_telemetry_batches()
def _reset(self):
self._last_sent_time = time.time()
self._telemetry_batches = {}
def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem):
telem_category = new_telemetry.telem_category
if telem_category in self._telemetry_batches:
self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry)
else:
self._telemetry_batches[telem_category] = new_telemetry
def _period_elapsed(self):
return (time.time() - self._last_sent_time) > self._period
def _send_telemetry_batches(self):
for batchable_telemetry in self._telemetry_batches.values():
self._telemetry_messenger.send_telemetry(batchable_telemetry)

View File

@ -0,0 +1,127 @@
import time
import pytest
from infection_monkey.telemetry.base_telem import BaseTelem
from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
from infection_monkey.telemetry.messengers.batched_telemetry_messenger import (
BatchedTelemetryMessenger,
)
PERIOD = 0.001
def release_GIL():
time.sleep(PERIOD)
def advance_clock_to_next_period(monkeypatch):
patch_time(monkeypatch, time.time() + (PERIOD * 1.01))
def patch_time(monkeypatch, new_time: float):
monkeypatch.setattr(time, "time", lambda: new_time)
class NonBatchableTelemStub(BaseTelem):
telem_category = "NonBatchableTelemStub"
def send(self, log_data=True):
raise NotImplementedError
def get_data(self) -> dict:
return {"1": {"i": "a", "ii": "b"}}
def __eq__(self, other):
return self.get_data() == other.get_data() and self.telem_category == other.telem_category
class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem):
def __init__(self, value, telem_category="cat1"):
self._telemetry_entries.append(value)
self._telem_category = telem_category
@property
def telem_category(self):
return self._telem_category
def send(self, log_data=True):
raise NotImplementedError
def get_data(self) -> dict:
return {"entries": self._telemetry_entries}
@pytest.fixture
def batched_telemetry_messenger(monkeypatch, telemetry_messenger_spy):
patch_time(monkeypatch, 0)
btm = BatchedTelemetryMessenger(telemetry_messenger_spy, period=0.001)
yield btm
btm.stop()
def test_send_immediately(batched_telemetry_messenger, telemetry_messenger_spy):
telem = NonBatchableTelemStub()
batched_telemetry_messenger.send_telemetry(telem)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 1
assert telemetry_messenger_spy.telemetries[0] == telem
def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy):
expected_data = {"entries": [1, 2]}
telem1 = BatchableTelemStub(1)
telem2 = BatchableTelemStub(2)
batched_telemetry_messenger.send_telemetry(telem1)
batched_telemetry_messenger.send_telemetry(telem2)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 0
advance_clock_to_next_period(monkeypatch)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 1
assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data
def test_send_different_telem_types(
monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy
):
telem1 = BatchableTelemStub(1, "cat1")
telem2 = BatchableTelemStub(2, "cat2")
batched_telemetry_messenger.send_telemetry(telem1)
batched_telemetry_messenger.send_telemetry(telem2)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 0
advance_clock_to_next_period(monkeypatch)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 2
assert telemetry_messenger_spy.telemetries[0] == telem1
assert telemetry_messenger_spy.telemetries[1] == telem2
def test_send_two_batches(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy):
telem1 = BatchableTelemStub(1, "cat1")
telem2 = BatchableTelemStub(2, "cat1")
batched_telemetry_messenger.send_telemetry(telem1)
advance_clock_to_next_period(monkeypatch)
release_GIL()
batched_telemetry_messenger.send_telemetry(telem2)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 1
advance_clock_to_next_period(monkeypatch)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 2
assert telemetry_messenger_spy.telemetries[1] == telem2