forked from p15670423/monkey
agent: Add BatchedTelemetryMessenger
This telemetry messenger is a decorator that aggregates batchable telemetries and sends them to the island periodically.
This commit is contained in:
parent
691e01e9c1
commit
fadd978050
|
@ -0,0 +1,82 @@
|
|||
import queue
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict
|
||||
|
||||
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
|
||||
from infection_monkey.telemetry.i_telem import ITelem
|
||||
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
|
||||
|
||||
WAKES_PER_PERIOD = 4
|
||||
|
||||
|
||||
class BatchedTelemetryMessenger(ITelemetryMessenger):
|
||||
"""
|
||||
An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries
|
||||
and periodically sends them to Monkey Island.
|
||||
"""
|
||||
|
||||
def __init__(self, telemetry_messenger: ITelemetryMessenger, period=5):
|
||||
self._telemetry_messenger = telemetry_messenger
|
||||
self._period = period
|
||||
|
||||
self._run_batch_thread = True
|
||||
self._queue: queue.Queue[ITelem] = queue.Queue()
|
||||
# TODO: Create a "timer" or "countdown" class and inject an object instead of
|
||||
# using time.time()
|
||||
self._last_sent_time = time.time()
|
||||
self._telemetry_batches: Dict[str, IBatchableTelem] = {}
|
||||
|
||||
self._manage_telemetry_batches_thread = threading.Thread(
|
||||
target=self._manage_telemetry_batches
|
||||
)
|
||||
self._manage_telemetry_batches_thread.start()
|
||||
|
||||
def __del__(self):
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
self._run_batch_thread = False
|
||||
self._manage_telemetry_batches_thread.join()
|
||||
|
||||
def send_telemetry(self, telemetry: ITelem):
|
||||
self._queue.put(telemetry)
|
||||
|
||||
def _manage_telemetry_batches(self):
|
||||
self._reset()
|
||||
|
||||
while self._run_batch_thread:
|
||||
try:
|
||||
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD)
|
||||
|
||||
if isinstance(telemetry, IBatchableTelem):
|
||||
self._add_telemetry_to_batch(telemetry)
|
||||
else:
|
||||
self._telemetry_messenger.send_telemetry(telemetry)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
if self._period_elapsed():
|
||||
self._send_telemetry_batches()
|
||||
self._reset()
|
||||
|
||||
self._send_telemetry_batches()
|
||||
|
||||
def _reset(self):
|
||||
self._last_sent_time = time.time()
|
||||
self._telemetry_batches = {}
|
||||
|
||||
def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem):
|
||||
telem_category = new_telemetry.telem_category
|
||||
|
||||
if telem_category in self._telemetry_batches:
|
||||
self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry)
|
||||
else:
|
||||
self._telemetry_batches[telem_category] = new_telemetry
|
||||
|
||||
def _period_elapsed(self):
|
||||
return (time.time() - self._last_sent_time) > self._period
|
||||
|
||||
def _send_telemetry_batches(self):
|
||||
for batchable_telemetry in self._telemetry_batches.values():
|
||||
self._telemetry_messenger.send_telemetry(batchable_telemetry)
|
|
@ -0,0 +1,127 @@
|
|||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from infection_monkey.telemetry.base_telem import BaseTelem
|
||||
from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin
|
||||
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
|
||||
from infection_monkey.telemetry.messengers.batched_telemetry_messenger import (
|
||||
BatchedTelemetryMessenger,
|
||||
)
|
||||
|
||||
PERIOD = 0.001
|
||||
|
||||
|
||||
def release_GIL():
|
||||
time.sleep(PERIOD)
|
||||
|
||||
|
||||
def advance_clock_to_next_period(monkeypatch):
|
||||
patch_time(monkeypatch, time.time() + (PERIOD * 1.01))
|
||||
|
||||
|
||||
def patch_time(monkeypatch, new_time: float):
|
||||
monkeypatch.setattr(time, "time", lambda: new_time)
|
||||
|
||||
|
||||
class NonBatchableTelemStub(BaseTelem):
|
||||
telem_category = "NonBatchableTelemStub"
|
||||
|
||||
def send(self, log_data=True):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_data(self) -> dict:
|
||||
return {"1": {"i": "a", "ii": "b"}}
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.get_data() == other.get_data() and self.telem_category == other.telem_category
|
||||
|
||||
|
||||
class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem):
|
||||
def __init__(self, value, telem_category="cat1"):
|
||||
self._telemetry_entries.append(value)
|
||||
self._telem_category = telem_category
|
||||
|
||||
@property
|
||||
def telem_category(self):
|
||||
return self._telem_category
|
||||
|
||||
def send(self, log_data=True):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_data(self) -> dict:
|
||||
return {"entries": self._telemetry_entries}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def batched_telemetry_messenger(monkeypatch, telemetry_messenger_spy):
|
||||
patch_time(monkeypatch, 0)
|
||||
btm = BatchedTelemetryMessenger(telemetry_messenger_spy, period=0.001)
|
||||
yield btm
|
||||
|
||||
btm.stop()
|
||||
|
||||
|
||||
def test_send_immediately(batched_telemetry_messenger, telemetry_messenger_spy):
|
||||
telem = NonBatchableTelemStub()
|
||||
|
||||
batched_telemetry_messenger.send_telemetry(telem)
|
||||
release_GIL()
|
||||
|
||||
assert len(telemetry_messenger_spy.telemetries) == 1
|
||||
assert telemetry_messenger_spy.telemetries[0] == telem
|
||||
|
||||
|
||||
def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy):
|
||||
expected_data = {"entries": [1, 2]}
|
||||
telem1 = BatchableTelemStub(1)
|
||||
telem2 = BatchableTelemStub(2)
|
||||
|
||||
batched_telemetry_messenger.send_telemetry(telem1)
|
||||
batched_telemetry_messenger.send_telemetry(telem2)
|
||||
release_GIL()
|
||||
|
||||
assert len(telemetry_messenger_spy.telemetries) == 0
|
||||
advance_clock_to_next_period(monkeypatch)
|
||||
release_GIL()
|
||||
|
||||
assert len(telemetry_messenger_spy.telemetries) == 1
|
||||
assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data
|
||||
|
||||
|
||||
def test_send_different_telem_types(
|
||||
monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy
|
||||
):
|
||||
telem1 = BatchableTelemStub(1, "cat1")
|
||||
telem2 = BatchableTelemStub(2, "cat2")
|
||||
|
||||
batched_telemetry_messenger.send_telemetry(telem1)
|
||||
batched_telemetry_messenger.send_telemetry(telem2)
|
||||
release_GIL()
|
||||
|
||||
assert len(telemetry_messenger_spy.telemetries) == 0
|
||||
advance_clock_to_next_period(monkeypatch)
|
||||
release_GIL()
|
||||
|
||||
assert len(telemetry_messenger_spy.telemetries) == 2
|
||||
assert telemetry_messenger_spy.telemetries[0] == telem1
|
||||
assert telemetry_messenger_spy.telemetries[1] == telem2
|
||||
|
||||
|
||||
def test_send_two_batches(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy):
|
||||
telem1 = BatchableTelemStub(1, "cat1")
|
||||
telem2 = BatchableTelemStub(2, "cat1")
|
||||
|
||||
batched_telemetry_messenger.send_telemetry(telem1)
|
||||
advance_clock_to_next_period(monkeypatch)
|
||||
release_GIL()
|
||||
|
||||
batched_telemetry_messenger.send_telemetry(telem2)
|
||||
release_GIL()
|
||||
assert len(telemetry_messenger_spy.telemetries) == 1
|
||||
|
||||
advance_clock_to_next_period(monkeypatch)
|
||||
release_GIL()
|
||||
|
||||
assert len(telemetry_messenger_spy.telemetries) == 2
|
||||
assert telemetry_messenger_spy.telemetries[1] == telem2
|
Loading…
Reference in New Issue