Agent: Make tcp/ping timeouts consistent

* Ping takes a `timeout: float` instead of `options: Dict` the same way
  that `scan_tcp_port()` does.
* Timeouts are floats instead of ints
This commit is contained in:
Mike Salvatore 2021-12-13 07:44:05 -05:00
parent 11e3c5d6e4
commit 5a1e19391d
5 changed files with 14 additions and 17 deletions

View File

@ -36,22 +36,22 @@ class IPuppet(metaclass=abc.ABCMeta):
""" """
@abc.abstractmethod @abc.abstractmethod
def ping(self, host: str, options: Dict) -> PingScanData: def ping(self, host: str, timeout: float) -> PingScanData:
""" """
Sends a ping (ICMP packet) to a remote host Sends a ping (ICMP packet) to a remote host
:param str host: The domain name or IP address of a host :param str host: The domain name or IP address of a host
:return: A tuple that contains whether or not the host responded and the host's inferred :param float timeout: The maximum amount of time (in seconds) to wait for a response
operating system :return: The data collected by attempting to ping the target host
:rtype: Tuple[bool, Optional[str]] :rtype: PingScanData
""" """
@abc.abstractmethod @abc.abstractmethod
def scan_tcp_port(self, host: str, port: int, timeout: int) -> PortScanData: def scan_tcp_port(self, host: str, port: int, timeout: float) -> PortScanData:
""" """
Scans a TCP port on a remote host Scans a TCP port on a remote host
:param str host: The domain name or IP address of a host :param str host: The domain name or IP address of a host
:param int port: A TCP port number to scan :param int port: A TCP port number to scan
:param int timeout: The maximum amount of time (in seconds) to wait for a response :param float timeout: The maximum amount of time (in seconds) to wait for a response
:return: The data collected by scanning the provided host:port combination :return: The data collected by scanning the provided host:port combination
:rtype: PortScanData :rtype: PortScanData
""" """

View File

@ -44,7 +44,8 @@ class IPScanner:
ip = ips.get_nowait() ip = ips.get_nowait()
logger.info(f"Scanning {ip}") logger.info(f"Scanning {ip}")
ping_scan_data = self._puppet.ping(ip, options["icmp"]) icmp_timeout = options["icmp"]["timeout_ms"] / 1000
ping_scan_data = self._puppet.ping(ip, icmp_timeout)
port_scan_data = self._scan_tcp_ports(ip, options["tcp"], stop) port_scan_data = self._scan_tcp_ports(ip, options["tcp"], stop)
results_callback(ip, ping_scan_data, port_scan_data) results_callback(ip, ping_scan_data, port_scan_data)
@ -59,11 +60,13 @@ class IPScanner:
) )
def _scan_tcp_ports(self, ip: str, options: Dict, stop: Event): def _scan_tcp_ports(self, ip: str, options: Dict, stop: Event):
tcp_timeout = options["timeout_ms"] / 1000
port_scan_data = {} port_scan_data = {}
for p in options["ports"]: for p in options["ports"]:
if stop.is_set(): if stop.is_set():
break break
port_scan_data[p] = self._puppet.scan_tcp_port(ip, p, options["timeout_ms"]) port_scan_data[p] = self._puppet.scan_tcp_port(ip, p, tcp_timeout)
return port_scan_data return port_scan_data

View File

@ -66,7 +66,7 @@ class MockMaster(IMaster):
for ip in ips: for ip in ips:
h = self._hosts[ip] h = self._hosts[ip]
ping_scan_data = self._puppet.ping(ip, {}) ping_scan_data = self._puppet.ping(ip, 1)
h.icmp = ping_scan_data.response_received h.icmp = ping_scan_data.response_received
if ping_scan_data.os is not None: if ping_scan_data.os is not None:
h.os["type"] = ping_scan_data.os h.os["type"] = ping_scan_data.os

View File

@ -156,8 +156,8 @@ class MockPuppet(IPuppet):
else: else:
return PostBreachData("pba command 2", ["pba result 2", False]) return PostBreachData("pba command 2", ["pba result 2", False])
def ping(self, host: str, options: Dict) -> PingScanData: def ping(self, host: str, timeout: float = 1) -> PingScanData:
logger.debug(f"run_ping({host})") logger.debug(f"run_ping({host}, {timeout})")
if host == DOT_1: if host == DOT_1:
return PingScanData(True, "windows") return PingScanData(True, "windows")

View File

@ -12,12 +12,6 @@ WINDOWS_OS = "windows"
LINUX_OS = "linux" LINUX_OS = "linux"
class MockPuppet(MockPuppet):
def __init__(self):
self.ping = MagicMock(side_effect=super().ping)
self.scan_tcp_port = MagicMock(side_effect=super().scan_tcp_port)
@pytest.fixture @pytest.fixture
def scan_config(): def scan_config():
return { return {