Common: Implement unsubscribing functions in PypubsubEventQueue

This commit is contained in:
Shreya Malviya 2022-08-08 13:42:55 +05:30
parent 0578a6fcc8
commit b78282ef20
1 changed files with 16 additions and 0 deletions

View File

@ -33,3 +33,19 @@ class PypubsubEventQueue(IEventQueue):
# publish to tags' topics # publish to tags' topics
for tag in event.tags: for tag in event.tags:
pub.sendMessage(tag, data) pub.sendMessage(tag, data)
@staticmethod
def unsubscribe_all(subscriber: Callable[..., Any]):
pub.unsubscribe(listener=subscriber, topicName=ALL_TOPICS)
@staticmethod
def unsubscribe_types(types: Sequence[AbstractEvent], subscriber: Callable[..., Any]):
for event_type in types:
# pypubsub.pub.subscribe needs a string as the topic/event name
event_type_name = event_type.__name__
pub.unsubscribe(listener=subscriber, topicName=event_type_name)
@staticmethod
def unsubscribe_tags(tags: Sequence[str], subscriber: Callable[..., Any]):
for tag in tags:
pub.unsubscribe(listener=subscriber, topicName=tag)