From c9500cd04f13df54bd42b15ffda16d7511df70c9 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:17:26 +0530 Subject: [PATCH] Common: Add PyPubSubIslandEventQueue --- .../pypubsub_island_event_queue.py | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 monkey/common/event_queue/pypubsub_island_event_queue.py diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py new file mode 100644 index 000000000..ba6ef3af4 --- /dev/null +++ b/monkey/common/event_queue/pypubsub_island_event_queue.py @@ -0,0 +1,64 @@ +import logging +from typing import Any, Callable + +from pubsub.core import Publisher + +from . import IIslandEventQueue, IslandEventTopics + +logger = logging.getLogger(__name__) + + +class PyPubSubIslandEventQueue(IIslandEventQueue): + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher = pypubsub_publisher + self._refs = [] + + def subscribe(self, topic: IslandEventTopics, subscriber: Callable[..., None]): + topic_value = topic.value # needs to be a string for pypubsub + try: + subscriber_name = subscriber.__name__ + except AttributeError: + subscriber_name = subscriber.__class__.__name__ + + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_value}") + + # NOTE: The subscriber's signature needs to match the MDS (message data specification) of + # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic + # is created, which in our case is when a subscriber subscribes to a topic which + # is new (hasn't been subscribed to before). If the topic is being subscribed to by + # a subscriber for the first time, the topic's MDS will automatically be set + # according to that subscriber's signature. + self._pypubsub_publisher.subscribe(topicName=topic_value, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: Callable[..., None]): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) + + def publish(self, topic: IslandEventTopics, event_data: Any = None): + topic_value = topic.value # needs to be a string for pypubsub + + logger.debug(f"Publishing {topic_value} event") + + # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, + # otherwise, errors will arise. The MDS of a topic is set when the topic is created, + # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) + # which is new (hasn't been subscribed to before). + self._pypubsub_publisher.sendMessage(topicName=topic_value, event=event_data)