Common: Decouple PyPubSubPublisherWrapper from events

This commit is contained in:
Mike Salvatore 2022-09-06 12:56:49 -04:00
parent c16c6456aa
commit ba52eae8ed
4 changed files with 40 additions and 11 deletions

View File

@ -53,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue):
def _publish_event(self, topic: str, event: AbstractAgentEvent): def _publish_event(self, topic: str, event: AbstractAgentEvent):
logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}")
self._pypubsub_publisher_wrapped.publish(topic, event) self._pypubsub_publisher_wrapped.publish(topic, event=event)
# Appending a unique string to the topics for type and tags prevents bugs caused by collisions # Appending a unique string to the topics for type and tags prevents bugs caused by collisions
# between type names and tag names. # between type names and tag names.

View File

@ -1,5 +1,5 @@
import logging import logging
from typing import Any, Callable from typing import Callable
from pubsub.core import Publisher from pubsub.core import Publisher
@ -49,9 +49,9 @@ class PyPubSubPublisherWrapper:
# scope. Adding subscribers to self._refs prevents them from ever going out of scope. # scope. Adding subscribers to self._refs prevents them from ever going out of scope.
self._refs.append(subscriber) self._refs.append(subscriber)
def publish(self, topic_name: str, event: Any = None): def publish(self, topic_name: str, **kwargs):
# NOTE: `event` needs to match the MDS (message data specification) of the topic, # NOTE: `event` needs to match the MDS (message data specification) of the topic,
# otherwise, errors will arise. The MDS of a topic is set when the topic is created, # otherwise, errors will arise. The MDS of a topic is set when the topic is created,
# which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which in our case is when a subscriber subscribes to a topic (in `subscribe()`)
# which is new (hasn't been subscribed to before). # which is new (hasn't been subscribed to before).
self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) self._pypubsub_publisher.sendMessage(topicName=topic_name, **kwargs)

View File

@ -21,4 +21,8 @@ class PyPubSubIslandEventQueue(IIslandEventQueue):
def publish(self, topic: IslandEventTopic, event: Any = None): def publish(self, topic: IslandEventTopic, event: Any = None):
topic_name = topic.name # needs to be a string for pypubsub topic_name = topic.name # needs to be a string for pypubsub
logger.debug(f"Publishing {topic_name} event") logger.debug(f"Publishing {topic_name} event")
self._pypubsub_publisher_wrapped.publish(topic_name, event)
if event is None:
self._pypubsub_publisher_wrapped.publish(topic_name)
else:
self._pypubsub_publisher_wrapped.publish(topic_name, event=event)

View File

@ -1,4 +1,4 @@
from typing import Any, Callable from typing import Callable
import pytest import pytest
from pubsub import pub from pubsub import pub
@ -19,7 +19,7 @@ def event_queue() -> IIslandEventQueue:
@pytest.fixture @pytest.fixture
def event_queue_subscriber() -> Callable[..., None]: def event_queue_subscriber() -> Callable[..., None]:
def fn(event, topic=pub.AUTO_TOPIC): def fn(topic=pub.AUTO_TOPIC):
fn.call_count += 1 fn.call_count += 1
fn.call_topics |= {topic.getName()} fn.call_topics |= {topic.getName()}
@ -29,10 +29,12 @@ def event_queue_subscriber() -> Callable[..., None]:
return fn return fn
def test_subscribe_publish( def test_subscribe_publish__no_event_body(
event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber
): ):
event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=event_queue_subscriber) event_queue.subscribe(
topic=IslandEventTopic.RESET_AGENT_CONFIGURATION, subscriber=event_queue_subscriber
)
event_queue.subscribe( event_queue.subscribe(
topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber
) )
@ -43,16 +45,39 @@ def test_subscribe_publish(
assert event_queue_subscriber.call_count == 2 assert event_queue_subscriber.call_count == 2
assert event_queue_subscriber.call_topics == { assert event_queue_subscriber.call_topics == {
IslandEventTopic.AGENT_CONNECTED.name, IslandEventTopic.RESET_AGENT_CONFIGURATION.name,
IslandEventTopic.CLEAR_SIMULATION_DATA.name, IslandEventTopic.CLEAR_SIMULATION_DATA.name,
} }
def test_subscribe_publish__with_event_body(
event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber
):
class MyCallable:
call_count = 0
event = None
def __call__(self, event):
MyCallable.call_count += 1
MyCallable.event = event
event = "my event!"
my_callable = MyCallable()
event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=my_callable)
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED, event=event)
event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA)
event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION)
assert my_callable.call_count == 1
assert my_callable.event == event
def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue): def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue):
class MyCallable: class MyCallable:
called = False called = False
def __call__(self, event: Any): def __call__(self):
MyCallable.called = True MyCallable.called = True
def subscribe(): def subscribe():