Agent: Refactor ping_scanner to remove unnecessary inheritance

This commit is contained in:
Mike Salvatore 2022-01-31 19:53:59 -05:00
parent 635496a4be
commit 9f2fe5e513
3 changed files with 237 additions and 57 deletions

View File

@ -1 +1,2 @@
from .scan_target_generator import NetworkAddress, NetworkInterface from .scan_target_generator import NetworkAddress, NetworkInterface
from .ping_scanner import ping

View File

@ -1,79 +1,83 @@
import logging import logging
import math
import os import os
import re import re
import subprocess import subprocess
import sys import sys
import infection_monkey.config from infection_monkey.i_puppet import PingScanData
from infection_monkey.network.HostFinger import HostFinger
from infection_monkey.network.HostScanner import HostScanner
PING_COUNT_FLAG = "-n" if "win32" == sys.platform else "-c" TTL_REGEX = re.compile(r"TTL=([0-9]+)\b", re.IGNORECASE)
PING_TIMEOUT_FLAG = "-w" if "win32" == sys.platform else "-W" LINUX_TTL = 64 # Windows TTL is 128
TTL_REGEX_STR = r"(?<=TTL\=)[0-9]+" PING_EXIT_TIMEOUT = 10
LINUX_TTL = 64
WINDOWS_TTL = 128
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PingScanner(HostScanner, HostFinger): def ping(host: str, timeout: float) -> PingScanData:
_SCANNED_SERVICE = "" if "win32" == sys.platform:
timeout = math.floor(timeout * 1000)
def __init__(self): ping_command_output = _run_ping_command(host, timeout)
self._timeout = infection_monkey.config.WormConfiguration.ping_scan_timeout
if not "win32" == sys.platform:
self._timeout /= 1000
self._devnull = open(os.devnull, "w") ping_scan_data = _process_ping_command_output(ping_command_output)
self._ttl_regex = re.compile(TTL_REGEX_STR, re.IGNORECASE) logger.debug(f"{host} - {ping_scan_data}")
def is_host_alive(self, host): return ping_scan_data
ping_cmd = self._build_ping_command(host.ip_addr)
logger.debug(f"Running ping command: {' '.join(ping_cmd)}")
return 0 == subprocess.call(
ping_cmd,
stdout=self._devnull,
stderr=self._devnull,
)
def get_host_fingerprint(self, host): def _run_ping_command(host: str, timeout: float) -> str:
ping_cmd = self._build_ping_command(host.ip_addr) ping_cmd = _build_ping_command(host, timeout)
logger.debug(f"Running ping command: {' '.join(ping_cmd)}") logger.debug(f"Running ping command: {' '.join(ping_cmd)}")
# If stdout is not connected to a terminal (i.e. redirected to a pipe or file), the result # If stdout is not connected to a terminal (i.e. redirected to a pipe or file), the result
# of os.device_encoding(1) will be None. Setting errors="backslashreplace" prevents a crash # of os.device_encoding(1) will be None. Setting errors="backslashreplace" prevents a crash
# in this case. See #1175 and #1403 for more information. # in this case. See #1175 and #1403 for more information.
encoding = os.device_encoding(1) encoding = os.device_encoding(1)
sub_proc = subprocess.Popen( sub_proc = subprocess.Popen(
ping_cmd, ping_cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,
encoding=encoding, encoding=encoding,
errors="backslashreplace", errors="backslashreplace",
) )
logger.debug(f"Retrieving ping command output using {encoding} encoding") logger.debug(f"Retrieving ping command output using {encoding} encoding")
output = " ".join(sub_proc.communicate())
regex_result = self._ttl_regex.search(output)
if regex_result:
try:
ttl = int(regex_result.group(0))
if ttl <= LINUX_TTL:
host.os["type"] = "linux"
else: # as far we we know, could also be OSX/BSD but lets handle that when it
# comes up.
host.os["type"] = "windows"
host.icmp = True try:
# The underlying ping command should timeout within the specified timeout. Setting the
# timeout parameter on communicate() is a failsafe mechanism for ensuring this does not
# block indefinitely.
output = " ".join(sub_proc.communicate(timeout=(timeout + PING_EXIT_TIMEOUT)))
logger.debug(output)
except subprocess.TimeoutExpired as te:
logger.error(te)
return ""
return True return output
except Exception as exc:
logger.debug("Error parsing ping fingerprint: %s", exc)
return False
def _build_ping_command(self, ip_addr): def _process_ping_command_output(ping_command_output: str) -> PingScanData:
return ["ping", PING_COUNT_FLAG, "1", PING_TIMEOUT_FLAG, str(self._timeout), ip_addr] ttl_match = TTL_REGEX.search(ping_command_output)
if not ttl_match:
return PingScanData(False, None)
# It should be impossible for this next line to raise any errors, since the TTL_REGEX won't
# match at all if the group isn't found or the contents of the group are not only digits.
ttl = int(ttl_match.group(1))
operating_system = None
if ttl <= LINUX_TTL:
operating_system = "linux"
else: # as far we we know, could also be OSX/BSD, but lets handle that when it comes up.
operating_system = "windows"
return PingScanData(True, operating_system)
def _build_ping_command(host: str, timeout: float):
ping_count_flag = "-n" if "win32" == sys.platform else "-c"
ping_timeout_flag = "-w" if "win32" == sys.platform else "-W"
return ["ping", ping_count_flag, "1", ping_timeout_flag, str(timeout), host]

View File

