Agent: Add a very crude outline of using a queue and threading in `push_all_events_to_island`

This commit is contained in:
Shreya Malviya 2022-09-12 19:16:00 +05:30
parent a06c6a622e
commit e51a717bdf
1 changed files with 31 additions and 8 deletions

View File

@ -1,4 +1,5 @@
import logging import logging
import queue
import requests import requests
@ -15,18 +16,40 @@ class push_all_events_to_island:
def __init__(self, server_address: str): def __init__(self, server_address: str):
self._server_address = server_address self._server_address = server_address
self._queue: queue.Queue[AbstractAgentEvent] = queue.Queue()
self._send_to_island_thread = self.batch_events_thread(self._queue, self._server_address)
self._send_to_island_thread.start()
def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC): def __call__(self, event: AbstractAgentEvent, topic=pub.AUTO_TOPIC):
topic_name = topic.getName() topic_name = topic.getName()
logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}") self._queue.put(self._serialize_event(event, topic_name))
requests.post( # noqa: DUO123 logger.debug(f"Pushing event of type {topic_name} to the Island at {self._server_address}")
"https://%s/api/events" % (self._server_address,),
data=self._serialize_event(event, topic_name),
headers={"content-type": "application/json"},
verify=False,
timeout=MEDIUM_REQUEST_TIMEOUT,
)
def _serialize_event(self, event: AbstractAgentEvent, topic_name: str): def _serialize_event(self, event: AbstractAgentEvent, topic_name: str):
pass pass
class batch_events_thread:
def __init__(self, queue_of_events: queue.Queue, server_address: str):
self._queue = queue_of_events
self._server_address = server_address
def start(self):
pass
def _manage_next_event(self):
pass
def _send_event_to_island(self, serialized_event):
requests.post( # noqa: DUO123
"https://%s/api/events" % (self._server_address,),
data=serialized_event,
headers={"content-type": "application/json"},
verify=False,
timeout=MEDIUM_REQUEST_TIMEOUT,
)
def stop(self):
pass