From fe63b28ee1d43cf05b9c4bfc142a909508e31bc9 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 14 Sep 2022 14:31:55 +0530 Subject: [PATCH] Agent: Restructure SendAllAgentEventsToIsland and _AgentEventsToIslandSender --- .../send_all_events_to_island.py | 108 ++++++++++-------- 1 file changed, 58 insertions(+), 50 deletions(-) diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 668f508e0..8b20c36b7 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -22,72 +22,80 @@ class SendAllAgentEventsToIsland: def __init__(self, server_address: str): self._server_address = server_address - self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() - self._send_to_island_thread = self._BatchAndSendEventsThread( - self._queue, self._server_address, DEFAULT_TIME_PERIOD_SECONDS + self._agent_events_to_island_sender = _AgentEventsToIslandSender( + self._server_address, DEFAULT_TIME_PERIOD_SECONDS ) - self._send_to_island_thread.start() + self._agent_events_to_island_sender.start() + + def __del__(self): + self._agent_events_to_island_sender.stop() def send_event(self, event: AbstractAgentEvent): - self._queue.put(self._serialize_event(event)) + serialized_event = self._serialize_event(event) + self._agent_events_to_island_sender.add_event_to_queue(serialized_event) logger.debug( f"Sending event of type {type(event).__name__} to the Island at {self._server_address}" ) def _serialize_event(self, event: AbstractAgentEvent): + # get serializer from registry and serialize pass - class _BatchAndSendEventsThread: - """ - Handles the batching and sending of the Agent's events to the Island - """ - def __init__(self, queue_of_events: queue.Queue, server_address: str, time_period: int): - self._queue = queue_of_events - self._server_address = server_address - self._time_period = time_period +class _AgentEventsToIslandSender: + """ + Handles the batching and sending of the Agent's events to the Island + """ - self._stop_batch_and_send_thread = threading.Event() + def __init__(self, server_address: str, time_period: int): + self._server_address = server_address + self._time_period = time_period - def start(self): - self._batch_and_send_thread = threading.Thread( - name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches - ) - self._batch_and_send_thread.start() + self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() + self._stop_batch_and_send_thread = threading.Event() - def _manage_event_batches(self): - while not self._stop_batch_and_send_thread.is_set(): - self._send_events_to_island() - sleep(self._time_period) + def start(self): + self._batch_and_send_thread = threading.Thread( + name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches + ) + self._batch_and_send_thread.start() - self._send_remaining_events() + def add_event_to_queue(self, serialized_event: AbstractAgentEvent): + self._queue.put(serialized_event) - def _send_events_to_island(self): - if self._queue.empty(): - return - - events = [] - - while not self._queue.empty(): - events.append(self._queue.get(block=False)) - - try: - requests.post( # noqa: DUO123 - EVENTS_API_URL % (self._server_address,), - json=events, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, - ) - except Exception as exc: - logger.warning( - f"Exception caught when connecting to the Island at {self._server_address}" - f": {exc}" - ) - - def _send_remaining_events(self): + def _manage_event_batches(self): + while not self._stop_batch_and_send_thread.is_set(): self._send_events_to_island() + sleep(self._time_period) - def stop(self): - self._stop_batch_and_send_thread.set() - self._batch_and_send_thread.join() + self._send_remaining_events() + + def _send_events_to_island(self): + if self._queue.empty(): + return + + events = [] + + while not self._queue.empty(): + events.append(self._queue.get(block=False)) + + try: + requests.post( # noqa: DUO123 + EVENTS_API_URL % (self._server_address,), + json=events, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + except Exception as exc: + logger.warning( + f"Exception caught when connecting to the Island at {self._server_address}" + f": {exc}" + ) + + def _send_remaining_events(self): + self._send_events_to_island() + + def stop(self): + self._stop_batch_and_send_thread.set() + self._batch_and_send_thread.join()