From 8d361777bc53f1cfae32c433d04f55f7b9b1f597 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Fri, 10 Dec 2021 09:46:13 -0500 Subject: [PATCH] Agent: Return PingScanData from IPuppet.ping() --- monkey/infection_monkey/i_puppet.py | 5 +++-- monkey/infection_monkey/master/ip_scanner.py | 8 ++++---- monkey/infection_monkey/master/mock_master.py | 8 ++++---- monkey/infection_monkey/puppet/mock_puppet.py | 15 ++++++++------- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/monkey/infection_monkey/i_puppet.py b/monkey/infection_monkey/i_puppet.py index 03ce3999f..49040dd9f 100644 --- a/monkey/infection_monkey/i_puppet.py +++ b/monkey/infection_monkey/i_puppet.py @@ -2,7 +2,7 @@ import abc import threading from collections import namedtuple from enum import Enum -from typing import Dict, Optional, Tuple +from typing import Dict class PortStatus(Enum): @@ -11,6 +11,7 @@ class PortStatus(Enum): ExploiterResultData = namedtuple("ExploiterResultData", ["result", "info", "attempts"]) +PingScanData = namedtuple("PingScanData", ["response_received", "os"]) PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"]) PostBreachData = namedtuple("PostBreachData", ["command", "result"]) @@ -35,7 +36,7 @@ class IPuppet(metaclass=abc.ABCMeta): """ @abc.abstractmethod - def ping(self, host: str, options: Dict) -> Tuple[bool, Optional[str]]: + def ping(self, host: str, options: Dict) -> PingScanData: """ Sends a ping (ICMP packet) to a remote host :param str host: The domain name or IP address of a host diff --git a/monkey/infection_monkey/master/ip_scanner.py b/monkey/infection_monkey/master/ip_scanner.py index 4f438ccf3..419931064 100644 --- a/monkey/infection_monkey/master/ip_scanner.py +++ b/monkey/infection_monkey/master/ip_scanner.py @@ -61,11 +61,11 @@ class IPScanner: logger.debug(f"Detected the stop signal, scanning thread {threading.get_ident()} exiting") def _ping_ip(self, ip: str, victim_host: VictimHost, options: Dict): - (response_received, os) = self._puppet.ping(ip, options) + ping_scan_data = self._puppet.ping(ip, options) - victim_host.icmp = response_received - if os is not None: - victim_host.os["type"] = os + victim_host.icmp = ping_scan_data.response_received + if ping_scan_data.os is not None: + victim_host.os["type"] = ping_scan_data.os def _scan_tcp_ports(self, ip: str, victim_host: VictimHost, options: Dict, stop: Event): for p in options["ports"]: diff --git a/monkey/infection_monkey/master/mock_master.py b/monkey/infection_monkey/master/mock_master.py index e78519a43..8c8ecebdd 100644 --- a/monkey/infection_monkey/master/mock_master.py +++ b/monkey/infection_monkey/master/mock_master.py @@ -66,10 +66,10 @@ class MockMaster(IMaster): for ip in ips: h = self._hosts[ip] - (response_received, os) = self._puppet.ping(ip) - h.icmp = response_received - if os is not None: - h.os["type"] = os + ping_scan_data = self._puppet.ping(ip, {}) + h.icmp = ping_scan_data.response_received + if ping_scan_data.os is not None: + h.os["type"] = ping_scan_data.os for p in ports: port_scan_data = self._puppet.scan_tcp_port(ip, p) diff --git a/monkey/infection_monkey/puppet/mock_puppet.py b/monkey/infection_monkey/puppet/mock_puppet.py index a606e7043..a7f7fa324 100644 --- a/monkey/infection_monkey/puppet/mock_puppet.py +++ b/monkey/infection_monkey/puppet/mock_puppet.py @@ -1,10 +1,11 @@ import logging import threading -from typing import Dict, Optional, Tuple +from typing import Dict, Tuple from infection_monkey.i_puppet import ( ExploiterResultData, IPuppet, + PingScanData, PortScanData, PortStatus, PostBreachData, @@ -155,21 +156,21 @@ class MockPuppet(IPuppet): else: return PostBreachData("pba command 2", ["pba result 2", False]) - def ping(self, host: str, options: Dict) -> Tuple[bool, Optional[str]]: + def ping(self, host: str, options: Dict) -> PingScanData: logger.debug(f"run_ping({host})") if host == DOT_1: - return (True, "windows") + return PingScanData(True, "windows") if host == DOT_2: - return (False, None) + return PingScanData(False, None) if host == DOT_3: - return (True, "linux") + return PingScanData(True, "linux") if host == DOT_4: - return (False, None) + return PingScanData(False, None) - return (False, None) + return PingScanData(False, None) def scan_tcp_port(self, host: str, port: int, timeout: int = 3) -> PortScanData: logger.debug(f"run_scan_tcp_port({host}, {port}, {timeout})")