Agent: Remove disused ExploitInterceptingTelemetryMessenger

This commit is contained in:
Mike Salvatore 2022-10-03 13:08:52 -04:00
parent 2ad972548b
commit d3ff56138f
2 changed files with 0 additions and 103 deletions

View File

@ -1,41 +0,0 @@
from functools import singledispatch
from ipaddress import IPv4Address
from infection_monkey.network.relay import TCPRelay
from infection_monkey.telemetry.exploit_telem import ExploitTelem
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
class ExploitInterceptingTelemetryMessenger(ITelemetryMessenger):
def __init__(self, telemetry_messenger: ITelemetryMessenger, relay: TCPRelay):
self._telemetry_messenger = telemetry_messenger
self._relay = relay
def send_telemetry(self, telemetry: ITelem):
_send_telemetry(telemetry, self._telemetry_messenger, self._relay)
# Note: We can use @singledispatchmethod instead of @singledispatch if we migrate to Python 3.8 or
# later.
@singledispatch
def _send_telemetry(
telemetry: ITelem,
telemetry_messenger: ITelemetryMessenger,
relay: TCPRelay,
):
telemetry_messenger.send_telemetry(telemetry)
@_send_telemetry.register
def _(
telemetry: ExploitTelem,
telemetry_messenger: ITelemetryMessenger,
relay: TCPRelay,
):
if telemetry.propagation_result is True:
if relay:
address = IPv4Address(str(telemetry.host["ip_addr"]))
relay.add_potential_user(address)
telemetry_messenger.send_telemetry(telemetry)

View File

@ -1,62 +0,0 @@
from unittest.mock import MagicMock
from infection_monkey.i_puppet.i_puppet import ExploiterResultData
from infection_monkey.model.host import VictimHost
from infection_monkey.telemetry.exploit_telem import ExploitTelem
from infection_monkey.telemetry.messengers.exploit_intercepting_telemetry_messenger import (
ExploitInterceptingTelemetryMessenger,
)
class MockExploitTelem(ExploitTelem):
def __init__(self, propagation_success):
erd = ExploiterResultData()
erd.propagation_success = propagation_success
super().__init__("TestExploiter", VictimHost("127.0.0.1"), erd)
def get_data(self):
return {}
def test_generic_telemetry(TestTelem):
mock_telemetry_messenger = MagicMock()
mock_relay = MagicMock()
telemetry_messenger = ExploitInterceptingTelemetryMessenger(
mock_telemetry_messenger, mock_relay
)
telemetry_messenger.send_telemetry(TestTelem())
assert mock_telemetry_messenger.send_telemetry.called
assert not mock_relay.add_potential_user.called
def test_propagation_successful_exploit_telemetry():
mock_telemetry_messenger = MagicMock()
mock_relay = MagicMock()
mock_exploit_telem = MockExploitTelem(True)
telemetry_messenger = ExploitInterceptingTelemetryMessenger(
mock_telemetry_messenger, mock_relay
)
telemetry_messenger.send_telemetry(mock_exploit_telem)
assert mock_telemetry_messenger.send_telemetry.called
assert mock_relay.add_potential_user.called
def test_propagation_failed_exploit_telemetry():
mock_telemetry_messenger = MagicMock()
mock_relay = MagicMock()
mock_exploit_telem = MockExploitTelem(False)
telemetry_messenger = ExploitInterceptingTelemetryMessenger(
mock_telemetry_messenger, mock_relay
)
telemetry_messenger.send_telemetry(mock_exploit_telem)
assert mock_telemetry_messenger.send_telemetry.called
assert not mock_relay.add_potential_user.called