Merge pull request #2245 from guardicore/2233-island-event-queue

Add IIslandEventQueue
This commit is contained in:
Mike Salvatore 2022-09-06 15:34:49 -04:00 committed by GitHub
commit 5972f87391
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 389 additions and 178 deletions

View File

@ -1,3 +1,4 @@
from .types import EventSubscriber from .types import AgentEventSubscriber
from .i_event_queue import IEventQueue from .pypubsub_publisher_wrapper import PyPubSubPublisherWrapper
from .pypubsub_event_queue import PyPubSubEventQueue from .i_agent_event_queue import IAgentEventQueue
from .pypubsub_agent_event_queue import PyPubSubAgentEventQueue

View File

@ -1,18 +1,18 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Type from typing import Type
from common.events import AbstractEvent from common.events import AbstractAgentEvent
from . import EventSubscriber from . import AgentEventSubscriber
class IEventQueue(ABC): class IAgentEventQueue(ABC):
""" """
Manages subscription and publishing of events Manages subscription and publishing of events in the Agent
""" """
@abstractmethod @abstractmethod
def subscribe_all_events(self, subscriber: EventSubscriber): def subscribe_all_events(self, subscriber: AgentEventSubscriber):
""" """
Subscribes a subscriber to all events Subscribes a subscriber to all events
@ -22,7 +22,9 @@ class IEventQueue(ABC):
pass pass
@abstractmethod @abstractmethod
def subscribe_type(self, event_type: Type[AbstractEvent], subscriber: EventSubscriber): def subscribe_type(
self, event_type: Type[AbstractAgentEvent], subscriber: AgentEventSubscriber
):
""" """
Subscribes a subscriber to the specified event type Subscribes a subscriber to the specified event type
@ -33,7 +35,7 @@ class IEventQueue(ABC):
pass pass
@abstractmethod @abstractmethod
def subscribe_tag(self, tag: str, subscriber: EventSubscriber): def subscribe_tag(self, tag: str, subscriber: AgentEventSubscriber):
""" """
Subscribes a subscriber to the specified event tag Subscribes a subscriber to the specified event tag
@ -44,12 +46,11 @@ class IEventQueue(ABC):
pass pass
@abstractmethod @abstractmethod
def publish(self, event: AbstractEvent): def publish(self, event: AbstractAgentEvent):
""" """
Publishes an event with the given data Publishes an event with the given data
:param event: Event to publish :param event: Event to publish
:param data: Data to pass to subscribers with the event publish
""" """
pass pass

View File

@ -0,0 +1,66 @@
import logging
from typing import Type
from pubsub.core import Publisher
from common.event_queue import PyPubSubPublisherWrapper
from common.events import AbstractAgentEvent
from . import AgentEventSubscriber, IAgentEventQueue
_ALL_EVENTS_TOPIC = "all_events_topic"
logger = logging.getLogger(__name__)
class PyPubSubAgentEventQueue(IAgentEventQueue):
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher)
def subscribe_all_events(self, subscriber: AgentEventSubscriber):
self._subscribe(_ALL_EVENTS_TOPIC, subscriber)
def subscribe_type(
self, event_type: Type[AbstractAgentEvent], subscriber: AgentEventSubscriber
):
# pypubsub.pub.subscribe needs a string as the topic/event name
event_type_topic = PyPubSubAgentEventQueue._get_type_topic(event_type)
self._subscribe(event_type_topic, subscriber)
def subscribe_tag(self, tag: str, subscriber: AgentEventSubscriber):
tag_topic = PyPubSubAgentEventQueue._get_tag_topic(tag)
self._subscribe(tag_topic, subscriber)
def _subscribe(self, topic: str, subscriber: AgentEventSubscriber):
self._pypubsub_publisher_wrapper.subscribe(topic, subscriber)
def publish(self, event: AbstractAgentEvent):
self._publish_to_all_events_topic(event)
self._publish_to_type_topic(event)
self._publish_to_tags_topics(event)
def _publish_to_all_events_topic(self, event: AbstractAgentEvent):
self._publish_event(_ALL_EVENTS_TOPIC, event)
def _publish_to_type_topic(self, event: AbstractAgentEvent):
event_type_topic = PyPubSubAgentEventQueue._get_type_topic(event.__class__)
self._publish_event(event_type_topic, event)
def _publish_to_tags_topics(self, event: AbstractAgentEvent):
for tag in event.tags:
tag_topic = PyPubSubAgentEventQueue._get_tag_topic(tag)
self._publish_event(tag_topic, event)
def _publish_event(self, topic: str, event: AbstractAgentEvent):
logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}")
self._pypubsub_publisher_wrapper.publish(topic, event=event)
# Appending a unique string to the topics for type and tags prevents bugs caused by collisions
# between type names and tag names.
@staticmethod
def _get_type_topic(event_type: Type[AbstractAgentEvent]) -> str:
return f"{event_type.__name__}-type"
@staticmethod
def _get_tag_topic(tag: str) -> str:
return f"{tag}-tag"

View File

