UT: Add event_queue() fixture
This commit is contained in:
parent
2bdc16c286
commit
68dafbfb9d
|
@ -29,9 +29,6 @@ class TestEvent2(AbstractEvent):
|
|||
tags: FrozenSet = frozenset()
|
||||
|
||||
|
||||
pypubsub_event_queue = PyPubSubEventQueue(pub)
|
||||
|
||||
|
||||
def new_subscriber():
|
||||
def fn(event):
|
||||
fn.call_count += 1
|
||||
|
@ -50,13 +47,18 @@ def subscriber():
|
|||
return new_subscriber()
|
||||
|
||||
|
||||
def test_subscribe_all(subscriber):
|
||||
pypubsub_event_queue.subscribe_all_events(subscriber)
|
||||
@pytest.fixture
|
||||
def event_queue():
|
||||
return PyPubSubEventQueue(pub)
|
||||
|
||||
pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
|
||||
pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_2}))
|
||||
pypubsub_event_queue.publish(TestEvent1(tags={"secret_tag"}))
|
||||
pypubsub_event_queue.publish(TestEvent2())
|
||||
|
||||
def test_subscribe_all(event_queue, subscriber):
|
||||
event_queue.subscribe_all_events(subscriber)
|
||||
|
||||
event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
|
||||
event_queue.publish(TestEvent1(tags={EVENT_TAG_2}))
|
||||
event_queue.publish(TestEvent1(tags={"secret_tag"}))
|
||||
event_queue.publish(TestEvent2())
|
||||
|
||||
assert subscriber.call_count == 4
|
||||
assert TestEvent1 in subscriber.call_types
|
||||
|
@ -64,21 +66,21 @@ def test_subscribe_all(subscriber):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("type_to_subscribe", [TestEvent1, TestEvent2])
|
||||
def test_subscribe_types(subscriber, type_to_subscribe):
|
||||
pypubsub_event_queue.subscribe_type(type_to_subscribe, subscriber)
|
||||
def test_subscribe_types(event_queue, subscriber, type_to_subscribe):
|
||||
event_queue.subscribe_type(type_to_subscribe, subscriber)
|
||||
|
||||
pypubsub_event_queue.publish(TestEvent1())
|
||||
pypubsub_event_queue.publish(TestEvent2())
|
||||
event_queue.publish(TestEvent1())
|
||||
event_queue.publish(TestEvent2())
|
||||
|
||||
assert subscriber.call_count == 1
|
||||
assert type_to_subscribe in subscriber.call_types
|
||||
|
||||
|
||||
def test_subscribe_tags_single_type(subscriber):
|
||||
pypubsub_event_queue.subscribe_tag(EVENT_TAG_1, subscriber)
|
||||
def test_subscribe_tags_single_type(event_queue, subscriber):
|
||||
event_queue.subscribe_tag(EVENT_TAG_1, subscriber)
|
||||
|
||||
pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
|
||||
pypubsub_event_queue.publish(TestEvent2(tags={EVENT_TAG_2}))
|
||||
event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
|
||||
event_queue.publish(TestEvent2(tags={EVENT_TAG_2}))
|
||||
|
||||
assert subscriber.call_count == 1
|
||||
assert len(subscriber.call_types) == 1
|
||||
|
@ -86,11 +88,11 @@ def test_subscribe_tags_single_type(subscriber):
|
|||
assert EVENT_TAG_1 in subscriber.call_tags
|
||||
|
||||
|
||||
def test_subscribe_tags_multiple_types(subscriber):
|
||||
pypubsub_event_queue.subscribe_tag(EVENT_TAG_2, subscriber)
|
||||
def test_subscribe_tags_multiple_types(event_queue, subscriber):
|
||||
event_queue.subscribe_tag(EVENT_TAG_2, subscriber)
|
||||
|
||||
pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
|
||||
pypubsub_event_queue.publish(TestEvent2(tags={EVENT_TAG_2}))
|
||||
event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
|
||||
event_queue.publish(TestEvent2(tags={EVENT_TAG_2}))
|
||||
|
||||
assert subscriber.call_count == 2
|
||||
assert len(subscriber.call_types) == 2
|
||||
|
|
Loading…
Reference in New Issue