diff --git a/monkey/monkey_island/cc/event_queue/i_island_event_queue.py b/monkey/monkey_island/cc/event_queue/i_island_event_queue.py new file mode 100644 index 000000000..f5e8ea5c3 --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/i_island_event_queue.py @@ -0,0 +1,37 @@ +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any + +from . import IslandEventSubscriber + +IslandEventTopic = Enum( + "IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"] +) + + +class IIslandEventQueue(ABC): + """ + Manages subscription and publishing of events in the Island + """ + + @abstractmethod + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): + """ + Subscribes a subscriber to the specified event topic + + :param topic: Event topic to which the subscriber should subscribe + :param subscriber: A subscriber that will receive events + """ + + pass + + @abstractmethod + def publish(self, topic: IslandEventTopic, event: Any = None): + """ + Publishes an event topic with the given data + + :param topic: Event topic to publish + :param event: Event to pass to subscribers with the event publish + """ + + pass diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py new file mode 100644 index 000000000..64c1b25ba --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -0,0 +1,64 @@ +import logging +from typing import Any + +from pubsub.core import Publisher + +from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic + +logger = logging.getLogger(__name__) + + +class PyPubSubIslandEventQueue(IIslandEventQueue): + def __init__(self, pypubsub_publisher: Publisher): + self._pypubsub_publisher = pypubsub_publisher + self._refs = [] + + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): + topic_name = topic.name # needs to be a string for pypubsub + try: + subscriber_name = subscriber.__name__ + except AttributeError: + subscriber_name = subscriber.__class__.__name__ + + logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}") + + # NOTE: The subscriber's signature needs to match the MDS (message data specification) of + # the topic, otherwise, errors will arise. The MDS of a topic is set when the topic + # is created, which in our case is when a subscriber subscribes to a topic which + # is new (hasn't been subscribed to before). If the topic is being subscribed to by + # a subscriber for the first time, the topic's MDS will automatically be set + # according to that subscriber's signature. + self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber) + self._keep_subscriber_strongref(subscriber) + + def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber): + # NOTE: PyPubSub stores subscribers by weak reference. From the documentation: + # > PyPubSub holds listeners by weak reference so that the lifetime of the + # > callable is not affected by PyPubSub: once the application no longer + # > references the callable, it can be garbage collected and PyPubSub can clean up + # > so it is no longer registered (this happens thanks to the weakref module). + # > Without this, it would be imperative to remember to unsubscribe certain + # > listeners, which is error prone; they would end up living until the + # > application exited. + # + # https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable + # + # In our case, we're OK with subscribers living until the application exits. We don't + # provide an unsubscribe method (at this time) as subscriptions are expected to last + # for the life of the process. + # + # Specifically, if an instance object of a callable class is created and subscribed, + # we don't want that subscription to end if the callable instance object goes out of + # scope. Adding subscribers to self._refs prevents them from ever going out of scope. + self._refs.append(subscriber) + + def publish(self, topic: IslandEventTopic, event: Any = None): + topic_name = topic.name # needs to be a string for pypubsub + + logger.debug(f"Publishing {topic_name} event") + + # NOTE: `event_data` needs to match the MDS (message data specification) of the topic, + # otherwise, errors will arise. The MDS of a topic is set when the topic is created, + # which in our case is when a subscriber subscribes to a topic (in `subscribe()`) + # which is new (hasn't been subscribed to before). + self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event) diff --git a/monkey/monkey_island/cc/event_queue/types.py b/monkey/monkey_island/cc/event_queue/types.py new file mode 100644 index 000000000..3a1588dbb --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/types.py @@ -0,0 +1,3 @@ +from typing import Callable + +IslandEventSubscriber = Callable[..., None]