Merge pull request #2246 from guardicore/2233-pypubsubislandeventqueue

Add PyPubSubIslandEventQueue
This commit is contained in:
Mike Salvatore 2022-09-06 15:06:08 -04:00 committed by GitHub
commit 01ff1711c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 47 deletions

View File

@ -1,3 +1,4 @@
from .types import AgentEventSubscriber
from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper
from .i_agent_event_queue import IAgentEventQueue
from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue

View File

@ -3,6 +3,7 @@ from typing import Type
from pubsub.core import Publisher
from common.event_queue import PyPubSubPublisherWrapper
from common.events import AbstractAgentEvent
from . import AgentEventSubscriber, IAgentEventQueue
@ -14,8 +15,7 @@ logger = logging.getLogger(__name__)
class PyPubSubAgentEventQueue(IAgentEventQueue):
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher = pypubsub_publisher
self._refs = []
self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher)
def subscribe_all_events(self, subscriber: AgentEventSubscriber):
self._subscribe(_ALL_EVENTS_TOPIC, subscriber)
@ -32,35 +32,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue):
self._subscribe(tag_topic, subscriber)
def _subscribe(self, topic: str, subscriber: AgentEventSubscriber):
try:
subscriber_name = subscriber.__name__
except AttributeError:
subscriber_name = subscriber.__class__.__name__
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}")
self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber)
self._keep_subscriber_strongref(subscriber)
def _keep_subscriber_strongref(self, subscriber: AgentEventSubscriber):
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
# > PyPubSub holds listeners by weak reference so that the lifetime of the
# > callable is not affected by PyPubSub: once the application no longer
# > references the callable, it can be garbage collected and PyPubSub can clean up
# > so it is no longer registered (this happens thanks to the weakref module).
# > Without this, it would be imperative to remember to unsubscribe certain
# > listeners, which is error prone; they would end up living until the
# > application exited.
#
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
#
# In our case, we're OK with subscribers living until the application exits. We don't
# provide an unsubscribe method (at this time) as subscriptions are expected to last
# for the life of the process.
#
# Specifically, if an instance object of a callable class is created and subscribed,
# we don't want that subscription to end if the callable instance object goes out of
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
self._refs.append(subscriber)
self._pypubsub_publisher_wrapper.subscribe(topic, subscriber)
def publish(self, event: AbstractAgentEvent):
self._publish_to_all_events_topic(event)
@ -81,7 +53,7 @@ class PyPubSubAgentEventQueue(IAgentEventQueue):
def _publish_event(self, topic: str, event: AbstractAgentEvent):
logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}")
self._pypubsub_publisher.sendMessage(topic, event=event)
self._pypubsub_publisher_wrapper.publish(topic, event=event)
# Appending a unique string to the topics for type and tags prevents bugs caused by collisions
# between type names and tag names.

View File

@ -0,0 +1,57 @@
import logging
from typing import Callable, List
from pubsub.core import Publisher
logger = logging.getLogger(__name__)
class PyPubSubPublisherWrapper:
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher = pypubsub_publisher
self._refs: List[Callable] = []
def subscribe(self, topic_name: str, subscriber: Callable):
try:
subscriber_name = subscriber.__name__
except AttributeError:
subscriber_name = subscriber.__class__.__name__
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}")
# NOTE: The subscriber's signature needs to match the MDS (message data specification) of
# the topic, otherwise, errors will arise. The MDS of a topic is set when the topic
# is created, which in our case is when a subscriber subscribes to a topic which
# is new (hasn't been subscribed to before). If the topic is being subscribed to by
# a subscriber for the first time, the topic's MDS will automatically be set
# according to that subscriber's signature.
self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber)
self._keep_subscriber_strongref(subscriber)
def _keep_subscriber_strongref(self, subscriber: Callable):
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
# > PyPubSub holds listeners by weak reference so that the lifetime of the
# > callable is not affected by PyPubSub: once the application no longer
# > references the callable, it can be garbage collected and PyPubSub can clean up
# > so it is no longer registered (this happens thanks to the weakref module).
# > Without this, it would be imperative to remember to unsubscribe certain
# > listeners, which is error prone; they would end up living until the
# > application exited.
#
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
#
# In our case, we're OK with subscribers living until the application exits. We don't
# provide an unsubscribe method (at this time) as subscriptions are expected to last
# for the life of the process.
#
# Specifically, if an instance object of a callable class is created and subscribed,
# we don't want that subscription to end if the callable instance object goes out of
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
self._refs.append(subscriber)
def publish(self, topic_name: str, **kwargs):
# NOTE: `kwargs` needs to match the MDS (message data specification) of the topic,
# otherwise, errors will arise. The MDS of a topic is set when the topic is created,
# which in our case is when a subscriber subscribes to a topic (in `subscribe()`)
# which is new (hasn't been subscribed to before).
self._pypubsub_publisher.sendMessage(topicName=topic_name, **kwargs)

View File

@ -0,0 +1,3 @@
from .types import IslandEventSubscriber
from .i_island_event_queue import IIslandEventQueue, IslandEventTopic
from .pypubsub_island_event_queue import PyPubSubIslandEventQueue

View File

