Merge pull request #1272 from guardicore/batchable-telemetry

Batchable telemetry
This commit is contained in:
Mike Salvatore 2021-06-29 10:35:23 -04:00 committed by GitHub
commit d9366a599b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 365 additions and 41 deletions

View File

@ -25,8 +25,11 @@ from infection_monkey.system_singleton import SystemSingleton
from infection_monkey.telemetry.attack.t1106_telem import T1106Telem
from infection_monkey.telemetry.attack.t1107_telem import T1107Telem
from infection_monkey.telemetry.attack.victim_host_telem import VictimHostTelem
from infection_monkey.telemetry.messengers.telemetry_messenger_wrapper import (
TelemetryMessengerWrapper,
from infection_monkey.telemetry.messengers.batching_telemetry_messenger import (
BatchingTelemetryMessenger,
)
from infection_monkey.telemetry.messengers.legacy_telemetry_messenger_adapter import (
LegacyTelemetryMessengerAdapter,
)
from infection_monkey.telemetry.scan_telem import ScanTelem
from infection_monkey.telemetry.state_telem import StateTelem
@ -469,8 +472,12 @@ class InfectionMonkey(object):
@staticmethod
def run_ransomware():
telemetry_messenger = LegacyTelemetryMessengerAdapter()
batching_telemetry_messenger = BatchingTelemetryMessenger(telemetry_messenger)
try:
telemetry_messenger = TelemetryMessengerWrapper()
RansomewarePayload(WormConfiguration.ransomware, telemetry_messenger).run_payload()
RansomewarePayload(
WormConfiguration.ransomware, batching_telemetry_messenger
).run_payload()
except Exception as ex:
LOG.error(f"An unexpected error occurred while running the ransomware payload: {ex}")

View File

@ -0,0 +1,23 @@
from typing import Iterable
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
class BatchableTelemMixin:
"""
Implements the get_telemetry_batch() and add_telemetry_to_batch() methods from the
IBatchableTelem interface using a list.
"""
@property
def _telemetry_entries(self):
if not hasattr(self, "_list"):
self._list = []
return self._list
def get_telemetry_batch(self) -> Iterable:
return self._telemetry_entries
def add_telemetry_to_batch(self, telemetry: IBatchableTelem):
self._telemetry_entries.extend(telemetry.get_telemetry_batch())

View File

@ -0,0 +1,21 @@
from __future__ import annotations
import abc
from typing import Iterable
from infection_monkey.telemetry.i_telem import ITelem
class IBatchableTelem(ITelem, metaclass=abc.ABCMeta):
"""
Extends the ITelem interface and enables telemetries to be aggregated into
batches.
"""
@abc.abstractmethod
def get_telemetry_batch(self) -> Iterable:
pass
@abc.abstractmethod
def add_telemetry_to_batch(self, telemetry: IBatchableTelem):
pass

View File

@ -0,0 +1,108 @@
import queue
import threading
import time
from typing import Dict
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
DEFAULT_PERIOD = 5
WAKES_PER_PERIOD = 4
class BatchingTelemetryMessenger(ITelemetryMessenger):
"""
An ITelemetryMessenger decorator that aggregates IBatchableTelem telemetries
and periodically sends them to Monkey Island.
"""
def __init__(self, telemetry_messenger: ITelemetryMessenger, period=DEFAULT_PERIOD):
self._queue: queue.Queue[ITelem] = queue.Queue()
self._thread = self._BatchingTelemetryMessengerThread(
self._queue, telemetry_messenger, period
)
self._thread.start()
def __del__(self):
self._thread.stop()
def send_telemetry(self, telemetry: ITelem):
self._queue.put(telemetry)
class _BatchingTelemetryMessengerThread:
def __init__(
self, telem_queue: queue.Queue, telemetry_messenger: ITelemetryMessenger, period: int
):
self._queue: queue.Queue[ITelem] = telem_queue
self._telemetry_messenger = telemetry_messenger
self._period = period
self._should_run_batch_thread = True
# TODO: Create a "timer" or "countdown" class and inject an object instead of
# using time.time()
self._last_sent_time = time.time()
self._telemetry_batches: Dict[str, IBatchableTelem] = {}
self._manage_telemetry_batches_thread = None
def start(self):
self._should_run_batch_thread = True
self._manage_telemetry_batches_thread = threading.Thread(
target=self._manage_telemetry_batches
)
self._manage_telemetry_batches_thread.start()
def stop(self):
self._should_run_batch_thread = False
self._manage_telemetry_batches_thread.join()
self._manage_telemetry_batches_thread = None
def _manage_telemetry_batches(self):
self._reset()
while self._should_run_batch_thread:
self._process_next_telemetry()
if self._period_elapsed():
self._send_telemetry_batches()
self._reset()
self._send_remaining_telemetry_batches()
def _reset(self):
self._last_sent_time = time.time()
self._telemetry_batches = {}
def _process_next_telemetry(self):
try:
telemetry = self._queue.get(block=True, timeout=self._period / WAKES_PER_PERIOD)
if isinstance(telemetry, IBatchableTelem):
self._add_telemetry_to_batch(telemetry)
else:
self._telemetry_messenger.send_telemetry(telemetry)
except queue.Empty:
pass
def _add_telemetry_to_batch(self, new_telemetry: IBatchableTelem):
telem_category = new_telemetry.telem_category
if telem_category in self._telemetry_batches:
self._telemetry_batches[telem_category].add_telemetry_to_batch(new_telemetry)
else:
self._telemetry_batches[telem_category] = new_telemetry
def _period_elapsed(self):
return (time.time() - self._last_sent_time) > self._period
def _send_remaining_telemetry_batches(self):
while not self._queue.empty():
self._process_next_telemetry()
self._send_telemetry_batches()
def _send_telemetry_batches(self):
for batchable_telemetry in self._telemetry_batches.values():
self._telemetry_messenger.send_telemetry(batchable_telemetry)

View File

@ -2,6 +2,11 @@ from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
class TelemetryMessengerWrapper(ITelemetryMessenger):
class LegacyTelemetryMessengerAdapter(ITelemetryMessenger):
"""
Provides an adapter between modules that require an ITelemetryMessenger and the
legacy method for sending telemetry.
"""
def send_telemetry(self, telemetry: ITelem):
telemetry.send()

View File

@ -1,11 +1,13 @@
from typing import List, Tuple
from typing import Tuple
from common.common_consts.telem_categories import TelemCategoryEnum
from infection_monkey.telemetry.base_telem import BaseTelem
from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
class RansomwareTelem(BaseTelem):
def __init__(self, attempts: List[Tuple[str, str]]):
class RansomwareTelem(BatchableTelemMixin, IBatchableTelem, BaseTelem):
def __init__(self, entry: Tuple[str, str]):
"""
Ransomware telemetry constructor
:param attempts: List of tuples with each tuple containing the path
@ -14,9 +16,10 @@ class RansomwareTelem(BaseTelem):
containing the directory path and error string.
"""
super().__init__()
self.attempts = attempts
self._telemetry_entries.append(entry)
telem_category = TelemCategoryEnum.RANSOMWARE
def get_data(self):
return {"ransomware_attempts": self.attempts}
return {"ransomware_attempts": self._telemetry_entries}

View File

@ -0,0 +1,17 @@
import pytest
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
class TelemetryMessengerSpy(ITelemetryMessenger):
def __init__(self):
self.telemetries = []
def send_telemetry(self, telemetry: ITelem):
self.telemetries.append(telemetry)
@pytest.fixture
def telemetry_messenger_spy():
return TelemetryMessengerSpy()

View File

@ -23,16 +23,6 @@ from tests.utils import hash_file, is_user_admin
from infection_monkey.ransomware import ransomware_payload as ransomware_payload_module
from infection_monkey.ransomware.ransomware_payload import EXTENSION, RansomewarePayload
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
class TelemetryMessengerSpy(ITelemetryMessenger):
def __init__(self):
self.telemetries = []
def send_telemetry(self, telemetry: ITelem):
self.telemetries.append(telemetry)
def with_extension(filename):
@ -46,11 +36,6 @@ def ransomware_payload_config(ransomware_target):
}
@pytest.fixture
def telemetry_messenger_spy():
return TelemetryMessengerSpy()
@pytest.fixture
def ransomware_payload(ransomware_payload_config, telemetry_messenger_spy):
return RansomewarePayload(ransomware_payload_config, telemetry_messenger_spy)
@ -148,10 +133,10 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
telem_1 = telemetry_messenger_spy.telemetries[0]
telem_2 = telemetry_messenger_spy.telemetries[1]
assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0]
assert telem_1.get_data()["ransomware_attempts"][1] == ""
assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0]
assert telem_2.get_data()["ransomware_attempts"][1] == ""
assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0][0]
assert telem_1.get_data()["ransomware_attempts"][0][1] == ""
assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0][0]
assert telem_2.get_data()["ransomware_attempts"][0][1] == ""
def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_spy):
@ -164,5 +149,5 @@ def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_
ransomware_payload.run_payload()
telem_1 = telemetry_messenger_spy.telemetries[0]
assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0]
assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][1]
assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0][0]
assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][0][1]

