Island: Pass the same lock to agent and island event queues

Subscribers to the Agent and Island event queues manipulate some of the
same data structures. Sharing the same lock between the queues allows
this to happen in a thread-safe manner.
This commit is contained in:
Mike Salvatore 2022-09-29 12:40:18 -04:00
parent 00d72390ff
commit 82e08ba157
1 changed files with 13 additions and 6 deletions

View File

@ -1,4 +1,5 @@
import logging import logging
import threading
from pathlib import Path from pathlib import Path
from pubsub.core import Publisher from pubsub.core import Publisher
@ -108,19 +109,25 @@ def _register_conventions(container: DIContainer):
def _register_event_queues(container: DIContainer): def _register_event_queues(container: DIContainer):
event_queue_lock = threading.Lock()
agent_event_queue = container.resolve(PyPubSubAgentEventQueue) agent_event_queue = container.resolve(PyPubSubAgentEventQueue)
container.register_instance(IAgentEventQueue, _decorate_agent_event_queue(agent_event_queue)) decorated_agent_event_queue = _decorate_agent_event_queue(agent_event_queue, event_queue_lock)
container.register_instance(IAgentEventQueue, decorated_agent_event_queue)
island_event_queue = container.resolve(PyPubSubIslandEventQueue) island_event_queue = container.resolve(PyPubSubIslandEventQueue)
container.register_instance(IIslandEventQueue, _decorate_island_event_queue(island_event_queue)) decorated_island_event_queue = _decorate_island_event_queue(
island_event_queue, event_queue_lock
)
container.register_instance(IIslandEventQueue, decorated_island_event_queue)
def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue): def _decorate_agent_event_queue(agent_event_queue: IAgentEventQueue, lock: threading.Lock):
return LockingAgentEventQueueDecorator(agent_event_queue) return LockingAgentEventQueueDecorator(agent_event_queue, lock)
def _decorate_island_event_queue(island_event_queue: IIslandEventQueue): def _decorate_island_event_queue(island_event_queue: IIslandEventQueue, lock: threading.Lock):
return LockingIslandEventQueueDecorator(island_event_queue) return LockingIslandEventQueueDecorator(island_event_queue, lock)
def _register_repositories(container: DIContainer, data_dir: Path): def _register_repositories(container: DIContainer, data_dir: Path):