Agent: Add notify_relay_on_propagation agent event handler

This commit is contained in:
Mike Salvatore 2022-10-03 13:02:00 -04:00
parent 368ddde20f
commit 0466eb7239
4 changed files with 74 additions and 0 deletions

View File

@ -0,0 +1 @@
from .notify_relay_on_propagation import notify_relay_on_propagation

View File

@ -0,0 +1,19 @@
import logging
from typing import Optional
from common.agent_events import PropagationEvent
from infection_monkey.network.relay import TCPRelay
logger = logging.getLogger(__name__)
class notify_relay_on_propagation:
def __init__(self, tcp_relay: Optional[TCPRelay]):
self._tcp_relay = tcp_relay
def __call__(self, event: PropagationEvent):
if self._tcp_relay is None:
return
if event.success:
self._tcp_relay.add_potential_user(event.target)

View File

@ -0,0 +1,54 @@
from ipaddress import IPv4Address
from unittest.mock import MagicMock
from uuid import UUID
import pytest
from common.agent_events import PropagationEvent
from infection_monkey.agent_event_handlers import notify_relay_on_propagation
from infection_monkey.network.relay import TCPRelay
TARGET_ADDRESS = IPv4Address("192.168.1.10")
SUCCESSFUL_PROPAGATION_EVENT = PropagationEvent(
source=UUID("f811ad00-5a68-4437-bd51-7b5cc1768ad5"),
target=TARGET_ADDRESS,
tags=frozenset({"test"}),
success=True,
exploiter_name="test_exploiter",
)
FAILED_PROPAGATION_EVENT = PropagationEvent(
source=UUID("f811ad00-5a68-4437-bd51-7b5cc1768ad5"),
target=TARGET_ADDRESS,
tags=frozenset({"test"}),
success=False,
exploiter_name="test_exploiter",
error_message="everything is broken",
)
@pytest.fixture
def mock_tcp_relay():
return MagicMock(spec=TCPRelay)
def test_relay_notified_on_successful_propation(mock_tcp_relay):
handler = notify_relay_on_propagation(mock_tcp_relay)
handler(SUCCESSFUL_PROPAGATION_EVENT)
mock_tcp_relay.add_potential_user.assert_called_once_with(TARGET_ADDRESS)
def test_relay_not_notified_on_successful_propation(mock_tcp_relay):
handler = notify_relay_on_propagation(mock_tcp_relay)
handler(FAILED_PROPAGATION_EVENT)
mock_tcp_relay.add_potential_user.assert_not_called()
def test_relay_not_notified_if_none():
handler = notify_relay_on_propagation(None)
# Raises AttributeError on failure
handler(SUCCESSFUL_PROPAGATION_EVENT)