diff --git a/monkey/monkey_island/cc/services/initialize.py b/monkey/monkey_island/cc/services/initialize.py index 79b29940f..88376ecc1 100644 --- a/monkey/monkey_island/cc/services/initialize.py +++ b/monkey/monkey_island/cc/services/initialize.py @@ -1,4 +1,5 @@ import logging +import threading from pathlib import Path from pubsub.core import Publisher @@ -108,19 +109,25 @@ def _register_conventions(container: DIContainer): def _register_event_queues(container: DIContainer): + event_queue_lock = threading.Lock() + agent_event_queue = container.resolve(PyPubSubAgentEventQueue) - container.register_instance(IAgentEventQueue, _decorate_agent_event_queue(agent_event_queue)) + decorated_agent_event_queue = _decorate_agent_event_queue(agent_event_queue, event_queue_lock) + container.register_instance(IAgentEventQueue, decorated_agent_event_queue) island_event_queue = container.resolve(PyPubSubIslandEventQueue) - container.register_instance(IIslandEventQueue, _decorate_island_event_queue(island_event_queue)) + decorated_island_event_queue = _decorate_island_event_queue( + island_event_queue, event_queue_lock + ) + container.register_instance(IIslandEventQueue, decorated_island_event_queue) -def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue): - return LockingAgentEventQueueDecorator(agent_event_queue) +def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue, lock: threading.Lock): + return LockingAgentEventQueueDecorator(agent_event_queue, lock) -def _decorate_island_event_queue(island_event_queue: IIslandEventQueue): - return LockingIslandEventQueueDecorator(island_event_queue) +def _decorate_island_event_queue(island_event_queue: IIslandEventQueue, lock: threading.Lock): + return LockingIslandEventQueueDecorator(island_event_queue, lock) def _register_repositories(container: DIContainer, data_dir: Path):