Agent: Return PingScanData from IPuppet.ping()

This commit is contained in:
Mike Salvatore 2021-12-10 09:46:13 -05:00
parent 75cfa252c9
commit 8d361777bc
4 changed files with 19 additions and 17 deletions

View File

@ -2,7 +2,7 @@ import abc
import threading import threading
from collections import namedtuple from collections import namedtuple
from enum import Enum from enum import Enum
from typing import Dict, Optional, Tuple from typing import Dict
class PortStatus(Enum): class PortStatus(Enum):
@ -11,6 +11,7 @@ class PortStatus(Enum):
ExploiterResultData = namedtuple("ExploiterResultData", ["result", "info", "attempts"]) ExploiterResultData = namedtuple("ExploiterResultData", ["result", "info", "attempts"])
PingScanData = namedtuple("PingScanData", ["response_received", "os"])
PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"]) PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"])
PostBreachData = namedtuple("PostBreachData", ["command", "result"]) PostBreachData = namedtuple("PostBreachData", ["command", "result"])
@ -35,7 +36,7 @@ class IPuppet(metaclass=abc.ABCMeta):
""" """
@abc.abstractmethod @abc.abstractmethod
def ping(self, host: str, options: Dict) -> Tuple[bool, Optional[str]]: def ping(self, host: str, options: Dict) -> PingScanData:
""" """
Sends a ping (ICMP packet) to a remote host Sends a ping (ICMP packet) to a remote host
:param str host: The domain name or IP address of a host :param str host: The domain name or IP address of a host

View File

@ -61,11 +61,11 @@ class IPScanner:
logger.debug(f"Detected the stop signal, scanning thread {threading.get_ident()} exiting") logger.debug(f"Detected the stop signal, scanning thread {threading.get_ident()} exiting")
def _ping_ip(self, ip: str, victim_host: VictimHost, options: Dict): def _ping_ip(self, ip: str, victim_host: VictimHost, options: Dict):
(response_received, os) = self._puppet.ping(ip, options) ping_scan_data = self._puppet.ping(ip, options)
victim_host.icmp = response_received victim_host.icmp = ping_scan_data.response_received
if os is not None: if ping_scan_data.os is not None:
victim_host.os["type"] = os victim_host.os["type"] = ping_scan_data.os
def _scan_tcp_ports(self, ip: str, victim_host: VictimHost, options: Dict, stop: Event): def _scan_tcp_ports(self, ip: str, victim_host: VictimHost, options: Dict, stop: Event):
for p in options["ports"]: for p in options["ports"]:

View File

@ -66,10 +66,10 @@ class MockMaster(IMaster):
for ip in ips: for ip in ips:
h = self._hosts[ip] h = self._hosts[ip]
(response_received, os) = self._puppet.ping(ip) ping_scan_data = self._puppet.ping(ip, {})
h.icmp = response_received h.icmp = ping_scan_data.response_received
if os is not None: if ping_scan_data.os is not None:
h.os["type"] = os h.os["type"] = ping_scan_data.os
for p in ports: for p in ports:
port_scan_data = self._puppet.scan_tcp_port(ip, p) port_scan_data = self._puppet.scan_tcp_port(ip, p)

View File

@ -1,10 +1,11 @@
import logging import logging
import threading import threading
from typing import Dict, Optional, Tuple from typing import Dict, Tuple
from infection_monkey.i_puppet import ( from infection_monkey.i_puppet import (
ExploiterResultData, ExploiterResultData,
IPuppet, IPuppet,
PingScanData,
PortScanData, PortScanData,
PortStatus, PortStatus,
PostBreachData, PostBreachData,
@ -155,21 +156,21 @@ class MockPuppet(IPuppet):
else: else:
return PostBreachData("pba command 2", ["pba result 2", False]) return PostBreachData("pba command 2", ["pba result 2", False])
def ping(self, host: str, options: Dict) -> Tuple[bool, Optional[str]]: def ping(self, host: str, options: Dict) -> PingScanData:
logger.debug(f"run_ping({host})") logger.debug(f"run_ping({host})")
if host == DOT_1: if host == DOT_1:
return (True, "windows") return PingScanData(True, "windows")
if host == DOT_2: if host == DOT_2:
return (False, None) return PingScanData(False, None)
if host == DOT_3: if host == DOT_3:
return (True, "linux") return PingScanData(True, "linux")
if host == DOT_4: if host == DOT_4:
return (False, None) return PingScanData(False, None)
return (False, None) return PingScanData(False, None)
def scan_tcp_port(self, host: str, port: int, timeout: int = 3) -> PortScanData: def scan_tcp_port(self, host: str, port: int, timeout: int = 3) -> PortScanData:
logger.debug(f"run_scan_tcp_port({host}, {port}, {timeout})") logger.debug(f"run_scan_tcp_port({host}, {port}, {timeout})")