From 3dba1bc7d5bacad6b1e90cfbe631fd2996f05827 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:04:51 +0530 Subject: [PATCH 01/30] Common: Import IIslandEventQueue, IslandEventTopics in common/event_queue/__init__.py --- monkey/common/event_queue/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 26d29cb49..18412d2a7 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -1,3 +1,4 @@ from .types import AgentEventSubscriber from .i_agent_event_queue import IAgentEventQueue +from .i_island_event_queue import IIslandEventQueue, IslandEventTopics from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue From 71e9f68fe6102fbeabe265c6b7faa3c8117d0d4f Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:05:35 +0530 Subject: [PATCH 02/30] Common: Fix IslandEventTopics enum values --- monkey/common/event_queue/i_island_event_queue.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/common/event_queue/i_island_event_queue.py index f7bc60c5f..8066b1385 100644 --- a/monkey/common/event_queue/i_island_event_queue.py +++ b/monkey/common/event_queue/i_island_event_queue.py @@ -1,12 +1,12 @@ from abc import ABC, abstractmethod -from enum import Enum, auto +from enum import Enum from typing import Any, Callable class IslandEventTopics(Enum): - AGENT_CONNECTED = auto() - CLEAR_SIMULATION_DATA = auto() - RESET_AGENT_CONFIGURATION = auto() + AGENT_CONNECTED = "agent_connected" + CLEAR_SIMULATION_DATA = "clear_simulation_data" + RESET_AGENT_CONFIGURATION = "reset_agent_configuration" class IIslandEventQueue(ABC): From c9500cd04f13df54bd42b15ffda16d7511df70c9 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:17:26 +0530 Subject: [PATCH 03/30] Common: Add PyPubSubIslandEventQueue --- .../pypubsub_island_event_queue.py | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 monkey/common/event_queue/pypubsub_island_event_queue.py diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py new file mode 100644 index 000000000..ba6ef3af4 --- /dev/null +++ b/monkey/common/event_queue/pypubsub_island_event_queue.py @@ -0,0 +1,64 @@ +import logging +from typing import Any, Callable + +from pubsub.core import Publisher + +from . import IIslandEventQueue, IslandEventTopics + +logger = logging.getLogger(__name__) + + +class PyPubSubIslandEventQueue(IIslandEventQueue): + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher = pypubsub_publisher + self._refs = [] + + def subscribe(self, topic: IslandEventTopics, subscriber: Callable[..., None]): + topic_value = topic.value # needs to be a string for pypubsub + try: + subscriber_name = subscriber.__name__ + except AttributeError: + subscriber_name = subscriber.__class__.__name__ + + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_value}") + + # NOTE: The subscriber's signature needs to match the MDS (message data specification) of + # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic + # is created, which in our case is when a subscriber subscribes to a topic which + # is new (hasn't been subscribed to before). If the topic is being subscribed to by + # a subscriber for the first time, the topic's MDS will automatically be set + # according to that subscriber's signature. + self._pypubsub_publisher.subscribe(topicName=topic_value, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: Callable[..., None]): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) + + def publish(self, topic: IslandEventTopics, event_data: Any = None): + topic_value = topic.value # needs to be a string for pypubsub + + logger.debug(f"Publishing {topic_value} event") + + # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, + # otherwise, errors will arise. The MDS of a topic is set when the topic is created, + # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) + # which is new (hasn't been subscribed to before). + self._pypubsub_publisher.sendMessage(topicName=topic_value, event=event_data) From 4219b6cbd4b3447427090dfe135abf6d11cd6c38 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:19:53 +0530 Subject: [PATCH 04/30] Common: Rename IslandEventTopics -> IslandEventTopic --- monkey/common/event_queue/__init__.py | 2 +- monkey/common/event_queue/i_island_event_queue.py | 6 +++--- monkey/common/event_queue/pypubsub_island_event_queue.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 18412d2a7..dabc9527e 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -1,4 +1,4 @@ from .types import AgentEventSubscriber from .i_agent_event_queue import IAgentEventQueue -from .i_island_event_queue import IIslandEventQueue, IslandEventTopics +from .i_island_event_queue import IIslandEventQueue, IslandEventTopic from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/common/event_queue/i_island_event_queue.py index 8066b1385..a5614c37f 100644 --- a/monkey/common/event_queue/i_island_event_queue.py +++ b/monkey/common/event_queue/i_island_event_queue.py @@ -3,7 +3,7 @@ from enum import Enum from typing import Any, Callable -class IslandEventTopics(Enum): +class IslandEventTopic(Enum): AGENT_CONNECTED = "agent_connected" CLEAR_SIMULATION_DATA = "clear_simulation_data" RESET_AGENT_CONFIGURATION = "reset_agent_configuration" @@ -15,7 +15,7 @@ class IIslandEventQueue(ABC): """ @abstractmethod - def subscribe(self, topic: IslandEventTopics, subscriber: Callable[..., None]): + def subscribe(self, topic: IslandEventTopic, subscriber: Callable[..., None]): """ Subscribes a subscriber to the specified event topic @@ -26,7 +26,7 @@ class IIslandEventQueue(ABC): pass @abstractmethod - def publish(self, topic: IslandEventTopics, event_data: Any = None): + def publish(self, topic: IslandEventTopic, event_data: Any = None): """ Publishes an event topic with the given data diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py index ba6ef3af4..25be20633 100644 --- a/monkey/common/event_queue/pypubsub_island_event_queue.py +++ b/monkey/common/event_queue/pypubsub_island_event_queue.py @@ -3,7 +3,7 @@ from typing import Any, Callable from pubsub.core import Publisher -from . import IIslandEventQueue, IslandEventTopics +from . import IIslandEventQueue, IslandEventTopic logger = logging.getLogger(__name__) @@ -13,7 +13,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): self._pypubsub_publisher = pypubsub_publisher self._refs = [] - def subscribe(self, topic: IslandEventTopics, subscriber: Callable[..., None]): + def subscribe(self, topic: IslandEventTopic, subscriber: Callable[..., None]): topic_value = topic.value # needs to be a string for pypubsub try: subscriber_name = subscriber.__name__ @@ -52,7 +52,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # scope. Adding subscribers to self._refs prevents them from ever going out of scope. self._refs.append(subscriber) - def publish(self, topic: IslandEventTopics, event_data: Any = None): + def publish(self, topic: IslandEventTopic, event_data: Any = None): topic_value = topic.value # needs to be a string for pypubsub logger.debug(f"Publishing {topic_value} event") From 342a4959b3f2cb5fe51c50be5a1c9312d7d7510c Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:20:53 +0530 Subject: [PATCH 05/30] Common: Import PyPubSubIslandEventQueue in common/event_queue/__init__.py --- monkey/common/event_queue/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index dabc9527e..04b67e625 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -2,3 +2,4 @@ from .types import AgentEventSubscriber from .i_agent_event_queue import IAgentEventQueue from .i_island_event_queue import IIslandEventQueue, IslandEventTopic from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue +from .pypubsub_island_event_queue import PyPubSubIslandEventQueue From fb4bfb7be14b31b7437ac99edc6a664527862ceb Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:25:59 +0530 Subject: [PATCH 06/30] Project: Fix PyPubSubIslandEventQueue entries to Vulture allowlist --- vulture_allowlist.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/vulture_allowlist.py b/vulture_allowlist.py index 976810c9d..fc05f4c59 100644 --- a/vulture_allowlist.py +++ b/vulture_allowlist.py @@ -8,6 +8,7 @@ from common.agent_configuration.agent_sub_configurations import ( ScanTargetConfiguration, ) from common.credentials import Credentials +from common.event_queue import IslandEventTopic, PyPubSubIslandEventQueue from common.utils import IJSONSerializable from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory from monkey_island.cc.models import Report @@ -317,9 +318,7 @@ CC_TUNNEL Credentials.from_json IJSONSerializable.from_json -# revisit when implementing PyPubSubIslandEventQueue -AGENT_CONNECTED -CLEAR_SIMULATION_DATA -RESET_AGENT_CONFIGURATION -IIslandEventQueue -event_data +PyPubSubIslandEventQueue +IslandEventTopic.AGENT_CONNECTED +IslandEventTopic.CLEAR_SIMULATION_DATA +IslandEventTopic.RESET_AGENT_CONFIGURATION From ac2217ce8a8ce4901e38f2ed736f0e2c9b45fe53 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:30:45 +0530 Subject: [PATCH 07/30] Common: Add IslandEventSubecriber type --- monkey/common/event_queue/__init__.py | 2 +- monkey/common/event_queue/i_island_event_queue.py | 6 ++++-- monkey/common/event_queue/pypubsub_island_event_queue.py | 8 ++++---- monkey/common/event_queue/types.py | 1 + 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 04b67e625..3f1e149b6 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -1,4 +1,4 @@ -from .types import AgentEventSubscriber +from .types import AgentEventSubscriber, IslandEventSubscriber from .i_agent_event_queue import IAgentEventQueue from .i_island_event_queue import IIslandEventQueue, IslandEventTopic from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/common/event_queue/i_island_event_queue.py index a5614c37f..65eb94ee3 100644 --- a/monkey/common/event_queue/i_island_event_queue.py +++ b/monkey/common/event_queue/i_island_event_queue.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Callable +from typing import Any + +from . import IslandEventSubscriber class IslandEventTopic(Enum): @@ -15,7 +17,7 @@ class IIslandEventQueue(ABC): """ @abstractmethod - def subscribe(self, topic: IslandEventTopic, subscriber: Callable[..., None]): + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): """ Subscribes a subscriber to the specified event topic diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py index 25be20633..d5d1df7cd 100644 --- a/monkey/common/event_queue/pypubsub_island_event_queue.py +++ b/monkey/common/event_queue/pypubsub_island_event_queue.py @@ -1,9 +1,9 @@ import logging -from typing import Any, Callable +from typing import Any from pubsub.core import Publisher -from . import IIslandEventQueue, IslandEventTopic +from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic logger = logging.getLogger(__name__) @@ -13,7 +13,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): self._pypubsub_publisher = pypubsub_publisher self._refs = [] - def subscribe(self, topic: IslandEventTopic, subscriber: Callable[..., None]): + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): topic_value = topic.value # needs to be a string for pypubsub try: subscriber_name = subscriber.__name__ @@ -31,7 +31,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): self._pypubsub_publisher.subscribe(topicName=topic_value, listener=subscriber) self._keep_subscriber_strongref(subscriber) - def _keep_subscriber_strongref(self, subscriber: Callable[..., None]): + def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: # > PyPubSub holds listeners by weak reference so that the lifetime of the # > callable is not affected by PyPubSub: once the application no longer diff --git a/monkey/common/event_queue/types.py b/monkey/common/event_queue/types.py index 6b57ae712..bd0b23dff 100644 --- a/monkey/common/event_queue/types.py +++ b/monkey/common/event_queue/types.py @@ -3,3 +3,4 @@ from typing import Callable from common.events import AbstractAgentEvent AgentEventSubscriber = Callable[[AbstractAgentEvent], None] +IslandEventSubscriber = Callable[..., None] From 265e083571c3392e908d5c6c0818016f9035567d Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 13:33:36 +0530 Subject: [PATCH 08/30] UT: Rename test_pypubsub_event_queue.py -> test_pypubsub_agent_event_queue.py --- ...pypubsub_event_queue.py => test_pypubsub_agent_event_queue.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename monkey/tests/unit_tests/common/event_queue/{test_pypubsub_event_queue.py => test_pypubsub_agent_event_queue.py} (100%) diff --git a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_agent_event_queue.py similarity index 100% rename from monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py rename to monkey/tests/unit_tests/common/event_queue/test_pypubsub_agent_event_queue.py From 71c7a9a5336c7c94e0ed4baa11e27c6e47a7d4cd Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 17:47:01 +0530 Subject: [PATCH 09/30] Common: Change parameter name event_data -> event in Island event queue --- monkey/common/event_queue/i_island_event_queue.py | 4 ++-- monkey/common/event_queue/pypubsub_island_event_queue.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/common/event_queue/i_island_event_queue.py index 65eb94ee3..c8e5115a9 100644 --- a/monkey/common/event_queue/i_island_event_queue.py +++ b/monkey/common/event_queue/i_island_event_queue.py @@ -28,12 +28,12 @@ class IIslandEventQueue(ABC): pass @abstractmethod - def publish(self, topic: IslandEventTopic, event_data: Any = None): + def publish(self, topic: IslandEventTopic, event: Any = None): """ Publishes an event topic with the given data :param topic: Event topic to publish - :param event_data: Event data to pass to subscribers with the event publish + :param event: Event to pass to subscribers with the event publish """ pass diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py index d5d1df7cd..8898b343a 100644 --- a/monkey/common/event_queue/pypubsub_island_event_queue.py +++ b/monkey/common/event_queue/pypubsub_island_event_queue.py @@ -52,7 +52,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # scope. Adding subscribers to self._refs prevents them from ever going out of scope. self._refs.append(subscriber) - def publish(self, topic: IslandEventTopic, event_data: Any = None): + def publish(self, topic: IslandEventTopic, event: Any = None): topic_value = topic.value # needs to be a string for pypubsub logger.debug(f"Publishing {topic_value} event") @@ -61,4 +61,4 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_value, event=event_data) + self._pypubsub_publisher.sendMessage(topicName=topic_value, event=event) From 237f6d01b6581a446bc3124fa0382430a4fcd201 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 17:53:36 +0530 Subject: [PATCH 10/30] UT: Add tests for PyPubSubIslandEventQueue --- .../test_pypubsub_island_event_queue.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py diff --git a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py new file mode 100644 index 000000000..cf51780f7 --- /dev/null +++ b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py @@ -0,0 +1,67 @@ +from typing import Any, Callable + +import pytest +from pubsub import pub +from pubsub.core import Publisher + +from common.event_queue import ( + IIslandEventQueue, + IslandEventSubscriber, + IslandEventTopic, + PyPubSubIslandEventQueue, +) + + +@pytest.fixture +def event_queue() -> IIslandEventQueue: + return PyPubSubIslandEventQueue(Publisher()) + + +@pytest.fixture +def event_queue_subscriber() -> Callable[..., None]: + def fn(event, topic=pub.AUTO_TOPIC): + fn.call_count += 1 + fn.call_topics |= {topic.getName()} + + fn.call_count = 0 + fn.call_topics = set() + + return fn + + +def test_subscribe_publish( + event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber +): + event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=event_queue_subscriber) + event_queue.subscribe( + topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber + ) + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED) + event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA) + event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION) + + assert event_queue_subscriber.call_count == 2 + assert event_queue_subscriber.call_topics == { + IslandEventTopic.AGENT_CONNECTED.value, + IslandEventTopic.CLEAR_SIMULATION_DATA.value, + } + + +def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue): + class MyCallable: + called = False + + def __call__(self, event: Any): + MyCallable.called = True + + def subscribe(): + # fn will go out of scope after subscribe() returns. + fn = MyCallable() + event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=fn) + + subscribe() + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED) + + assert MyCallable.called From 27c8a1019b95143f5be345d330d82b4007781910 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 17:59:17 +0530 Subject: [PATCH 11/30] Island: Register IIslandEventQueue instance in DI container --- monkey/monkey_island/cc/services/initialize.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index b1caab657..5f5fc76c1 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -14,6 +14,7 @@ from common.aws import AWSInstance from common.common_consts.telem_categories import TelemCategoryEnum from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue from common.utils.file_utils import get_binary_io_sha256_hash +from monkey.common.event_queue import IIslandEventQueue, PyPubSubIslandEventQueue from monkey_island.cc.repository import ( AgentBinaryRepository, FileAgentConfigurationRepository, @@ -64,6 +65,7 @@ def initialize_services(container: DIContainer, data_dir: Path): ) container.register(Publisher, Publisher) container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) + container.register_instance(IIslandEventQueue, container.resolve(PyPubSubIslandEventQueue)) _register_repositories(container, data_dir) _register_services(container) From 1a09f26fd9d16f86258802821fde319d1690ac6b Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 18:07:25 +0530 Subject: [PATCH 12/30] Common: Modify IslandEventTopic enum to not have values --- monkey/common/event_queue/i_island_event_queue.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/common/event_queue/i_island_event_queue.py index c8e5115a9..f5e8ea5c3 100644 --- a/monkey/common/event_queue/i_island_event_queue.py +++ b/monkey/common/event_queue/i_island_event_queue.py @@ -4,11 +4,9 @@ from typing import Any from . import IslandEventSubscriber - -class IslandEventTopic(Enum): - AGENT_CONNECTED = "agent_connected" - CLEAR_SIMULATION_DATA = "clear_simulation_data" - RESET_AGENT_CONFIGURATION = "reset_agent_configuration" +IslandEventTopic = Enum( + "IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"] +) class IIslandEventQueue(ABC): From 004337583a9880fb559727f508f41841c44cb735 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 18:11:08 +0530 Subject: [PATCH 13/30] Common: Use IslandEventTopic enum's names for pypubsub topics --- .../event_queue/pypubsub_island_event_queue.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py index 8898b343a..64c1b25ba 100644 --- a/monkey/common/event_queue/pypubsub_island_event_queue.py +++ b/monkey/common/event_queue/pypubsub_island_event_queue.py @@ -14,13 +14,13 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): self._refs = [] def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): - topic_value = topic.value # needs to be a string for pypubsub + topic_name = topic.name # needs to be a string for pypubsub try: subscriber_name = subscriber.__name__ except AttributeError: subscriber_name = subscriber.__class__.__name__ - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_value}") + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") # NOTE: The subscriber's signature needs to match the MDS (message data specification) of # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic @@ -28,7 +28,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # is new (hasn't been subscribed to before). If the topic is being subscribed to by # a subscriber for the first time, the topic's MDS will automatically be set # according to that subscriber's signature. - self._pypubsub_publisher.subscribe(topicName=topic_value, listener=subscriber) + self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) self._keep_subscriber_strongref(subscriber) def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): @@ -53,12 +53,12 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): self._refs.append(subscriber) def publish(self, topic: IslandEventTopic, event: Any = None): - topic_value = topic.value # needs to be a string for pypubsub + topic_name = topic.name # needs to be a string for pypubsub - logger.debug(f"Publishing {topic_value} event") + logger.debug(f"Publishing {topic_name} event") # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_value, event=event) + self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) From 5da8b424b57fe3df0f968c7fa5a45eceefafe1da Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 18:13:25 +0530 Subject: [PATCH 14/30] UT: Use IslandEventTopic enum's names for pypubsub topics --- .../common/event_queue/test_pypubsub_island_event_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py index cf51780f7..d026a288a 100644 --- a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py +++ b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py @@ -43,8 +43,8 @@ def test_subscribe_publish( assert event_queue_subscriber.call_count == 2 assert event_queue_subscriber.call_topics == { - IslandEventTopic.AGENT_CONNECTED.value, - IslandEventTopic.CLEAR_SIMULATION_DATA.value, + IslandEventTopic.AGENT_CONNECTED.name, + IslandEventTopic.CLEAR_SIMULATION_DATA.name, } From 502a875fdd6d22f350d0cd70cc02c54567989e12 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 18:57:56 +0530 Subject: [PATCH 15/30] Common: Move Island event queue stuff out of common/ --- monkey/common/event_queue/__init__.py | 4 +- .../event_queue/i_island_event_queue.py | 37 ----------- .../pypubsub_island_event_queue.py | 64 ------------------- monkey/common/event_queue/types.py | 1 - .../monkey_island/cc/event_queue/__init__.py | 3 + 5 files changed, 4 insertions(+), 105 deletions(-) delete mode 100644 monkey/common/event_queue/i_island_event_queue.py delete mode 100644 monkey/common/event_queue/pypubsub_island_event_queue.py create mode 100644 monkey/monkey_island/cc/event_queue/__init__.py diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 3f1e149b6..26d29cb49 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -1,5 +1,3 @@ -from .types import AgentEventSubscriber, IslandEventSubscriber +from .types import AgentEventSubscriber from .i_agent_event_queue import IAgentEventQueue -from .i_island_event_queue import IIslandEventQueue, IslandEventTopic from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue -from .pypubsub_island_event_queue import PyPubSubIslandEventQueue diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/common/event_queue/i_island_event_queue.py deleted file mode 100644 index f5e8ea5c3..000000000 --- a/monkey/common/event_queue/i_island_event_queue.py +++ /dev/null @@ -1,37 +0,0 @@ -from abc import ABC, abstractmethod -from enum import Enum -from typing import Any - -from . import IslandEventSubscriber - -IslandEventTopic = Enum( - "IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"] -) - - -class IIslandEventQueue(ABC): - """ - Manages subscription and publishing of events in the Island - """ - - @abstractmethod - def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): - """ - Subscribes a subscriber to the specified event topic - - :param topic: Event topic to which the subscriber should subscribe - :param subscriber: A subscriber that will receive events - """ - - pass - - @abstractmethod - def publish(self, topic: IslandEventTopic, event: Any = None): - """ - Publishes an event topic with the given data - - :param topic: Event topic to publish - :param event: Event to pass to subscribers with the event publish - """ - - pass diff --git a/monkey/common/event_queue/pypubsub_island_event_queue.py b/monkey/common/event_queue/pypubsub_island_event_queue.py deleted file mode 100644 index 64c1b25ba..000000000 --- a/monkey/common/event_queue/pypubsub_island_event_queue.py +++ /dev/null @@ -1,64 +0,0 @@ -import logging -from typing import Any - -from pubsub.core import Publisher - -from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic - -logger = logging.getLogger(__name__) - - -class PyPubSubIslandEventQueue(IIslandEventQueue): - def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher = pypubsub_publisher - self._refs = [] - - def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): - topic_name = topic.name # needs to be a string for pypubsub - try: - subscriber_name = subscriber.__name__ - except AttributeError: - subscriber_name = subscriber.__class__.__name__ - - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") - - # NOTE: The subscriber's signature needs to match the MDS (message data specification) of - # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic - # is created, which in our case is when a subscriber subscribes to a topic which - # is new (hasn't been subscribed to before). If the topic is being subscribed to by - # a subscriber for the first time, the topic's MDS will automatically be set - # according to that subscriber's signature. - self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) - self._keep_subscriber_strongref(subscriber) - - def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): - # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: - # > PyPubSub holds listeners by weak reference so that the lifetime of the - # > callable is not affected by PyPubSub: once the application no longer - # > references the callable, it can be garbage collected and PyPubSub can clean up - # > so it is no longer registered (this happens thanks to the weakref module). - # > Without this, it would be imperative to remember to unsubscribe certain - # > listeners, which is error prone; they would end up living until the - # > application exited. - # - # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable - # - # In our case, we're OK with subscribers living until the application exits. We don't - # provide an unsubscribe method (at this time) as subscriptions are expected to last - # for the life of the process. - # - # Specifically, if an instance object of a callable class is created and subscribed, - # we don't want that subscription to end if the callable instance object goes out of - # scope. Adding subscribers to self._refs prevents them from ever going out of scope. - self._refs.append(subscriber) - - def publish(self, topic: IslandEventTopic, event: Any = None): - topic_name = topic.name # needs to be a string for pypubsub - - logger.debug(f"Publishing {topic_name} event") - - # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, - # otherwise, errors will arise. The MDS of a topic is set when the topic is created, - # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) - # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) diff --git a/monkey/common/event_queue/types.py b/monkey/common/event_queue/types.py index bd0b23dff..6b57ae712 100644 --- a/monkey/common/event_queue/types.py +++ b/monkey/common/event_queue/types.py @@ -3,4 +3,3 @@ from typing import Callable from common.events import AbstractAgentEvent AgentEventSubscriber = Callable[[AbstractAgentEvent], None] -IslandEventSubscriber = Callable[..., None] diff --git a/monkey/monkey_island/cc/event_queue/__init__.py b/monkey/monkey_island/cc/event_queue/__init__.py new file mode 100644 index 000000000..6eab2b6ac --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/__init__.py @@ -0,0 +1,3 @@ +from .types import IslandEventSubscriber +from .i_island_event_queue import IIslandEventQueue, IslandEventTopic +from .pypubsub_island_event_queue import PyPubSubIslandEventQueue From f2e7a3d66f12c285efa52bd0599110b835c62fc3 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 18:58:33 +0530 Subject: [PATCH 16/30] Island: Add Island event queue stuff to monkey_island/ --- .../cc/event_queue/i_island_event_queue.py | 37 +++++++++++ .../pypubsub_island_event_queue.py | 64 +++++++++++++++++++ monkey/monkey_island/cc/event_queue/types.py | 3 + 3 files changed, 104 insertions(+) create mode 100644 monkey/monkey_island/cc/event_queue/i_island_event_queue.py create mode 100644 monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py create mode 100644 monkey/monkey_island/cc/event_queue/types.py diff --git a/monkey/monkey_island/cc/event_queue/i_island_event_queue.py b/monkey/monkey_island/cc/event_queue/i_island_event_queue.py new file mode 100644 index 000000000..f5e8ea5c3 --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/i_island_event_queue.py @@ -0,0 +1,37 @@ +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any + +from . import IslandEventSubscriber + +IslandEventTopic = Enum( + "IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"] +) + + +class IIslandEventQueue(ABC): + """ + Manages subscription and publishing of events in the Island + """ + + @abstractmethod + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): + """ + Subscribes a subscriber to the specified event topic + + :param topic: Event topic to which the subscriber should subscribe + :param subscriber: A subscriber that will receive events + """ + + pass + + @abstractmethod + def publish(self, topic: IslandEventTopic, event: Any = None): + """ + Publishes an event topic with the given data + + :param topic: Event topic to publish + :param event: Event to pass to subscribers with the event publish + """ + + pass diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py new file mode 100644 index 000000000..64c1b25ba --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -0,0 +1,64 @@ +import logging +from typing import Any + +from pubsub.core import Publisher + +from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic + +logger = logging.getLogger(__name__) + + +class PyPubSubIslandEventQueue(IIslandEventQueue): + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher = pypubsub_publisher + self._refs = [] + + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): + topic_name = topic.name # needs to be a string for pypubsub + try: + subscriber_name = subscriber.__name__ + except AttributeError: + subscriber_name = subscriber.__class__.__name__ + + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") + + # NOTE: The subscriber's signature needs to match the MDS (message data specification) of + # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic + # is created, which in our case is when a subscriber subscribes to a topic which + # is new (hasn't been subscribed to before). If the topic is being subscribed to by + # a subscriber for the first time, the topic's MDS will automatically be set + # according to that subscriber's signature. + self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) + + def publish(self, topic: IslandEventTopic, event: Any = None): + topic_name = topic.name # needs to be a string for pypubsub + + logger.debug(f"Publishing {topic_name} event") + + # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, + # otherwise, errors will arise. The MDS of a topic is set when the topic is created, + # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) + # which is new (hasn't been subscribed to before). + self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) diff --git a/monkey/monkey_island/cc/event_queue/types.py b/monkey/monkey_island/cc/event_queue/types.py new file mode 100644 index 000000000..3a1588dbb --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/types.py @@ -0,0 +1,3 @@ +from typing import Callable + +IslandEventSubscriber = Callable[..., None] From 8f35a435915418e207185b24de43be01d61f09f8 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:00:29 +0530 Subject: [PATCH 17/30] Project: Fix import path in Vulture allowlist --- vulture_allowlist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vulture_allowlist.py b/vulture_allowlist.py index fc05f4c59..a672f319e 100644 --- a/vulture_allowlist.py +++ b/vulture_allowlist.py @@ -8,9 +8,9 @@ from common.agent_configuration.agent_sub_configurations import ( ScanTargetConfiguration, ) from common.credentials import Credentials -from common.event_queue import IslandEventTopic, PyPubSubIslandEventQueue from common.utils import IJSONSerializable from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory +from monkey_island.cc.event_queue import IslandEventTopic, PyPubSubIslandEventQueue from monkey_island.cc.models import Report from monkey_island.cc.models.networkmap import Arc, NetworkMap from monkey_island.cc.repository.attack.IMitigationsRepository import IMitigationsRepository From 38c6d53cc545ebd77ee2a5b19eeb6c79f90a92fc Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:05:50 +0530 Subject: [PATCH 18/30] UT: Move test_pypubsub_island_event_queue.py out of common/ and in to monkey_island/ --- .../cc}/event_queue/test_pypubsub_island_event_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename monkey/tests/unit_tests/{common => monkey_island/cc}/event_queue/test_pypubsub_island_event_queue.py (97%) diff --git a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py similarity index 97% rename from monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py rename to monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py index d026a288a..d2a2fa84f 100644 --- a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_island_event_queue.py +++ b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py @@ -4,7 +4,7 @@ import pytest from pubsub import pub from pubsub.core import Publisher -from common.event_queue import ( +from monkey_island.cc.event_queue import ( IIslandEventQueue, IslandEventSubscriber, IslandEventTopic, From 3cf332a0790cfc1855423ce67c722d78bd04fe1a Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:37:35 +0530 Subject: [PATCH 19/30] Common: Add PyPubSubPublisherWrapper --- monkey/common/event_queue/__init__.py | 1 + .../event_queue/pypubsub_publisher_wrapper.py | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 monkey/common/event_queue/pypubsub_publisher_wrapper.py diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 26d29cb49..1c2e287d7 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -1,3 +1,4 @@ from .types import AgentEventSubscriber +from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper from .i_agent_event_queue import IAgentEventQueue from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py new file mode 100644 index 000000000..3f9b75d79 --- /dev/null +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -0,0 +1,47 @@ +import logging +from typing import Any, Callable + +from pubsub.core import Publisher + +logger = logging.getLogger(__name__) + + +class PyPubSubPublisherWrapper: + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher = pypubsub_publisher + self._refs = [] + + def subscribe(self, topic_name: str, subscriber: Callable): + try: + subscriber_name = subscriber.__name__ + except AttributeError: + subscriber_name = subscriber.__class__.__name__ + + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") + + self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: Callable): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) + + def publish(self, topic_name: str, event: Any = None): + self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) From 3c71211b799797faa980bf2aa188eff007b3b55b Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:38:16 +0530 Subject: [PATCH 20/30] Common: Use PyPubSubPublisherWrapper in PyPubSubAgentEventQueue --- .../event_queue/pypubsub_agent_event_queue.py | 36 +++---------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_agent_event_queue.py b/monkey/common/event_queue/pypubsub_agent_event_queue.py index 86dbea4af..d5cebf39a 100644 --- a/monkey/common/event_queue/pypubsub_agent_event_queue.py +++ b/monkey/common/event_queue/pypubsub_agent_event_queue.py @@ -3,6 +3,7 @@ from typing import Type from pubsub.core import Publisher +from common.event_queue import PyPubSubPublisherWrapper from common.events import AbstractAgentEvent from . import AgentEventSubscriber, IAgentEventQueue @@ -14,8 +15,7 @@ logger = logging.getLogger(__name__) class PyPubSubAgentEventQueue(IAgentEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher = pypubsub_publisher - self._refs = [] + self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe_all_events(self, subscriber: AgentEventSubscriber): self._subscribe(_ALL_EVENTS_TOPIC, subscriber) @@ -32,35 +32,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): self._subscribe(tag_topic, subscriber) def _subscribe(self, topic: str, subscriber: AgentEventSubscriber): - try: - subscriber_name = subscriber.__name__ - except AttributeError: - subscriber_name = subscriber.__class__.__name__ - - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}") - self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber) - self._keep_subscriber_strongref(subscriber) - - def _keep_subscriber_strongref(self, subscriber: AgentEventSubscriber): - # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: - # > PyPubSub holds listeners by weak reference so that the lifetime of the - # > callable is not affected by PyPubSub: once the application no longer - # > references the callable, it can be garbage collected and PyPubSub can clean up - # > so it is no longer registered (this happens thanks to the weakref module). - # > Without this, it would be imperative to remember to unsubscribe certain - # > listeners, which is error prone; they would end up living until the - # > application exited. - # - # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable - # - # In our case, we're OK with subscribers living until the application exits. We don't - # provide an unsubscribe method (at this time) as subscriptions are expected to last - # for the life of the process. - # - # Specifically, if an instance object of a callable class is created and subscribed, - # we don't want that subscription to end if the callable instance object goes out of - # scope. Adding subscribers to self._refs prevents them from ever going out of scope. - self._refs.append(subscriber) + self._pypubsub_publisher_wrapped.subscribe(topic, subscriber) def publish(self, event: AbstractAgentEvent): self._publish_to_all_events_topic(event) @@ -81,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): def _publish_event(self, topic: str, event: AbstractAgentEvent): logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") - self._pypubsub_publisher.sendMessage(topic, event=event) + self._pypubsub_publisher_wrapped.publish(topic, event) # Appending a unique string to the topics for type and tags prevents bugs caused by collisions # between type names and tag names. From 70468c37fb34a91aee9dbed870fc41ab46884595 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:38:48 +0530 Subject: [PATCH 21/30] Island: Use PyPubSubPublisherWrapper in PyPubSubIslandEventQueue --- .../pypubsub_island_event_queue.py | 37 +++---------------- 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index 64c1b25ba..478228d72 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -3,6 +3,8 @@ from typing import Any from pubsub.core import Publisher +from common.event_queue import PyPubSubPublisherWrapper + from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic logger = logging.getLogger(__name__) @@ -10,17 +12,10 @@ logger = logging.getLogger(__name__) class PyPubSubIslandEventQueue(IIslandEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher = pypubsub_publisher - self._refs = [] + self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): topic_name = topic.name # needs to be a string for pypubsub - try: - subscriber_name = subscriber.__name__ - except AttributeError: - subscriber_name = subscriber.__class__.__name__ - - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") # NOTE: The subscriber's signature needs to match the MDS (message data specification) of # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic @@ -28,29 +23,7 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # is new (hasn't been subscribed to before). If the topic is being subscribed to by # a subscriber for the first time, the topic's MDS will automatically be set # according to that subscriber's signature. - self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) - self._keep_subscriber_strongref(subscriber) - - def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): - # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: - # > PyPubSub holds listeners by weak reference so that the lifetime of the - # > callable is not affected by PyPubSub: once the application no longer - # > references the callable, it can be garbage collected and PyPubSub can clean up - # > so it is no longer registered (this happens thanks to the weakref module). - # > Without this, it would be imperative to remember to unsubscribe certain - # > listeners, which is error prone; they would end up living until the - # > application exited. - # - # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable - # - # In our case, we're OK with subscribers living until the application exits. We don't - # provide an unsubscribe method (at this time) as subscriptions are expected to last - # for the life of the process. - # - # Specifically, if an instance object of a callable class is created and subscribed, - # we don't want that subscription to end if the callable instance object goes out of - # scope. Adding subscribers to self._refs prevents them from ever going out of scope. - self._refs.append(subscriber) + self._pypubsub_publisher_wrapped.subscribe(topic_name, subscriber) def publish(self, topic: IslandEventTopic, event: Any = None): topic_name = topic.name # needs to be a string for pypubsub @@ -61,4 +34,4 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) + self._pypubsub_publisher_wrapped.publish(topic_name, event) From 69813f8cd4727bb364c16af094622e64c8eff1a6 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:41:42 +0530 Subject: [PATCH 22/30] Common: Add explanatory comments about pypubsub's internal working in PyPubSubPublisherWrapper --- .../common/event_queue/pypubsub_publisher_wrapper.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py index 3f9b75d79..510b929a8 100644 --- a/monkey/common/event_queue/pypubsub_publisher_wrapper.py +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -19,6 +19,12 @@ class PyPubSubPublisherWrapper: logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") + # NOTE: The subscriber's signature needs to match the MDS (message data specification) of + # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic + # is created, which in our case is when a subscriber subscribes to a topic which + # is new (hasn't been subscribed to before). If the topic is being subscribed to by + # a subscriber for the first time, the topic's MDS will automatically be set + # according to that subscriber's signature. self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) self._keep_subscriber_strongref(subscriber) @@ -44,4 +50,8 @@ class PyPubSubPublisherWrapper: self._refs.append(subscriber) def publish(self, topic_name: str, event: Any = None): + # NOTE: `event` needs to match the MDS (message data specification) of the topic, + # otherwise, errors will arise. The MDS of a topic is set when the topic is created, + # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) + # which is new (hasn't been subscribed to before). self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) From c16c6456aa33dd6c4e15f9ff1b89de36f70cb354 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Tue, 6 Sep 2022 19:41:59 +0530 Subject: [PATCH 23/30] Island: Remove unneeded comments from PyPubSubIslandEventQueue --- .../cc/event_queue/pypubsub_island_event_queue.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index 478228d72..e18c12f2a 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -16,22 +16,9 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): topic_name = topic.name # needs to be a string for pypubsub - - # NOTE: The subscriber's signature needs to match the MDS (message data specification) of - # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic - # is created, which in our case is when a subscriber subscribes to a topic which - # is new (hasn't been subscribed to before). If the topic is being subscribed to by - # a subscriber for the first time, the topic's MDS will automatically be set - # according to that subscriber's signature. self._pypubsub_publisher_wrapped.subscribe(topic_name, subscriber) def publish(self, topic: IslandEventTopic, event: Any = None): topic_name = topic.name # needs to be a string for pypubsub - logger.debug(f"Publishing {topic_name} event") - - # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, - # otherwise, errors will arise. The MDS of a topic is set when the topic is created, - # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) - # which is new (hasn't been subscribed to before). self._pypubsub_publisher_wrapped.publish(topic_name, event) From ba52eae8ed9fbeeed46efc99c71361929faf2438 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 12:56:49 -0400 Subject: [PATCH 24/30] Common: Decouple PyPubSubPublisherWrapper from events --- .../event_queue/pypubsub_agent_event_queue.py | 2 +- .../event_queue/pypubsub_publisher_wrapper.py | 6 +-- .../pypubsub_island_event_queue.py | 6 ++- .../test_pypubsub_island_event_queue.py | 37 ++++++++++++++++--- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_agent_event_queue.py b/monkey/common/event_queue/pypubsub_agent_event_queue.py index d5cebf39a..cf75c5b6a 100644 --- a/monkey/common/event_queue/pypubsub_agent_event_queue.py +++ b/monkey/common/event_queue/pypubsub_agent_event_queue.py @@ -53,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): def _publish_event(self, topic: str, event: AbstractAgentEvent): logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") - self._pypubsub_publisher_wrapped.publish(topic, event) + self._pypubsub_publisher_wrapped.publish(topic, event=event) # Appending a unique string to the topics for type and tags prevents bugs caused by collisions # between type names and tag names. diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py index 510b929a8..71c7dda48 100644 --- a/monkey/common/event_queue/pypubsub_publisher_wrapper.py +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Callable +from typing import Callable from pubsub.core import Publisher @@ -49,9 +49,9 @@ class PyPubSubPublisherWrapper: # scope. Adding subscribers to self._refs prevents them from ever going out of scope. self._refs.append(subscriber) - def publish(self, topic_name: str, event: Any = None): + def publish(self, topic_name: str, **kwargs): # NOTE: `event` needs to match the MDS (message data specification) of the topic, # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before). - self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) + self._pypubsub_publisher.sendMessage(topicName=topic_name, **kwargs) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index e18c12f2a..55d4f9846 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -21,4 +21,8 @@ class PyPubSubIslandEventQueue(IIslandEventQueue): def publish(self, topic: IslandEventTopic, event: Any = None): topic_name = topic.name # needs to be a string for pypubsub logger.debug(f"Publishing {topic_name} event") - self._pypubsub_publisher_wrapped.publish(topic_name, event) + + if event is None: + self._pypubsub_publisher_wrapped.publish(topic_name) + else: + self._pypubsub_publisher_wrapped.publish(topic_name, event=event) diff --git a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py index d2a2fa84f..2546359fc 100644 --- a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py +++ b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py @@ -1,4 +1,4 @@ -from typing import Any, Callable +from typing import Callable import pytest from pubsub import pub @@ -19,7 +19,7 @@ def event_queue() -> IIslandEventQueue: @pytest.fixture def event_queue_subscriber() -> Callable[..., None]: - def fn(event, topic=pub.AUTO_TOPIC): + def fn(topic=pub.AUTO_TOPIC): fn.call_count += 1 fn.call_topics |= {topic.getName()} @@ -29,10 +29,12 @@ def event_queue_subscriber() -> Callable[..., None]: return fn -def test_subscribe_publish( +def test_subscribe_publish__no_event_body( event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber ): - event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=event_queue_subscriber) + event_queue.subscribe( + topic=IslandEventTopic.RESET_AGENT_CONFIGURATION, subscriber=event_queue_subscriber + ) event_queue.subscribe( topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber ) @@ -43,16 +45,39 @@ def test_subscribe_publish( assert event_queue_subscriber.call_count == 2 assert event_queue_subscriber.call_topics == { - IslandEventTopic.AGENT_CONNECTED.name, + IslandEventTopic.RESET_AGENT_CONFIGURATION.name, IslandEventTopic.CLEAR_SIMULATION_DATA.name, } +def test_subscribe_publish__with_event_body( + event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber +): + class MyCallable: + call_count = 0 + event = None + + def __call__(self, event): + MyCallable.call_count += 1 + MyCallable.event = event + + event = "my event!" + my_callable = MyCallable() + event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=my_callable) + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED, event=event) + event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA) + event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION) + + assert my_callable.call_count == 1 + assert my_callable.event == event + + def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue): class MyCallable: called = False - def __call__(self, event: Any): + def __call__(self): MyCallable.called = True def subscribe(): From e1e119c27aaba2f15d5762ebedf9c916f96d4d58 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 12:57:24 -0400 Subject: [PATCH 25/30] Common: Add missing type hint for PyPubSubPublisherWrapper._refs --- monkey/common/event_queue/pypubsub_publisher_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py index 71c7dda48..74b4b9a0c 100644 --- a/monkey/common/event_queue/pypubsub_publisher_wrapper.py +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -1,5 +1,5 @@ import logging -from typing import Callable +from typing import Callable, List from pubsub.core import Publisher @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) class PyPubSubPublisherWrapper: def __init__(self, pypubsub_publisher: Publisher): self._pypubsub_publisher = pypubsub_publisher - self._refs = [] + self._refs: List[Callable] = [] def subscribe(self, topic_name: str, subscriber: Callable): try: From 59c58b3115346764125ff2ca8d21581c3b110ddb Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 12:59:22 -0400 Subject: [PATCH 26/30] UT: Replace fn() with SubscriberSpy callable --- .../event_queue/test_pypubsub_island_event_queue.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py index 2546359fc..e3cce926f 100644 --- a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py +++ b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py @@ -19,14 +19,15 @@ def event_queue() -> IIslandEventQueue: @pytest.fixture def event_queue_subscriber() -> Callable[..., None]: - def fn(topic=pub.AUTO_TOPIC): - fn.call_count += 1 - fn.call_topics |= {topic.getName()} + class SubscriberSpy: + call_count = 0 + call_topics = set() - fn.call_count = 0 - fn.call_topics = set() + def __call__(self, topic=pub.AUTO_TOPIC): + self.call_count += 1 + self.call_topics |= {topic.getName()} - return fn + return SubscriberSpy() def test_subscribe_publish__no_event_body( From 377bb293fe49ac6df91433a33b277707c24af2dd Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 13:00:01 -0400 Subject: [PATCH 27/30] UT: Use `self` instead of class name --- .../cc/event_queue/test_pypubsub_island_event_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py index e3cce926f..206009727 100644 --- a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py +++ b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py @@ -59,8 +59,8 @@ def test_subscribe_publish__with_event_body( event = None def __call__(self, event): - MyCallable.call_count += 1 - MyCallable.event = event + self.call_count += 1 + self.event = event event = "my event!" my_callable = MyCallable() From b16d19e0ed856e204ee72e36f0c7a402bddc6a68 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 15:00:24 -0400 Subject: [PATCH 28/30] Common: Rename _pypubsub_publisher_wrappe{d,r} --- monkey/common/event_queue/pypubsub_agent_event_queue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monkey/common/event_queue/pypubsub_agent_event_queue.py b/monkey/common/event_queue/pypubsub_agent_event_queue.py index cf75c5b6a..246b743d1 100644 --- a/monkey/common/event_queue/pypubsub_agent_event_queue.py +++ b/monkey/common/event_queue/pypubsub_agent_event_queue.py @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) class PyPubSubAgentEventQueue(IAgentEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher) + self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe_all_events(self, subscriber: AgentEventSubscriber): self._subscribe(_ALL_EVENTS_TOPIC, subscriber) @@ -32,7 +32,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): self._subscribe(tag_topic, subscriber) def _subscribe(self, topic: str, subscriber: AgentEventSubscriber): - self._pypubsub_publisher_wrapped.subscribe(topic, subscriber) + self._pypubsub_publisher_wrapper.subscribe(topic, subscriber) def publish(self, event: AbstractAgentEvent): self._publish_to_all_events_topic(event) @@ -53,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): def _publish_event(self, topic: str, event: AbstractAgentEvent): logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") - self._pypubsub_publisher_wrapped.publish(topic, event=event) + self._pypubsub_publisher_wrapper.publish(topic, event=event) # Appending a unique string to the topics for type and tags prevents bugs caused by collisions # between type names and tag names. From 1036189fcc26b17b874a7ace08b4d472ec163bc0 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 15:01:10 -0400 Subject: [PATCH 29/30] Island: Rename _pypubsub_publisher_wrappe{d,r} --- .../cc/event_queue/pypubsub_island_event_queue.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index 55d4f9846..3cf1399c9 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -12,17 +12,17 @@ logger = logging.getLogger(__name__) class PyPubSubIslandEventQueue(IIslandEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher_wrapped = PyPubSubPublisherWrapper(pypubsub_publisher) + self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): topic_name = topic.name # needs to be a string for pypubsub - self._pypubsub_publisher_wrapped.subscribe(topic_name, subscriber) + self._pypubsub_publisher_wrapper.subscribe(topic_name, subscriber) def publish(self, topic: IslandEventTopic, event: Any = None): topic_name = topic.name # needs to be a string for pypubsub logger.debug(f"Publishing {topic_name} event") if event is None: - self._pypubsub_publisher_wrapped.publish(topic_name) + self._pypubsub_publisher_wrapper.publish(topic_name) else: - self._pypubsub_publisher_wrapped.publish(topic_name, event=event) + self._pypubsub_publisher_wrapper.publish(topic_name, event=event) From 4e4331c5c32fab9b714fad652b4064ef4ddad0de Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Tue, 6 Sep 2022 15:05:29 -0400 Subject: [PATCH 30/30] Common: s/event/kwargs in PyPubSubPublisherWrapper comment --- monkey/common/event_queue/pypubsub_publisher_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py index 74b4b9a0c..ee2377edd 100644 --- a/monkey/common/event_queue/pypubsub_publisher_wrapper.py +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -50,7 +50,7 @@ class PyPubSubPublisherWrapper: self._refs.append(subscriber) def publish(self, topic_name: str, **kwargs): - # NOTE: `event` needs to match the MDS (message data specification) of the topic, + # NOTE: `kwargs` needs to match the MDS (message data specification) of the topic, # otherwise, errors will arise. The MDS of a topic is set when the topic is created, # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) # which is new (hasn't been subscribed to before).