UT: Simplify MimikatzCredentialCollector's event publishing test

This commit is contained in:
Shreya Malviya 2022-08-17 14:41:26 +05:30
parent 8f789b9d60
commit eb17b20625
1 changed files with 18 additions and 17 deletions

View File

@ -2,10 +2,10 @@ from typing import Sequence
from unittest.mock import MagicMock, Mock from unittest.mock import MagicMock, Mock
import pytest import pytest
from pubsub.core import Publisher
from common.credentials import Credentials, LMHash, NTHash, Password, Username from common.credentials import Credentials, LMHash, NTHash, Password, Username
from common.event_queue import IEventQueue, PyPubSubEventQueue from common.event_queue import IEventQueue
from common.events import CredentialsStolenEvent
from infection_monkey.credential_collectors import MimikatzCredentialCollector from infection_monkey.credential_collectors import MimikatzCredentialCollector
from infection_monkey.credential_collectors.mimikatz_collector.mimikatz_credential_collector import ( # noqa: E501 from infection_monkey.credential_collectors.mimikatz_collector.mimikatz_credential_collector import ( # noqa: E501
MIMIKATZ_EVENT_TAGS, MIMIKATZ_EVENT_TAGS,
@ -119,22 +119,23 @@ def test_pypykatz_result_parsing_no_secrets(monkeypatch):
assert collected_credentials == expected_credentials assert collected_credentials == expected_credentials
@pytest.fixture def test_mimikatz_credentials_stolen_event_published(monkeypatch):
def event_queue() -> IEventQueue: mock_event_queue = MagicMock(spec=IEventQueue)
return PyPubSubEventQueue(Publisher()) mock_subscriber = Mock()
def test_pypykatz_credentials_stolen_event_published(
monkeypatch, event_queue, event_queue_subscriber
):
for event_tag in MIMIKATZ_EVENT_TAGS:
event_queue.subscribe_tag(event_tag, event_queue_subscriber)
mimikatz_credential_collector = MimikatzCredentialCollector(event_queue)
monkeypatch.setattr( monkeypatch.setattr(
"infection_monkey.credential_collectors.mimikatz_collector.pypykatz_handler", Mock() "infection_monkey.credential_collectors.mimikatz_collector.pypykatz_handler", Mock()
) )
mimikatz_credential_collector.collect_credentials()
assert event_queue_subscriber.call_count == 3 for event_tag in MIMIKATZ_EVENT_TAGS:
assert event_queue_subscriber.call_tags == MIMIKATZ_EVENT_TAGS mock_event_queue.subscribe_tag(event_tag, mock_subscriber)
mimikatz_credential_collector = MimikatzCredentialCollector(mock_event_queue)
collected_credentials = mimikatz_credential_collector.collect_credentials()
mock_event_queue.publish.assert_called_once()
mock_event_queue_call_args = mock_event_queue.publish.call_args[0][0]
assert isinstance(mock_event_queue_call_args, CredentialsStolenEvent)
assert mock_event_queue_call_args.tags == frozenset(MIMIKATZ_EVENT_TAGS)
assert mock_event_queue_call_args.stolen_credentials == collected_credentials