From 70468c37fb34a91aee9dbed870fc41ab46884595 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:38:48 +0530 Subject: [PATCH] Island: Use PyPubSubPublisherWrapper in PyPubSubIslandEventQueue --- .../pypubsub_island_event_queue.py | 37 +++---------------- 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index 64c1b25ba..478228d72 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -3,6 +3,8 @@ from typing import Any from pubsub.core import Publisher +from common.event_queue import PyPubSubPublisherWrapper + from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic logger = logging.getLogger(__name__) @@ -10,17 +12,10 @@ logger = logging.getLogger(__name__) class PyPubSubIslandEventQueue(IIslandEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher = pypubsub_publisher - self._refs = [] + self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): topic_name = topic.name # needs to be a string for pypubsub - try: - subscriber_name = subscriber.__name__ - except AttributeError: - subscriber_name = subscriber.__class__.__name__ - - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") # NOTE: The subscriber's signature needs to match the MDS (message data specification) of # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic @@ -28,29 +23,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # is new (hasn't been subscribed to before). If the topic is being subscribed to by # a subscriber for the first time, the topic's MDS will automatically be set # according to that subscriber's signature. - self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) - self._keep_subscriber_strongref(subscriber) - - def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): - # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: - # > PyPubSub holds listeners by weak reference so that the lifetime of the - # > callable is not affected by PyPubSub: once the application no longer - # > references the callable, it can be garbage collected and PyPubSub can clean up - # > so it is no longer registered (this happens thanks to the weakref module). - # > Without this, it would be imperative to remember to unsubscribe certain - # > listeners, which is error prone; they would end up living until the - # > application exited. - # - # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable - # - # In our case, we're OK with subscribers living until the application exits. We don't - # provide an unsubscribe method (at this time) as subscriptions are expected to last - # for the life of the process. - # - # Specifically, if an instance object of a callable class is created and subscribed, - # we don't want that subscription to end if the callable instance object goes out of - # scope. Adding subscribers to self._refs prevents them from ever going out of scope. - self._refs.append(subscriber) + self._pypubsub_publisher_wrapped.subscribe(topic_name, subscriber) def publish(self, topic: IslandEventTopic, event: Any = None): topic_name = topic.name # needs to be a string for pypubsub @@ -61,4 +34,4 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) + self._pypubsub_publisher_wrapped.publish(topic_name, event)