UT: Use event_queue_subscriber fixture in PyPubSubEventQueue tests

This commit is contained in:
Shreya Malviya 2022-08-16 19:15:55 +05:30
parent 2fbe9f3a4a
commit b6c7001294
1 changed files with 31 additions and 43 deletions

View File

@ -31,81 +31,69 @@ class TestEvent2(AbstractEvent):
tags: FrozenSet = frozenset() tags: FrozenSet = frozenset()
def new_subscriber() -> EventSubscriber:
def fn(event: AbstractEvent):
fn.call_count += 1
fn.call_types.add(event.__class__)
fn.call_tags |= event.tags
fn.call_count = 0
fn.call_types = set()
fn.call_tags = set()
return fn
@pytest.fixture
def subscriber() -> EventSubscriber:
return new_subscriber()
@pytest.fixture @pytest.fixture
def event_queue() -> IEventQueue: def event_queue() -> IEventQueue:
return PyPubSubEventQueue(Publisher()) return PyPubSubEventQueue(Publisher())
def test_subscribe_all(event_queue: IEventQueue, subscriber: EventSubscriber): def test_subscribe_all(event_queue: IEventQueue, event_queue_subscriber: EventSubscriber):
event_queue.subscribe_all_events(subscriber) event_queue.subscribe_all_events(event_queue_subscriber)
event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2}))) event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2})))
event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_2}))) event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_2})))
event_queue.publish(TestEvent1(tags=frozenset({"secret_tag"}))) event_queue.publish(TestEvent1(tags=frozenset({"secret_tag"})))
event_queue.publish(TestEvent2()) event_queue.publish(TestEvent2())
assert subscriber.call_count == 4 assert event_queue_subscriber.call_count == 4
assert TestEvent1 in subscriber.call_types assert TestEvent1 in event_queue_subscriber.call_types
assert TestEvent2 in subscriber.call_types assert TestEvent2 in event_queue_subscriber.call_types
@pytest.mark.parametrize("type_to_subscribe", [TestEvent1, TestEvent2]) @pytest.mark.parametrize("type_to_subscribe", [TestEvent1, TestEvent2])
def test_subscribe_types(event_queue: IEventQueue, subscriber: EventSubscriber, type_to_subscribe): def test_subscribe_types(
event_queue.subscribe_type(type_to_subscribe, subscriber) event_queue: IEventQueue, event_queue_subscriber: EventSubscriber, type_to_subscribe
):
event_queue.subscribe_type(type_to_subscribe, event_queue_subscriber)
event_queue.publish(TestEvent1()) event_queue.publish(TestEvent1())
event_queue.publish(TestEvent2()) event_queue.publish(TestEvent2())
assert subscriber.call_count == 1 assert event_queue_subscriber.call_count == 1
assert type_to_subscribe in subscriber.call_types assert type_to_subscribe in event_queue_subscriber.call_types
def test_subscribe_tags_single_type(event_queue: IEventQueue, subscriber: EventSubscriber): def test_subscribe_tags_single_type(
event_queue.subscribe_tag(EVENT_TAG_1, subscriber) event_queue: IEventQueue, event_queue_subscriber: EventSubscriber
):
event_queue.subscribe_tag(EVENT_TAG_1, event_queue_subscriber)
event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2}))) event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2})))
event_queue.publish(TestEvent2(tags=frozenset({EVENT_TAG_2}))) event_queue.publish(TestEvent2(tags=frozenset({EVENT_TAG_2})))
assert subscriber.call_count == 1 assert event_queue_subscriber.call_count == 1
assert len(subscriber.call_types) == 1 assert len(event_queue_subscriber.call_types) == 1
assert TestEvent1 in subscriber.call_types assert TestEvent1 in event_queue_subscriber.call_types
assert EVENT_TAG_1 in subscriber.call_tags assert EVENT_TAG_1 in event_queue_subscriber.call_tags
def test_subscribe_tags_multiple_types(event_queue: IEventQueue, subscriber: EventSubscriber): def test_subscribe_tags_multiple_types(
event_queue.subscribe_tag(EVENT_TAG_2, subscriber) event_queue: IEventQueue, event_queue_subscriber: EventSubscriber
):
event_queue.subscribe_tag(EVENT_TAG_2, event_queue_subscriber)
event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2}))) event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2})))
event_queue.publish(TestEvent2(tags=frozenset({EVENT_TAG_2}))) event_queue.publish(TestEvent2(tags=frozenset({EVENT_TAG_2})))
assert subscriber.call_count == 2 assert event_queue_subscriber.call_count == 2
assert len(subscriber.call_types) == 2 assert len(event_queue_subscriber.call_types) == 2
assert TestEvent1 in subscriber.call_types assert TestEvent1 in event_queue_subscriber.call_types
assert TestEvent2 in subscriber.call_types assert TestEvent2 in event_queue_subscriber.call_types
assert {EVENT_TAG_1, EVENT_TAG_2}.issubset(subscriber.call_tags) assert {EVENT_TAG_1, EVENT_TAG_2}.issubset(event_queue_subscriber.call_tags)
def test_type_tag_collision(event_queue: IEventQueue, subscriber: EventSubscriber): def test_type_tag_collision(event_queue: IEventQueue, event_queue_subscriber: EventSubscriber):
event_queue.subscribe_type(TestEvent1, subscriber) event_queue.subscribe_type(TestEvent1, event_queue_subscriber)
event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__}))) event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__})))
assert subscriber.call_count == 0 assert event_queue_subscriber.call_count == 0