Common: Store strongrefs to subscribers
This commit is contained in:
parent
9ab2c0bc6a
commit
a28cd97c0d
|
@ -15,6 +15,7 @@ logger = logging.getLogger(__name__)
|
||||||
class PyPubSubEventQueue(IEventQueue):
|
class PyPubSubEventQueue(IEventQueue):
|
||||||
def __init__(self, pypubsub_publisher: Publisher):
|
def __init__(self, pypubsub_publisher: Publisher):
|
||||||
self._pypubsub_publisher = pypubsub_publisher
|
self._pypubsub_publisher = pypubsub_publisher
|
||||||
|
self._refs = []
|
||||||
|
|
||||||
def subscribe_all_events(self, subscriber: EventSubscriber):
|
def subscribe_all_events(self, subscriber: EventSubscriber):
|
||||||
self._subscribe(_ALL_EVENTS_TOPIC, subscriber)
|
self._subscribe(_ALL_EVENTS_TOPIC, subscriber)
|
||||||
|
@ -36,6 +37,28 @@ class PyPubSubEventQueue(IEventQueue):
|
||||||
|
|
||||||
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}")
|
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}")
|
||||||
self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber)
|
self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber)
|
||||||
|
self._keep_subscriber_strongref(subscriber)
|
||||||
|
|
||||||
|
def _keep_subscriber_strongref(self, subscriber: EventSubscriber):
|
||||||
|
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
|
||||||
|
# > PyPubSub holds listeners by weak reference so that the lifetime of the
|
||||||
|
# > callable is not affected by PyPubSub: once the application no longer
|
||||||
|
# > references the callable, it can be garbage collected and PyPubSub can clean up
|
||||||
|
# > so it is no longer registered (this happens thanks to the weakref module).
|
||||||
|
# > Without this, it would be imperative to remember to unsubscribe certain
|
||||||
|
# > listeners, which is error prone; they would end up living until the
|
||||||
|
# > application exited.
|
||||||
|
#
|
||||||
|
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
|
||||||
|
#
|
||||||
|
# In our case, we're OK with subscribers living until the application exits. We don't
|
||||||
|
# provide an unsubscribe method (at this time) as subscriptions are expected to last
|
||||||
|
# for the life of the process.
|
||||||
|
#
|
||||||
|
# Specifically, if an instance object of a callable class is created and subscribed,
|
||||||
|
# we don't want that subscription to end if the callable instance object goes out of
|
||||||
|
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
|
||||||
|
self._refs.append(subscriber)
|
||||||
|
|
||||||
def publish(self, event: AbstractEvent):
|
def publish(self, event: AbstractEvent):
|
||||||
self._publish_to_all_events_topic(event)
|
self._publish_to_all_events_topic(event)
|
||||||
|
|
|
@ -111,3 +111,22 @@ def test_type_tag_collision(event_queue: IEventQueue, event_queue_subscriber: Ev
|
||||||
event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__})))
|
event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__})))
|
||||||
|
|
||||||
assert event_queue_subscriber.call_count == 0
|
assert event_queue_subscriber.call_count == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_keep_subscriber_in_scope(event_queue: IEventQueue):
|
||||||
|
class MyCallable:
|
||||||
|
called = False
|
||||||
|
|
||||||
|
def __call__(self, event: AbstractEvent):
|
||||||
|
MyCallable.called = True
|
||||||
|
|
||||||
|
def subscribe():
|
||||||
|
# fn will go out of scope after subscribe() returns.
|
||||||
|
fn = MyCallable()
|
||||||
|
event_queue.subscribe_all_events(fn)
|
||||||
|
|
||||||
|
subscribe()
|
||||||
|
|
||||||
|
event_queue.publish(TestEvent2())
|
||||||
|
|
||||||
|
assert MyCallable.called
|
||||||
|
|
Loading…
Reference in New Issue