From 67c78abee12929c26eb4f95297a29d9bf10d503d Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 11:50:43 -0400 Subject: [PATCH 1/9] Island: Add docstring to PyPubSubIslandEventQueue --- .../cc/event_queue/pypubsub_island_event_queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py index 8f7759072..b2259ce3e 100644 --- a/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py +++ b/monkey/monkey_island/cc/event_queue/pypubsub_island_event_queue.py @@ -10,6 +10,10 @@ logger = logging.getLogger(__name__) class PyPubSubIslandEventQueue(IIslandEventQueue): + """ + Implements IIslandEventQueue using pypubsub + """ + def __init__(self, pypubsub_publisher: Publisher): self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher) From 8ee14c4564d7004fdc21284fd97bb1c4da915d38 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 11:55:47 -0400 Subject: [PATCH 2/9] Island: Add LockingIslandEventQueueDecorator --- .../monkey_island/cc/event_queue/__init__.py | 1 + .../locking_island_event_queue_decorator.py | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py diff --git a/monkey/monkey_island/cc/event_queue/__init__.py b/monkey/monkey_island/cc/event_queue/__init__.py index 6eab2b6ac..1a9ec662b 100644 --- a/monkey/monkey_island/cc/event_queue/__init__.py +++ b/monkey/monkey_island/cc/event_queue/__init__.py @@ -1,3 +1,4 @@ from .types import IslandEventSubscriber from .i_island_event_queue import IIslandEventQueue, IslandEventTopic from .pypubsub_island_event_queue import PyPubSubIslandEventQueue +from .locking_island_event_queue_decorator import LockingIslandEventQueueDecorator diff --git a/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py b/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py new file mode 100644 index 000000000..da20ef3b1 --- /dev/null +++ b/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py @@ -0,0 +1,20 @@ +from threading import Lock + +from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic + + +class LockingIslandEventQueueDecorator(IIslandEventQueue): + """ + Makes an IIslandEventQueue thread-safe by locking publish() + """ + + def __init__(self, island_event_queue: IIslandEventQueue): + self._lock = Lock() + self._island_event_queue = island_event_queue + + def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): + self._island_event_queue.subscribe(topic, subscriber) + + def publish(self, topic: IslandEventTopic, **kwargs): + with self._lock: + self._island_event_queue.publish(topic, **kwargs) From cb7add7e59ea36dffe829efd016dc8f9f61e2906 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:01:23 -0400 Subject: [PATCH 3/9] Island: Register a thread-safe IIslandEventQueue --- .../monkey_island/cc/services/initialize.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index 1cd020fd8..f1b86700e 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -17,7 +17,11 @@ from common.agent_event_serializers import ( from common.aws import AWSInstance from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue from common.utils.file_utils import get_binary_io_sha256_hash -from monkey_island.cc.event_queue import IIslandEventQueue, PyPubSubIslandEventQueue +from monkey_island.cc.event_queue import ( + IIslandEventQueue, + LockingIslandEventQueueDecorator, + PyPubSubIslandEventQueue, +) from monkey_island.cc.repository import ( AgentBinaryRepository, FileAgentConfigurationRepository, @@ -72,8 +76,7 @@ def initialize_services(container: DIContainer, data_dir: Path): ILockableEncryptor, RepositoryEncryptor(data_dir / REPOSITORY_KEY_FILE_NAME) ) container.register(Publisher, Publisher) - container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) - container.register_instance(IIslandEventQueue, container.resolve(PyPubSubIslandEventQueue)) + _register_event_queues(container) _setup_agent_event_serializers(container) _register_repositories(container, data_dir) @@ -100,6 +103,16 @@ def _register_conventions(container: DIContainer): ) +def _register_event_queues(container: DIContainer): + container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) + island_event_queue = container.resolve(PyPubSubIslandEventQueue) + container.register_instance(IIslandEventQueue, _decorate_island_event_queue(island_event_queue)) + + +def _decorate_island_event_queue(island_event_queue: IIslandEventQueue): + return LockingIslandEventQueueDecorator(island_event_queue) + + def _register_repositories(container: DIContainer, data_dir: Path): container.register_instance( IFileRepository, From 5d893d64cdec472e71bdef8ab629eee6247f449e Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:13:21 -0400 Subject: [PATCH 4/9] Common: Add LockingAgentEventQueueDecorator --- monkey/common/event_queue/__init__.py | 1 + .../locking_agent_event_queue_decorator.py | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 monkey/common/event_queue/locking_agent_event_queue_decorator.py diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 1c2e287d7..63a105a06 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -2,3 +2,4 @@ from .types import AgentEventSubscriber from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper from .i_agent_event_queue import IAgentEventQueue from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue +from .locking_agent_event_queue_decorator import LockingAgentEventQueueDecorator diff --git a/monkey/common/event_queue/locking_agent_event_queue_decorator.py b/monkey/common/event_queue/locking_agent_event_queue_decorator.py new file mode 100644 index 000000000..2a5e6828b --- /dev/null +++ b/monkey/common/event_queue/locking_agent_event_queue_decorator.py @@ -0,0 +1,31 @@ +from threading import Lock +from typing import Type + +from common.agent_events import AbstractAgentEvent + +from . import AgentEventSubscriber, IAgentEventQueue + + +class LockingAgentEventQueueDecorator(IAgentEventQueue): + """ + Makes an IAgentEventQueue thread-safe by locking publish() + """ + + def __init__(self, agent_event_queue: IAgentEventQueue): + self._lock = Lock() + self._agent_event_queue = agent_event_queue + + def subscribe_all_events(self, subscriber: AgentEventSubscriber): + self._agent_event_queue.subscribe_all_events(subscriber) + + def subscribe_type( + self, event_type: Type[AbstractAgentEvent], subscriber: AgentEventSubscriber + ): + self._agent_event_queue.subscribe_type(event_type, subscriber) + + def subscribe_tag(self, tag: str, subscriber: AgentEventSubscriber): + self._agent_event_queue.subscribe_tag(tag, subscriber) + + def publish(self, event: AbstractAgentEvent): + with self._lock: + self._agent_event_queue.publish(event) From 91375cdff28e96a97d334547322771baad0741a5 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:21:28 -0400 Subject: [PATCH 5/9] Island: Register a thread-safe IAgentEventQueue --- monkey/monkey_island/cc/services/initialize.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index f1b86700e..79b29940f 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -15,7 +15,11 @@ from common.agent_event_serializers import ( register_common_agent_event_serializers, ) from common.aws import AWSInstance -from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue +from common.event_queue import ( + IAgentEventQueue, + LockingAgentEventQueueDecorator, + PyPubSubAgentEventQueue, +) from common.utils.file_utils import get_binary_io_sha256_hash from monkey_island.cc.event_queue import ( IIslandEventQueue, @@ -104,11 +108,17 @@ def _register_conventions(container: DIContainer): def _register_event_queues(container: DIContainer): - container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) + agent_event_queue = container.resolve(PyPubSubAgentEventQueue) + container.register_instance(IAgentEventQueue, _decorate_agent_event_queue(agent_event_queue)) + island_event_queue = container.resolve(PyPubSubIslandEventQueue) container.register_instance(IIslandEventQueue, _decorate_island_event_queue(island_event_queue)) +def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue): + return LockingAgentEventQueueDecorator(agent_event_queue) + + def _decorate_island_event_queue(island_event_queue: IIslandEventQueue): return LockingIslandEventQueueDecorator(island_event_queue) From 3344300f84ef7e328809e951cc234c7e649ed162 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:39:17 -0400 Subject: [PATCH 6/9] Common: Accept a lock in LockingAgentEventQueueDecorator's constructor --- .../common/event_queue/locking_agent_event_queue_decorator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/common/event_queue/locking_agent_event_queue_decorator.py b/monkey/common/event_queue/locking_agent_event_queue_decorator.py index 2a5e6828b..c3aa8097a 100644 --- a/monkey/common/event_queue/locking_agent_event_queue_decorator.py +++ b/monkey/common/event_queue/locking_agent_event_queue_decorator.py @@ -11,8 +11,8 @@ class LockingAgentEventQueueDecorator(IAgentEventQueue): Makes an IAgentEventQueue thread-safe by locking publish() """ - def __init__(self, agent_event_queue: IAgentEventQueue): - self._lock = Lock() + def __init__(self, agent_event_queue: IAgentEventQueue, lock: Lock): + self._lock = lock self._agent_event_queue = agent_event_queue def subscribe_all_events(self, subscriber: AgentEventSubscriber): From 00d72390ff277e7e0d80a3b3deca55757d444b43 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:39:57 -0400 Subject: [PATCH 7/9] Common: Accept a lock in LockingIslandEventQueueDecorator's constructor --- .../cc/event_queue/locking_island_event_queue_decorator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py b/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py index da20ef3b1..200f2f1d7 100644 --- a/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py +++ b/monkey/monkey_island/cc/event_queue/locking_island_event_queue_decorator.py @@ -8,8 +8,8 @@ class LockingIslandEventQueueDecorator(IIslandEventQueue): Makes an IIslandEventQueue thread-safe by locking publish() """ - def __init__(self, island_event_queue: IIslandEventQueue): - self._lock = Lock() + def __init__(self, island_event_queue: IIslandEventQueue, lock: Lock): + self._lock = lock self._island_event_queue = island_event_queue def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber): From 82e08ba157cc051e080a5bcb4d67ec410ac56257 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:40:18 -0400 Subject: [PATCH 8/9] Island: Pass the same lock to agent and island event queues Subscribers to the Agent and Island event queues manipulate some of the same data structures. Sharing the same lock between the queues allows this to happen in a thread-safe manner. --- .../monkey_island/cc/services/initialize.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index 79b29940f..88376ecc1 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -1,4 +1,5 @@ import logging +import threading from pathlib import Path from pubsub.core import Publisher @@ -108,19 +109,25 @@ def _register_conventions(container: DIContainer): def _register_event_queues(container: DIContainer): + event_queue_lock = threading.Lock() + agent_event_queue = container.resolve(PyPubSubAgentEventQueue) - container.register_instance(IAgentEventQueue, _decorate_agent_event_queue(agent_event_queue)) + decorated_agent_event_queue = _decorate_agent_event_queue(agent_event_queue, event_queue_lock) + container.register_instance(IAgentEventQueue, decorated_agent_event_queue) island_event_queue = container.resolve(PyPubSubIslandEventQueue) - container.register_instance(IIslandEventQueue, _decorate_island_event_queue(island_event_queue)) + decorated_island_event_queue = _decorate_island_event_queue( + island_event_queue, event_queue_lock + ) + container.register_instance(IIslandEventQueue, decorated_island_event_queue) -def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue): - return LockingAgentEventQueueDecorator(agent_event_queue) +def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue, lock: threading.Lock): + return LockingAgentEventQueueDecorator(agent_event_queue, lock) -def _decorate_island_event_queue(island_event_queue: IIslandEventQueue): - return LockingIslandEventQueueDecorator(island_event_queue) +def _decorate_island_event_queue(island_event_queue: IIslandEventQueue, lock: threading.Lock): + return LockingIslandEventQueueDecorator(island_event_queue, lock) def _register_repositories(container: DIContainer, data_dir: Path): From 1b7c3be65b6de765d5c3d1f03cbc94c8c8fe1f01 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:43:23 -0400 Subject: [PATCH 9/9] Island: Add missing return types --- monkey/monkey_island/cc/services/initialize.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index 88376ecc1..40839c388 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -122,11 +122,15 @@ def _register_event_queues(container: DIContainer): container.register_instance(IIslandEventQueue, decorated_island_event_queue) -def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue, lock: threading.Lock): +def _decorate_agent_event_queue( + agent_event_queue: IAgentEventQueue, lock: threading.Lock +) -> IAgentEventQueue: return LockingAgentEventQueueDecorator(agent_event_queue, lock) -def _decorate_island_event_queue(island_event_queue: IIslandEventQueue, lock: threading.Lock): +def _decorate_island_event_queue( + island_event_queue: IIslandEventQueue, lock: threading.Lock +) -> IIslandEventQueue: return LockingIslandEventQueueDecorator(island_event_queue, lock)