Common: Move Island event queue stuff out of common/
This commit is contained in:
parent
5da8b424b5
commit
502a875fdd
|
@ -1,5 +1,3 @@
|
||||||
from .types import AgentEventSubscriber, IslandEventSubscriber
|
from .types import AgentEventSubscriber
|
||||||
from .i_agent_event_queue import IAgentEventQueue
|
from .i_agent_event_queue import IAgentEventQueue
|
||||||
from .i_island_event_queue import IIslandEventQueue, IslandEventTopic
|
|
||||||
from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue
|
from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue
|
||||||
from .pypubsub_island_event_queue import PyPubSubIslandEventQueue
|
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
from enum import Enum
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from . import IslandEventSubscriber
|
|
||||||
|
|
||||||
IslandEventTopic = Enum(
|
|
||||||
"IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class IIslandEventQueue(ABC):
|
|
||||||
"""
|
|
||||||
Manages subscription and publishing of events in the Island
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
|
|
||||||
"""
|
|
||||||
Subscribes a subscriber to the specified event topic
|
|
||||||
|
|
||||||
:param topic: Event topic to which the subscriber should subscribe
|
|
||||||
:param subscriber: A subscriber that will receive events
|
|
||||||
"""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def publish(self, topic: IslandEventTopic, event: Any = None):
|
|
||||||
"""
|
|
||||||
Publishes an event topic with the given data
|
|
||||||
|
|
||||||
:param topic: Event topic to publish
|
|
||||||
:param event: Event to pass to subscribers with the event publish
|
|
||||||
"""
|
|
||||||
|
|
||||||
pass
|
|
|
@ -1,64 +0,0 @@
|
||||||
import logging
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from pubsub.core import Publisher
|
|
||||||
|
|
||||||
from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class PyPubSubIslandEventQueue(IIslandEventQueue):
|
|
||||||
def __init__(self, pypubsub_publisher: Publisher):
|
|
||||||
self._pypubsub_publisher = pypubsub_publisher
|
|
||||||
self._refs = []
|
|
||||||
|
|
||||||
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
|
|
||||||
topic_name = topic.name # needs to be a string for pypubsub
|
|
||||||
try:
|
|
||||||
subscriber_name = subscriber.__name__
|
|
||||||
except AttributeError:
|
|
||||||
subscriber_name = subscriber.__class__.__name__
|
|
||||||
|
|
||||||
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}")
|
|
||||||
|
|
||||||
# NOTE: The subscriber's signature needs to match the MDS (message data specification) of
|
|
||||||
# the topic, otherwise, errors will arise. The MDS of a topic is set when the topic
|
|
||||||
# is created, which in our case is when a subscriber subscribes to a topic which
|
|
||||||
# is new (hasn't been subscribed to before). If the topic is being subscribed to by
|
|
||||||
# a subscriber for the first time, the topic's MDS will automatically be set
|
|
||||||
# according to that subscriber's signature.
|
|
||||||
self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber)
|
|
||||||
self._keep_subscriber_strongref(subscriber)
|
|
||||||
|
|
||||||
def _keep_subscriber_strongref(self, subscriber: IslandEventSubscriber):
|
|
||||||
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
|
|
||||||
# > PyPubSub holds listeners by weak reference so that the lifetime of the
|
|
||||||
# > callable is not affected by PyPubSub: once the application no longer
|
|
||||||
# > references the callable, it can be garbage collected and PyPubSub can clean up
|
|
||||||
# > so it is no longer registered (this happens thanks to the weakref module).
|
|
||||||
# > Without this, it would be imperative to remember to unsubscribe certain
|
|
||||||
# > listeners, which is error prone; they would end up living until the
|
|
||||||
# > application exited.
|
|
||||||
#
|
|
||||||
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
|
|
||||||
#
|
|
||||||
# In our case, we're OK with subscribers living until the application exits. We don't
|
|
||||||
# provide an unsubscribe method (at this time) as subscriptions are expected to last
|
|
||||||
# for the life of the process.
|
|
||||||
#
|
|
||||||
# Specifically, if an instance object of a callable class is created and subscribed,
|
|
||||||
# we don't want that subscription to end if the callable instance object goes out of
|
|
||||||
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
|
|
||||||
self._refs.append(subscriber)
|
|
||||||
|
|
||||||
def publish(self, topic: IslandEventTopic, event: Any = None):
|
|
||||||
topic_name = topic.name # needs to be a string for pypubsub
|
|
||||||
|
|
||||||
logger.debug(f"Publishing {topic_name} event")
|
|
||||||
|
|
||||||
# NOTE: `event_data` needs to match the MDS (message data specification) of the topic,
|
|
||||||
# otherwise, errors will arise. The MDS of a topic is set when the topic is created,
|
|
||||||
# which in our case is when a subscriber subscribes to a topic (in `subscribe()`)
|
|
||||||
# which is new (hasn't been subscribed to before).
|
|
||||||
self._pypubsub_publisher.sendMessage(topicName=topic_name, event=event)
|
|
|
@ -3,4 +3,3 @@ from typing import Callable
|
||||||
from common.events import AbstractAgentEvent
|
from common.events import AbstractAgentEvent
|
||||||
|
|
||||||
AgentEventSubscriber = Callable[[AbstractAgentEvent], None]
|
AgentEventSubscriber = Callable[[AbstractAgentEvent], None]
|
||||||
IslandEventSubscriber = Callable[..., None]
|
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
from .types import IslandEventSubscriber
|
||||||
|
from .i_island_event_queue import IIslandEventQueue, IslandEventTopic
|
||||||
|
from .pypubsub_island_event_queue import PyPubSubIslandEventQueue
|
Loading…
Reference in New Issue