diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 1c2e287d7..63a105a06 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -2,3 +2,4 @@ from .types import AgentEventSubscriber from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper from .i_agent_event_queue import IAgentEventQueue from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue +from .locking_agent_event_queue_decorator import LockingAgentEventQueueDecorator diff --git a/monkey/common/event_queue/locking_agent_event_queue_decorator.py b/monkey/common/event_queue/locking_agent_event_queue_decorator.py new file mode 100644 index 000000000..2a5e6828b --- /dev/null +++ b/monkey/common/event_queue/locking_agent_event_queue_decorator.py @@ -0,0 +1,31 @@ +from threading import Lock +from typing import Type + +from common.agent_events import AbstractAgentEvent + +from . import AgentEventSubscriber, IAgentEventQueue + + +class LockingAgentEventQueueDecorator(IAgentEventQueue): + """ + Makes an IAgentEventQueue thread-safe by locking publish() + """ + + def __init__(self, agent_event_queue: IAgentEventQueue): + self._lock = Lock() + self._agent_event_queue = agent_event_queue + + def subscribe_all_events(self, subscriber: AgentEventSubscriber): + self._agent_event_queue.subscribe_all_events(subscriber) + + def subscribe_type( + self, event_type: Type[AbstractAgentEvent], subscriber: AgentEventSubscriber + ): + self._agent_event_queue.subscribe_type(event_type, subscriber) + + def subscribe_tag(self, tag: str, subscriber: AgentEventSubscriber): + self._agent_event_queue.subscribe_tag(tag, subscriber) + + def publish(self, event: AbstractAgentEvent): + with self._lock: + self._agent_event_queue.publish(event)