From 3c71211b799797faa980bf2aa188eff007b3b55b Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:38:16 +0530 Subject: [PATCH] Common: Use PyPubSubPublisherWrapper in PyPubSubAgentEventQueue --- .../event_queue/pypubsub_agent_event_queue.py | 36 +++---------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_agent_event_queue.py b/monkey/common/event_queue/pypubsub_agent_event_queue.py index 86dbea4af..d5cebf39a 100644 --- a/monkey/common/event_queue/pypubsub_agent_event_queue.py +++ b/monkey/common/event_queue/pypubsub_agent_event_queue.py @@ -3,6 +3,7 @@ from typing import Type from pubsub.core import Publisher +from common.event_queue import PyPubSubPublisherWrapper from common.events import AbstractAgentEvent from . import AgentEventSubscriber, IAgentEventQueue @@ -14,8 +15,7 @@ logger = logging.getLogger(__name__) class PyPubSubAgentEventQueue(IAgentEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher = pypubsub_publisher - self._refs = [] + self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe_all_events(self, subscriber: AgentEventSubscriber): self._subscribe(_ALL_EVENTS_TOPIC, subscriber) @@ -32,35 +32,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): self._subscribe(tag_topic, subscriber) def _subscribe(self, topic: str, subscriber: AgentEventSubscriber): - try: - subscriber_name = subscriber.__name__ - except AttributeError: - subscriber_name = subscriber.__class__.__name__ - - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}") - self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber) - self._keep_subscriber_strongref(subscriber) - - def _keep_subscriber_strongref(self, subscriber: AgentEventSubscriber): - # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: - # > PyPubSub holds listeners by weak reference so that the lifetime of the - # > callable is not affected by PyPubSub: once the application no longer - # > references the callable, it can be garbage collected and PyPubSub can clean up - # > so it is no longer registered (this happens thanks to the weakref module). - # > Without this, it would be imperative to remember to unsubscribe certain - # > listeners, which is error prone; they would end up living until the - # > application exited. - # - # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable - # - # In our case, we're OK with subscribers living until the application exits. We don't - # provide an unsubscribe method (at this time) as subscriptions are expected to last - # for the life of the process. - # - # Specifically, if an instance object of a callable class is created and subscribed, - # we don't want that subscription to end if the callable instance object goes out of - # scope. Adding subscribers to self._refs prevents them from ever going out of scope. - self._refs.append(subscriber) + self._pypubsub_publisher_wrapped.subscribe(topic, subscriber) def publish(self, event: AbstractAgentEvent): self._publish_to_all_events_topic(event) @@ -81,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): def _publish_event(self, topic: str, event: AbstractAgentEvent): logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") - self._pypubsub_publisher.sendMessage(topic, event=event) + self._pypubsub_publisher_wrapped.publish(topic, event) # Appending a unique string to the topics for type and tags prevents bugs caused by collisions # between type names and tag names.