From cb7add7e59ea36dffe829efd016dc8f9f61e2906 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:01:23 -0400 Subject: [PATCH] Island: Register a thread-safe IIslandEventQueue --- .../monkey_island/cc/services/initialize.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index 1cd020fd8..f1b86700e 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -17,7 +17,11 @@ from common.agent_event_serializers import ( from common.aws import AWSInstance from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue from common.utils.file_utils import get_binary_io_sha256_hash -from monkey_island.cc.event_queue import IIslandEventQueue, PyPubSubIslandEventQueue +from monkey_island.cc.event_queue import ( + IIslandEventQueue, + LockingIslandEventQueueDecorator, + PyPubSubIslandEventQueue, +) from monkey_island.cc.repository import ( AgentBinaryRepository, FileAgentConfigurationRepository, @@ -72,8 +76,7 @@ def initialize_services(container: DIContainer, data_dir: Path): ILockableEncryptor, RepositoryEncryptor(data_dir / REPOSITORY_KEY_FILE_NAME) ) container.register(Publisher, Publisher) - container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) - container.register_instance(IIslandEventQueue, container.resolve(PyPubSubIslandEventQueue)) + _register_event_queues(container) _setup_agent_event_serializers(container) _register_repositories(container, data_dir) @@ -100,6 +103,16 @@ def _register_conventions(container: DIContainer): ) +def _register_event_queues(container: DIContainer): + container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue)) + island_event_queue = container.resolve(PyPubSubIslandEventQueue) + container.register_instance(IIslandEventQueue, _decorate_island_event_queue(island_event_queue)) + + +def _decorate_island_event_queue(island_event_queue: IIslandEventQueue): + return LockingIslandEventQueueDecorator(island_event_queue) + + def _register_repositories(container: DIContainer, data_dir: Path): container.register_instance( IFileRepository,