From e993998432ce24be4310c3041d4f16ae3944f0b8 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Wed, 23 Feb 2022 18:24:54 +0530 Subject: [PATCH] Agent: Make ExploiterResultData a dataclass instead of a named tuple and modify HostExploiter and the SSH exploiter accordingly --- .../infection_monkey/exploit/HostExploiter.py | 21 +---- monkey/infection_monkey/exploit/sshexec.py | 76 +++++++++---------- monkey/infection_monkey/i_puppet/i_puppet.py | 17 +++-- 3 files changed, 53 insertions(+), 61 deletions(-) diff --git a/monkey/infection_monkey/exploit/HostExploiter.py b/monkey/infection_monkey/exploit/HostExploiter.py index 7c3750b03..b74dc3871 100644 --- a/monkey/infection_monkey/exploit/HostExploiter.py +++ b/monkey/infection_monkey/exploit/HostExploiter.py @@ -85,14 +85,9 @@ class HostExploiter: return result def pre_exploit(self): - self.exploit_result = { - "exploitation_success": False, - "propagation_success": False, - "os": self.host.os.get("type"), - "info": self.exploit_info, - "attempts": self.exploit_attempts, - "error_message": "", - } + self.exploit_result = ExploiterResultData( + os=self.host.os.get("type"), info=self.exploit_info, attempts=self.exploit_attempts + ) self.set_start_time() def post_exploit(self): @@ -115,13 +110,3 @@ class HostExploiter: """ powershell = True if "powershell" in cmd.lower() else False self.exploit_info["executed_cmds"].append({"cmd": cmd, "powershell": powershell}) - - def return_exploit_result_data(self) -> ExploiterResultData: - return ExploiterResultData( - self.exploit_result["exploitation_success"], - self.exploit_result["propagation_success"], - self.exploit_result["os"], - self.exploit_result["info"], - self.exploit_result["attempts"], - self.exploit_result["error_message"], - ) diff --git a/monkey/infection_monkey/exploit/sshexec.py b/monkey/infection_monkey/exploit/sshexec.py index 3fcf4605f..a8a585bed 100644 --- a/monkey/infection_monkey/exploit/sshexec.py +++ b/monkey/infection_monkey/exploit/sshexec.py @@ -111,22 +111,22 @@ class SSHExploiter(HostExploiter): is_open, _ = check_tcp_port(self.host.ip_addr, port) if not is_open: - self.exploit_result["error_message"] = f"SSH port is closed on {self.host}, skipping" + self.exploit_result.error_message = f"SSH port is closed on {self.host}, skipping" - logger.info(self.exploit_result["error_message"]) - return self.return_exploit_result_data() + logger.info(self.exploit_result.error_message) + return self.exploit_result try: ssh = self.exploit_with_ssh_keys(port) - self.exploit_result["exploitation_success"] = True + self.exploit_result.exploitation_success = True except FailedExploitationError: try: ssh = self.exploit_with_login_creds(port) - self.exploit_result["exploitation_success"] = True + self.exploit_result.exploitation_success = True except FailedExploitationError: - self.exploit_result["error_message"] = "Exploiter SSHExploiter is giving up..." - logger.debug(self.exploit_result["error_message"]) - return self.return_exploit_result_data() + self.exploit_result.error_message = "Exploiter SSHExploiter is giving up..." + logger.debug(self.exploit_result.error_message) + return self.exploit_result if not self.host.os.get("type"): try: @@ -134,20 +134,20 @@ class SSHExploiter(HostExploiter): uname_os = stdout.read().lower().strip().decode() if "linux" in uname_os: self.host.os["type"] = "linux" - self.exploit_result["os"] = "linux" + self.exploit_result.os = "linux" else: - self.exploit_result["error_message"] = f"SSH Skipping unknown os: {uname_os}" + self.exploit_result.error_message = f"SSH Skipping unknown os: {uname_os}" if not uname_os: - logger.error(self.exploit_result["error_message"]) - return self.return_exploit_result_data() + logger.error(self.exploit_result.error_message) + return self.exploit_result except Exception as exc: - self.exploit_result[ - "error_message" - ] = f"Error running uname os command on victim {self.host}: ({exc})" + self.exploit_result.error_message = ( + f"Error running uname os command on victim {self.host}: ({exc})" + ) - logger.debug(self.exploit_result["error_message"]) - return self.return_exploit_result_data() + logger.debug(self.exploit_result.error_message) + return self.exploit_result if not self.host.os.get("machine"): try: @@ -156,20 +156,20 @@ class SSHExploiter(HostExploiter): if "" != uname_machine: self.host.os["machine"] = uname_machine except Exception as exc: - self.exploit_result[ - "error_message" - ] = f"Error running uname machine command on victim {self.host}: ({exc})" - logger.error(self.exploit_result["error_message"]) + self.exploit_result.error_message = ( + f"Error running uname machine command on victim {self.host}: ({exc})" + ) + logger.error(self.exploit_result.error_message) src_path = get_target_monkey(self.host) if not src_path: - self.exploit_result[ - "error_message" - ] = f"Can't find suitable monkey executable for host {self.host}" + self.exploit_result.error_message = ( + f"Can't find suitable monkey executable for host {self.host}" + ) - logger.info(self.exploit_result["error_message"]) - return self.return_exploit_result_data() + logger.info(self.exploit_result.error_message) + return self.exploit_result try: ftp = ssh.open_sftp() @@ -193,10 +193,10 @@ class SSHExploiter(HostExploiter): ) ftp.close() except Exception as exc: - self.exploit_result[ - "error_message" - ] = f"Error uploading file into victim {self.host}: ({exc})" - logger.error(self.exploit_result["error_message"]) + self.exploit_result.error_message = ( + f"Error uploading file into victim {self.host}: ({exc})" + ) + logger.error(self.exploit_result.error_message) status = ScanStatus.SCANNED self.telemetry_messenger.send_telemetry( @@ -205,7 +205,7 @@ class SSHExploiter(HostExploiter): ) ) if status == ScanStatus.SCANNED: - return self.return_exploit_result_data() + return self.exploit_result try: cmdline = "%s %s" % (self.options["dropper_target_path_linux"], MONKEY_ARG) @@ -220,16 +220,16 @@ class SSHExploiter(HostExploiter): cmdline, ) - self.exploit_result["propagation_success"] = True + self.exploit_result.propagation_success = True ssh.close() self.add_executed_cmd(cmdline) - return self.return_exploit_result_data() + return self.exploit_result except Exception as exc: - self.exploit_result[ - "error_message" - ] = f"Error running monkey on victim {self.host}: ({exc})" + self.exploit_result.error_message = ( + f"Error running monkey on victim {self.host}: ({exc})" + ) - logger.error(self.exploit_result["error_message"]) - return self.return_exploit_result_data() + logger.error(self.exploit_result.error_message) + return self.exploit_result diff --git a/monkey/infection_monkey/i_puppet/i_puppet.py b/monkey/infection_monkey/i_puppet/i_puppet.py index 78c0c9659..0a4cdd2dd 100644 --- a/monkey/infection_monkey/i_puppet/i_puppet.py +++ b/monkey/infection_monkey/i_puppet/i_puppet.py @@ -1,8 +1,9 @@ import abc import threading from collections import namedtuple +from dataclasses import dataclass from enum import Enum -from typing import Dict, List, Sequence +from typing import Dict, Iterable, List, Mapping, Sequence from . import Credentials, PluginType @@ -16,10 +17,16 @@ class UnknownPluginError(Exception): pass -ExploiterResultData = namedtuple( - "ExploiterResultData", - ["exploitation_success", "propagation_success", "os", "info", "attempts", "error_message"], -) +@dataclass +class ExploiterResultData: + exploitation_success: bool = False + propagation_success: bool = False + os: str = "" + info: Mapping = None + attempts: Iterable = None + error_message: str = "" + + PingScanData = namedtuple("PingScanData", ["response_received", "os"]) PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"]) FingerprintData = namedtuple("FingerprintData", ["os_type", "os_version", "services"])