@ -1,92 +0,0 @@
import logging
from typing import Type
from pubsub.core import Publisher
from common.events import AbstractEvent
from . import EventSubscriber, IEventQueue
_ALL_EVENTS_TOPIC = "all_events_topic"
logger = logging.getLogger(__name__)
class PyPubSubEventQueue(IEventQueue):
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher = pypubsub_publisher
self._refs = []
def subscribe_all_events(self, subscriber: EventSubscriber):
self._subscribe(_ALL_EVENTS_TOPIC, subscriber)
def subscribe_type(self, event_type: Type[AbstractEvent], subscriber: EventSubscriber):
# pypubsub.pub.subscribe needs a string as the topic/event name
event_type_topic = PyPubSubEventQueue._get_type_topic(event_type)
self._subscribe(event_type_topic, subscriber)
def subscribe_tag(self, tag: str, subscriber: EventSubscriber):
tag_topic = PyPubSubEventQueue._get_tag_topic(tag)
self._subscribe(tag_topic, subscriber)
def _subscribe(self, topic: str, subscriber: EventSubscriber):
try:
subscriber_name = subscriber.__name__
except AttributeError:
subscriber_name = subscriber.__class__.__name__
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic}")
self._pypubsub_publisher.subscribe(topicName=topic, listener=subscriber)
self._keep_subscriber_strongref(subscriber)
def _keep_subscriber_strongref(self, subscriber: EventSubscriber):
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
# > PyPubSub holds listeners by weak reference so that the lifetime of the
# > callable is not affected by PyPubSub: once the application no longer
# > references the callable, it can be garbage collected and PyPubSub can clean up
# > so it is no longer registered (this happens thanks to the weakref module).
# > Without this, it would be imperative to remember to unsubscribe certain
# > listeners, which is error prone; they would end up living until the
# > application exited.
#
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
#
# In our case, we're OK with subscribers living until the application exits. We don't
# provide an unsubscribe method (at this time) as subscriptions are expected to last
# for the life of the process.
#
# Specifically, if an instance object of a callable class is created and subscribed,
# we don't want that subscription to end if the callable instance object goes out of
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
self._refs.append(subscriber)
def publish(self, event: AbstractEvent):
self._publish_to_all_events_topic(event)
self._publish_to_type_topic(event)
self._publish_to_tags_topics(event)
def _publish_to_all_events_topic(self, event: AbstractEvent):
self._publish_event(_ALL_EVENTS_TOPIC, event)
def _publish_to_type_topic(self, event: AbstractEvent):
event_type_topic = PyPubSubEventQueue._get_type_topic(event.__class__)
self._publish_event(event_type_topic, event)
def _publish_to_tags_topics(self, event: AbstractEvent):
for tag in event.tags:
tag_topic = PyPubSubEventQueue._get_tag_topic(tag)
self._publish_event(tag_topic, event)
def _publish_event(self, topic: str, event: AbstractEvent):
logger.debug(f"Publishing a {event.__class__.__name__} event to {topic}")
self._pypubsub_publisher.sendMessage(topic, event=event)
# Appending a unique string to the topics for type and tags prevents bugs caused by collisions
# between type names and tag names.
@staticmethod
def _get_type_topic(event_type: Type[AbstractEvent]) -> str:
return f"{event_type.__name__}-type"
@staticmethod
def _get_tag_topic(tag: str) -> str:
return f"{tag}-tag"

View File

@ -0,0 +1,57 @@
import logging
from typing import Callable, List
from pubsub.core import Publisher
logger = logging.getLogger(__name__)
class PyPubSubPublisherWrapper:
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher = pypubsub_publisher
self._refs: List[Callable] = []
def subscribe(self, topic_name: str, subscriber: Callable):
try:
subscriber_name = subscriber.__name__
except AttributeError:
subscriber_name = subscriber.__class__.__name__
logging.debug(f"Subscriber {subscriber_name} subscribed to {topic_name}")
# NOTE: The subscriber's signature needs to match the MDS (message data specification) of
# the topic, otherwise, errors will arise. The MDS of a topic is set when the topic
# is created, which in our case is when a subscriber subscribes to a topic which
# is new (hasn't been subscribed to before). If the topic is being subscribed to by
# a subscriber for the first time, the topic's MDS will automatically be set
# according to that subscriber's signature.
self._pypubsub_publisher.subscribe(topicName=topic_name, listener=subscriber)
self._keep_subscriber_strongref(subscriber)
def _keep_subscriber_strongref(self, subscriber: Callable):
# NOTE: PyPubSub stores subscribers by weak reference. From the documentation:
# > PyPubSub holds listeners by weak reference so that the lifetime of the
# > callable is not affected by PyPubSub: once the application no longer
# > references the callable, it can be garbage collected and PyPubSub can clean up
# > so it is no longer registered (this happens thanks to the weakref module).
# > Without this, it would be imperative to remember to unsubscribe certain
# > listeners, which is error prone; they would end up living until the
# > application exited.
#
# https://pypubsub.readthedocs.io/en/v4.0.3/usage/usage_basic_tasks.html?#callable
#
# In our case, we're OK with subscribers living until the application exits. We don't
# provide an unsubscribe method (at this time) as subscriptions are expected to last
# for the life of the process.
#
# Specifically, if an instance object of a callable class is created and subscribed,
# we don't want that subscription to end if the callable instance object goes out of
# scope. Adding subscribers to self._refs prevents them from ever going out of scope.
self._refs.append(subscriber)
def publish(self, topic_name: str, **kwargs):
# NOTE: `kwargs` needs to match the MDS (message data specification) of the topic,
# otherwise, errors will arise. The MDS of a topic is set when the topic is created,
# which in our case is when a subscriber subscribes to a topic (in `subscribe()`)
# which is new (hasn't been subscribed to before).
self._pypubsub_publisher.sendMessage(topicName=topic_name, **kwargs)

View File

@ -1,5 +1,5 @@
from typing import Callable from typing import Callable
from common.events import AbstractEvent from common.events import AbstractAgentEvent
EventSubscriber = Callable[[AbstractEvent], None] AgentEventSubscriber = Callable[[AbstractAgentEvent], None]

View File

