UT: Add test to check CredentialStolenEvent is published in MimikatzCredentialCollector

This commit is contained in:
Shreya Malviya 2022-08-16 19:07:06 +05:30
parent f453ff21fd
commit 95a3be0273
1 changed files with 37 additions and 2 deletions

View File

@ -1,11 +1,19 @@
from typing import Sequence from typing import Sequence
from unittest.mock import MagicMock from unittest.mock import MagicMock, Mock
import pytest import pytest
from pubsub.core import Publisher
from common.credentials import Credentials, LMHash, NTHash, Password, Username from common.credentials import Credentials, LMHash, NTHash, Password, Username
from common.event_queue import IEventQueue from common.event_queue import IEventQueue, PyPubSubEventQueue
from common.events import AbstractEvent
from infection_monkey.credential_collectors import MimikatzCredentialCollector from infection_monkey.credential_collectors import MimikatzCredentialCollector
from infection_monkey.credential_collectors.mimikatz_collector.mimikatz_credential_collector import (
MIMIKATZ_CREDENTIAL_COLLECTOR_TAG,
MIMIKATZ_EVENT_TAGS,
T1003_ATTACK_TECHNIQUE_TAG,
T1005_ATTACK_TECHNIQUE_TAG,
)
from infection_monkey.credential_collectors.mimikatz_collector.windows_credentials import ( from infection_monkey.credential_collectors.mimikatz_collector.windows_credentials import (
WindowsCredentials, WindowsCredentials,
) )
@ -113,3 +121,30 @@ def test_pypykatz_result_parsing_no_secrets(monkeypatch):
collected_credentials = collect_credentials() collected_credentials = collect_credentials()
assert len(collected_credentials) == 1 assert len(collected_credentials) == 1
assert collected_credentials == expected_credentials assert collected_credentials == expected_credentials
@pytest.fixture
def event_queue() -> IEventQueue:
return PyPubSubEventQueue(Publisher())
def test_pypykatz_credentials_stolen_event_published(monkeypatch, event_queue):
def subscriber(event: AbstractEvent):
subscriber.call_count += 1
subscriber.call_tags |= event.tags
subscriber.call_count = 0
subscriber.call_tags = set()
event_queue.subscribe_tag(MIMIKATZ_CREDENTIAL_COLLECTOR_TAG, subscriber)
event_queue.subscribe_tag(T1003_ATTACK_TECHNIQUE_TAG, subscriber)
event_queue.subscribe_tag(T1005_ATTACK_TECHNIQUE_TAG, subscriber)
mimikatz_credential_collector = MimikatzCredentialCollector(event_queue)
monkeypatch.setattr(
"infection_monkey.credential_collectors.mimikatz_collector.pypykatz_handler", Mock()
)
mimikatz_credential_collector.collect_credentials()
assert subscriber.call_count == 3
assert subscriber.call_tags == MIMIKATZ_EVENT_TAGS