UT: Refactor test_pypubsub_event_queue.py

This commit is contained in:
Mike Salvatore 2022-08-10 11:07:36 -04:00
parent d6bb56536d
commit 2bdc16c286
1 changed files with 58 additions and 53 deletions

View File

@ -14,81 +14,86 @@ EVENT_TAG_2 = "event tag 2"
@dataclass(frozen=True) @dataclass(frozen=True)
class TestEvent(AbstractEvent): class TestEvent1(AbstractEvent):
source: UUID = "f811ad00-5a68-4437-bd51-7b5cc1768ad5" source: UUID = "f811ad00-5a68-4437-bd51-7b5cc1768ad5"
target: Union[UUID, IPv4Address, None] = None target: Union[UUID, IPv4Address, None] = None
timestamp: float = 0.0 timestamp: float = 0.0
tags: FrozenSet = frozenset() tags: FrozenSet = frozenset()
@dataclass(frozen=True)
class TestEvent2(AbstractEvent):
source: UUID = "e810ad01-6b67-9446-fc58-9b8d717653f7"
target: Union[UUID, IPv4Address, None] = None
timestamp: float = 0.0
tags: FrozenSet = frozenset()
pypubsub_event_queue = PyPubSubEventQueue(pub) pypubsub_event_queue = PyPubSubEventQueue(pub)
@pytest.fixture() def new_subscriber():
def subscriber_1_calls(): def fn(event):
return [] fn.call_count += 1
fn.call_types.add(event.__class__)
fn.call_tags |= event.tags
fn.call_count = 0
@pytest.fixture() fn.call_types = set()
def subscriber_2_calls(): fn.call_tags = set()
return []
@pytest.fixture
def subscriber_1(subscriber_1_calls):
def fn(event, topic=pub.AUTO_TOPIC):
subscriber_1_calls.append(topic.getName())
return fn return fn
@pytest.fixture @pytest.fixture
def subscriber_2(subscriber_2_calls): def subscriber():
def fn(event, topic=pub.AUTO_TOPIC): return new_subscriber()
subscriber_2_calls.append(topic.getName())
return fn
@pytest.mark.usefixtures("subscriber_1", "subscriber_2", "subscriber_1_calls", "subscriber_2_calls") def test_subscribe_all(subscriber):
def test_topic_subscription(subscriber_1, subscriber_2, subscriber_1_calls, subscriber_2_calls):
pypubsub_event_queue.subscribe_type(TestEvent, subscriber_1)
pypubsub_event_queue.subscribe_tag(EVENT_TAG_2, subscriber_1)
pypubsub_event_queue.subscribe_tag(EVENT_TAG_1, subscriber_2)
pypubsub_event_queue.publish(TestEvent(tags={EVENT_TAG_1, EVENT_TAG_2}))
assert subscriber_1_calls == [TestEvent.__name__, EVENT_TAG_2]
assert subscriber_2_calls == [EVENT_TAG_1]
def test_subscribe_all():
subscriber_calls = []
def subscriber(event, topic=pub.AUTO_TOPIC):
subscriber_calls.append(topic.getName())
pypubsub_event_queue.subscribe_all_events(subscriber) pypubsub_event_queue.subscribe_all_events(subscriber)
pypubsub_event_queue.publish(TestEvent(tags={EVENT_TAG_1, EVENT_TAG_2}))
assert len(subscriber_calls) == 1 pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
assert TestEvent.__name__ not in subscriber_calls pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_2}))
assert EVENT_TAG_1 not in subscriber_calls pypubsub_event_queue.publish(TestEvent1(tags={"secret_tag"}))
assert EVENT_TAG_2 not in subscriber_calls pypubsub_event_queue.publish(TestEvent2())
assert subscriber.call_count == 4
assert TestEvent1 in subscriber.call_types
assert TestEvent2 in subscriber.call_types
@pytest.mark.usefixtures("subscriber_1", "subscriber_1_calls") @pytest.mark.parametrize("type_to_subscribe", [TestEvent1, TestEvent2])
def test_subscribe_types(subscriber_1, subscriber_1_calls): def test_subscribe_types(subscriber, type_to_subscribe):
pypubsub_event_queue.subscribe_type(TestEvent, subscriber_1) pypubsub_event_queue.subscribe_type(type_to_subscribe, subscriber)
pypubsub_event_queue.publish(TestEvent(tags={EVENT_TAG_1, EVENT_TAG_2}))
assert subscriber_1_calls == [TestEvent.__name__] pypubsub_event_queue.publish(TestEvent1())
pypubsub_event_queue.publish(TestEvent2())
assert subscriber.call_count == 1
assert type_to_subscribe in subscriber.call_types
@pytest.mark.usefixtures("subscriber_1", "subscriber_2", "subscriber_1_calls", "subscriber_2_calls") def test_subscribe_tags_single_type(subscriber):
def test_subscribe_tags(subscriber_1, subscriber_2, subscriber_1_calls, subscriber_2_calls): pypubsub_event_queue.subscribe_tag(EVENT_TAG_1, subscriber)
pypubsub_event_queue.subscribe_tag(EVENT_TAG_1, subscriber_1)
pypubsub_event_queue.subscribe_tag(EVENT_TAG_2, subscriber_2)
pypubsub_event_queue.publish(TestEvent(tags={EVENT_TAG_1, EVENT_TAG_2}))
assert subscriber_1_calls == [EVENT_TAG_1] pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
assert subscriber_2_calls == [EVENT_TAG_2] pypubsub_event_queue.publish(TestEvent2(tags={EVENT_TAG_2}))
assert subscriber.call_count == 1
assert len(subscriber.call_types) == 1
assert TestEvent1 in subscriber.call_types
assert EVENT_TAG_1 in subscriber.call_tags
def test_subscribe_tags_multiple_types(subscriber):
pypubsub_event_queue.subscribe_tag(EVENT_TAG_2, subscriber)
pypubsub_event_queue.publish(TestEvent1(tags={EVENT_TAG_1, EVENT_TAG_2}))
pypubsub_event_queue.publish(TestEvent2(tags={EVENT_TAG_2}))
assert subscriber.call_count == 2
assert len(subscriber.call_types) == 2
assert TestEvent1 in subscriber.call_types
assert TestEvent2 in subscriber.call_types
assert {EVENT_TAG_1, EVENT_TAG_2}.issubset(subscriber.call_tags)