diff --git a/monkey/infection_monkey/send_all_events_to_island.py b/monkey/infection_monkey/send_all_events_to_island.py index 793b59501..b895cee72 100644 --- a/monkey/infection_monkey/send_all_events_to_island.py +++ b/monkey/infection_monkey/send_all_events_to_island.py @@ -1,6 +1,7 @@ import logging import queue import threading +from time import sleep from typing import Set import requests @@ -10,7 +11,6 @@ from pubsub import pub from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.events import AbstractAgentEvent -from common.utils import Timer logger = logging.getLogger(__name__) @@ -64,48 +64,36 @@ class send_all_events_to_island: self._batch_and_send_thread.start() def _manage_event_batches(self): - timer = Timer() - timer.set(self._time_period) - - self._event_batch.clear() - while self._should_run_batch_and_send_thread: - self._add_next_event_to_batch() - - if timer.is_expired(): - self._send_events_to_island() - timer.reset() - self._event_batch.clear() + self._send_events_to_island() + sleep(DEFAULT_TIME_PERIOD) self._send_remaining_events() - def _add_next_event_to_batch(self): - try: - event = self._queue.get(block=True, timeout=self._time_period / WAKES_PER_PERIOD) - self._event_batch.add(event) - except queue.Empty: - pass - def _send_events_to_island(self): - for serialized_event in self._event_batch: - try: - requests.post( # noqa: DUO123 - EVENTS_API_URL % (self._server_address,), - data=serialized_event, - headers={"content-type": "application/json"}, - verify=False, - timeout=MEDIUM_REQUEST_TIMEOUT, - ) - except Exception as exc: - logger.warning( - f"Exception caught when connecting to the Island at {self._server_address}" - f": {exc}" - ) + if self._queue.empty(): + return + + events = [] + + while not self._queue.empty(): + events.append(self._queue.get(block=False)) + + try: + requests.post( # noqa: DUO123 + EVENTS_API_URL % (self._server_address,), + data=events, + headers={"content-type": "application/json"}, + verify=False, + timeout=MEDIUM_REQUEST_TIMEOUT, + ) + except Exception as exc: + logger.warning( + f"Exception caught when connecting to the Island at {self._server_address}" + f": {exc}" + ) def _send_remaining_events(self): - while not self._queue.empty(): - self._add_next_event_to_batch() - self._send_events_to_island() def stop(self):