@ -1,7 +1,7 @@
from typing import Type, Union from typing import Type, Union
from common.event_serializers import IEventSerializer from common.event_serializers import IEventSerializer
from common.events import AbstractEvent from common.events import AbstractAgentEvent
class EventSerializerRegistry: class EventSerializerRegistry:
@ -21,9 +21,11 @@ class EventSerializerRegistry:
def __init__(self): def __init__(self):
self._registry = {} self._registry = {}
def __setitem__(self, event_class: Type[AbstractEvent], event_serializer: IEventSerializer): def __setitem__(
if not issubclass(event_class, AbstractEvent): self, event_class: Type[AbstractAgentEvent], event_serializer: IEventSerializer
raise TypeError(f"Event class must be of type: {AbstractEvent.__name__}") ):
if not issubclass(event_class, AbstractAgentEvent):
raise TypeError(f"Event class must be of type: {AbstractAgentEvent.__name__}")
if not isinstance(event_serializer, IEventSerializer): if not isinstance(event_serializer, IEventSerializer):
raise TypeError(f"Event serializer must be of type: {IEventSerializer.__name__}") raise TypeError(f"Event serializer must be of type: {IEventSerializer.__name__}")
@ -31,10 +33,10 @@ class EventSerializerRegistry:
self._registry[event_class] = event_serializer self._registry[event_class] = event_serializer
self._registry[event_class.__name__] = event_serializer self._registry[event_class.__name__] = event_serializer
def __getitem__(self, event_class: Union[str, Type[AbstractEvent]]) -> IEventSerializer: def __getitem__(self, event_class: Union[str, Type[AbstractAgentEvent]]) -> IEventSerializer:
if not (isinstance(event_class, str) or issubclass(event_class, AbstractEvent)): if not (isinstance(event_class, str) or issubclass(event_class, AbstractAgentEvent)):
raise TypeError( raise TypeError(
f"Registry get key {event_class} must be of type: {AbstractEvent.__name__} or " f"Registry get key {event_class} must be of type: {AbstractAgentEvent.__name__} or "
f"{str.__name__}" f"{str.__name__}"
) )

View File

