From 1462e832b0a21b1248dd28060b6068c9fb021455 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Mon, 12 Sep 2022 18:54:53 +0530 Subject: [PATCH 01/40] Agent: Add subscriber to push all events to the Island --- .../infection_monkey/push_events_to_island.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 monkey/infection_monkey/push_events_to_island.py diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/push_events_to_island.py new file mode 100644 index 000000000..6eae55ae7 --- /dev/null +++ b/monkey/infection_monkey/push_events_to_island.py @@ -0,0 +1,28 @@ +import logging + +import requests + +# TODO: shouldn't leak implementation information; can we do this some other way? +from pubsub import pub + +from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT +from common.events import AbstractAgentEvent + +logger = logging.getLogger(__name__) + + +class push_all_events_to_island: + def __init__(self, server_address: str): + self._server_address = server_address + + def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): + requests.post( # noqa: DUO123 + "https://%s/api/events" % (self._server_address,), + data=self._serialize_event(event, topic.getName()), + headers={"content-type": "application/json"}, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + + def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): + pass From a06c6a622ee720ede430459376499daee27adbe2 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Mon, 12 Sep 2022 18:57:39 +0530 Subject: [PATCH 02/40] Agent: Add log statements in class `push_all_events_to_island`'s `__call__()` --- monkey/infection_monkey/push_events_to_island.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/push_events_to_island.py index 6eae55ae7..8aebacac2 100644 --- a/monkey/infection_monkey/push_events_to_island.py +++ b/monkey/infection_monkey/push_events_to_island.py @@ -16,9 +16,13 @@ class push_all_events_to_island: self._server_address = server_address def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): + topic_name = topic.getName() + + logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") + requests.post( # noqa: DUO123 "https://%s/api/events" % (self._server_address,), - data=self._serialize_event(event, topic.getName()), + data=self._serialize_event(event, topic_name), headers={"content-type": "application/json"}, verify=False, timeout=MEDIUM_REQUEST_TIMEOUT, From e51a717bdf63cfec790ad524c74c2d76e981e01b Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Mon, 12 Sep 2022 19:16:00 +0530 Subject: [PATCH 03/40] Agent: Add a very crude outline of using a queue and threading in `push_all_events_to_island` --- .../infection_monkey/push_events_to_island.py | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/push_events_to_island.py index 8aebacac2..b84d418be 100644 --- a/monkey/infection_monkey/push_events_to_island.py +++ b/monkey/infection_monkey/push_events_to_island.py @@ -1,4 +1,5 @@ import logging +import queue import requests @@ -15,18 +16,40 @@ class push_all_events_to_island: def __init__(self, server_address: str): self._server_address = server_address + self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() + self._send_to_island_thread = self.batch_events_thread(self._queue, self._server_address) + + self._send_to_island_thread.start() + def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): topic_name = topic.getName() - logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") + self._queue.put(self._serialize_event(event, topic_name)) - requests.post( # noqa: DUO123 - "https://%s/api/events" % (self._server_address,), - data=self._serialize_event(event, topic_name), - headers={"content-type": "application/json"}, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, - ) + logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): pass + + class batch_events_thread: + def __init__(self, queue_of_events: queue.Queue, server_address: str): + self._queue = queue_of_events + self._server_address = server_address + + def start(self): + pass + + def _manage_next_event(self): + pass + + def _send_event_to_island(self, serialized_event): + requests.post( # noqa: DUO123 + "https://%s/api/events" % (self._server_address,), + data=serialized_event, + headers={"content-type": "application/json"}, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + + def stop(self): + pass From 5542f67ceb99f0a6e84639485afca966f1e80584 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 12:46:01 +0530 Subject: [PATCH 04/40] Agent: Finish implementing threading/batching in `push_all_events_to_island` --- .../infection_monkey/push_events_to_island.py | 82 +++++++++++++++---- 1 file changed, 65 insertions(+), 17 deletions(-) diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/push_events_to_island.py index b84d418be..8828b2ca4 100644 --- a/monkey/infection_monkey/push_events_to_island.py +++ b/monkey/infection_monkey/push_events_to_island.py @@ -1,5 +1,6 @@ import logging import queue +import threading import requests @@ -8,22 +9,27 @@ from pubsub import pub from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.events import AbstractAgentEvent +from common.utils import Timer logger = logging.getLogger(__name__) +DEFAULT_TIME_PERIOD = 5 +WAKES_PER_PERIOD = 4 + + class push_all_events_to_island: def __init__(self, server_address: str): self._server_address = server_address - self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() - self._send_to_island_thread = self.batch_events_thread(self._queue, self._server_address) + self._send_to_island_thread = self.batch_events_thread( + self._queue, self._server_address, DEFAULT_TIME_PERIOD + ) self._send_to_island_thread.start() def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): topic_name = topic.getName() - self._queue.put(self._serialize_event(event, topic_name)) logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") @@ -32,24 +38,66 @@ class push_all_events_to_island: pass class batch_events_thread: - def __init__(self, queue_of_events: queue.Queue, server_address: str): + def __init__(self, queue_of_events: queue.Queue, server_address: str, time_period: int): self._queue = queue_of_events self._server_address = server_address + self._time_period = time_period + + self._event_batch = set() + self._should_run_batch_thread = True def start(self): - pass - - def _manage_next_event(self): - pass - - def _send_event_to_island(self, serialized_event): - requests.post( # noqa: DUO123 - "https://%s/api/events" % (self._server_address,), - data=serialized_event, - headers={"content-type": "application/json"}, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, + self._should_run_batch_thread = True + self._batch_events_thread = threading.Thread( + name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches ) + self._batch_events_thread.start() + + def _manage_event_batches(self): + timer = Timer() + timer.set(self._time_period) + + self._event_batch = {} + + while self._should_run_batch_thread: + self._add_next_event_to_batch() + + if timer.is_expired(): + self._send_events_to_island() + timer.reset() + self._event_batch = {} + + self._send_remaining_events() + + def _add_next_event_to_batch(self): + try: + event = self._queue.get(block=True, timeout=self._time_period / WAKES_PER_PERIOD) + self._event_batch.add(event) + except queue.Empty: + pass + + def _send_events_to_island(self): + for serialized_event in self._event_batch: + try: + requests.post( # noqa: DUO123 + "https://%s/api/events" % (self._server_address,), + data=serialized_event, + headers={"content-type": "application/json"}, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + except Exception as exc: + logger.warning( + f"Exception caught when connecting to the Island at {self.server_address}" + f": {exc}" + ) + + def _send_remaining_events(self): + while not self._queue.empty(): + self._add_next_event_to_batch() + + self._send_events_to_island() def stop(self): - pass + self._should_run_batch_thread = False + self._batch_events_thread.join() From 9475c86fba8a1dc9a997846298cad7373582961d Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 12:49:54 +0530 Subject: [PATCH 05/40] Agent: Rename `push_all_events_to_island` -> `send_all_events_to_island` for consistency --- monkey/infection_monkey/push_events_to_island.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/push_events_to_island.py index 8828b2ca4..62f7c7ac2 100644 --- a/monkey/infection_monkey/push_events_to_island.py +++ b/monkey/infection_monkey/push_events_to_island.py @@ -18,7 +18,7 @@ DEFAULT_TIME_PERIOD = 5 WAKES_PER_PERIOD = 4 -class push_all_events_to_island: +class send_all_events_to_island: def __init__(self, server_address: str): self._server_address = server_address self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() @@ -32,7 +32,7 @@ class push_all_events_to_island: topic_name = topic.getName() self._queue.put(self._serialize_event(event, topic_name)) - logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") + logger.debug(f"Sending event of type {topic_name} to the Island at {self._server_address}") def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): pass From 6520fe2c901077a8688f32d7c1f5d60df6fb7a93 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 12:51:47 +0530 Subject: [PATCH 06/40] Agent: Rename push_events_to_island.py -> send_all_events_to_island.py for consistency --- .../{push_events_to_island.py => send_all_events_to_island.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename monkey/infection_monkey/{push_events_to_island.py => send_all_events_to_island.py} (100%) diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py similarity index 100% rename from monkey/infection_monkey/push_events_to_island.py rename to monkey/infection_monkey/send_all_events_to_island.py From 8f4aefda743b94d4b7e4555316e604a89337b19e Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 12:57:04 +0530 Subject: [PATCH 07/40] Agent: Rename some variables in `send_all_events_to_island` --- .../send_all_events_to_island.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 62f7c7ac2..8abab42f9 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -23,7 +23,7 @@ class send_all_events_to_island: self._server_address = server_address self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() - self._send_to_island_thread = self.batch_events_thread( + self._send_to_island_thread = self._batch_and_send_events_thread( self._queue, self._server_address, DEFAULT_TIME_PERIOD ) self._send_to_island_thread.start() @@ -37,21 +37,21 @@ class send_all_events_to_island: def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): pass - class batch_events_thread: + class _batch_and_send_events_thread: def __init__(self, queue_of_events: queue.Queue, server_address: str, time_period: int): self._queue = queue_of_events self._server_address = server_address self._time_period = time_period self._event_batch = set() - self._should_run_batch_thread = True + self._should_run_batch_and_send_thread = True def start(self): - self._should_run_batch_thread = True - self._batch_events_thread = threading.Thread( + self._should_run_batch_and_send_thread = True + self._batch_and_send_thread = threading.Thread( name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches ) - self._batch_events_thread.start() + self._batch_and_send_thread.start() def _manage_event_batches(self): timer = Timer() @@ -59,7 +59,7 @@ class send_all_events_to_island: self._event_batch = {} - while self._should_run_batch_thread: + while self._should_run_batch_and_send_thread: self._add_next_event_to_batch() if timer.is_expired(): @@ -99,5 +99,5 @@ class send_all_events_to_island: self._send_events_to_island() def stop(self): - self._should_run_batch_thread = False - self._batch_events_thread.join() + self._should_run_batch_and_send_thread = False + self._batch_and_send_thread.join() From 1abf323b14a5fa430a56d0ab2d48b49a2982f2a3 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 12:58:00 +0530 Subject: [PATCH 08/40] Agent: Add type annotation for `send_all_events_to_island._batch_and_send_events_thread._event_batch` --- monkey/infection_monkey/send_all_events_to_island.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 8abab42f9..f42a18bba 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -1,6 +1,7 @@ import logging import queue import threading +from typing import Set import requests @@ -43,7 +44,7 @@ class send_all_events_to_island: self._server_address = server_address self._time_period = time_period - self._event_batch = set() + self._event_batch: Set = set() self._should_run_batch_and_send_thread = True def start(self): From de68f21fdfbc7ba7757beeee85df71e393edb660 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 13:38:27 +0530 Subject: [PATCH 09/40] Agent: Subscribe `send_all_events_to_island` to all events --- monkey/infection_monkey/monkey.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 8a5c4e26b..bca1d2516 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -67,6 +67,7 @@ from infection_monkey.post_breach.actions.use_signed_scripts import SignedScript from infection_monkey.post_breach.actions.use_trap_command import TrapCommand from infection_monkey.post_breach.custom_pba import CustomPBA from infection_monkey.puppet.puppet import Puppet +from infection_monkey.send_all_events_to_island import send_all_events_to_island from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem @@ -219,7 +220,9 @@ class InfectionMonkey: ) event_queue = PyPubSubAgentEventQueue(Publisher()) - InfectionMonkey._subscribe_events(event_queue, propagation_credentials_repository) + InfectionMonkey._subscribe_events( + event_queue, propagation_credentials_repository, self._control_client.server_address + ) puppet = self._build_puppet(propagation_credentials_repository, event_queue) @@ -243,6 +246,7 @@ class InfectionMonkey: def _subscribe_events( event_queue: IAgentEventQueue, propagation_credentials_repository: IPropagationCredentialsRepository, + server_address: str, ): event_queue.subscribe_type( CredentialsStolenEvent, @@ -250,6 +254,7 @@ class InfectionMonkey: propagation_credentials_repository ), ) + event_queue.subscribe_all_events(send_all_events_to_island(server_address)) @staticmethod def _get_local_network_interfaces() -> List[IPv4Interface]: From 39f8bafd46360d9f8e46ad15c3ad426e10b993b5 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 17:08:49 +0530 Subject: [PATCH 10/40] Agent: Add docstrings in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index f42a18bba..77e19ef59 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -20,6 +20,10 @@ WAKES_PER_PERIOD = 4 class send_all_events_to_island: + """ + Sends information about the events carried out by the Agent to the Island in batches + """ + def __init__(self, server_address: str): self._server_address = server_address self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() @@ -39,6 +43,10 @@ class send_all_events_to_island: pass class _batch_and_send_events_thread: + """ + Handles the batching and sending of the Agent's events to the Island + """ + def __init__(self, queue_of_events: queue.Queue, server_address: str, time_period: int): self._queue = queue_of_events self._server_address = server_address From 239b2e2550173526b6d834dc89d62c29ee1afaf6 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 17:51:10 +0530 Subject: [PATCH 11/40] Agent: Extract API URL to variable EVENTS_API_URL in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 77e19ef59..35f24892c 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) DEFAULT_TIME_PERIOD = 5 WAKES_PER_PERIOD = 4 +EVENTS_API_URL = "https://%s/api/events" class send_all_events_to_island: @@ -89,7 +90,7 @@ class send_all_events_to_island: for serialized_event in self._event_batch: try: requests.post( # noqa: DUO123 - "https://%s/api/events" % (self._server_address,), + EVENTS_API_URL % (self._server_address,), data=serialized_event, headers={"content-type": "application/json"}, verify=False, From 1d60cb160b03fc9ed463399f3d6020e0cb6b4f03 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 18:51:25 +0530 Subject: [PATCH 12/40] Agent: Clear event batches set and use correct variable name in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 35f24892c..793b59501 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -67,7 +67,7 @@ class send_all_events_to_island: timer = Timer() timer.set(self._time_period) - self._event_batch = {} + self._event_batch.clear() while self._should_run_batch_and_send_thread: self._add_next_event_to_batch() @@ -75,7 +75,7 @@ class send_all_events_to_island: if timer.is_expired(): self._send_events_to_island() timer.reset() - self._event_batch = {} + self._event_batch.clear() self._send_remaining_events() @@ -98,7 +98,7 @@ class send_all_events_to_island: ) except Exception as exc: logger.warning( - f"Exception caught when connecting to the Island at {self.server_address}" + f"Exception caught when connecting to the Island at {self._server_address}" f": {exc}" ) From 83c7f1085994de9423cea0f6778e84668676e8d7 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 19:04:44 +0530 Subject: [PATCH 13/40] Agent: Simplify batching and sending logic in send_all_events_to_island.py --- .../send_all_events_to_island.py | 60 ++++++++----------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 793b59501..b895cee72 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -1,6 +1,7 @@ import logging import queue import threading +from time import sleep from typing import Set import requests @@ -10,7 +11,6 @@ from pubsub import pub from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.events import AbstractAgentEvent -from common.utils import Timer logger = logging.getLogger(__name__) @@ -64,48 +64,36 @@ class send_all_events_to_island: self._batch_and_send_thread.start() def _manage_event_batches(self): - timer = Timer() - timer.set(self._time_period) - - self._event_batch.clear() - while self._should_run_batch_and_send_thread: - self._add_next_event_to_batch() - - if timer.is_expired(): - self._send_events_to_island() - timer.reset() - self._event_batch.clear() + self._send_events_to_island() + sleep(DEFAULT_TIME_PERIOD) self._send_remaining_events() - def _add_next_event_to_batch(self): - try: - event = self._queue.get(block=True, timeout=self._time_period / WAKES_PER_PERIOD) - self._event_batch.add(event) - except queue.Empty: - pass - def _send_events_to_island(self): - for serialized_event in self._event_batch: - try: - requests.post( # noqa: DUO123 - EVENTS_API_URL % (self._server_address,), - data=serialized_event, - headers={"content-type": "application/json"}, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, - ) - except Exception as exc: - logger.warning( - f"Exception caught when connecting to the Island at {self._server_address}" - f": {exc}" - ) + if self._queue.empty(): + return + + events = [] + + while not self._queue.empty(): + events.append(self._queue.get(block=False)) + + try: + requests.post( # noqa: DUO123 + EVENTS_API_URL % (self._server_address,), + data=events, + headers={"content-type": "application/json"}, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + except Exception as exc: + logger.warning( + f"Exception caught when connecting to the Island at {self._server_address}" + f": {exc}" + ) def _send_remaining_events(self): - while not self._queue.empty(): - self._add_next_event_to_batch() - self._send_events_to_island() def stop(self): From 5ecc3e992ae07c02d54730c89a78191fe9a2d2a4 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 19:28:48 +0530 Subject: [PATCH 14/40] Agent: Remove send_all_events_to_island's dependency on pubsub --- .../infection_monkey/send_all_events_to_island.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index b895cee72..90d009b37 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -6,9 +6,6 @@ from typing import Set import requests -# TODO: shouldn't leak implementation information; can we do this some other way? -from pubsub import pub - from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.events import AbstractAgentEvent @@ -34,13 +31,13 @@ class send_all_events_to_island: ) self._send_to_island_thread.start() - def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): - topic_name = topic.getName() - self._queue.put(self._serialize_event(event, topic_name)) + def __call__(self, event: AbstractAgentEvent): + self._queue.put(self._serialize_event(event)) + logger.debug( + f"Sending event of type {type(event).__name__} to the Island at {self._server_address}" + ) - logger.debug(f"Sending event of type {topic_name} to the Island at {self._server_address}") - - def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): + def _serialize_event(self, event: AbstractAgentEvent): pass class _batch_and_send_events_thread: From 39a7ae0964b589df1a81cff612c82109dcbc924c Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 19:32:52 +0530 Subject: [PATCH 15/40] Agent: Rename DEFAULT_TIME_PERIOD -> DEFAULT_TIME_PERIOD_SECONDS in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 90d009b37..ecd895464 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -12,8 +12,8 @@ from common.events import AbstractAgentEvent logger = logging.getLogger(__name__) -DEFAULT_TIME_PERIOD = 5 WAKES_PER_PERIOD = 4 +DEFAULT_TIME_PERIOD_SECONDS = 5 EVENTS_API_URL = "https://%s/api/events" @@ -27,7 +27,7 @@ class send_all_events_to_island: self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() self._send_to_island_thread = self._batch_and_send_events_thread( - self._queue, self._server_address, DEFAULT_TIME_PERIOD + self._queue, self._server_address, DEFAULT_TIME_PERIOD_SECONDS ) self._send_to_island_thread.start() @@ -63,7 +63,7 @@ class send_all_events_to_island: def _manage_event_batches(self): while self._should_run_batch_and_send_thread: self._send_events_to_island() - sleep(DEFAULT_TIME_PERIOD) + sleep(self._time_period) self._send_remaining_events() From 4381716e549c7b4af8bed50fee7ca5f37373bb74 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 19:33:38 +0530 Subject: [PATCH 16/40] Agent: Remove unused variables in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index ecd895464..4e03a5239 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -2,7 +2,6 @@ import logging import queue import threading from time import sleep -from typing import Set import requests @@ -12,7 +11,6 @@ from common.events import AbstractAgentEvent logger = logging.getLogger(__name__) -WAKES_PER_PERIOD = 4 DEFAULT_TIME_PERIOD_SECONDS = 5 EVENTS_API_URL = "https://%s/api/events" @@ -50,7 +48,6 @@ class send_all_events_to_island: self._server_address = server_address self._time_period = time_period - self._event_batch: Set = set() self._should_run_batch_and_send_thread = True def start(self): From a561195508ec617266b5b13c78f8fbac254c9ff8 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 13 Sep 2022 19:35:44 +0530 Subject: [PATCH 17/40] Agent: Use `json` instead of `data` in POST request in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 4e03a5239..cd3c449c1 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -76,8 +76,7 @@ class send_all_events_to_island: try: requests.post( # noqa: DUO123 EVENTS_API_URL % (self._server_address,), - data=events, - headers={"content-type": "application/json"}, + json=events, verify=False, timeout=MEDIUM_REQUEST_TIMEOUT, ) From 5152b9a3cc926bf5ae6e2f4bcb9fce53f264e900 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 12:29:07 +0530 Subject: [PATCH 18/40] Agent: Use threading.Event instead of flag in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index cd3c449c1..a7260b495 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -48,17 +48,16 @@ class send_all_events_to_island: self._server_address = server_address self._time_period = time_period - self._should_run_batch_and_send_thread = True + self._stop_batch_and_send_thread = threading.Event() def start(self): - self._should_run_batch_and_send_thread = True self._batch_and_send_thread = threading.Thread( name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches ) self._batch_and_send_thread.start() def _manage_event_batches(self): - while self._should_run_batch_and_send_thread: + while not self._stop_batch_and_send_thread.is_set(): self._send_events_to_island() sleep(self._time_period) @@ -90,5 +89,5 @@ class send_all_events_to_island: self._send_events_to_island() def stop(self): - self._should_run_batch_and_send_thread = False + self._stop_batch_and_send_thread.set() self._batch_and_send_thread.join() From 1ad13db758da9ca75d53a88733cab64d29840423 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 13:33:34 +0530 Subject: [PATCH 19/40] Agent: Rename send_all_events_to_island -> SendAllAgentEventsToIsland --- monkey/infection_monkey/monkey.py | 4 ++-- monkey/infection_monkey/send_all_events_to_island.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index bca1d2516..708d92a59 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -67,7 +67,7 @@ from infection_monkey.post_breach.actions.use_signed_scripts import SignedScript from infection_monkey.post_breach.actions.use_trap_command import TrapCommand from infection_monkey.post_breach.custom_pba import CustomPBA from infection_monkey.puppet.puppet import Puppet -from infection_monkey.send_all_events_to_island import send_all_events_to_island +from infection_monkey.SendAllAgentEventsToIsland import SendAllAgentEventsToIsland from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem @@ -254,7 +254,7 @@ class InfectionMonkey: propagation_credentials_repository ), ) - event_queue.subscribe_all_events(send_all_events_to_island(server_address)) + event_queue.subscribe_all_events(SendAllAgentEventsToIsland(server_address)) @staticmethod def _get_local_network_interfaces() -> List[IPv4Interface]: diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index a7260b495..8b7d7614f 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -15,7 +15,7 @@ DEFAULT_TIME_PERIOD_SECONDS = 5 EVENTS_API_URL = "https://%s/api/events" -class send_all_events_to_island: +class SendAllAgentEventsToIsland: """ Sends information about the events carried out by the Agent to the Island in batches """ From 8256322a2959a507a81a60a62b6cc427d7200a75 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 13:34:34 +0530 Subject: [PATCH 20/40] Agent: Rename _batch_and_send_events_thread -> _BatchAndSendEventsThread --- monkey/infection_monkey/send_all_events_to_island.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 8b7d7614f..7d3e04280 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -24,7 +24,7 @@ class SendAllAgentEventsToIsland: self._server_address = server_address self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() - self._send_to_island_thread = self._batch_and_send_events_thread( + self._send_to_island_thread = self._BatchAndSendEventsThread( self._queue, self._server_address, DEFAULT_TIME_PERIOD_SECONDS ) self._send_to_island_thread.start() @@ -38,7 +38,7 @@ class SendAllAgentEventsToIsland: def _serialize_event(self, event: AbstractAgentEvent): pass - class _batch_and_send_events_thread: + class _BatchAndSendEventsThread: """ Handles the batching and sending of the Agent's events to the Island """ From 404f6d954f494fdd1c27a8ded6463514791f5444 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 13:46:45 +0530 Subject: [PATCH 21/40] Agent: Replace SendAllAgentEventsToIsland's __call__() with send_event() --- monkey/infection_monkey/monkey.py | 2 +- monkey/infection_monkey/send_all_events_to_island.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 708d92a59..f621dc3ec 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -254,7 +254,7 @@ class InfectionMonkey: propagation_credentials_repository ), ) - event_queue.subscribe_all_events(SendAllAgentEventsToIsland(server_address)) + event_queue.subscribe_all_events(SendAllAgentEventsToIsland(server_address).send_event) @staticmethod def _get_local_network_interfaces() -> List[IPv4Interface]: diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 7d3e04280..668f508e0 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -29,7 +29,7 @@ class SendAllAgentEventsToIsland: ) self._send_to_island_thread.start() - def __call__(self, event: AbstractAgentEvent): + def send_event(self, event: AbstractAgentEvent): self._queue.put(self._serialize_event(event)) logger.debug( f"Sending event of type {type(event).__name__} to the Island at {self._server_address}" From fe63b28ee1d43cf05b9c4bfc142a909508e31bc9 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 14:31:55 +0530 Subject: [PATCH 22/40] Agent: Restructure SendAllAgentEventsToIsland and _AgentEventsToIslandSender --- .../send_all_events_to_island.py | 108 ++++++++++-------- 1 file changed, 58 insertions(+), 50 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 668f508e0..8b20c36b7 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -22,72 +22,80 @@ class SendAllAgentEventsToIsland: def __init__(self, server_address: str): self._server_address = server_address - self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() - self._send_to_island_thread = self._BatchAndSendEventsThread( - self._queue, self._server_address, DEFAULT_TIME_PERIOD_SECONDS + self._agent_events_to_island_sender = _AgentEventsToIslandSender( + self._server_address, DEFAULT_TIME_PERIOD_SECONDS ) - self._send_to_island_thread.start() + self._agent_events_to_island_sender.start() + + def __del__(self): + self._agent_events_to_island_sender.stop() def send_event(self, event: AbstractAgentEvent): - self._queue.put(self._serialize_event(event)) + serialized_event = self._serialize_event(event) + self._agent_events_to_island_sender.add_event_to_queue(serialized_event) logger.debug( f"Sending event of type {type(event).__name__} to the Island at {self._server_address}" ) def _serialize_event(self, event: AbstractAgentEvent): + # get serializer from registry and serialize pass - class _BatchAndSendEventsThread: - """ - Handles the batching and sending of the Agent's events to the Island - """ - def __init__(self, queue_of_events: queue.Queue, server_address: str, time_period: int): - self._queue = queue_of_events - self._server_address = server_address - self._time_period = time_period +class _AgentEventsToIslandSender: + """ + Handles the batching and sending of the Agent's events to the Island + """ - self._stop_batch_and_send_thread = threading.Event() + def __init__(self, server_address: str, time_period: int): + self._server_address = server_address + self._time_period = time_period - def start(self): - self._batch_and_send_thread = threading.Thread( - name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches - ) - self._batch_and_send_thread.start() + self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() + self._stop_batch_and_send_thread = threading.Event() - def _manage_event_batches(self): - while not self._stop_batch_and_send_thread.is_set(): - self._send_events_to_island() - sleep(self._time_period) + def start(self): + self._batch_and_send_thread = threading.Thread( + name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches + ) + self._batch_and_send_thread.start() - self._send_remaining_events() + def add_event_to_queue(self, serialized_event: AbstractAgentEvent): + self._queue.put(serialized_event) - def _send_events_to_island(self): - if self._queue.empty(): - return - - events = [] - - while not self._queue.empty(): - events.append(self._queue.get(block=False)) - - try: - requests.post( # noqa: DUO123 - EVENTS_API_URL % (self._server_address,), - json=events, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, - ) - except Exception as exc: - logger.warning( - f"Exception caught when connecting to the Island at {self._server_address}" - f": {exc}" - ) - - def _send_remaining_events(self): + def _manage_event_batches(self): + while not self._stop_batch_and_send_thread.is_set(): self._send_events_to_island() + sleep(self._time_period) - def stop(self): - self._stop_batch_and_send_thread.set() - self._batch_and_send_thread.join() + self._send_remaining_events() + + def _send_events_to_island(self): + if self._queue.empty(): + return + + events = [] + + while not self._queue.empty(): + events.append(self._queue.get(block=False)) + + try: + requests.post( # noqa: DUO123 + EVENTS_API_URL % (self._server_address,), + json=events, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + except Exception as exc: + logger.warning( + f"Exception caught when connecting to the Island at {self._server_address}" + f": {exc}" + ) + + def _send_remaining_events(self): + self._send_events_to_island() + + def stop(self): + self._stop_batch_and_send_thread.set() + self._batch_and_send_thread.join() From 37c4362b6033855ea58adebb601b1bf68bc9736e Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 15:43:38 +0530 Subject: [PATCH 23/40] Agent: Add default argument and type hint in send_all_events_to_island.py --- monkey/infection_monkey/send_all_events_to_island.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 8b20c36b7..4ae418d3f 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -6,6 +6,7 @@ from time import sleep import requests from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT +from common.event_serializers.i_event_serializer import JSONSerializable from common.events import AbstractAgentEvent logger = logging.getLogger(__name__) @@ -23,9 +24,7 @@ class SendAllAgentEventsToIsland: def __init__(self, server_address: str): self._server_address = server_address - self._agent_events_to_island_sender = _AgentEventsToIslandSender( - self._server_address, DEFAULT_TIME_PERIOD_SECONDS - ) + self._agent_events_to_island_sender = AgentEventsToIslandSender(self._server_address) self._agent_events_to_island_sender.start() def __del__(self): @@ -43,12 +42,12 @@ class SendAllAgentEventsToIsland: pass -class _AgentEventsToIslandSender: +class AgentEventsToIslandSender: """ Handles the batching and sending of the Agent's events to the Island """ - def __init__(self, server_address: str, time_period: int): + def __init__(self, server_address: str, time_period: int = DEFAULT_TIME_PERIOD_SECONDS): self._server_address = server_address self._time_period = time_period @@ -61,7 +60,7 @@ class _AgentEventsToIslandSender: ) self._batch_and_send_thread.start() - def add_event_to_queue(self, serialized_event: AbstractAgentEvent): + def add_event_to_queue(self, serialized_event: JSONSerializable): self._queue.put(serialized_event) def _manage_event_batches(self): From aeda96db0615aadbffe980687bf146f36af031c1 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 15:45:40 +0530 Subject: [PATCH 24/40] UT: Add test for AgentEventsToIslandSender --- .../test_send_all_events_to_island.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py diff --git a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py new file mode 100644 index 000000000..0569015c4 --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py @@ -0,0 +1,33 @@ +import time + +import pytest +import requests_mock + +from infection_monkey.send_all_events_to_island import AgentEventsToIslandSender +from monkey.infection_monkey.send_all_events_to_island import EVENTS_API_URL + +SERVER = "1.1.1.1:9999" + + +@pytest.fixture +def event_sender(): + return AgentEventsToIslandSender(SERVER) + + +# @pytest.mark.skipif(os.name != "posix", reason="This test is racey on Windows") +def test_send_events(event_sender): + with requests_mock.Mocker() as mock: + mock.post(EVENTS_API_URL % SERVER) + + event_sender.start() + + for _ in range(5): + event_sender.add_event_to_queue({}) + time.sleep(1) + assert mock.call_count == 5 + + event_sender.add_event_to_queue({}) + time.sleep(1) + assert mock.call_count == 6 + + event_sender.stop() From bb8b4742af02f2ebba6f477013dad18f6a3bd637 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 16:00:56 +0530 Subject: [PATCH 25/40] Agent: Implement event serializer logic for SendAllAgentEventsToIsland --- monkey/infection_monkey/monkey.py | 12 +++++++++--- monkey/infection_monkey/send_all_events_to_island.py | 10 +++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index f621dc3ec..bfdbc5004 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -177,7 +177,7 @@ class InfectionMonkey: if firewall.is_enabled(): firewall.add_firewall_rule() - _ = self._setup_agent_event_serializers() + self._agent_event_serializer_registry = self._setup_agent_event_serializers() control_channel = ControlChannel( self._control_client.server_address, GUID, self._control_client.proxies @@ -221,7 +221,10 @@ class InfectionMonkey: event_queue = PyPubSubAgentEventQueue(Publisher()) InfectionMonkey._subscribe_events( - event_queue, propagation_credentials_repository, self._control_client.server_address + event_queue, + propagation_credentials_repository, + self._control_client.server_address, + self._agent_event_serializer_registry, ) puppet = self._build_puppet(propagation_credentials_repository, event_queue) @@ -247,6 +250,7 @@ class InfectionMonkey: event_queue: IAgentEventQueue, propagation_credentials_repository: IPropagationCredentialsRepository, server_address: str, + agent_event_serializer_registry: EventSerializerRegistry, ): event_queue.subscribe_type( CredentialsStolenEvent, @@ -254,7 +258,9 @@ class InfectionMonkey: propagation_credentials_repository ), ) - event_queue.subscribe_all_events(SendAllAgentEventsToIsland(server_address).send_event) + event_queue.subscribe_all_events( + SendAllAgentEventsToIsland(server_address, agent_event_serializer_registry).send_event + ) @staticmethod def _get_local_network_interfaces() -> List[IPv4Interface]: diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 4ae418d3f..3bdb08db9 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -6,6 +6,7 @@ from time import sleep import requests from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT +from common.event_serializers import EventSerializerRegistry from common.event_serializers.i_event_serializer import JSONSerializable from common.events import AbstractAgentEvent @@ -21,8 +22,11 @@ class SendAllAgentEventsToIsland: Sends information about the events carried out by the Agent to the Island in batches """ - def __init__(self, server_address: str): + def __init__( + self, server_address: str, agent_event_serializer_registry: EventSerializerRegistry + ): self._server_address = server_address + self._agent_event_serializer_registry = agent_event_serializer_registry self._agent_events_to_island_sender = AgentEventsToIslandSender(self._server_address) self._agent_events_to_island_sender.start() @@ -38,8 +42,8 @@ class SendAllAgentEventsToIsland: ) def _serialize_event(self, event: AbstractAgentEvent): - # get serializer from registry and serialize - pass + serializer = self._agent_event_serializer_registry[event.__class__] + return serializer.serialize(event) class AgentEventsToIslandSender: From c72d34b7ad886ebe443f821a419c3e190fb19c11 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 16:12:39 +0530 Subject: [PATCH 26/40] Agent: Fix import in monkey.py --- monkey/infection_monkey/monkey.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index bfdbc5004..f4dd2cc4a 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -67,7 +67,7 @@ from infection_monkey.post_breach.actions.use_signed_scripts import SignedScript from infection_monkey.post_breach.actions.use_trap_command import TrapCommand from infection_monkey.post_breach.custom_pba import CustomPBA from infection_monkey.puppet.puppet import Puppet -from infection_monkey.SendAllAgentEventsToIsland import SendAllAgentEventsToIsland +from infection_monkey.send_all_events_to_island import SendAllAgentEventsToIsland from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem From 97a9f2d156af888215684d44c3c72845021d5fae Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 16:14:53 +0530 Subject: [PATCH 27/40] UT: Fix import in test_send_all_events_to_island.py --- .../infection_monkey/test_send_all_events_to_island.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py index 0569015c4..dc05d6b86 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py +++ b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py @@ -3,8 +3,7 @@ import time import pytest import requests_mock -from infection_monkey.send_all_events_to_island import AgentEventsToIslandSender -from monkey.infection_monkey.send_all_events_to_island import EVENTS_API_URL +from infection_monkey.send_all_events_to_island import EVENTS_API_URL, AgentEventsToIslandSender SERVER = "1.1.1.1:9999" From 17e9c0018044db53f2c427adbc5e75c4ae864ccc Mon Sep 17 00:00:00 2001 From: Ilija Lazoroski Date: Wed, 14 Sep 2022 15:09:00 +0200 Subject: [PATCH 28/40] UT: Add another test for AgentEventsToIslandSender --- .../test_send_all_events_to_island.py | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py index dc05d6b86..21c21b2a3 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py +++ b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py @@ -10,10 +10,9 @@ SERVER = "1.1.1.1:9999" @pytest.fixture def event_sender(): - return AgentEventsToIslandSender(SERVER) + return AgentEventsToIslandSender(SERVER, time_period=0.001) -# @pytest.mark.skipif(os.name != "posix", reason="This test is racey on Windows") def test_send_events(event_sender): with requests_mock.Mocker() as mock: mock.post(EVENTS_API_URL % SERVER) @@ -22,11 +21,27 @@ def test_send_events(event_sender): for _ in range(5): event_sender.add_event_to_queue({}) - time.sleep(1) - assert mock.call_count == 5 + time.sleep(0.1) + assert mock.call_count == 1 event_sender.add_event_to_queue({}) - time.sleep(1) - assert mock.call_count == 6 + time.sleep(0.1) + assert mock.call_count == 2 event_sender.stop() + + +def test_send_remaining_events(event_sender): + with requests_mock.Mocker() as mock: + mock.post(EVENTS_API_URL % SERVER) + + event_sender.start() + + for _ in range(5): + event_sender.add_event_to_queue({}) + time.sleep(0.1) + assert mock.call_count == 1 + + event_sender.add_event_to_queue({}) + event_sender.stop() + assert mock.call_count == 2 From 4eabf6e77b5bc27b9201bb4c38767ddacd675a15 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:45:12 +0530 Subject: [PATCH 29/40] Agent: Use create_daemon_thread() instead of threading.Thread() in AgentEventsToIslandSender --- monkey/infection_monkey/send_all_events_to_island.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 3bdb08db9..479e5bbe5 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -9,6 +9,7 @@ from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.event_serializers import EventSerializerRegistry from common.event_serializers.i_event_serializer import JSONSerializable from common.events import AbstractAgentEvent +from infection_monkey.utils.threading import create_daemon_thread logger = logging.getLogger(__name__) @@ -59,8 +60,8 @@ class AgentEventsToIslandSender: self._stop_batch_and_send_thread = threading.Event() def start(self): - self._batch_and_send_thread = threading.Thread( - name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches + self._batch_and_send_thread = create_daemon_thread( + target=self._manage_event_batches, name="SendEventsToIslandInBatchesThread" ) self._batch_and_send_thread.start() From 0775beda7a5c4f246e3d3701a3defef97dc00f9c Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:47:53 +0530 Subject: [PATCH 30/40] Agent: Rename SendAllAgentEventsToIsland -> AgentEventForwarder --- monkey/infection_monkey/monkey.py | 4 ++-- monkey/infection_monkey/send_all_events_to_island.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index f4dd2cc4a..3e6f0ce5e 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -67,7 +67,7 @@ from infection_monkey.post_breach.actions.use_signed_scripts import SignedScript from infection_monkey.post_breach.actions.use_trap_command import TrapCommand from infection_monkey.post_breach.custom_pba import CustomPBA from infection_monkey.puppet.puppet import Puppet -from infection_monkey.send_all_events_to_island import SendAllAgentEventsToIsland +from infection_monkey.send_all_events_to_island import AgentEventForwarder from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem @@ -259,7 +259,7 @@ class InfectionMonkey: ), ) event_queue.subscribe_all_events( - SendAllAgentEventsToIsland(server_address, agent_event_serializer_registry).send_event + AgentEventForwarder(server_address, agent_event_serializer_registry).send_event ) @staticmethod diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 479e5bbe5..8118c3a27 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -18,7 +18,7 @@ DEFAULT_TIME_PERIOD_SECONDS = 5 EVENTS_API_URL = "https://%s/api/events" -class SendAllAgentEventsToIsland: +class AgentEventForwarder: """ Sends information about the events carried out by the Agent to the Island in batches """ From 04b1caeb4ca83caedab20719b906c04d84e00d59 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:49:35 +0530 Subject: [PATCH 31/40] Agent: Rename send_all_events_to_island.py -> agent_event_forwarder.py --- .../{send_all_events_to_island.py => agent_event_forwarder.py} | 0 monkey/infection_monkey/monkey.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename monkey/infection_monkey/{send_all_events_to_island.py => agent_event_forwarder.py} (100%) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/agent_event_forwarder.py similarity index 100% rename from monkey/infection_monkey/send_all_events_to_island.py rename to monkey/infection_monkey/agent_event_forwarder.py diff --git a/monkey/infection_monkey/monkey.py b/monkey/infection_monkey/monkey.py index 3e6f0ce5e..e1ed8792e 100644 --- a/monkey/infection_monkey/monkey.py +++ b/monkey/infection_monkey/monkey.py @@ -20,6 +20,7 @@ from common.network.network_utils import address_to_ip_port from common.utils.argparse_types import positive_int from common.utils.attack_utils import ScanStatus, UsageEnum from common.version import get_version +from infection_monkey.agent_event_forwarder import AgentEventForwarder from infection_monkey.config import GUID from infection_monkey.control import ControlClient from infection_monkey.credential_collectors import ( @@ -67,7 +68,6 @@ from infection_monkey.post_breach.actions.use_signed_scripts import SignedScript from infection_monkey.post_breach.actions.use_trap_command import TrapCommand from infection_monkey.post_breach.custom_pba import CustomPBA from infection_monkey.puppet.puppet import Puppet -from infection_monkey.send_all_events_to_island import AgentEventForwarder from infection_monkey.system_singleton import SystemSingleton from infection_monkey.telemetry.attack.t1106_telem import T1106Telem from infection_monkey.telemetry.attack.t1107_telem import T1107Telem From 984ce2e8bc6d6479f06fa238cd054c913d0ddc40 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:50:33 +0530 Subject: [PATCH 32/40] UT: Use agent_event_forwarder instead send_all_events_to_island in import --- .../infection_monkey/test_send_all_events_to_island.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py index 21c21b2a3..0056a0052 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py +++ b/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py @@ -3,7 +3,7 @@ import time import pytest import requests_mock -from infection_monkey.send_all_events_to_island import EVENTS_API_URL, AgentEventsToIslandSender +from infection_monkey.agent_event_forwarder import EVENTS_API_URL, AgentEventsToIslandSender SERVER = "1.1.1.1:9999" From 83a828ada5cd1c39c8235336f8f4eb73adecbbc2 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:51:15 +0530 Subject: [PATCH 33/40] UT: Rename test_send_all_events_to_island.py -> test_agent_event_forwarder.py --- ...send_all_events_to_island.py => test_agent_event_forwarder.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename monkey/tests/unit_tests/infection_monkey/{test_send_all_events_to_island.py => test_agent_event_forwarder.py} (100%) diff --git a/monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py similarity index 100% rename from monkey/tests/unit_tests/infection_monkey/test_send_all_events_to_island.py rename to monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py From c24fd9ddab30bef49428bbe7d8f545245e9fde9b Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:52:10 +0530 Subject: [PATCH 34/40] Agent: Rename AgentEventsToIslandSender -> BatchingAgentEventForwarder --- monkey/infection_monkey/agent_event_forwarder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/infection_monkey/agent_event_forwarder.py b/monkey/infection_monkey/agent_event_forwarder.py index 8118c3a27..55733601b 100644 --- a/monkey/infection_monkey/agent_event_forwarder.py +++ b/monkey/infection_monkey/agent_event_forwarder.py @@ -29,7 +29,7 @@ class AgentEventForwarder: self._server_address = server_address self._agent_event_serializer_registry = agent_event_serializer_registry - self._agent_events_to_island_sender = AgentEventsToIslandSender(self._server_address) + self._agent_events_to_island_sender = BatchingAgentEventForwarder(self._server_address) self._agent_events_to_island_sender.start() def __del__(self): @@ -47,7 +47,7 @@ class AgentEventForwarder: return serializer.serialize(event) -class AgentEventsToIslandSender: +class BatchingAgentEventForwarder: """ Handles the batching and sending of the Agent's events to the Island """ From 7fed97530fab311990b54c68726775a6e0928cda Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:52:33 +0530 Subject: [PATCH 35/40] UT: Use BatchingAgentEventForwarder instead of AgentEventsToIslandSender --- .../unit_tests/infection_monkey/test_agent_event_forwarder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py index 0056a0052..5762c456f 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py +++ b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py @@ -3,14 +3,14 @@ import time import pytest import requests_mock -from infection_monkey.agent_event_forwarder import EVENTS_API_URL, AgentEventsToIslandSender +from infection_monkey.agent_event_forwarder import EVENTS_API_URL, BatchingAgentEventForwarder SERVER = "1.1.1.1:9999" @pytest.fixture def event_sender(): - return AgentEventsToIslandSender(SERVER, time_period=0.001) + return BatchingAgentEventForwarder(SERVER, time_period=0.001) def test_send_events(event_sender): From 5c1613bc7998f1d280da3ab09576dfc86b1ce6ba Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:54:36 +0530 Subject: [PATCH 36/40] Agent: Add log statement when sending Agent events to Island --- monkey/infection_monkey/agent_event_forwarder.py | 1 + 1 file changed, 1 insertion(+) diff --git a/monkey/infection_monkey/agent_event_forwarder.py b/monkey/infection_monkey/agent_event_forwarder.py index 55733601b..a23b16df1 100644 --- a/monkey/infection_monkey/agent_event_forwarder.py +++ b/monkey/infection_monkey/agent_event_forwarder.py @@ -85,6 +85,7 @@ class BatchingAgentEventForwarder: events.append(self._queue.get(block=False)) try: + logger.debug(f"Sending events to Island at {self._server_address}: {events}") requests.post( # noqa: DUO123 EVENTS_API_URL % (self._server_address,), json=events, From 218363d40ed96ca705608eee62d6f56d916b9e65 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 19:59:19 +0530 Subject: [PATCH 37/40] UT: Reduce sleep time from 0.1 to 0.05 in test_agent_event_forwarder.py --- .../infection_monkey/test_agent_event_forwarder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py index 5762c456f..ed73ec669 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py +++ b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py @@ -21,11 +21,11 @@ def test_send_events(event_sender): for _ in range(5): event_sender.add_event_to_queue({}) - time.sleep(0.1) + time.sleep(0.05) assert mock.call_count == 1 event_sender.add_event_to_queue({}) - time.sleep(0.1) + time.sleep(0.05) assert mock.call_count == 2 event_sender.stop() @@ -39,7 +39,7 @@ def test_send_remaining_events(event_sender): for _ in range(5): event_sender.add_event_to_queue({}) - time.sleep(0.1) + time.sleep(0.05) assert mock.call_count == 1 event_sender.add_event_to_queue({}) From 9471db378f0522aa7d20bfc487aa1198c9c897ca Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Wed, 14 Sep 2022 12:07:36 -0400 Subject: [PATCH 38/40] UT: Reduce sleep time from 0.05 to 0.01 in test_agent_event_forwarder --- .../infection_monkey/test_agent_event_forwarder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py index ed73ec669..289f42eb2 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py +++ b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py @@ -21,11 +21,11 @@ def test_send_events(event_sender): for _ in range(5): event_sender.add_event_to_queue({}) - time.sleep(0.05) + time.sleep(0.01) assert mock.call_count == 1 event_sender.add_event_to_queue({}) - time.sleep(0.05) + time.sleep(0.01) assert mock.call_count == 2 event_sender.stop() @@ -39,7 +39,7 @@ def test_send_remaining_events(event_sender): for _ in range(5): event_sender.add_event_to_queue({}) - time.sleep(0.05) + time.sleep(0.01) assert mock.call_count == 1 event_sender.add_event_to_queue({}) From fbd5bb9a9ef276fd90f9bf8302e74488380bee99 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Wed, 14 Sep 2022 12:08:45 -0400 Subject: [PATCH 39/40] Agent: Rename _agent_events_to_island_sender --- monkey/infection_monkey/agent_event_forwarder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/monkey/infection_monkey/agent_event_forwarder.py b/monkey/infection_monkey/agent_event_forwarder.py index a23b16df1..f8e4dfef7 100644 --- a/monkey/infection_monkey/agent_event_forwarder.py +++ b/monkey/infection_monkey/agent_event_forwarder.py @@ -29,15 +29,15 @@ class AgentEventForwarder: self._server_address = server_address self._agent_event_serializer_registry = agent_event_serializer_registry - self._agent_events_to_island_sender = BatchingAgentEventForwarder(self._server_address) - self._agent_events_to_island_sender.start() + self._batching_agent_event_forwarder = BatchingAgentEventForwarder(self._server_address) + self._batching_agent_event_forwarder.start() def __del__(self): - self._agent_events_to_island_sender.stop() + self._batching_agent_event_forwarder.stop() def send_event(self, event: AbstractAgentEvent): serialized_event = self._serialize_event(event) - self._agent_events_to_island_sender.add_event_to_queue(serialized_event) + self._batching_agent_event_forwarder.add_event_to_queue(serialized_event) logger.debug( f"Sending event of type {type(event).__name__} to the Island at {self._server_address}" ) From d179a5563fed56fb3cc262419c2ab638ed14e58b Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Wed, 14 Sep 2022 12:11:51 -0400 Subject: [PATCH 40/40] UT: Add note about potentially slow or racey tests --- .../infection_monkey/test_agent_event_forwarder.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py index 289f42eb2..e0bbcb8c0 100644 --- a/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py +++ b/monkey/tests/unit_tests/infection_monkey/test_agent_event_forwarder.py @@ -13,6 +13,11 @@ def event_sender(): return BatchingAgentEventForwarder(SERVER, time_period=0.001) +# NOTE: If these tests are too slow or end up being racey, we can redesign AgentEventForwarder to +# handle threading and simply command BatchingAgentEventForwarder when to send events. +# BatchingAgentEventForwarder would have unit tests, but AgentEventForwarder would not. + + def test_send_events(event_sender): with requests_mock.Mocker() as mock: mock.post(EVENTS_API_URL % SERVER)