Island: Use PyPubSubPublisherWrapper in PyPubSubIslandEventQueue

This commit is contained in:
Shreya Malviya 2022-09-06 19:38:48 +05:30
parent 3c71211b79
commit 70468c37fb
1 changed files with 5 additions and 32 deletions

View File

@ -3,6 +3,8 @@ from typing import Any
from pubsub.core import Publisher
from common.event_queue import PyPubSubPublisherWrapper
from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic
logger = logging.getLogger(__name__)
@ -10,17 +12,10 @@ logger = logging.getLogger(__name__)
class PyPubSubIslandEventQueue(IIslandEventQueue):
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher = pypubsub_publisher
self._refs = []
self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher)
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
topic_name = topic.name # needs to be a string for pypubsub
try:
subscriber_name = subscriber.__name__
except AttributeError:
subscriber_name = subscriber.__class__.__name__
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}")
# NOTE: The subscriber's signature needs to match the MDS (message data specification) of
# the topic, otherwise, errors will arise. The MDS of a topic is set when the topic
@ -28,29 +23,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue):
# is new (hasn't been subscribed to before). If the topic is being subscribed to by
# a subscriber for the first time, the topic's MDS will automatically be set
# according to that subscriber's signature.
self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber)
self._keep_subscriber_strongref(subscriber)
def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber):
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
# > PyPubSub holds listeners by weak reference so that the lifetime of the
# > callable is not affected by PyPubSub: once the application no longer
# > references the callable, it can be garbage collected and PyPubSub can clean up
# > so it is no longer registered (this happens thanks to the weakref module).
# > Without this, it would be imperative to remember to unsubscribe certain
# > listeners, which is error prone; they would end up living until the
# > application exited.
#
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
#
# In our case, we're OK with subscribers living until the application exits. We don't
# provide an unsubscribe method (at this time) as subscriptions are expected to last
# for the life of the process.
#
# Specifically, if an instance object of a callable class is created and subscribed,
# we don't want that subscription to end if the callable instance object goes out of
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
self._refs.append(subscriber)
self._pypubsub_publisher_wrapped.subscribe(topic_name, subscriber)
def publish(self, topic: IslandEventTopic, event: Any = None):
topic_name = topic.name # needs to be a string for pypubsub
@ -61,4 +34,4 @@ class PyPubSubIslandEventQueue(IIslandEventQueue):
# otherwise, errors will arise. The MDS of a topic is set when the topic is created,
# which in our case is when a subscriber subscribes to a topic (in `subscribe()`)
# which is new (hasn't been subscribed to before).
self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event)
self._pypubsub_publisher_wrapped.publish(topic_name, event)