Agent: Remove CredentialsInterceptingTelemetryMessenger

This commit is contained in:
Ilija Lazoroski 2022-08-15 14:45:39 +02:00
parent 2edaf52140
commit 8dd6c5b7c2
2 changed files with 0 additions and 92 deletions

View File

@ -1,40 +0,0 @@
from functools import singledispatch
from infection_monkey.credential_repository import IPropagationCredentialsRepository
from infection_monkey.telemetry.credentials_telem import CredentialsTelem
from infection_monkey.telemetry.i_telem import ITelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
class CredentialsInterceptingTelemetryMessenger(ITelemetryMessenger):
def __init__(
self,
telemetry_messenger: ITelemetryMessenger,
credentials_store: IPropagationCredentialsRepository,
):
self._telemetry_messenger = telemetry_messenger
self._credentials_store = credentials_store
def send_telemetry(self, telemetry: ITelem):
_send_telemetry(telemetry, self._telemetry_messenger, self._credentials_store)
# Note: We can use @singledispatchmethod instead of @singledispatch if we migrate to Python 3.8 or
# later.
@singledispatch
def _send_telemetry(
telemetry: ITelem,
telemetry_messenger: ITelemetryMessenger,
credentials_store: IPropagationCredentialsRepository,
):
telemetry_messenger.send_telemetry(telemetry)
@_send_telemetry.register
def _(
telemetry: CredentialsTelem,
telemetry_messenger: ITelemetryMessenger,
credentials_store: IPropagationCredentialsRepository,
):
credentials_store.add_credentials(telemetry.credentials)
telemetry_messenger.send_telemetry(telemetry)

View File

@ -1,52 +0,0 @@
from unittest.mock import MagicMock
from common.credentials import Credentials, Password, SSHKeypair, Username
from infection_monkey.telemetry.credentials_telem import CredentialsTelem
from infection_monkey.telemetry.messengers.credentials_intercepting_telemetry_messenger import (
CredentialsInterceptingTelemetryMessenger,
)
TELEM_CREDENTIALS = [
Credentials(
Username("user1"),
SSHKeypair(public_key="some_public_key", private_key="some_private_key"),
),
Credentials(Username("root"), Password("password")),
]
class MockCredentialsTelem(CredentialsTelem):
def __init(self, credentials):
super().__init__(credentials)
def get_data(self):
return {}
def test_credentials_generic_telemetry(TestTelem):
mock_telemetry_messenger = MagicMock()
mock_credentials_repository = MagicMock()
telemetry_messenger = CredentialsInterceptingTelemetryMessenger(
mock_telemetry_messenger, mock_credentials_repository
)
telemetry_messenger.send_telemetry(TestTelem())
assert mock_telemetry_messenger.send_telemetry.called
assert not mock_credentials_repository.add_credentials.called
def test_successful_intercepting_credentials_telemetry():
mock_telemetry_messenger = MagicMock()
mock_credentials_repository = MagicMock()
mock_empty_credentials_telem = MockCredentialsTelem(TELEM_CREDENTIALS)
telemetry_messenger = CredentialsInterceptingTelemetryMessenger(
mock_telemetry_messenger, mock_credentials_repository
)
telemetry_messenger.send_telemetry(mock_empty_credentials_telem)
assert mock_telemetry_messenger.send_telemetry.called
assert mock_credentials_repository.add_credentials.called