forked from p15670423/monkey
agent: Add BatchedTelemetryMessenger
This telemetry messenger is a decorator that aggregates batchable telemetries and sends them to the island periodically.
This commit is contained in:
parent
691e01e9c1
commit
fadd978050
|
@ -0,0 +1,82 @@
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
|
||||||
|
from infection_monkey.telemetry.i_telem import ITelem
|
||||||
|
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
|
||||||
|
|
||||||
|
WAKES_PER_PERIOD = 4
|
||||||
|
|
||||||
|
|
||||||
|
class BatchedTelemetryMessenger(ITelemetryMessenger):
|
||||||
|
"""
|
||||||
|
An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries
|
||||||
|
and periodically sends them to Monkey Island.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, telemetry_messenger: ITelemetryMessenger, period=5):
|
||||||
|
self._telemetry_messenger = telemetry_messenger
|
||||||
|
self._period = period
|
||||||
|
|
||||||
|
self._run_batch_thread = True
|
||||||
|
self._queue: queue.Queue[ITelem] = queue.Queue()
|
||||||
|
# TODO: Create a "timer" or "countdown" class and inject an object instead of
|
||||||
|
# using time.time()
|
||||||
|
self._last_sent_time = time.time()
|
||||||
|
self._telemetry_batches: Dict[str, IBatchableTelem] = {}
|
||||||
|
|
||||||
|
self._manage_telemetry_batches_thread = threading.Thread(
|
||||||
|
target=self._manage_telemetry_batches
|
||||||
|
)
|
||||||
|
self._manage_telemetry_batches_thread.start()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._run_batch_thread = False
|
||||||
|
self._manage_telemetry_batches_thread.join()
|
||||||
|
|
||||||
|
def send_telemetry(self, telemetry: ITelem):
|
||||||
|
self._queue.put(telemetry)
|
||||||
|
|
||||||
|
def _manage_telemetry_batches(self):
|
||||||
|
self._reset()
|
||||||
|
|
||||||
|
while self._run_batch_thread:
|
||||||
|
try:
|
||||||
|
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD)
|
||||||
|
|
||||||
|
if isinstance(telemetry, IBatchableTelem):
|
||||||
|
self._add_telemetry_to_batch(telemetry)
|
||||||
|
else:
|
||||||
|
self._telemetry_messenger.send_telemetry(telemetry)
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if self._period_elapsed():
|
||||||
|
self._send_telemetry_batches()
|
||||||
|
self._reset()
|
||||||
|
|
||||||
|
self._send_telemetry_batches()
|
||||||
|
|
||||||
|
def _reset(self):
|
||||||
|
self._last_sent_time = time.time()
|
||||||
|
self._telemetry_batches = {}
|
||||||
|
|
||||||
|
def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem):
|
||||||
|
telem_category = new_telemetry.telem_category
|
||||||
|
|
||||||
|
if telem_category in self._telemetry_batches:
|
||||||
|
self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry)
|
||||||
|
else:
|
||||||
|
self._telemetry_batches[telem_category] = new_telemetry
|
||||||
|
|
||||||
|
def _period_elapsed(self):
|
||||||
|
return (time.time() - self._last_sent_time) > self._period
|
||||||
|
|
||||||
|
def _send_telemetry_batches(self):
|
||||||
|
for batchable_telemetry in self._telemetry_batches.values():
|
||||||
|
self._telemetry_messenger.send_telemetry(batchable_telemetry)
|
|
@ -0,0 +1,127 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from infection_monkey.telemetry.base_telem import BaseTelem
|
||||||
|
from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin
|
||||||
|
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
|
||||||
|
from infection_monkey.telemetry.messengers.batched_telemetry_messenger import (
|
||||||
|
BatchedTelemetryMessenger,
|
||||||
|
)
|
||||||
|
|
||||||
|
PERIOD = 0.001
|
||||||
|
|
||||||
|
|
||||||
|
def release_GIL():
|
||||||
|
time.sleep(PERIOD)
|
||||||
|
|
||||||
|
|
||||||
|
def advance_clock_to_next_period(monkeypatch):
|
||||||
|
patch_time(monkeypatch, time.time() + (PERIOD * 1.01))
|
||||||
|
|
||||||
|
|
||||||
|
def patch_time(monkeypatch, new_time: float):
|
||||||
|
monkeypatch.setattr(time, "time", lambda: new_time)
|
||||||
|
|
||||||
|
|
||||||
|
class NonBatchableTelemStub(BaseTelem):
|
||||||
|
telem_category = "NonBatchableTelemStub"
|
||||||
|
|
||||||
|
def send(self, log_data=True):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def get_data(self) -> dict:
|
||||||
|
return {"1": {"i": "a", "ii": "b"}}
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return self.get_data() == other.get_data() and self.telem_category == other.telem_category
|
||||||
|
|
||||||
|
|
||||||
|
class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem):
|
||||||
|
def __init__(self, value, telem_category="cat1"):
|
||||||
|
self._telemetry_entries.append(value)
|
||||||
|
self._telem_category = telem_category
|
||||||
|
|
||||||
|
@property
|
||||||
|
def telem_category(self):
|
||||||
|
return self._telem_category
|
||||||
|
|
||||||
|
def send(self, log_data=True):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def get_data(self) -> dict:
|
||||||
|
return {"entries": self._telemetry_entries}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def batched_telemetry_messenger(monkeypatch, telemetry_messenger_spy):
|
||||||
|
patch_time(monkeypatch, 0)
|
||||||
|
btm = BatchedTelemetryMessenger(telemetry_messenger_spy, period=0.001)
|
||||||
|
yield btm
|
||||||
|
|
||||||
|
btm.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_immediately(batched_telemetry_messenger, telemetry_messenger_spy):
|
||||||
|
telem = NonBatchableTelemStub()
|
||||||
|
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 1
|
||||||
|
assert telemetry_messenger_spy.telemetries[0] == telem
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_telem_batch(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy):
|
||||||
|
expected_data = {"entries": [1, 2]}
|
||||||
|
telem1 = BatchableTelemStub(1)
|
||||||
|
telem2 = BatchableTelemStub(2)
|
||||||
|
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem1)
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem2)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 0
|
||||||
|
advance_clock_to_next_period(monkeypatch)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 1
|
||||||
|
assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_different_telem_types(
|
||||||
|
monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy
|
||||||
|
):
|
||||||
|
telem1 = BatchableTelemStub(1, "cat1")
|
||||||
|
telem2 = BatchableTelemStub(2, "cat2")
|
||||||
|
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem1)
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem2)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 0
|
||||||
|
advance_clock_to_next_period(monkeypatch)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 2
|
||||||
|
assert telemetry_messenger_spy.telemetries[0] == telem1
|
||||||
|
assert telemetry_messenger_spy.telemetries[1] == telem2
|
||||||
|
|
||||||
|
|
||||||
|
def test_send_two_batches(monkeypatch, batched_telemetry_messenger, telemetry_messenger_spy):
|
||||||
|
telem1 = BatchableTelemStub(1, "cat1")
|
||||||
|
telem2 = BatchableTelemStub(2, "cat1")
|
||||||
|
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem1)
|
||||||
|
advance_clock_to_next_period(monkeypatch)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
batched_telemetry_messenger.send_telemetry(telem2)
|
||||||
|
release_GIL()
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 1
|
||||||
|
|
||||||
|
advance_clock_to_next_period(monkeypatch)
|
||||||
|
release_GIL()
|
||||||
|
|
||||||
|
assert len(telemetry_messenger_spy.telemetries) == 2
|
||||||
|
assert telemetry_messenger_spy.telemetries[1] == telem2
|
Loading…
Reference in New Issue