Common: Create and partially implement PypubsubEventQueue

This commit is contained in:
Shreya Malviya 2022-08-05 19:11:33 +05:30
parent 40219aa5ff
commit bb76410bac
1 changed files with 10 additions and 11 deletions

View File

@ -1,25 +1,25 @@
from typing import Any, Callable, Sequence
from pubsub import ALL_TOPICS, pub
from common.events import AbstractEvent
from .i_event_queue import IEventQueue
from pubsub import pub, ALL_TOPICS
class PypubsubEventQueue(IEventQueue):
@staticmethod
def subscribe_all(subscriber: Callable[..., Any]):
pub.subscribe(listener=subscriber, topicName=ALL_TOPICS)
def subscribe_types(self, types: Sequence[AbstractEvent], subscriber: Callable[..., Any]):
"""
Subscribes a subscriber to all specifed event types
@staticmethod
def subscribe_types(types: Sequence[AbstractEvent], subscriber: Callable[..., Any]):
for event_type in types:
PypubsubEventQueue._subscribe_type(event_type, subscriber)
:param types: Event types to which the subscriber should subscribe
:param subscriber: Callable that should subscribe to events
"""
pass
def _subscribe_type(event_type: AbstractEvent, subscriber: Callable[..., Any]):
event_type_name = event_type.__name__
pub.subscribe(subscriber, event_type_name)
def subscribe_tags(self, tags: Sequence[str], subscriber: Callable[..., Any]):
"""
@ -40,4 +40,3 @@ class PypubsubEventQueue(IEventQueue):
"""
pass