From 5d893d64cdec472e71bdef8ab629eee6247f449e Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Thu, 29 Sep 2022 12:13:21 -0400 Subject: [PATCH] Common: Add LockingAgentEventQueueDecorator --- monkey/common/event_queue/__init__.py | 1 + .../locking_agent_event_queue_decorator.py | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 monkey/common/event_queue/locking_agent_event_queue_decorator.py diff --git a/monkey/common/event_queue/__init__.py b/monkey/common/event_queue/__init__.py index 1c2e287d7..63a105a06 100644 --- a/monkey/common/event_queue/__init__.py +++ b/monkey/common/event_queue/__init__.py @@ -2,3 +2,4 @@ from .types import AgentEventSubscriber from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper from .i_agent_event_queue import IAgentEventQueue from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue +from .locking_agent_event_queue_decorator import LockingAgentEventQueueDecorator diff --git a/monkey/common/event_queue/locking_agent_event_queue_decorator.py b/monkey/common/event_queue/locking_agent_event_queue_decorator.py new file mode 100644 index 000000000..2a5e6828b --- /dev/null +++ b/monkey/common/event_queue/locking_agent_event_queue_decorator.py @@ -0,0 +1,31 @@ +from threading import Lock +from typing import Type + +from common.agent_events import AbstractAgentEvent + +from . import AgentEventSubscriber, IAgentEventQueue + + +class LockingAgentEventQueueDecorator(IAgentEventQueue): + """ + Makes an IAgentEventQueue thread-safe by locking publish() + """ + + def __init__(self, agent_event_queue: IAgentEventQueue): + self._lock = Lock() + self._agent_event_queue = agent_event_queue + + def subscribe_all_events(self, subscriber: AgentEventSubscriber): + self._agent_event_queue.subscribe_all_events(subscriber) + + def subscribe_type( + self, event_type: Type[AbstractAgentEvent], subscriber: AgentEventSubscriber + ): + self._agent_event_queue.subscribe_type(event_type, subscriber) + + def subscribe_tag(self, tag: str, subscriber: AgentEventSubscriber): + self._agent_event_queue.subscribe_tag(tag, subscriber) + + def publish(self, event: AbstractAgentEvent): + with self._lock: + self._agent_event_queue.publish(event)