@ -1,7 +1,7 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, List, Union from typing import Dict, List, Union
from common.events import AbstractEvent from common.events import AbstractAgentEvent
JSONSerializable = Union[ JSONSerializable = Union[
Dict[str, "JSONSerializable"], List["JSONSerializable"], int, str, float, bool, None Dict[str, "JSONSerializable"], List["JSONSerializable"], int, str, float, bool, None
@ -14,7 +14,7 @@ class IEventSerializer(ABC):
""" """
@abstractmethod @abstractmethod
def serialize(self, event: AbstractEvent) -> JSONSerializable: def serialize(self, event: AbstractAgentEvent) -> JSONSerializable:
""" """
Serializes an event Serializes an event
@ -24,7 +24,7 @@ class IEventSerializer(ABC):
pass pass
@abstractmethod @abstractmethod
def deserialize(self, serialized_event: JSONSerializable) -> AbstractEvent: def deserialize(self, serialized_event: JSONSerializable) -> AbstractAgentEvent:
""" """
Deserializes an event Deserializes an event

View File

@ -1,2 +1,2 @@
from .abstract_event import AbstractEvent from .abstract_agent_event import AbstractAgentEvent
from .credentials_stolen_events import CredentialsStolenEvent from .credentials_stolen_events import CredentialsStolenEvent

View File

@ -7,13 +7,13 @@ from uuid import UUID, getnode
@dataclass(frozen=True) @dataclass(frozen=True)
class AbstractEvent(ABC): class AbstractAgentEvent(ABC):
""" """
An event that was initiated or observed by an agent An event that was initiated or observed by an agent
Agents perform actions and collect data. These actions and data are represented as "events". Agents perform actions and collect data. These actions and data are represented as "events".
Subtypes of `AbstractEvent` will have additional properties that provide context and information Subtypes of `AbstractAgentEvent` will have additional properties that provide context and
about the event. information about the event.
Attributes: Attributes:
:param source: The UUID of the agent that observed the event :param source: The UUID of the agent that observed the event

View File

@ -3,11 +3,11 @@ from typing import Sequence
from common.credentials import Credentials from common.credentials import Credentials
from . import AbstractEvent from . import AbstractAgentEvent
@dataclass(frozen=True) @dataclass(frozen=True)
class CredentialsStolenEvent(AbstractEvent): class CredentialsStolenEvent(AbstractAgentEvent):
""" """
An event that occurs when an agent collects credentials from the victim An event that occurs when an agent collects credentials from the victim

View File

@ -2,7 +2,7 @@ import logging
from typing import Sequence from typing import Sequence
from common.credentials import Credentials, LMHash, NTHash, Password, Username from common.credentials import Credentials, LMHash, NTHash, Password, Username
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from common.events import CredentialsStolenEvent from common.events import CredentialsStolenEvent
from infection_monkey.i_puppet import ICredentialCollector from infection_monkey.i_puppet import ICredentialCollector
from infection_monkey.model import USERNAME_PREFIX from infection_monkey.model import USERNAME_PREFIX
@ -27,7 +27,7 @@ MIMIKATZ_EVENT_TAGS = frozenset(
class MimikatzCredentialCollector(ICredentialCollector): class MimikatzCredentialCollector(ICredentialCollector):
def __init__(self, event_queue: IEventQueue): def __init__(self, event_queue: IAgentEventQueue):
self._event_queue = event_queue self._event_queue = event_queue
def collect_credentials(self, options=None) -> Sequence[Credentials]: def collect_credentials(self, options=None) -> Sequence[Credentials]:

View File

@ -2,7 +2,7 @@ import logging
from typing import Sequence from typing import Sequence
from common.credentials import Credentials from common.credentials import Credentials
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from infection_monkey.credential_collectors.ssh_collector import ssh_handler from infection_monkey.credential_collectors.ssh_collector import ssh_handler
from infection_monkey.i_puppet import ICredentialCollector from infection_monkey.i_puppet import ICredentialCollector
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
@ -15,7 +15,7 @@ class SSHCredentialCollector(ICredentialCollector):
SSH keys credential collector SSH keys credential collector
""" """
def __init__(self, telemetry_messenger: ITelemetryMessenger, event_queue: IEventQueue): def __init__(self, telemetry_messenger: ITelemetryMessenger, event_queue: IAgentEventQueue):
self._telemetry_messenger = telemetry_messenger self._telemetry_messenger = telemetry_messenger
self._event_queue = event_queue self._event_queue = event_queue

View File

@ -4,7 +4,7 @@ import os
from typing import Dict, Iterable, Sequence from typing import Dict, Iterable, Sequence
from common.credentials import Credentials, SSHKeypair, Username from common.credentials import Credentials, SSHKeypair, Username
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from common.events import CredentialsStolenEvent from common.events import CredentialsStolenEvent
from common.utils.attack_utils import ScanStatus from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1005_telem import T1005Telem from infection_monkey.telemetry.attack.t1005_telem import T1005Telem
@ -31,7 +31,7 @@ SSH_COLLECTOR_EVENT_TAGS = frozenset(
def get_ssh_info( def get_ssh_info(
telemetry_messenger: ITelemetryMessenger, event_queue: IEventQueue telemetry_messenger: ITelemetryMessenger, event_queue: IAgentEventQueue
) -> Iterable[Dict]: ) -> Iterable[Dict]:
# TODO: Remove this check when this is turned into a plugin. # TODO: Remove this check when this is turned into a plugin.
if is_windows_os(): if is_windows_os():
@ -80,7 +80,9 @@ def _get_ssh_struct(name: str, home_dir: str) -> Dict:
def _get_ssh_files( def _get_ssh_files(
user_info: Iterable[Dict], telemetry_messenger: ITelemetryMessenger, event_queue: IEventQueue user_info: Iterable[Dict],
telemetry_messenger: ITelemetryMessenger,
event_queue: IAgentEventQueue,
) -> Iterable[Dict]: ) -> Iterable[Dict]:
for info in user_info: for info in user_info:
path = info["home_dir"] path = info["home_dir"]
@ -165,7 +167,9 @@ def to_credentials(ssh_info: Iterable[Dict]) -> Sequence[Credentials]:
return ssh_credentials return ssh_credentials
def _publish_credentials_stolen_event(collected_credentials: Credentials, event_queue: IEventQueue): def _publish_credentials_stolen_event(
collected_credentials: Credentials, event_queue: IAgentEventQueue
):
credentials_stolen_event = CredentialsStolenEvent( credentials_stolen_event = CredentialsStolenEvent(
tags=SSH_COLLECTOR_EVENT_TAGS, tags=SSH_COLLECTOR_EVENT_TAGS,
stolen_credentials=[collected_credentials], stolen_credentials=[collected_credentials],

View File

@ -4,7 +4,7 @@ from abc import abstractmethod
from datetime import datetime from datetime import datetime
from typing import Dict from typing import Dict
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from common.utils.exceptions import FailedExploitationError from common.utils.exceptions import FailedExploitationError
from infection_monkey.i_puppet import ExploiterResultData from infection_monkey.i_puppet import ExploiterResultData
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
@ -60,7 +60,7 @@ class HostExploiter:
host, host,
current_depth: int, current_depth: int,
telemetry_messenger: ITelemetryMessenger, telemetry_messenger: ITelemetryMessenger,
event_queue: IEventQueue, event_queue: IAgentEventQueue,
agent_binary_repository: IAgentBinaryRepository, agent_binary_repository: IAgentBinaryRepository,
options: Dict, options: Dict,
interrupt: threading.Event, interrupt: threading.Event,

View File

@ -1,7 +1,7 @@
import threading import threading
from typing import Dict, Type from typing import Dict, Type
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from infection_monkey.model import VictimHost from infection_monkey.model import VictimHost
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
@ -22,7 +22,7 @@ class ExploiterWrapper:
self, self,
exploit_class: Type[HostExploiter], exploit_class: Type[HostExploiter],
telemetry_messenger: ITelemetryMessenger, telemetry_messenger: ITelemetryMessenger,
event_queue: IEventQueue, event_queue: IAgentEventQueue,
agent_binary_repository: IAgentBinaryRepository, agent_binary_repository: IAgentBinaryRepository,
): ):
self._exploit_class = exploit_class self._exploit_class = exploit_class
@ -47,7 +47,7 @@ class ExploiterWrapper:
def __init__( def __init__(
self, self,
telemetry_messenger: ITelemetryMessenger, telemetry_messenger: ITelemetryMessenger,
event_queue: IEventQueue, event_queue: IAgentEventQueue,
agent_binary_repository: IAgentBinaryRepository, agent_binary_repository: IAgentBinaryRepository,
): ):
self._telemetry_messenger = telemetry_messenger self._telemetry_messenger = telemetry_messenger

View File

@ -10,7 +10,7 @@ from typing import List
from pubsub.core import Publisher from pubsub.core import Publisher
import infection_monkey.tunnel as tunnel import infection_monkey.tunnel as tunnel
from common.event_queue import IEventQueue, PyPubSubEventQueue from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue
from common.events import CredentialsStolenEvent from common.events import CredentialsStolenEvent
from common.network.network_utils import address_to_ip_port from common.network.network_utils import address_to_ip_port
from common.utils.argparse_types import positive_int from common.utils.argparse_types import positive_int
@ -205,7 +205,7 @@ class InfectionMonkey:
control_channel control_channel
) )
event_queue = PyPubSubEventQueue(Publisher()) event_queue = PyPubSubAgentEventQueue(Publisher())
InfectionMonkey._subscribe_events(event_queue, propagation_credentials_repository) InfectionMonkey._subscribe_events(event_queue, propagation_credentials_repository)
puppet = self._build_puppet(propagation_credentials_repository, event_queue) puppet = self._build_puppet(propagation_credentials_repository, event_queue)
@ -228,7 +228,7 @@ class InfectionMonkey:
@staticmethod @staticmethod
def _subscribe_events( def _subscribe_events(
event_queue: IEventQueue, event_queue: IAgentEventQueue,
propagation_credentials_repository: IPropagationCredentialsRepository, propagation_credentials_repository: IPropagationCredentialsRepository,
): ):
event_queue.subscribe_type( event_queue.subscribe_type(
@ -249,7 +249,7 @@ class InfectionMonkey:
def _build_puppet( def _build_puppet(
self, self,
propagation_credentials_repository: IPropagationCredentialsRepository, propagation_credentials_repository: IPropagationCredentialsRepository,
event_queue: IEventQueue, event_queue: IAgentEventQueue,
) -> IPuppet: ) -> IPuppet:
puppet = Puppet() puppet = Puppet()

View File

@ -0,0 +1,3 @@
from .types import IslandEventSubscriber
from .i_island_event_queue import IIslandEventQueue, IslandEventTopic
from .pypubsub_island_event_queue import PyPubSubIslandEventQueue

View File

@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any
from . import IslandEventSubscriber
IslandEventTopic = Enum(
"IslandEventTopic", ["AGENT_CONNECTED", "CLEAR_SIMULATION_DATA", "RESET_AGENT_CONFIGURATION"]
)
class IIslandEventQueue(ABC):
"""
Manages subscription and publishing of events in the Island
"""
@abstractmethod
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
"""
Subscribes a subscriber to the specified event topic
:param topic: Event topic to which the subscriber should subscribe
:param subscriber: A subscriber that will receive events
"""
pass
@abstractmethod
def publish(self, topic: IslandEventTopic, event: Any = None):
"""
Publishes an event topic with the given data
:param topic: Event topic to publish
:param event: Event data to publish
"""
pass

View File

@ -0,0 +1,28 @@
import logging
from typing import Any
from pubsub.core import Publisher
from common.event_queue import PyPubSubPublisherWrapper
from . import IIslandEventQueue, IslandEventSubscriber, IslandEventTopic
logger = logging.getLogger(__name__)
class PyPubSubIslandEventQueue(IIslandEventQueue):
def __init__(self, pypubsub_publisher: Publisher):
self._pypubsub_publisher_wrapper = PyPubSubPublisherWrapper(pypubsub_publisher)
def subscribe(self, topic: IslandEventTopic, subscriber: IslandEventSubscriber):
topic_name = topic.name # needs to be a string for pypubsub
self._pypubsub_publisher_wrapper.subscribe(topic_name, subscriber)
def publish(self, topic: IslandEventTopic, event: Any = None):
topic_name = topic.name # needs to be a string for pypubsub
logger.debug(f"Publishing {topic_name} event")
if event is None:
self._pypubsub_publisher_wrapper.publish(topic_name)
else:
self._pypubsub_publisher_wrapper.publish(topic_name, event=event)

View File

@ -0,0 +1,3 @@
from typing import Callable
IslandEventSubscriber = Callable[..., None]

View File

@ -12,8 +12,9 @@ from common.agent_configuration import (
) )
from common.aws import AWSInstance from common.aws import AWSInstance
from common.common_consts.telem_categories import TelemCategoryEnum from common.common_consts.telem_categories import TelemCategoryEnum
from common.event_queue import IEventQueue, PyPubSubEventQueue from common.event_queue import IAgentEventQueue, PyPubSubAgentEventQueue
from common.utils.file_utils import get_binary_io_sha256_hash from common.utils.file_utils import get_binary_io_sha256_hash
from monkey_island.cc.event_queue import IIslandEventQueue, PyPubSubIslandEventQueue
from monkey_island.cc.repository import ( from monkey_island.cc.repository import (
AgentBinaryRepository, AgentBinaryRepository,
FileAgentConfigurationRepository, FileAgentConfigurationRepository,
@ -63,7 +64,8 @@ def initialize_services(container: DIContainer, data_dir: Path):
ILockableEncryptor, RepositoryEncryptor(data_dir / REPOSITORY_KEY_FILE_NAME) ILockableEncryptor, RepositoryEncryptor(data_dir / REPOSITORY_KEY_FILE_NAME)
) )
container.register(Publisher, Publisher) container.register(Publisher, Publisher)
container.register_instance(IEventQueue, container.resolve(PyPubSubEventQueue)) container.register_instance(IAgentEventQueue, container.resolve(PyPubSubAgentEventQueue))
container.register_instance(IIslandEventQueue, container.resolve(PyPubSubIslandEventQueue))
_register_repositories(container, data_dir) _register_repositories(container, data_dir)
_register_services(container) _register_services(container)

View File

@ -6,15 +6,15 @@ from uuid import UUID
import pytest import pytest
from pubsub.core import Publisher from pubsub.core import Publisher
from common.event_queue import EventSubscriber, IEventQueue, PyPubSubEventQueue from common.event_queue import AgentEventSubscriber, IAgentEventQueue, PyPubSubAgentEventQueue
from common.events import AbstractEvent from common.events import AbstractAgentEvent
EVENT_TAG_1 = "event tag 1" EVENT_TAG_1 = "event tag 1"
EVENT_TAG_2 = "event tag 2" EVENT_TAG_2 = "event tag 2"
@dataclass(frozen=True) @dataclass(frozen=True)
class TestEvent1(AbstractEvent): class TestEvent1(AbstractAgentEvent):
__test__ = False __test__ = False
source: UUID = UUID("f811ad00-5a68-4437-bd51-7b5cc1768ad5") source: UUID = UUID("f811ad00-5a68-4437-bd51-7b5cc1768ad5")
target: Union[UUID, IPv4Address, None] = None target: Union[UUID, IPv4Address, None] = None
@ -23,7 +23,7 @@ class TestEvent1(AbstractEvent):
@dataclass(frozen=True) @dataclass(frozen=True)
class TestEvent2(AbstractEvent): class TestEvent2(AbstractAgentEvent):
__test__ = False __test__ = False
source: UUID = UUID("e810ad01-6b67-9446-fc58-9b8d717653f7") source: UUID = UUID("e810ad01-6b67-9446-fc58-9b8d717653f7")
target: Union[UUID, IPv4Address, None] = None target: Union[UUID, IPv4Address, None] = None
@ -32,13 +32,13 @@ class TestEvent2(AbstractEvent):
@pytest.fixture @pytest.fixture
def event_queue() -> IEventQueue: def event_queue() -> IAgentEventQueue:
return PyPubSubEventQueue(Publisher()) return PyPubSubAgentEventQueue(Publisher())
@pytest.fixture @pytest.fixture
def event_queue_subscriber() -> Callable[[AbstractEvent], None]: def event_queue_subscriber() -> Callable[[AbstractAgentEvent], None]:
def fn(event: AbstractEvent): def fn(event: AbstractAgentEvent):
fn.call_count += 1 fn.call_count += 1
fn.call_types.add(event.__class__) fn.call_types.add(event.__class__)
fn.call_tags |= event.tags fn.call_tags |= event.tags
@ -50,7 +50,7 @@ def event_queue_subscriber() -> Callable[[AbstractEvent], None]:
return fn return fn
def test_subscribe_all(event_queue: IEventQueue, event_queue_subscriber: EventSubscriber): def test_subscribe_all(event_queue: IAgentEventQueue, event_queue_subscriber: AgentEventSubscriber):
event_queue.subscribe_all_events(event_queue_subscriber) event_queue.subscribe_all_events(event_queue_subscriber)
event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2}))) event_queue.publish(TestEvent1(tags=frozenset({EVENT_TAG_1, EVENT_TAG_2})))
@ -65,7 +65,7 @@ def test_subscribe_all(event_queue: IEventQueue, event_queue_subscriber: EventSu
@pytest.mark.parametrize("type_to_subscribe", [TestEvent1, TestEvent2]) @pytest.mark.parametrize("type_to_subscribe", [TestEvent1, TestEvent2])
def test_subscribe_types( def test_subscribe_types(
event_queue: IEventQueue, event_queue_subscriber: EventSubscriber, type_to_subscribe event_queue: IAgentEventQueue, event_queue_subscriber: AgentEventSubscriber, type_to_subscribe
): ):
event_queue.subscribe_type(type_to_subscribe, event_queue_subscriber) event_queue.subscribe_type(type_to_subscribe, event_queue_subscriber)
@ -77,7 +77,7 @@ def test_subscribe_types(
def test_subscribe_tags_single_type( def test_subscribe_tags_single_type(
event_queue: IEventQueue, event_queue_subscriber: EventSubscriber event_queue: IAgentEventQueue, event_queue_subscriber: AgentEventSubscriber
): ):
event_queue.subscribe_tag(EVENT_TAG_1, event_queue_subscriber) event_queue.subscribe_tag(EVENT_TAG_1, event_queue_subscriber)
@ -91,7 +91,7 @@ def test_subscribe_tags_single_type(
def test_subscribe_tags_multiple_types( def test_subscribe_tags_multiple_types(
event_queue: IEventQueue, event_queue_subscriber: EventSubscriber event_queue: IAgentEventQueue, event_queue_subscriber: AgentEventSubscriber
): ):
event_queue.subscribe_tag(EVENT_TAG_2, event_queue_subscriber) event_queue.subscribe_tag(EVENT_TAG_2, event_queue_subscriber)
@ -105,7 +105,9 @@ def test_subscribe_tags_multiple_types(
assert {EVENT_TAG_1, EVENT_TAG_2}.issubset(event_queue_subscriber.call_tags) assert {EVENT_TAG_1, EVENT_TAG_2}.issubset(event_queue_subscriber.call_tags)
def test_type_tag_collision(event_queue: IEventQueue, event_queue_subscriber: EventSubscriber): def test_type_tag_collision(
event_queue: IAgentEventQueue, event_queue_subscriber: AgentEventSubscriber
):
event_queue.subscribe_type(TestEvent1, event_queue_subscriber) event_queue.subscribe_type(TestEvent1, event_queue_subscriber)
event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__}))) event_queue.publish(TestEvent2(tags=frozenset({TestEvent1.__name__})))
@ -113,11 +115,11 @@ def test_type_tag_collision(event_queue: IEventQueue, event_queue_subscriber: Ev
assert event_queue_subscriber.call_count == 0 assert event_queue_subscriber.call_count == 0
def test_keep_subscriber_in_scope(event_queue: IEventQueue): def test_keep_subscriber_in_scope(event_queue: IAgentEventQueue):
class MyCallable: class MyCallable:
called = False called = False
def __call__(self, event: AbstractEvent): def __call__(self, event: AbstractAgentEvent):
MyCallable.called = True MyCallable.called = True
def subscribe(): def subscribe():

View File

@ -4,21 +4,21 @@ from unittest.mock import MagicMock
import pytest import pytest
from common.event_serializers import EventSerializerRegistry, IEventSerializer from common.event_serializers import EventSerializerRegistry, IEventSerializer
from common.events import AbstractEvent from common.events import AbstractAgentEvent
@dataclass(frozen=True) @dataclass(frozen=True)
class SomeEvent(AbstractEvent): class SomeEvent(AbstractAgentEvent):
some_param: int = field(default=435) some_param: int = field(default=435)
@dataclass(frozen=True) @dataclass(frozen=True)
class OtherEvent(AbstractEvent): class OtherEvent(AbstractAgentEvent):
other_param: float = field(default=123.456) other_param: float = field(default=123.456)
@dataclass(frozen=True) @dataclass(frozen=True)
class NoneEvent(AbstractEvent): class NoneEvent(AbstractAgentEvent):
none_param: float = field(default=1.0) none_param: float = field(default=1.0)

View File

@ -4,7 +4,7 @@ from unittest.mock import MagicMock
import pytest import pytest
from common.credentials import Credentials, LMHash, NTHash, Password, Username from common.credentials import Credentials, LMHash, NTHash, Password, Username
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from common.events import CredentialsStolenEvent from common.events import CredentialsStolenEvent
from infection_monkey.credential_collectors import MimikatzCredentialCollector from infection_monkey.credential_collectors import MimikatzCredentialCollector
from infection_monkey.credential_collectors.mimikatz_collector.mimikatz_credential_collector import ( # noqa: E501 from infection_monkey.credential_collectors.mimikatz_collector.mimikatz_credential_collector import ( # noqa: E501
@ -24,7 +24,7 @@ def patch_pypykatz(win_creds: [WindowsCredentials], monkeypatch):
def collect_credentials() -> Sequence[Credentials]: def collect_credentials() -> Sequence[Credentials]:
mock_event_queue = MagicMock(spec=IEventQueue) mock_event_queue = MagicMock(spec=IAgentEventQueue)
return MimikatzCredentialCollector(mock_event_queue).collect_credentials() return MimikatzCredentialCollector(mock_event_queue).collect_credentials()
@ -120,7 +120,7 @@ def test_pypykatz_result_parsing_no_secrets(monkeypatch):
def test_mimikatz_credentials_stolen_event_published(monkeypatch): def test_mimikatz_credentials_stolen_event_published(monkeypatch):
mock_event_queue = MagicMock(spec=IEventQueue) mock_event_queue = MagicMock(spec=IAgentEventQueue)
patch_pypykatz([], monkeypatch) patch_pypykatz([], monkeypatch)
mimikatz_credential_collector = MimikatzCredentialCollector(mock_event_queue) mimikatz_credential_collector = MimikatzCredentialCollector(mock_event_queue)
@ -134,7 +134,7 @@ def test_mimikatz_credentials_stolen_event_published(monkeypatch):
def test_mimikatz_credentials_stolen_event_tags(monkeypatch): def test_mimikatz_credentials_stolen_event_tags(monkeypatch):
mock_event_queue = MagicMock(spec=IEventQueue) mock_event_queue = MagicMock(spec=IAgentEventQueue)
patch_pypykatz([], monkeypatch) patch_pypykatz([], monkeypatch)
mimikatz_credential_collector = MimikatzCredentialCollector(mock_event_queue) mimikatz_credential_collector = MimikatzCredentialCollector(mock_event_queue)
@ -146,7 +146,7 @@ def test_mimikatz_credentials_stolen_event_tags(monkeypatch):
def test_mimikatz_credentials_stolen_event_stolen_credentials(monkeypatch): def test_mimikatz_credentials_stolen_event_stolen_credentials(monkeypatch):
mock_event_queue = MagicMock(spec=IEventQueue) mock_event_queue = MagicMock(spec=IAgentEventQueue)
win_creds = [ win_creds = [
WindowsCredentials( WindowsCredentials(
username="user2", password="secret2", lm_hash="0182BD0BD4444BF8FC83B5D9042EED2E" username="user2", password="secret2", lm_hash="0182BD0BD4444BF8FC83B5D9042EED2E"

View File

@ -3,7 +3,7 @@ from unittest.mock import MagicMock
import pytest import pytest
from common.credentials import Credentials, SSHKeypair, Username from common.credentials import Credentials, SSHKeypair, Username
from common.event_queue import IEventQueue from common.event_queue import IAgentEventQueue
from infection_monkey.credential_collectors import SSHCredentialCollector from infection_monkey.credential_collectors import SSHCredentialCollector
@ -25,7 +25,7 @@ def patch_ssh_handler(ssh_creds, monkeypatch):
def test_ssh_credentials_empty_results(monkeypatch, ssh_creds, patch_telemetry_messenger): def test_ssh_credentials_empty_results(monkeypatch, ssh_creds, patch_telemetry_messenger):
patch_ssh_handler(ssh_creds, monkeypatch) patch_ssh_handler(ssh_creds, monkeypatch)
collected = SSHCredentialCollector( collected = SSHCredentialCollector(
patch_telemetry_messenger, MagicMock(spec=IEventQueue) patch_telemetry_messenger, MagicMock(spec=IAgentEventQueue)
).collect_credentials() ).collect_credentials()
assert not collected assert not collected
@ -71,6 +71,6 @@ def test_ssh_info_result_parsing(monkeypatch, patch_telemetry_messenger):
Credentials(identity=None, secret=ssh_keypair3), Credentials(identity=None, secret=ssh_keypair3),
] ]
collected = SSHCredentialCollector( collected = SSHCredentialCollector(
patch_telemetry_messenger, MagicMock(spec=IEventQueue) patch_telemetry_messenger, MagicMock(spec=IAgentEventQueue)
).collect_credentials() ).collect_credentials()
assert expected == collected assert expected == collected

View File

@ -0,0 +1,93 @@
from typing import Callable
import pytest
from pubsub import pub
from pubsub.core import Publisher
from monkey_island.cc.event_queue import (
IIslandEventQueue,
IslandEventSubscriber,
IslandEventTopic,
PyPubSubIslandEventQueue,
)
@pytest.fixture
def event_queue() -> IIslandEventQueue:
return PyPubSubIslandEventQueue(Publisher())
@pytest.fixture
def event_queue_subscriber() -> Callable[..., None]:
class SubscriberSpy:
call_count = 0
call_topics = set()
def __call__(self, topic=pub.AUTO_TOPIC):
self.call_count += 1
self.call_topics |= {topic.getName()}
return SubscriberSpy()
def test_subscribe_publish__no_event_body(
event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber
):
event_queue.subscribe(
topic=IslandEventTopic.RESET_AGENT_CONFIGURATION, subscriber=event_queue_subscriber
)
event_queue.subscribe(
topic=IslandEventTopic.CLEAR_SIMULATION_DATA, subscriber=event_queue_subscriber
)
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED)
event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA)
event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION)
assert event_queue_subscriber.call_count == 2
assert event_queue_subscriber.call_topics == {
IslandEventTopic.RESET_AGENT_CONFIGURATION.name,
IslandEventTopic.CLEAR_SIMULATION_DATA.name,
}
def test_subscribe_publish__with_event_body(
event_queue: IIslandEventQueue, event_queue_subscriber: IslandEventSubscriber
):
class MyCallable:
call_count = 0
event = None
def __call__(self, event):
self.call_count += 1
self.event = event
event = "my event!"
my_callable = MyCallable()
event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=my_callable)
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED, event=event)
event_queue.publish(topic=IslandEventTopic.CLEAR_SIMULATION_DATA)
event_queue.publish(topic=IslandEventTopic.RESET_AGENT_CONFIGURATION)
assert my_callable.call_count == 1
assert my_callable.event == event
def test_keep_subscriber_in_scope(event_queue: IIslandEventQueue):
class MyCallable:
called = False
def __call__(self):
MyCallable.called = True
def subscribe():
# fn will go out of scope after subscribe() returns.
fn = MyCallable()
event_queue.subscribe(topic=IslandEventTopic.AGENT_CONNECTED, subscriber=fn)
subscribe()
event_queue.publish(topic=IslandEventTopic.AGENT_CONNECTED)
assert MyCallable.called

