Common: Implement publish() in PypubsubEventQueue

This commit is contained in:
Shreya Malviya 2022-08-08 12:49:21 +05:30
parent 1fbe9e5ad4
commit 8ae451f240
1 changed files with 24 additions and 7 deletions

View File

@ -32,12 +32,29 @@ class PypubsubEventQueue(IEventQueue):
def _subscribe_tag(tag: str, subscriber: Callable[..., Any]):
pub.subscribe(listener=subscriber, topicName=tag)
def publish(self, event: AbstractEvent, data: Any):
"""
Publishes an event with the given data
@staticmethod
def publish(event: AbstractEvent, data: Any):
# someClass.mro() returns a list of types that someClass is derived from,
# in order of resolution
# we can be sure that for any valid event, the last three items in the list will be
# <class '__main__.AbstractEvent'>, <class 'abc.ABC'>, and <class 'object'>
:param event: Event to publish
:param data: Data to pass to subscribers with the event publish
"""
# for some event, say, CredentialsStolenEvent which was derived from SecurityEvent,
# we want to publish the data to both events, so, we loop through the super
# classes of the CredentialsStolenEvent which was initially passed as an argument
# to the function, and publish to each class's type and tags (except the last 3 classes)
for event_type in event.mro()[:-3]:
PypubsubEventQueue._publish_to_type(event_type, data)
pass
# one event can have multiple tags
for tag in event_type.tags:
PypubsubEventQueue._publish_to_tag(tag, data)
@staticmethod
def _publish_to_type(event_type: AbstractEvent, data: Any):
event_type_name = event_type.__name__
pub.sendMessage(event_type_name, data)
@staticmethod
def _publish_to_tag(tag: str, data: Any):
pub.sendMessage(tag, data)