View File

@ -0,0 +1,156 @@
import time
from infection_monkey.telemetry.base_telem import BaseTelem
from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
from infection_monkey.telemetry.messengers.batching_telemetry_messenger import (
BatchingTelemetryMessenger,
)
PERIOD = 0.001
def release_GIL():
time.sleep(PERIOD)
def advance_clock_to_next_period(monkeypatch):
patch_time(monkeypatch, time.time() + (PERIOD * 1.01))
def patch_time(monkeypatch, new_time: float):
monkeypatch.setattr(time, "time", lambda: new_time)
class NonBatchableTelemStub(BaseTelem):
telem_category = "NonBatchableTelemStub"
def send(self, log_data=True):
raise NotImplementedError
def get_data(self) -> dict:
return {"1": {"i": "a", "ii": "b"}}
def __eq__(self, other):
return self.get_data() == other.get_data() and self.telem_category == other.telem_category
class BatchableTelemStub(BatchableTelemMixin, BaseTelem, IBatchableTelem):
def __init__(self, value, telem_category="cat1"):
self._telemetry_entries.append(value)
self._telem_category = telem_category
@property
def telem_category(self):
return self._telem_category
def send(self, log_data=True):
raise NotImplementedError
def get_data(self) -> dict:
return {"entries": self._telemetry_entries}
# Note that this function is not a fixture. This is because BatchingTelemetyMessenger
# stops its thread when it is destructed. If this were a fixture, it may live
# past the end of the test, which would allow the in the BatchingTelemetryMessenger
# instance to keep running instead of stopping
def build_batching_telemetry_messenger(monkeypatch, telemetry_messenger_spy):
patch_time(monkeypatch, 0)
return BatchingTelemetryMessenger(telemetry_messenger_spy, period=PERIOD)
def test_send_immediately(monkeypatch, telemetry_messenger_spy):
batching_telemetry_messenger = build_batching_telemetry_messenger(
monkeypatch, telemetry_messenger_spy
)
telem = NonBatchableTelemStub()
batching_telemetry_messenger.send_telemetry(telem)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 1
assert telemetry_messenger_spy.telemetries[0] == telem
def test_send_telem_batch(monkeypatch, telemetry_messenger_spy):
batching_telemetry_messenger = build_batching_telemetry_messenger(
monkeypatch, telemetry_messenger_spy
)
expected_data = {"entries": [1, 2]}
telem1 = BatchableTelemStub(1)
telem2 = BatchableTelemStub(2)
batching_telemetry_messenger.send_telemetry(telem1)
batching_telemetry_messenger.send_telemetry(telem2)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 0
advance_clock_to_next_period(monkeypatch)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 1
assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data
def test_send_different_telem_types(monkeypatch, telemetry_messenger_spy):
batching_telemetry_messenger = build_batching_telemetry_messenger(
monkeypatch, telemetry_messenger_spy
)
telem1 = BatchableTelemStub(1, "cat1")
telem2 = BatchableTelemStub(2, "cat2")
batching_telemetry_messenger.send_telemetry(telem1)
batching_telemetry_messenger.send_telemetry(telem2)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 0
advance_clock_to_next_period(monkeypatch)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 2
assert telemetry_messenger_spy.telemetries[0] == telem1
assert telemetry_messenger_spy.telemetries[1] == telem2
def test_send_two_batches(monkeypatch, telemetry_messenger_spy):
batching_telemetry_messenger = build_batching_telemetry_messenger(
monkeypatch, telemetry_messenger_spy
)
telem1 = BatchableTelemStub(1, "cat1")
telem2 = BatchableTelemStub(2, "cat1")
batching_telemetry_messenger.send_telemetry(telem1)
advance_clock_to_next_period(monkeypatch)
release_GIL()
batching_telemetry_messenger.send_telemetry(telem2)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 1
advance_clock_to_next_period(monkeypatch)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 2
assert telemetry_messenger_spy.telemetries[1] == telem2
def test_send_remaining_telem_after_stop(monkeypatch, telemetry_messenger_spy):
batching_telemetry_messenger = build_batching_telemetry_messenger(
monkeypatch, telemetry_messenger_spy
)
expected_data = {"entries": [1]}
telem = BatchableTelemStub(1)
batching_telemetry_messenger.send_telemetry(telem)
release_GIL()
assert len(telemetry_messenger_spy.telemetries) == 0
del batching_telemetry_messenger
assert len(telemetry_messenger_spy.telemetries) == 1
assert telemetry_messenger_spy.telemetries[0].get_data() == expected_data

View File

@ -1,20 +1,19 @@
import json
import pytest
from infection_monkey.telemetry.ransomware_telem import RansomwareTelem
ATTEMPTS = [("<file>", "<encryption attempt result>")]
ENCRYPTION_ATTEMPTS = [("<file1>", "<encryption attempt result>"), ("<file2>", "")]
@pytest.fixture
def ransomware_telem_test_instance():
return RansomwareTelem(ATTEMPTS)
def test_ransomware_telem_send(spy_send_telemetry):
ransomware_telem_1 = RansomwareTelem(ENCRYPTION_ATTEMPTS[0])
ransomware_telem_2 = RansomwareTelem(ENCRYPTION_ATTEMPTS[1])
ransomware_telem_1.add_telemetry_to_batch(ransomware_telem_2)
ransomware_telem_1.send()
expected_data = {"ransomware_attempts": ENCRYPTION_ATTEMPTS}
expected_data = json.dumps(expected_data, cls=ransomware_telem_1.json_encoder)
def test_ransomware_telem_send(ransomware_telem_test_instance, spy_send_telemetry):
ransomware_telem_test_instance.send()
expected_data = {"ransomware_attempts": ATTEMPTS}
expected_data = json.dumps(expected_data, cls=ransomware_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "ransomware"