diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 26d29cb49..1c2e287d7 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -1,3 +1,4 @@ from .types import AgentEventSubscriber +from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper from .i_agent_event_queue import IAgentEventQueue from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue diff --git a/monkey/common/event_queue/pypubsub_agent_event_queue.py b/monkey/common/event_queue/pypubsub_agent_event_queue.py index 86dbea4af..246b743d1 100644 --- a/monkey/common/event_queue/pypubsub_agent_event_queue.py +++ b/monkey/common/event_queue/pypubsub_agent_event_queue.py @@ -3,6 +3,7 @@ from typing import Type from pubsub.core import Publisher +from common.event_queue import PyPubSubPublisherWrapper from common.events import AbstractAgentEvent from . import AgentEventSubscriber, IAgentEventQueue @@ -14,8 +15,7 @@ logger = logging.getLogger(__name__) class PyPubSubAgentEventQueue(IAgentEventQueue): def __init__(self, pypubsub_publisher: Publisher): - self._pypubsub_publisher = pypubsub_publisher - self._refs = [] + self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher) def subscribe_all_events(self, subscriber: AgentEventSubscriber): self._subscribe(_ALL_EVENTS_TOPIC, subscriber) @@ -32,35 +32,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): self._subscribe(tag_topic, subscriber) def _subscribe(self, topic: str, subscriber: AgentEventSubscriber): - try: - subscriber_name = subscriber.__name__ - except AttributeError: - subscriber_name = subscriber.__class__.__name__ - - logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}") - self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber) - self._keep_subscriber_strongref(subscriber) - - def _keep_subscriber_strongref(self, subscriber: AgentEventSubscriber): - # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: - # > PyPubSub holds listeners by weak reference so that the lifetime of the - # > callable is not affected by PyPubSub: once the application no longer - # > references the callable, it can be garbage collected and PyPubSub can clean up - # > so it is no longer registered (this happens thanks to the weakref module). - # > Without this, it would be imperative to remember to unsubscribe certain - # > listeners, which is error prone; they would end up living until the - # > application exited. - # - # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable - # - # In our case, we're OK with subscribers living until the application exits. We don't - # provide an unsubscribe method (at this time) as subscriptions are expected to last - # for the life of the process. - # - # Specifically, if an instance object of a callable class is created and subscribed, - # we don't want that subscription to end if the callable instance object goes out of - # scope. Adding subscribers to self._refs prevents them from ever going out of scope. - self._refs.append(subscriber) + self._pypubsub_publisher_wrapper.subscribe(topic, subscriber) def publish(self, event: AbstractAgentEvent): self._publish_to_all_events_topic(event) @@ -81,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue): def _publish_event(self, topic: str, event: AbstractAgentEvent): logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}") - self._pypubsub_publisher.sendMessage(topic, event=event) + self._pypubsub_publisher_wrapper.publish(topic, event=event) # Appending a unique string to the topics for type and tags prevents bugs caused by collisions # between type names and tag names. diff --git a/monkey/common/event_queue/pypubsub_publisher_wrapper.py b/monkey/common/event_queue/pypubsub_publisher_wrapper.py new file mode 100644 index 000000000..ee2377edd --- /dev/null +++ b/monkey/common/event_queue/pypubsub_publisher_wrapper.py @@ -0,0 +1,57 @@ +import logging +from typing import Callable, List + +from pubsub.core import Publisher + +logger = logging.getLogger(__name__) + + +class PyPubSubPublisherWrapper: + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher = pypubsub_publisher + self._refs: List[Callable] = [] + + def subscribe(self, topic_name: str, subscriber: Callable): + try: + subscriber_name = subscriber.__name__ + except AttributeError: + subscriber_name = subscriber.__class__.__name__ + + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") + + # NOTE: The subscriber's signature needs to match the MDS (message data specification) of + # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic + # is created, which in our case is when a subscriber subscribes to a topic which + # is new (hasn't been subscribed to before). If the topic is being subscribed to by + # a subscriber for the first time, the topic's MDS will automatically be set + # according to that subscriber's signature. + self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: Callable): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) + + def publish(self, topic_name: str, **kwargs): + # NOTE: `kwargs` needs to match the MDS (message data specification) of the topic, + # otherwise, errors will arise. The MDS of a topic is set when the topic is created, + # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) + # which is new (hasn't been subscribed to before). + self._pypubsub_publisher.sendMessage(topicName=topic_name, **kwargs) diff --git a/monkey/monkey_island/cc/event_queue/__init__.py b/monkey/monkey_island/cc/event_queue/__init__.py new file mode 100644 index 000000000..6eab2b6ac --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/__init__.py @@ -0,0 +1,3 @@ +from .types import IslandEventSubscriber +from .i_island_event_queue import IIslandEventQueue, IslandEventTopic +from .pypubsub_island_event_queue import PyPubSubIslandEventQueue diff --git a/monkey/common/event_queue/i_island_event_queue.py b/monkey/monkey_island/cc/event_queue/i_island_event_queue.py similarity index 56% rename from monkey/common/event_queue/i_island_event_queue.py rename to monkey/monkey_island/cc/event_queue/i_island_event_queue.py index f7bc60c5f..f5e8ea5c3 100644 --- a/monkey/common/event_queue/i_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/i_island_event_queue.py @@ -1,12 +1,12 @@ from abc import ABC, abstractmethod -from enum import Enum, auto -from typing import Any, Callable +from enum import Enum +from typing import Any +from . import IslandEventSubscriber -class IslandEventTopics(Enum): - AGENT_CONNECTED = auto() - CLEAR_SIMULATION_DATA = auto() - RESET_AGENT_CONFIGURATION = auto() +IslandEventTopic = Enum( + "IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"] +) class IIslandEventQueue(ABC): @@ -15,7 +15,7 @@ class IIslandEventQueue(ABC): """ @abstractmethod - def subscribe(self, topic: IslandEventTopics, subscriber: Callable[..., None]): + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): """ Subscribes a subscriber to the specified event topic @@ -26,12 +26,12 @@ class IIslandEventQueue(ABC): pass @abstractmethod - def publish(self, topic: IslandEventTopics, event_data: Any = None): + def publish(self, topic: IslandEventTopic, event: Any = None): """ Publishes an event topic with the given data :param topic: Event topic to publish - :param event_data: Event data to pass to subscribers with the event publish + :param event: Event to pass to subscribers with the event publish """ pass diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py new file mode 100644 index 000000000..3cf1399c9 --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -0,0 +1,28 @@ +import logging +from typing import Any + +from pubsub.core import Publisher + +from common.event_queue import PyPubSubPublisherWrapper + +from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic + +logger = logging.getLogger(__name__) + + +class PyPubSubIslandEventQueue(IIslandEventQueue): + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher) + + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): + topic_name = topic.name # needs to be a string for pypubsub + self._pypubsub_publisher_wrapper.subscribe(topic_name, subscriber) + + def publish(self, topic: IslandEventTopic, event: Any = None): + topic_name = topic.name # needs to be a string for pypubsub + logger.debug(f"Publishing {topic_name} event") + + if event is None: + self._pypubsub_publisher_wrapper.publish(topic_name) + else: + self._pypubsub_publisher_wrapper.publish(topic_name, event=event) diff --git a/monkey/monkey_island/cc/event_queue/types.py b/monkey/monkey_island/cc/event_queue/types.py new file mode 100644 index 000000000..3a1588dbb --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/types.py @@ -0,0 +1,3 @@ +from typing import Callable + +IslandEventSubscriber = Callable[..., None] diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index b1caab657..5f5fc76c1 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -14,6 +14,7 @@ from common.aws import AWSInstance from common.common_consts.telem_categories import TelemCategoryEnum from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue from common.utils.file_utils import get_binary_io_sha256_hash +from monkey.common.event_queue import IIslandEventQueue, PyPubSubIslandEventQueue from monkey_island.cc.repository import ( AgentBinaryRepository, FileAgentConfigurationRepository, @@ -64,6 +65,7 @@ def initialize_services(container: DIContainer, data_dir: Path): ) container.register(Publisher, Publisher) container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) + container.register_instance(IIslandEventQueue, container.resolve(PyPubSubIslandEventQueue)) _register_repositories(container, data_dir) _register_services(container) diff --git a/monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py b/monkey/tests/unit_tests/common/event_queue/test_pypubsub_agent_event_queue.py similarity index 100% rename from monkey/tests/unit_tests/common/event_queue/test_pypubsub_event_queue.py rename to monkey/tests/unit_tests/common/event_queue/test_pypubsub_agent_event_queue.py diff --git a/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py new file mode 100644 index 000000000..206009727 --- /dev/null +++ b/monkey/tests/unit_tests/monkey_island/cc/event_queue/test_pypubsub_island_event_queue.py @@ -0,0 +1,93 @@ +from typing import Callable + +import pytest +from pubsub import pub +from pubsub.core import Publisher + +from monkey_island.cc.event_queue import ( + IIslandEventQueue, + IslandEventSubscriber, + IslandEventTopic, + PyPubSubIslandEventQueue, +) + + +@pytest.fixture +def event_queue() -> IIslandEventQueue: + return PyPubSubIslandEventQueue(Publisher()) + + +@pytest.fixture +def event_queue_subscriber() -> Callable[..., None]: + class SubscriberSpy: + call_count = 0 + call_topics = set() + + def __call__(self, topic=pub.AUTO_TOPIC): + self.call_count += 1 + self.call_topics |= {topic.getName()} + + return SubscriberSpy() + + +def test_subscribe_publish__no_event_body( + event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber +): + event_queue.subscribe( + topic=IslandEventTopic.RESET_AGENT_CONFIGURATION, subscriber=event_queue_subscriber + ) + event_queue.subscribe( + topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber + ) + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED) + event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA) + event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION) + + assert event_queue_subscriber.call_count == 2 + assert event_queue_subscriber.call_topics == { + IslandEventTopic.RESET_AGENT_CONFIGURATION.name, + IslandEventTopic.CLEAR_SIMULATION_DATA.name, + } + + +def test_subscribe_publish__with_event_body( + event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber +): + class MyCallable: + call_count = 0 + event = None + + def __call__(self, event): + self.call_count += 1 + self.event = event + + event = "my event!" + my_callable = MyCallable() + event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=my_callable) + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED, event=event) + event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA) + event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION) + + assert my_callable.call_count == 1 + assert my_callable.event == event + + +def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue): + class MyCallable: + called = False + + def __call__(self): + MyCallable.called = True + + def subscribe(): + # fn will go out of scope after subscribe() returns. + fn = MyCallable() + event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=fn) + + subscribe() + + event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED) + + assert MyCallable.called diff --git a/vulture_allowlist.py b/vulture_allowlist.py index 976810c9d..a672f319e 100644 --- a/vulture_allowlist.py +++ b/vulture_allowlist.py @@ -10,6 +10,7 @@ from common.agent_configuration.agent_sub_configurations import ( from common.credentials import Credentials from common.utils import IJSONSerializable from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory +from monkey_island.cc.event_queue import IslandEventTopic, PyPubSubIslandEventQueue from monkey_island.cc.models import Report from monkey_island.cc.models.networkmap import Arc, NetworkMap from monkey_island.cc.repository.attack.IMitigationsRepository import IMitigationsRepository @@ -317,9 +318,7 @@ CC_TUNNEL Credentials.from_json IJSONSerializable.from_json -# revisit when implementing PyPubSubIslandEventQueue -AGENT_CONNECTED -CLEAR_SIMULATION_DATA -RESET_AGENT_CONFIGURATION -IIslandEventQueue -event_data +PyPubSubIslandEventQueue +IslandEventTopic.AGENT_CONNECTED +IslandEventTopic.CLEAR_SIMULATION_DATA +IslandEventTopic.RESET_AGENT_CONFIGURATION