From ba52eae8ed9fbeeed46efc99c71361929faf2438 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 12:56:49 -0400 Subject: [PATCH] Common: Decouple PyPubSubPublisherWrapper from events --- .../event_queue/pypubsub_agent_event_queue.py | 2 +- .../event_queue/pypubsub_publisher_wrapper.py | 6 +-- .../pypubsub_island_event_queue.py | 6 ++- .../test_pypubsub_island_event_queue.py | 37 ++++++++++++++++--- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_agent_event_queue.py b/monkey/common/event_queue/pypubsub_agent_event_queue.py index d5cebf39a..cf75c5b6a 100644 --- a/monkey/common/event_queue/pypubsub_agent_event_queue.py +++ b/monkey/common/event_queue/pypubsub_agent_event_queue.py @@ -53,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): def _publish_event(self, topic: str, event: AbstractAgentEvent): logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") - self._pypubsub_publisher_wrapped.publish(topic, event) + self._pypubsub_publisher_wrapped.publish(topic, event=event) # Appending a unique string to the topics for type and tags prevents bugs caused by collisions # between type names and tag names. diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py index 510b929a8..71c7dda48 100644 --- a/monkey/common/event_queue/pypubsub_publisher_wrapper.py +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Callable +from typing import Callable from pubsub.core import Publisher @@ -49,9 +49,9 @@ class PyPubSubPublisherWrapper: # scope. Adding subscribers to self._refs prevents them from ever going out of scope. self._refs.append(subscriber) - def publish(self, topic_name: str, event: Any = None): + def publish(self, topic_name: str, **kwargs): # NOTE: `event` needs to match the MDS (message data specification) of the topic, # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) + self._pypubsub_publisher.sendMessage(topicName=topic_name, **kwargs) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index e18c12f2a..55d4f9846 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -21,4 +21,8 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): def publish(self, topic: IslandEventTopic, event: Any = None): topic_name = topic.name # needs to be a string for pypubsub logger.debug(f"Publishing {topic_name} event") - self._pypubsub_publisher_wrapped.publish(topic_name, event) + + if event is None: + self._pypubsub_publisher_wrapped.publish(topic_name) + else: + self._pypubsub_publisher_wrapped.publish(topic_name, event=event) diff --git a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py index d2a2fa84f..2546359fc 100644 --- a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py +++ b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py @@ -1,4 +1,4 @@ -from typing import Any, Callable +from typing import Callable import pytest from pubsub import pub @@ -19,7 +19,7 @@ def event_queue() -> IIslandEventQueue: @pytest.fixture def event_queue_subscriber() -> Callable[..., None]: - def fn(event, topic=pub.AUTO_TOPIC): + def fn(topic=pub.AUTO_TOPIC): fn.call_count += 1 fn.call_topics |= {topic.getName()} @@ -29,10 +29,12 @@ def event_queue_subscriber() -> Callable[..., None]: return fn -def test_subscribe_publish( +def test_subscribe_publish__no_event_body( event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber ): - event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=event_queue_subscriber) + event_queue.subscribe( + topic=IslandEventTopic.RESET_AGENT_CONFIGURATION, subscriber=event_queue_subscriber + ) event_queue.subscribe( topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber ) @@ -43,16 +45,39 @@ def test_subscribe_publish( assert event_queue_subscriber.call_count == 2 assert event_queue_subscriber.call_topics == { - IslandEventTopic.AGENT_CONNECTED.name, + IslandEventTopic.RESET_AGENT_CONFIGURATION.name, IslandEventTopic.CLEAR_SIMULATION_DATA.name, } +def test_subscribe_publish__with_event_body( + event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber +): + class MyCallable: + call_count = 0 + event = None + + def __call__(self, event): + MyCallable.call_count += 1 + MyCallable.event = event + + event = "my event!" + my_callable = MyCallable() + event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=my_callable) + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED, event=event) + event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA) + event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION) + + assert my_callable.call_count == 1 + assert my_callable.event == event + + def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue): class MyCallable: called = False - def __call__(self, event: Any): + def __call__(self): MyCallable.called = True def subscribe():