Agent: Use default value for exploiter name

This commit is contained in:
Kekoa Kaaikala 2022-10-03 20:20:15 +00:00
parent 0b72e4ef9a
commit 3e86766aaf
2 changed files with 104 additions and 4 deletions

View File

@ -132,7 +132,7 @@ class HostExploiter:
self, self,
target: str, target: str,
propagation_success: bool, propagation_success: bool,
exploiter_name: str, exploiter_name: str = "",
tags: frozenset = frozenset(), tags: frozenset = frozenset(),
error_message: str = "", error_message: str = "",
): ):
@ -140,7 +140,7 @@ class HostExploiter:
source=get_agent_id(), source=get_agent_id(),
target=IPv4Address(target), target=IPv4Address(target),
success=propagation_success, success=propagation_success,
exploiter_name=exploiter_name, exploiter_name=exploiter_name or self.__class__.__name__,
error_message=error_message, error_message=error_message,
tags=tags, tags=tags,
) )
@ -150,7 +150,7 @@ class HostExploiter:
self, self,
target: str, target: str,
exploitation_success: bool, exploitation_success: bool,
exploiter_name: str, exploiter_name: str = "",
tags: frozenset = frozenset(), tags: frozenset = frozenset(),
error_message: str = "", error_message: str = "",
): ):
@ -158,7 +158,7 @@ class HostExploiter:
source=get_agent_id(), source=get_agent_id(),
target=IPv4Address(target), target=IPv4Address(target),
success=exploitation_success, success=exploitation_success,
exploiter_name=exploiter_name, exploiter_name=exploiter_name or self.__class__.__name__,
error_message=error_message, error_message=error_message,
tags=tags, tags=tags,
) )

View File

@ -0,0 +1,100 @@
import threading
from ipaddress import IPv4Address
from unittest.mock import MagicMock
from uuid import UUID
import pytest
from common.agent_events import ExploitationEvent, PropagationEvent
from common.event_queue import IAgentEventQueue
from infection_monkey.exploit import IAgentBinaryRepository
from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.model import VictimHost
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
AGENT_ID = UUID("faaca0a2-6270-46dc-b8c9-592880d6d5cd")
TARGET = IPv4Address("10.10.10.2")
VICTIM_HOST = VictimHost("10.10.10.1")
class FakeExploiter(HostExploiter):
_EXPLOITED_SERVICE = "Fake"
def _exploit_host(self):
pass
@pytest.fixture(autouse=True)
def mock_get_agent_id(monkeypatch):
monkeypatch.setattr("infection_monkey.utils.ids", lambda _: AGENT_ID)
@pytest.fixture
def agent_binary_repository() -> IAgentBinaryRepository:
return MagicMock(spec=IAgentBinaryRepository)
@pytest.fixture
def agent_event_queue() -> IAgentEventQueue:
return MagicMock(spec=IAgentEventQueue)
@pytest.fixture
def telemetry_messenger() -> ITelemetryMessenger:
return MagicMock(spec=ITelemetryMessenger)
@pytest.fixture
def exploiter() -> HostExploiter:
return FakeExploiter()
def test_publish_exploitation_event__uses_exploiter_name_by_default(
exploiter: HostExploiter,
agent_binary_repository: IAgentBinaryRepository,
agent_event_queue: IAgentEventQueue,
telemetry_messenger: ITelemetryMessenger,
):
exploiter.exploit_host(
host=VICTIM_HOST,
servers=[],
current_depth=0,
telemetry_messenger=telemetry_messenger,
agent_event_queue=agent_event_queue,
agent_binary_repository=agent_binary_repository,
options={},
interrupt=threading.Event(),
)
exploiter.publish_exploitation_event(target=str(TARGET), exploitation_success=True)
expected_event = ExploitationEvent(
source=AGENT_ID,
target=TARGET,
success=True,
exploiter_name=FakeExploiter.__name__,
)
assert agent_event_queue.publish.called_with(expected_event) # type: ignore[attr-defined]
def test_publish_propagation_event__uses_exploiter_name_by_default(
exploiter: HostExploiter,
agent_binary_repository: IAgentBinaryRepository,
agent_event_queue: IAgentEventQueue,
telemetry_messenger: ITelemetryMessenger,
):
exploiter.exploit_host(
host=VICTIM_HOST,
servers=[],
current_depth=0,
telemetry_messenger=telemetry_messenger,
agent_event_queue=agent_event_queue,
agent_binary_repository=agent_binary_repository,
options={},
interrupt=threading.Event(),
)
exploiter.publish_propagation_event(target=str(TARGET), propagation_success=True)
expected_event = PropagationEvent(
source=AGENT_ID, target=TARGET, success=True, exploiter_name=FakeExploiter.__name__
)
assert agent_event_queue.publish.called_with(expected_event) # type: ignore[attr-defined]