@ -1,12 +1,12 @@
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Any, Callable
from enum import Enum
from typing import Any
from . import IslandEventSubscriber
class IslandEventTopics(Enum):
AGENT_CONNECTED = auto()
CLEAR_SIMULATION_DATA = auto()
RESET_AGENT_CONFIGURATION = auto()
IslandEventTopic = Enum(
"IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"]
)
class IIslandEventQueue(ABC):
@ -15,7 +15,7 @@ class IIslandEventQueue(ABC):
"""
@abstractmethod
def subscribe(self, topic: IslandEventTopics, subscriber: Callable[..., None]):
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
"""
Subscribes a subscriber to the specified event topic
@ -26,12 +26,12 @@ class IIslandEventQueue(ABC):
pass
@abstractmethod
def publish(self, topic: IslandEventTopics, event_data: Any = None):
def publish(self, topic: IslandEventTopic, event: Any = None):
"""
Publishes an event topic with the given data
:param topic: Event topic to publish
:param event_data: Event data to pass to subscribers with the event publish
:param event: Event to pass to subscribers with the event publish
"""
pass

View File

@ -0,0 +1,28 @@
import logging
from typing import Any
from pubsub.core import Publisher
from common.event_queue import PyPubSubPublisherWrapper
from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic
logger = logging.getLogger(__name__)
class PyPubSubIslandEventQueue(IIslandEventQueue):
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher)
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
topic_name = topic.name # needs to be a string for pypubsub
self._pypubsub_publisher_wrapper.subscribe(topic_name, subscriber)
def publish(self, topic: IslandEventTopic, event: Any = None):
topic_name = topic.name # needs to be a string for pypubsub
logger.debug(f"Publishing {topic_name} event")
if event is None:
self._pypubsub_publisher_wrapper.publish(topic_name)
else:
self._pypubsub_publisher_wrapper.publish(topic_name, event=event)

View File

@ -0,0 +1,3 @@
from typing import Callable
IslandEventSubscriber = Callable[..., None]

View File

@ -14,6 +14,7 @@ from common.aws import AWSInstance
from common.common_consts.telem_categories import TelemCategoryEnum
from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue
from common.utils.file_utils import get_binary_io_sha256_hash
from monkey.common.event_queue import IIslandEventQueue, PyPubSubIslandEventQueue
from monkey_island.cc.repository import (
AgentBinaryRepository,
FileAgentConfigurationRepository,
@ -64,6 +65,7 @@ def initialize_services(container: DIContainer, data_dir: Path):
)
container.register(Publisher, Publisher)
container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue))
container.register_instance(IIslandEventQueue, container.resolve(PyPubSubIslandEventQueue))
_register_repositories(container, data_dir)
_register_services(container)

View File

@ -0,0 +1,93 @@
from typing import Callable
import pytest
from pubsub import pub
from pubsub.core import Publisher
from monkey_island.cc.event_queue import (
IIslandEventQueue,
IslandEventSubscriber,
IslandEventTopic,
PyPubSubIslandEventQueue,
)
@pytest.fixture
def event_queue() -> IIslandEventQueue:
return PyPubSubIslandEventQueue(Publisher())
@pytest.fixture
def event_queue_subscriber() -> Callable[..., None]:
class SubscriberSpy:
call_count = 0
call_topics = set()
def __call__(self, topic=pub.AUTO_TOPIC):
self.call_count += 1
self.call_topics |= {topic.getName()}
return SubscriberSpy()
def test_subscribe_publish__no_event_body(
event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber
):
event_queue.subscribe(
topic=IslandEventTopic.RESET_AGENT_CONFIGURATION, subscriber=event_queue_subscriber
)
event_queue.subscribe(
topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber
)
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED)
event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA)
event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION)
assert event_queue_subscriber.call_count == 2
assert event_queue_subscriber.call_topics == {
IslandEventTopic.RESET_AGENT_CONFIGURATION.name,
IslandEventTopic.CLEAR_SIMULATION_DATA.name,
}
def test_subscribe_publish__with_event_body(
event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber
):
class MyCallable:
call_count = 0
event = None
def __call__(self, event):
self.call_count += 1
self.event = event
event = "my event!"
my_callable = MyCallable()
event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=my_callable)
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED, event=event)
event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA)
event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION)
assert my_callable.call_count == 1
assert my_callable.event == event
def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue):
class MyCallable:
called = False
def __call__(self):
MyCallable.called = True
def subscribe():
# fn will go out of scope after subscribe() returns.
fn = MyCallable()
event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=fn)
subscribe()
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED)
assert MyCallable.called

View File

@ -10,6 +10,7 @@ from common.agent_configuration.agent_sub_configurations import (
from common.credentials import Credentials
from common.utils import IJSONSerializable
from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory
from monkey_island.cc.event_queue import IslandEventTopic, PyPubSubIslandEventQueue
from monkey_island.cc.models import Report
from monkey_island.cc.models.networkmap import Arc, NetworkMap
from monkey_island.cc.repository.attack.IMitigationsRepository import IMitigationsRepository
@ -317,9 +318,7 @@ CC_TUNNEL
Credentials.from_json
IJSONSerializable.from_json
# revisit when implementing PyPubSubIslandEventQueue
AGENT_CONNECTED
CLEAR_SIMULATION_DATA
RESET_AGENT_CONFIGURATION
IIslandEventQueue
event_data
PyPubSubIslandEventQueue
IslandEventTopic.AGENT_CONNECTED
IslandEventTopic.CLEAR_SIMULATION_DATA
IslandEventTopic.RESET_AGENT_CONFIGURATION