@ -0,0 +1,175 @@
import subprocess
from unittest.mock import MagicMock
import pytest
from infection_monkey.network import ping
LINUX_SUCCESS_OUTPUT = """
PING 192.168.1.1 (192.168.1.1) 56(84) bytes of data.
64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.057 ms
--- 192.168.1.1 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 0.057/0.057/0.057/0.000 ms
"""
LINUX_NO_RESPONSE_OUTPUT = """
PING test-fake-domain.com (127.0.0.1) 56(84) bytes of data.
--- test-fake-domain.com ping statistics ---
1 packets transmitted, 0 received, 100% packet loss, time 0ms
"""
WINDOWS_SUCCESS_OUTPUT = """
Pinging 10.0.0.1 with 32 bytes of data:
Reply from 10.0.0.1: bytes=32 time=2ms TTL=127
Ping statistics for 10.0.0.1:
Packets: Sent = 1, Received = 1, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
Minimum = 2ms, Maximum = 2ms, Average = 2ms
"""
WINDOWS_NO_RESPONSE_OUTPUT = """
Pinging 10.0.0.99 with 32 bytes of data:
Request timed out.
Ping statistics for 10.0.0.99:
Packets: Sent = 1, Received = 0, Lost = 1 (100% loss),
"""
MALFORMED_OUTPUT = """
WUBBA LUBBA DUB DUBttl=1a1 time=0.201 ms
TTL=b10
TTL=1C
ttl=2d2!
"""
@pytest.fixture
def patch_subprocess_running_ping(monkeypatch):
def inner(mock_obj):
monkeypatch.setattr("subprocess.Popen", MagicMock(return_value=mock_obj))
return inner
@pytest.fixture
def patch_subprocess_running_ping_with_ping_output(patch_subprocess_running_ping):
def inner(ping_output):
mock_ping = MagicMock()
mock_ping.communicate = MagicMock(return_value=(ping_output, ""))
patch_subprocess_running_ping(mock_ping)
return inner
@pytest.fixture
def patch_subprocess_running_ping_to_raise_timeout_expired(patch_subprocess_running_ping):
mock_ping = MagicMock()
mock_ping.communicate = MagicMock(side_effect=subprocess.TimeoutExpired(["test-ping"], 10))
patch_subprocess_running_ping(mock_ping)
@pytest.fixture
def set_os_linux(monkeypatch):
monkeypatch.setattr("sys.platform", "linux")
@pytest.fixture
def set_os_windows(monkeypatch):
monkeypatch.setattr("sys.platform", "win32")
@pytest.mark.usefixtures("set_os_linux")
def test_linux_ping_success(patch_subprocess_running_ping_with_ping_output):
patch_subprocess_running_ping_with_ping_output(LINUX_SUCCESS_OUTPUT)
result = ping("192.168.1.1", 1.0)
assert result.response_received
assert result.os == "linux"
@pytest.mark.usefixtures("set_os_linux")
def test_linux_ping_no_response(patch_subprocess_running_ping_with_ping_output):
patch_subprocess_running_ping_with_ping_output(LINUX_NO_RESPONSE_OUTPUT)
result = ping("192.168.1.1", 1.0)
assert not result.response_received
assert result.os is None
@pytest.mark.usefixtures("set_os_windows")
def test_windows_ping_success(patch_subprocess_running_ping_with_ping_output):
patch_subprocess_running_ping_with_ping_output(WINDOWS_SUCCESS_OUTPUT)
result = ping("192.168.1.1", 1.0)
assert result.response_received
assert result.os == "windows"
@pytest.mark.usefixtures("set_os_windows")
def test_windows_ping_no_response(patch_subprocess_running_ping_with_ping_output):
patch_subprocess_running_ping_with_ping_output(WINDOWS_NO_RESPONSE_OUTPUT)
result = ping("192.168.1.1", 1.0)
assert not result.response_received
assert result.os is None
def test_malformed_ping_command_response(patch_subprocess_running_ping_with_ping_output):
patch_subprocess_running_ping_with_ping_output(MALFORMED_OUTPUT)
result = ping("192.168.1.1", 1.0)
assert not result.response_received
assert result.os is None
@pytest.mark.usefixtures("patch_subprocess_running_ping_to_raise_timeout_expired")
def test_timeout_expired():
result = ping("192.168.1.1", 1.0)
assert not result.response_received
assert result.os is None
@pytest.fixture
def ping_command_spy(monkeypatch):
ping_stub = MagicMock()
monkeypatch.setattr("subprocess.Popen", ping_stub)
return ping_stub
@pytest.fixture
def assert_expected_timeout(ping_command_spy):
def inner(timeout_flag, timeout_input, expected_timeout):
ping("192.168.1.1", timeout_input)
assert ping_command_spy.call_args is not None
ping_command = ping_command_spy.call_args[0][0]
assert timeout_flag in ping_command
timeout_flag_index = ping_command.index(timeout_flag)
assert ping_command[timeout_flag_index + 1] == expected_timeout
return inner
@pytest.mark.usefixtures("set_os_windows")
def test_windows_timeout(assert_expected_timeout):
timeout_flag = "-w"
timeout = 1.42379
assert_expected_timeout(timeout_flag, timeout, str(1423))
@pytest.mark.usefixtures("set_os_linux")
def test_linux_timeout(assert_expected_timeout):
timeout_flag = "-W"
timeout = 1.42379
assert_expected_timeout(timeout_flag, timeout, str(timeout))