From e51a717bdf63cfec790ad524c74c2d76e981e01b Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Mon, 12 Sep 2022 19:16:00 +0530 Subject: [PATCH] Agent: Add a very crude outline of using a queue and threading in `push_all_events_to_island` --- .../infection_monkey/push_events_to_island.py | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/monkey/infection_monkey/push_events_to_island.py b/monkey/infection_monkey/push_events_to_island.py index 8aebacac2..b84d418be 100644 --- a/monkey/infection_monkey/push_events_to_island.py +++ b/monkey/infection_monkey/push_events_to_island.py @@ -1,4 +1,5 @@ import logging +import queue import requests @@ -15,18 +16,40 @@ class push_all_events_to_island: def __init__(self, server_address: str): self._server_address = server_address + self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue() + self._send_to_island_thread = self.batch_events_thread(self._queue, self._server_address) + + self._send_to_island_thread.start() + def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): topic_name = topic.getName() - logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") + self._queue.put(self._serialize_event(event, topic_name)) - requests.post( # noqa: DUO123 - "https://%s/api/events" % (self._server_address,), - data=self._serialize_event(event, topic_name), - headers={"content-type": "application/json"}, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, - ) + logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): pass + + class batch_events_thread: + def __init__(self, queue_of_events: queue.Queue, server_address: str): + self._queue = queue_of_events + self._server_address = server_address + + def start(self): + pass + + def _manage_next_event(self): + pass + + def _send_event_to_island(self, serialized_event): + requests.post( # noqa: DUO123 + "https://%s/api/events" % (self._server_address,), + data=serialized_event, + headers={"content-type": "application/json"}, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + + def stop(self): + pass