Agent: Restructure SendAllAgentEventsToIsland and _AgentEventsToIslandSender

This commit is contained in:
Shreya Malviya 2022-09-14 14:31:55 +05:30
parent 404f6d954f
commit fe63b28ee1
1 changed files with 58 additions and 50 deletions

View File

@ -22,72 +22,80 @@ class SendAllAgentEventsToIsland:
def __init__(self, server_address: str): def __init__(self, server_address: str):
self._server_address = server_address self._server_address = server_address
self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue()
self._send_to_island_thread = self._BatchAndSendEventsThread( self._agent_events_to_island_sender = _AgentEventsToIslandSender(
self._queue, self._server_address, DEFAULT_TIME_PERIOD_SECONDS self._server_address, DEFAULT_TIME_PERIOD_SECONDS
) )
self._send_to_island_thread.start() self._agent_events_to_island_sender.start()
def __del__(self):
self._agent_events_to_island_sender.stop()
def send_event(self, event: AbstractAgentEvent): def send_event(self, event: AbstractAgentEvent):
self._queue.put(self._serialize_event(event)) serialized_event = self._serialize_event(event)
self._agent_events_to_island_sender.add_event_to_queue(serialized_event)
logger.debug( logger.debug(
f"Sending event of type {type(event).__name__} to the Island at {self._server_address}" f"Sending event of type {type(event).__name__} to the Island at {self._server_address}"
) )
def _serialize_event(self, event: AbstractAgentEvent): def _serialize_event(self, event: AbstractAgentEvent):
# get serializer from registry and serialize
pass pass
class _BatchAndSendEventsThread:
"""
Handles the batching and sending of the Agent's events to the Island
"""
def __init__(self, queue_of_events: queue.Queue, server_address: str, time_period: int): class _AgentEventsToIslandSender:
self._queue = queue_of_events """
self._server_address = server_address Handles the batching and sending of the Agent's events to the Island
self._time_period = time_period """
self._stop_batch_and_send_thread = threading.Event() def __init__(self, server_address: str, time_period: int):
self._server_address = server_address
self._time_period = time_period
def start(self): self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue()
self._batch_and_send_thread = threading.Thread( self._stop_batch_and_send_thread = threading.Event()
name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches
)
self._batch_and_send_thread.start()
def _manage_event_batches(self): def start(self):
while not self._stop_batch_and_send_thread.is_set(): self._batch_and_send_thread = threading.Thread(
self._send_events_to_island() name="SendEventsToIslandInBatchesThread", target=self._manage_event_batches
sleep(self._time_period) )
self._batch_and_send_thread.start()
self._send_remaining_events() def add_event_to_queue(self, serialized_event: AbstractAgentEvent):
self._queue.put(serialized_event)
def _send_events_to_island(self): def _manage_event_batches(self):
if self._queue.empty(): while not self._stop_batch_and_send_thread.is_set():
return
events = []
while not self._queue.empty():
events.append(self._queue.get(block=False))
try:
requests.post( # noqa: DUO123
EVENTS_API_URL % (self._server_address,),
json=events,
verify=False,
timeout=MEDIUM_REQUEST_TIMEOUT,
)
except Exception as exc:
logger.warning(
f"Exception caught when connecting to the Island at {self._server_address}"
f": {exc}"
)
def _send_remaining_events(self):
self._send_events_to_island() self._send_events_to_island()
sleep(self._time_period)
def stop(self): self._send_remaining_events()
self._stop_batch_and_send_thread.set()
self._batch_and_send_thread.join() def _send_events_to_island(self):
if self._queue.empty():
return
events = []
while not self._queue.empty():
events.append(self._queue.get(block=False))
try:
requests.post( # noqa: DUO123
EVENTS_API_URL % (self._server_address,),
json=events,
verify=False,
timeout=MEDIUM_REQUEST_TIMEOUT,
)
except Exception as exc:
logger.warning(
f"Exception caught when connecting to the Island at {self._server_address}"
f": {exc}"
)
def _send_remaining_events(self):
self._send_events_to_island()
def stop(self):
self._stop_batch_and_send_thread.set()
self._batch_and_send_thread.join()