Agent: Make ExploiterResultData a dataclass instead of a named tuple

and modify HostExploiter and the SSH exploiter accordingly
This commit is contained in:
Shreya Malviya 2022-02-23 18:24:54 +05:30
parent 2a8186928d
commit e993998432
3 changed files with 53 additions and 61 deletions

View File

@ -85,14 +85,9 @@ class HostExploiter:
return result
def pre_exploit(self):
self.exploit_result = {
"exploitation_success": False,
"propagation_success": False,
"os": self.host.os.get("type"),
"info": self.exploit_info,
"attempts": self.exploit_attempts,
"error_message": "",
}
self.exploit_result = ExploiterResultData(
os=self.host.os.get("type"), info=self.exploit_info, attempts=self.exploit_attempts
)
self.set_start_time()
def post_exploit(self):
@ -115,13 +110,3 @@ class HostExploiter:
"""
powershell = True if "powershell" in cmd.lower() else False
self.exploit_info["executed_cmds"].append({"cmd": cmd, "powershell": powershell})
def return_exploit_result_data(self) -> ExploiterResultData:
return ExploiterResultData(
self.exploit_result["exploitation_success"],
self.exploit_result["propagation_success"],
self.exploit_result["os"],
self.exploit_result["info"],
self.exploit_result["attempts"],
self.exploit_result["error_message"],
)

View File

@ -111,22 +111,22 @@ class SSHExploiter(HostExploiter):
is_open, _ = check_tcp_port(self.host.ip_addr, port)
if not is_open:
self.exploit_result["error_message"] = f"SSH port is closed on {self.host}, skipping"
self.exploit_result.error_message = f"SSH port is closed on {self.host}, skipping"
logger.info(self.exploit_result["error_message"])
return self.return_exploit_result_data()
logger.info(self.exploit_result.error_message)
return self.exploit_result
try:
ssh = self.exploit_with_ssh_keys(port)
self.exploit_result["exploitation_success"] = True
self.exploit_result.exploitation_success = True
except FailedExploitationError:
try:
ssh = self.exploit_with_login_creds(port)
self.exploit_result["exploitation_success"] = True
self.exploit_result.exploitation_success = True
except FailedExploitationError:
self.exploit_result["error_message"] = "Exploiter SSHExploiter is giving up..."
logger.debug(self.exploit_result["error_message"])
return self.return_exploit_result_data()
self.exploit_result.error_message = "Exploiter SSHExploiter is giving up..."
logger.debug(self.exploit_result.error_message)
return self.exploit_result
if not self.host.os.get("type"):
try:
@ -134,20 +134,20 @@ class SSHExploiter(HostExploiter):
uname_os = stdout.read().lower().strip().decode()
if "linux" in uname_os:
self.host.os["type"] = "linux"
self.exploit_result["os"] = "linux"
self.exploit_result.os = "linux"
else:
self.exploit_result["error_message"] = f"SSH Skipping unknown os: {uname_os}"
self.exploit_result.error_message = f"SSH Skipping unknown os: {uname_os}"
if not uname_os:
logger.error(self.exploit_result["error_message"])
return self.return_exploit_result_data()
logger.error(self.exploit_result.error_message)
return self.exploit_result
except Exception as exc:
self.exploit_result[
"error_message"
] = f"Error running uname os command on victim {self.host}: ({exc})"
self.exploit_result.error_message = (
f"Error running uname os command on victim {self.host}: ({exc})"
)
logger.debug(self.exploit_result["error_message"])
return self.return_exploit_result_data()
logger.debug(self.exploit_result.error_message)
return self.exploit_result
if not self.host.os.get("machine"):
try:
@ -156,20 +156,20 @@ class SSHExploiter(HostExploiter):
if "" != uname_machine:
self.host.os["machine"] = uname_machine
except Exception as exc:
self.exploit_result[
"error_message"
] = f"Error running uname machine command on victim {self.host}: ({exc})"
logger.error(self.exploit_result["error_message"])
self.exploit_result.error_message = (
f"Error running uname machine command on victim {self.host}: ({exc})"
)
logger.error(self.exploit_result.error_message)
src_path = get_target_monkey(self.host)
if not src_path:
self.exploit_result[
"error_message"
] = f"Can't find suitable monkey executable for host {self.host}"
self.exploit_result.error_message = (
f"Can't find suitable monkey executable for host {self.host}"
)
logger.info(self.exploit_result["error_message"])
return self.return_exploit_result_data()
logger.info(self.exploit_result.error_message)
return self.exploit_result
try:
ftp = ssh.open_sftp()
@ -193,10 +193,10 @@ class SSHExploiter(HostExploiter):
)
ftp.close()
except Exception as exc:
self.exploit_result[
"error_message"
] = f"Error uploading file into victim {self.host}: ({exc})"
logger.error(self.exploit_result["error_message"])
self.exploit_result.error_message = (
f"Error uploading file into victim {self.host}: ({exc})"
)
logger.error(self.exploit_result.error_message)
status = ScanStatus.SCANNED
self.telemetry_messenger.send_telemetry(
@ -205,7 +205,7 @@ class SSHExploiter(HostExploiter):
)
)
if status == ScanStatus.SCANNED:
return self.return_exploit_result_data()
return self.exploit_result
try:
cmdline = "%s %s" % (self.options["dropper_target_path_linux"], MONKEY_ARG)
@ -220,16 +220,16 @@ class SSHExploiter(HostExploiter):
cmdline,
)
self.exploit_result["propagation_success"] = True
self.exploit_result.propagation_success = True
ssh.close()
self.add_executed_cmd(cmdline)
return self.return_exploit_result_data()
return self.exploit_result
except Exception as exc:
self.exploit_result[
"error_message"
] = f"Error running monkey on victim {self.host}: ({exc})"
self.exploit_result.error_message = (
f"Error running monkey on victim {self.host}: ({exc})"
)
logger.error(self.exploit_result["error_message"])
return self.return_exploit_result_data()
logger.error(self.exploit_result.error_message)
return self.exploit_result

View File

@ -1,8 +1,9 @@
import abc
import threading
from collections import namedtuple
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Sequence
from typing import Dict, Iterable, List, Mapping, Sequence
from . import Credentials, PluginType
@ -16,10 +17,16 @@ class UnknownPluginError(Exception):
pass
ExploiterResultData = namedtuple(
"ExploiterResultData",
["exploitation_success", "propagation_success", "os", "info", "attempts", "error_message"],
)
@dataclass
class ExploiterResultData:
exploitation_success: bool = False
propagation_success: bool = False
os: str = ""
info: Mapping = None
attempts: Iterable = None
error_message: str = ""
PingScanData = namedtuple("PingScanData", ["response_received", "os"])
PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"])
FingerprintData = namedtuple("FingerprintData", ["os_type", "os_version", "services"])