diff --git a/monkey/common/event_queue/pypubsub_event_queue.py b/monkey/common/event_queue/pypubsub_event_queue.py index ae5220cb4..6dde760a8 100644 --- a/monkey/common/event_queue/pypubsub_event_queue.py +++ b/monkey/common/event_queue/pypubsub_event_queue.py @@ -15,6 +15,7 @@ logger = logging.getLogger(__name__) class PyPubSubEventQueue(IEventQueue): def __init__(self, pypubsub_publisher: Publisher): self._pypubsub_publisher = pypubsub_publisher + self._refs = [] def subscribe_all_events(self, subscriber: EventSubscriber): self._subscribe(_ALL_EVENTS_TOPIC, subscriber) @@ -36,6 +37,28 @@ class PyPubSubEventQueue(IEventQueue): logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}") self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: EventSubscriber): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) def publish(self, event: AbstractEvent): self._publish_to_all_events_topic(event) diff --git a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py index 006a27916..f1648d2f1 100644 --- a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py +++ b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py @@ -111,3 +111,22 @@ def test_type_tag_collision(event_queue: IEventQueue, event_queue_subscriber: Ev event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__}))) assert event_queue_subscriber.call_count == 0 + + +def test_keep_subscriber_in_scope(event_queue: IEventQueue): + class MyCallable: + called = False + + def __call__(self, event: AbstractEvent): + MyCallable.called = True + + def subscribe(): + # fn will go out of scope after subscribe() returns. + fn = MyCallable() + event_queue.subscribe_all_events(fn) + + subscribe() + + event_queue.publish(TestEvent2()) + + assert MyCallable.called