View File

@ -10,6 +10,7 @@ from common.agent_configuration.agent_sub_configurations import (
from common.credentials import Credentials from common.credentials import Credentials
from common.utils import IJSONSerializable from common.utils import IJSONSerializable
from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory
from monkey_island.cc.event_queue import IslandEventTopic, PyPubSubIslandEventQueue
from monkey_island.cc.models import Report from monkey_island.cc.models import Report
from monkey_island.cc.models.networkmap import Arc, NetworkMap from monkey_island.cc.models.networkmap import Arc, NetworkMap
from monkey_island.cc.repository.attack.IMitigationsRepository import IMitigationsRepository from monkey_island.cc.repository.attack.IMitigationsRepository import IMitigationsRepository
@ -277,16 +278,15 @@ key_list
simulation simulation
netmap netmap
validate_windows_filename_not_reserved validate_windows_filename_not_reserved
subscribe_all # common\event_queue\i_event_queue.py subscribe_all # common\event_queue\i_agent_event_queue.py
subscribe_type # common\event_queue\i_event_queue.py subscribe_type # common\event_queue\i_agent_event_queue.py
subscribe_tag # common\event_queue\i_event_queue.py subscribe_tag # common\event_queue\i_agent_event_queue.py
publish # common\event_queue\i_event_queue.py publish # common\event_queue\i_agent_event_queue.py
subscribe_all # common\event_queue\pypubsub_event_queue.py subscribe_all # common\event_queue\pypubsub_agent_event_queue.py
subscribe_type # common\event_queue\pypubsub_event_queue.py subscribe_type # common\event_queue\pypubsub_agent_event_queue.py
subscribe_tag # common\event_queue\pypubsub_event_queue.py subscribe_tag # common\event_queue\pypubsub_agent_event_queue.py
publish # common\event_queue\pypubsub_event_queue.py publish # common\event_queue\pypubsub_agent_event_queue.py
PyPubSubEventQueue # common\event_queue\pypubsub_event_queue.py subscribe_all_events # common\event_queue\pypubsub_agent_event_queue.py
subscribe_all_events # common\event_queue\pypubsub_event_queue.py
# TODO: Remove once #2179 is closed # TODO: Remove once #2179 is closed
@ -316,3 +316,7 @@ CC_TUNNEL
Credentials.from_json Credentials.from_json
IJSONSerializable.from_json IJSONSerializable.from_json
IslandEventTopic.AGENT_CONNECTED
IslandEventTopic.CLEAR_SIMULATION_DATA
IslandEventTopic.RESET_AGENT_CONFIGURATION