Agent: Simplify batching and sending logic in send_all_events_to_island.py

This commit is contained in:
Shreya Malviya 2022-09-13 19:04:44 +05:30
parent 1d60cb160b
commit 83c7f10859
1 changed files with 24 additions and 36 deletions

View File

@ -1,6 +1,7 @@
import logging import logging
import queue import queue
import threading import threading
from time import sleep
from typing import Set from typing import Set
import requests import requests
@ -10,7 +11,6 @@ from pubsub import pub
from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT
from common.events import AbstractAgentEvent from common.events import AbstractAgentEvent
from common.utils import Timer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -64,48 +64,36 @@ class send_all_events_to_island:
self._batch_and_send_thread.start() self._batch_and_send_thread.start()
def _manage_event_batches(self): def _manage_event_batches(self):
timer = Timer()
timer.set(self._time_period)
self._event_batch.clear()
while self._should_run_batch_and_send_thread: while self._should_run_batch_and_send_thread:
self._add_next_event_to_batch() self._send_events_to_island()
sleep(DEFAULT_TIME_PERIOD)
if timer.is_expired():
self._send_events_to_island()
timer.reset()
self._event_batch.clear()
self._send_remaining_events() self._send_remaining_events()
def _add_next_event_to_batch(self):
try:
event = self._queue.get(block=True, timeout=self._time_period / WAKES_PER_PERIOD)
self._event_batch.add(event)
except queue.Empty:
pass
def _send_events_to_island(self): def _send_events_to_island(self):
for serialized_event in self._event_batch: if self._queue.empty():
try: return
requests.post( # noqa: DUO123
EVENTS_API_URL % (self._server_address,), events = []
data=serialized_event,
headers={"content-type": "application/json"}, while not self._queue.empty():
verify=False, events.append(self._queue.get(block=False))
timeout=MEDIUM_REQUEST_TIMEOUT,
) try:
except Exception as exc: requests.post( # noqa: DUO123
logger.warning( EVENTS_API_URL % (self._server_address,),
f"Exception caught when connecting to the Island at {self._server_address}" data=events,
f": {exc}" headers={"content-type": "application/json"},
) verify=False,
timeout=MEDIUM_REQUEST_TIMEOUT,
)
except Exception as exc:
logger.warning(
f"Exception caught when connecting to the Island at {self._server_address}"
f": {exc}"
)
def _send_remaining_events(self): def _send_remaining_events(self):
while not self._queue.empty():
self._add_next_event_to_batch()
self._send_events_to_island() self._send_events_to_island()
def stop(self): def stop(self):