agent: Implement IBatchableTelem in RansomwareTelem

This allows encryption attempt telmetries to be batched into one
telemetry object so they can be sent to the island in batches.
This commit is contained in:
Mike Salvatore 2021-06-24 12:19:43 -04:00
parent 8e40e44263
commit a0b43a17a2
3 changed files with 23 additions and 21 deletions

View File

@ -1,11 +1,13 @@
from typing import List, Tuple
from typing import Tuple
from common.common_consts.telem_categories import TelemCategoryEnum
from infection_monkey.telemetry.base_telem import BaseTelem
from infection_monkey.telemetry.batchable_telem_mixin import BatchableTelemMixin
from infection_monkey.telemetry.i_batchable_telem import IBatchableTelem
class RansomwareTelem(BaseTelem):
def __init__(self, attempts: List[Tuple[str, str]]):
class RansomwareTelem(BatchableTelemMixin, IBatchableTelem, BaseTelem):
def __init__(self, entry: Tuple[str, str]):
"""
Ransomware telemetry constructor
:param attempts: List of tuples with each tuple containing the path
@ -14,9 +16,10 @@ class RansomwareTelem(BaseTelem):
containing the directory path and error string.
"""
super().__init__()
self.attempts = attempts
self._telemetry_entries.append(entry)
telem_category = TelemCategoryEnum.RANSOMWARE
def get_data(self):
return {"ransomware_attempts": self.attempts}
return {"ransomware_attempts": self._telemetry_entries}

View File

@ -148,10 +148,10 @@ def test_telemetry_success(ransomware_payload, telemetry_messenger_spy):
telem_1 = telemetry_messenger_spy.telemetries[0]
telem_2 = telemetry_messenger_spy.telemetries[1]
assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0]
assert telem_1.get_data()["ransomware_attempts"][1] == ""
assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0]
assert telem_2.get_data()["ransomware_attempts"][1] == ""
assert ALL_ZEROS_PDF in telem_1.get_data()["ransomware_attempts"][0][0]
assert telem_1.get_data()["ransomware_attempts"][0][1] == ""
assert TEST_KEYBOARD_TXT in telem_2.get_data()["ransomware_attempts"][0][0]
assert telem_2.get_data()["ransomware_attempts"][0][1] == ""
def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_spy):
@ -164,5 +164,5 @@ def test_telemetry_failure(monkeypatch, ransomware_payload, telemetry_messenger_
ransomware_payload.run_payload()
telem_1 = telemetry_messenger_spy.telemetries[0]
assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0]
assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][1]
assert "/file/not/exist" in telem_1.get_data()["ransomware_attempts"][0][0]
assert "No such file or directory" in telem_1.get_data()["ransomware_attempts"][0][1]

View File

@ -1,20 +1,19 @@
import json
import pytest
from infection_monkey.telemetry.ransomware_telem import RansomwareTelem
ATTEMPTS = [("<file>", "<encryption attempt result>")]
ENCRYPTION_ATTEMPTS = [("<file1>", "<encryption attempt result>"), ("<file2>", "")]
@pytest.fixture
def ransomware_telem_test_instance():
return RansomwareTelem(ATTEMPTS)
def test_ransomware_telem_send(spy_send_telemetry):
ransomware_telem_1 = RansomwareTelem(ENCRYPTION_ATTEMPTS[0])
ransomware_telem_2 = RansomwareTelem(ENCRYPTION_ATTEMPTS[1])
ransomware_telem_1.add_telemetry_to_batch(ransomware_telem_2)
ransomware_telem_1.send()
expected_data = {"ransomware_attempts": ENCRYPTION_ATTEMPTS}
expected_data = json.dumps(expected_data, cls=ransomware_telem_1.json_encoder)
def test_ransomware_telem_send(ransomware_telem_test_instance, spy_send_telemetry):
ransomware_telem_test_instance.send()
expected_data = {"ransomware_attempts": ATTEMPTS}
expected_data = json.dumps(expected_data, cls=ransomware_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "ransomware"