forked from p15670423/monkey
Merge pull request #1701 from guardicore/1603-enable-http-fingerprinting
Enable http fingerprinting
This commit is contained in:
commit
c15290415d
|
@ -9,3 +9,4 @@ from .i_puppet import (
|
||||||
PostBreachData,
|
PostBreachData,
|
||||||
UnknownPluginError,
|
UnknownPluginError,
|
||||||
)
|
)
|
||||||
|
from .i_fingerprinter import IFingerprinter
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
from abc import abstractmethod
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from . import FingerprintData, PingScanData, PortScanData
|
||||||
|
|
||||||
|
|
||||||
|
class IFingerprinter:
|
||||||
|
@abstractmethod
|
||||||
|
def get_host_fingerprint(
|
||||||
|
self,
|
||||||
|
host: str,
|
||||||
|
ping_scan_data: PingScanData,
|
||||||
|
port_scan_data: Dict[int, PortScanData],
|
||||||
|
options: Dict,
|
||||||
|
) -> FingerprintData:
|
||||||
|
"""
|
||||||
|
Attempts to gather detailed information about a host and its services
|
||||||
|
:param str host: The domain name or IP address of a host
|
||||||
|
:param PingScanData ping_scan_data: Data retrieved from the target host via ICMP
|
||||||
|
:param Dict[int, PortScanData] port_scan_data: Data retrieved from the target host via a TCP
|
||||||
|
port scan
|
||||||
|
:param Dict options: A dictionary containing options that modify the behavior of the
|
||||||
|
fingerprinter
|
||||||
|
:return: Detailed information about the target host
|
||||||
|
:rtype: FingerprintData
|
||||||
|
"""
|
||||||
|
pass
|
|
@ -83,15 +83,19 @@ class IPuppet(metaclass=abc.ABCMeta):
|
||||||
host: str,
|
host: str,
|
||||||
ping_scan_data: PingScanData,
|
ping_scan_data: PingScanData,
|
||||||
port_scan_data: Dict[int, PortScanData],
|
port_scan_data: Dict[int, PortScanData],
|
||||||
|
options: Dict,
|
||||||
) -> FingerprintData:
|
) -> FingerprintData:
|
||||||
"""
|
"""
|
||||||
Runs a fingerprinter against a remote host
|
Runs a specific fingerprinter to attempt to gather detailed information about a host and its
|
||||||
|
services
|
||||||
:param str name: The name of the fingerprinter to run
|
:param str name: The name of the fingerprinter to run
|
||||||
:param str host: The domain name or IP address of a host
|
:param str host: The domain name or IP address of a host
|
||||||
:param PingScanData ping_scan_data: Data retrieved from the target host via ICMP
|
:param PingScanData ping_scan_data: Data retrieved from the target host via ICMP
|
||||||
:param Dict[int, PortScanData] port_scan_data: Data retrieved from the target host via a TCP
|
:param Dict[int, PortScanData] port_scan_data: Data retrieved from the target host via a TCP
|
||||||
port scan
|
port scan
|
||||||
:return: The data collected by running the fingerprinter on the specified host
|
:param Dict options: A dictionary containing options that modify the behavior of the
|
||||||
|
fingerprinter
|
||||||
|
:return: Detailed information about the target host
|
||||||
:rtype: FingerprintData
|
:rtype: FingerprintData
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ import queue
|
||||||
import threading
|
import threading
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from typing import Callable, Dict, List
|
from typing import Any, Callable, Dict, List
|
||||||
|
|
||||||
from infection_monkey.i_puppet import (
|
from infection_monkey.i_puppet import (
|
||||||
FingerprintData,
|
FingerprintData,
|
||||||
|
@ -87,7 +87,7 @@ class IPScanner:
|
||||||
def _run_fingerprinters(
|
def _run_fingerprinters(
|
||||||
self,
|
self,
|
||||||
ip: str,
|
ip: str,
|
||||||
fingerprinters: List[str],
|
fingerprinters: List[Dict[str, Any]],
|
||||||
ping_scan_data: PingScanData,
|
ping_scan_data: PingScanData,
|
||||||
port_scan_data: Dict[int, PortScanData],
|
port_scan_data: Dict[int, PortScanData],
|
||||||
stop: Event,
|
stop: Event,
|
||||||
|
@ -95,6 +95,8 @@ class IPScanner:
|
||||||
fingerprint_data = {}
|
fingerprint_data = {}
|
||||||
|
|
||||||
for f in interruptable_iter(fingerprinters, stop):
|
for f in interruptable_iter(fingerprinters, stop):
|
||||||
fingerprint_data[f] = self._puppet.fingerprint(f, ip, ping_scan_data, port_scan_data)
|
fingerprint_data[f["name"]] = self._puppet.fingerprint(
|
||||||
|
f["name"], ip, ping_scan_data, port_scan_data, f["options"]
|
||||||
|
)
|
||||||
|
|
||||||
return fingerprint_data
|
return fingerprint_data
|
||||||
|
|
|
@ -18,6 +18,7 @@ from infection_monkey.master.control_channel import ControlChannel
|
||||||
from infection_monkey.model import DELAY_DELETE_CMD, VictimHostFactory
|
from infection_monkey.model import DELAY_DELETE_CMD, VictimHostFactory
|
||||||
from infection_monkey.network import NetworkInterface
|
from infection_monkey.network import NetworkInterface
|
||||||
from infection_monkey.network.firewall import app as firewall
|
from infection_monkey.network.firewall import app as firewall
|
||||||
|
from infection_monkey.network.http_fingerprinter import HTTPFingerprinter
|
||||||
from infection_monkey.network.info import get_local_network_interfaces
|
from infection_monkey.network.info import get_local_network_interfaces
|
||||||
from infection_monkey.payload.ransomware.ransomware_payload import RansomwarePayload
|
from infection_monkey.payload.ransomware.ransomware_payload import RansomwarePayload
|
||||||
from infection_monkey.puppet.puppet import Puppet
|
from infection_monkey.puppet.puppet import Puppet
|
||||||
|
@ -183,6 +184,7 @@ class InfectionMonkey:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _build_puppet() -> IPuppet:
|
def _build_puppet() -> IPuppet:
|
||||||
puppet = Puppet()
|
puppet = Puppet()
|
||||||
|
puppet.load_plugin("http", HTTPFingerprinter(), PluginType.FINGERPRINTER)
|
||||||
puppet.load_plugin("ransomware", RansomwarePayload(), PluginType.PAYLOAD)
|
puppet.load_plugin("ransomware", RansomwarePayload(), PluginType.PAYLOAD)
|
||||||
|
|
||||||
return puppet
|
return puppet
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
import logging
|
||||||
|
from contextlib import closing
|
||||||
|
from typing import Dict, Iterable, Optional, Set, Tuple
|
||||||
|
|
||||||
|
from requests import head
|
||||||
|
from requests.exceptions import ConnectionError, Timeout
|
||||||
|
|
||||||
|
from infection_monkey.i_puppet import (
|
||||||
|
FingerprintData,
|
||||||
|
IFingerprinter,
|
||||||
|
PingScanData,
|
||||||
|
PortScanData,
|
||||||
|
PortStatus,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HTTPFingerprinter(IFingerprinter):
|
||||||
|
"""
|
||||||
|
Queries potential HTTP(S) ports and attempt to determine the server software that handles the
|
||||||
|
HTTP requests.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_host_fingerprint(
|
||||||
|
self,
|
||||||
|
host: str,
|
||||||
|
_: PingScanData,
|
||||||
|
port_scan_data: Dict[int, PortScanData],
|
||||||
|
options: Dict,
|
||||||
|
) -> FingerprintData:
|
||||||
|
services = {}
|
||||||
|
http_ports = set(options.get("http_ports", []))
|
||||||
|
ports_to_fingerprint = _get_open_http_ports(http_ports, port_scan_data)
|
||||||
|
|
||||||
|
for port in ports_to_fingerprint:
|
||||||
|
server_header_contents, ssl = _query_potential_http_server(host, port)
|
||||||
|
|
||||||
|
if server_header_contents is not None:
|
||||||
|
services[f"tcp-{port}"] = {
|
||||||
|
"display_name": "HTTP",
|
||||||
|
"port": port,
|
||||||
|
"name": "http",
|
||||||
|
"data": (server_header_contents, ssl),
|
||||||
|
}
|
||||||
|
|
||||||
|
return FingerprintData(None, None, services)
|
||||||
|
|
||||||
|
|
||||||
|
def _query_potential_http_server(host: str, port: int) -> Tuple[Optional[str], Optional[bool]]:
|
||||||
|
# check both http and https
|
||||||
|
http = f"http://{host}:{port}"
|
||||||
|
https = f"https://{host}:{port}"
|
||||||
|
|
||||||
|
for url, ssl in ((https, True), (http, False)): # start with https and downgrade
|
||||||
|
server_header_contents = _get_server_from_headers(url)
|
||||||
|
|
||||||
|
if server_header_contents is not None:
|
||||||
|
return (server_header_contents, ssl)
|
||||||
|
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_server_from_headers(url: str) -> Optional[str]:
|
||||||
|
try:
|
||||||
|
logger.debug(f"Sending request for headers to {url}")
|
||||||
|
with closing(head(url, verify=False, timeout=1)) as req: # noqa: DUO123
|
||||||
|
server = req.headers.get("Server")
|
||||||
|
|
||||||
|
logger.debug(f'Got server string "{server}" from {url}')
|
||||||
|
return server
|
||||||
|
except Timeout:
|
||||||
|
logger.debug(f"Timeout while requesting headers from {url}")
|
||||||
|
except ConnectionError: # Someone doesn't like us
|
||||||
|
logger.debug(f"Connection error while requesting headers from {url}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_open_http_ports(
|
||||||
|
allowed_http_ports: Set, port_scan_data: Dict[int, PortScanData]
|
||||||
|
) -> Iterable[int]:
|
||||||
|
open_ports = (psd.port for psd in port_scan_data.values() if psd.status == PortStatus.OPEN)
|
||||||
|
return (port for port in open_ports if port in allowed_http_ports)
|
|
@ -1,47 +0,0 @@
|
||||||
import logging
|
|
||||||
|
|
||||||
import infection_monkey.config
|
|
||||||
from infection_monkey.network.HostFinger import HostFinger
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class HTTPFinger(HostFinger):
|
|
||||||
"""
|
|
||||||
Goal is to recognise HTTP servers, where what we currently care about is apache.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_SCANNED_SERVICE = "HTTP"
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._config = infection_monkey.config.WormConfiguration
|
|
||||||
self.HTTP = [(port, str(port)) for port in self._config.HTTP_PORTS]
|
|
||||||
|
|
||||||
def get_host_fingerprint(self, host):
|
|
||||||
from contextlib import closing
|
|
||||||
|
|
||||||
from requests import head
|
|
||||||
from requests.exceptions import ConnectionError, Timeout
|
|
||||||
|
|
||||||
for port in self.HTTP:
|
|
||||||
# check both http and https
|
|
||||||
http = "http://" + host.ip_addr + ":" + port[1]
|
|
||||||
https = "https://" + host.ip_addr + ":" + port[1]
|
|
||||||
|
|
||||||
# try http, we don't optimise for 443
|
|
||||||
for url in (https, http): # start with https and downgrade
|
|
||||||
try:
|
|
||||||
with closing(head(url, verify=False, timeout=1)) as req: # noqa: DUO123
|
|
||||||
server = req.headers.get("Server")
|
|
||||||
ssl = True if "https://" in url else False
|
|
||||||
self.init_service(host.services, ("tcp-" + port[1]), port[0])
|
|
||||||
host.services["tcp-" + port[1]]["name"] = "http"
|
|
||||||
host.services["tcp-" + port[1]]["data"] = (server, ssl)
|
|
||||||
logger.info("Port %d is open on host %s " % (port[0], host))
|
|
||||||
break # https will be the same on the same port
|
|
||||||
except Timeout:
|
|
||||||
logger.debug(f"Timeout while requesting headers from {url}")
|
|
||||||
except ConnectionError: # Someone doesn't like us
|
|
||||||
logger.debug(f"Connection error while requesting headers from {url}")
|
|
||||||
|
|
||||||
return True
|
|
|
@ -206,6 +206,7 @@ class MockPuppet(IPuppet):
|
||||||
host: str,
|
host: str,
|
||||||
ping_scan_data: PingScanData,
|
ping_scan_data: PingScanData,
|
||||||
port_scan_data: Dict[int, PortScanData],
|
port_scan_data: Dict[int, PortScanData],
|
||||||
|
options: Dict,
|
||||||
) -> FingerprintData:
|
) -> FingerprintData:
|
||||||
logger.debug(f"fingerprint({name}, {host})")
|
logger.debug(f"fingerprint({name}, {host})")
|
||||||
empty_fingerprint_data = FingerprintData(None, None, {})
|
empty_fingerprint_data = FingerprintData(None, None, {})
|
||||||
|
|
|
@ -47,8 +47,10 @@ class Puppet(IPuppet):
|
||||||
host: str,
|
host: str,
|
||||||
ping_scan_data: PingScanData,
|
ping_scan_data: PingScanData,
|
||||||
port_scan_data: Dict[int, PortScanData],
|
port_scan_data: Dict[int, PortScanData],
|
||||||
|
options: Dict,
|
||||||
) -> FingerprintData:
|
) -> FingerprintData:
|
||||||
return self._mock_puppet.fingerprint(name, host, ping_scan_data, port_scan_data)
|
fingerprinter = self._plugin_registry.get_plugin(name, PluginType.FINGERPRINTER)
|
||||||
|
return fingerprinter.get_host_fingerprint(host, ping_scan_data, port_scan_data, options)
|
||||||
|
|
||||||
def exploit_host(
|
def exploit_host(
|
||||||
self, name: str, host: str, options: Dict, interrupt: threading.Event
|
self, name: str, host: str, options: Dict, interrupt: threading.Event
|
||||||
|
|
|
@ -2,7 +2,8 @@ import collections
|
||||||
import copy
|
import copy
|
||||||
import functools
|
import functools
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, List
|
import re
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
from jsonschema import Draft4Validator, validators
|
from jsonschema import Draft4Validator, validators
|
||||||
|
|
||||||
|
@ -405,7 +406,7 @@ class ConfigService:
|
||||||
return ConfigService.get_config_value(EXPORT_MONKEY_TELEMS_PATH)
|
return ConfigService.get_config_value(EXPORT_MONKEY_TELEMS_PATH)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_config_propagation_credentials_from_flat_config(config):
|
def get_config_propagation_credentials_from_flat_config(config) -> Dict[str, List[str]]:
|
||||||
return {
|
return {
|
||||||
"exploit_user_list": config.get("exploit_user_list", []),
|
"exploit_user_list": config.get("exploit_user_list", []),
|
||||||
"exploit_password_list": config.get("exploit_password_list", []),
|
"exploit_password_list": config.get("exploit_password_list", []),
|
||||||
|
@ -482,8 +483,8 @@ class ConfigService:
|
||||||
config["propagation"] = formatted_propagation_config
|
config["propagation"] = formatted_propagation_config
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_network_scan_from_flat_config(config: Dict):
|
def _format_network_scan_from_flat_config(config: Dict) -> Dict[str, Any]:
|
||||||
formatted_network_scan_config = {"tcp": {}, "icmp": {}}
|
formatted_network_scan_config = {"tcp": {}, "icmp": {}, "fingerprinters": []}
|
||||||
|
|
||||||
formatted_network_scan_config["tcp"] = ConfigService._format_tcp_scan_from_flat_config(
|
formatted_network_scan_config["tcp"] = ConfigService._format_tcp_scan_from_flat_config(
|
||||||
config
|
config
|
||||||
|
@ -498,7 +499,7 @@ class ConfigService:
|
||||||
return formatted_network_scan_config
|
return formatted_network_scan_config
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_tcp_scan_from_flat_config(config: Dict):
|
def _format_tcp_scan_from_flat_config(config: Dict) -> Dict[str, Any]:
|
||||||
flat_http_ports_field = "HTTP_PORTS"
|
flat_http_ports_field = "HTTP_PORTS"
|
||||||
flat_tcp_timeout_field = "tcp_scan_timeout"
|
flat_tcp_timeout_field = "tcp_scan_timeout"
|
||||||
flat_tcp_ports_field = "tcp_target_ports"
|
flat_tcp_ports_field = "tcp_target_ports"
|
||||||
|
@ -525,7 +526,7 @@ class ConfigService:
|
||||||
return sorted(combined_ports)
|
return sorted(combined_ports)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_icmp_scan_from_flat_config(config: Dict):
|
def _format_icmp_scan_from_flat_config(config: Dict) -> Dict[str, Any]:
|
||||||
flat_ping_timeout_field = "ping_scan_timeout"
|
flat_ping_timeout_field = "ping_scan_timeout"
|
||||||
|
|
||||||
formatted_icmp_scan_config = {}
|
formatted_icmp_scan_config = {}
|
||||||
|
@ -536,16 +537,34 @@ class ConfigService:
|
||||||
return formatted_icmp_scan_config
|
return formatted_icmp_scan_config
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_fingerprinters_from_flat_config(config: Dict):
|
def _format_fingerprinters_from_flat_config(config: Dict) -> List[Dict[str, Any]]:
|
||||||
flat_fingerprinter_classes_field = "finger_classes"
|
flat_fingerprinter_classes_field = "finger_classes"
|
||||||
|
flat_http_ports_field = "HTTP_PORTS"
|
||||||
|
|
||||||
|
formatted_fingerprinters = [
|
||||||
|
{"name": f, "options": {}} for f in sorted(config[flat_fingerprinter_classes_field])
|
||||||
|
]
|
||||||
|
|
||||||
|
if "HTTPFinger" in config[flat_fingerprinter_classes_field]:
|
||||||
|
for fp in formatted_fingerprinters:
|
||||||
|
if fp["name"] == "HTTPFinger":
|
||||||
|
fp["options"] = {"http_ports": sorted(config[flat_http_ports_field])}
|
||||||
|
|
||||||
|
fp["name"] = ConfigService._translate_fingerprinter_name(fp["name"])
|
||||||
|
|
||||||
formatted_fingerprinters = config[flat_fingerprinter_classes_field]
|
|
||||||
config.pop(flat_fingerprinter_classes_field)
|
config.pop(flat_fingerprinter_classes_field)
|
||||||
|
|
||||||
return formatted_fingerprinters
|
return formatted_fingerprinters
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_targets_from_flat_config(config: Dict):
|
def _translate_fingerprinter_name(name: str) -> str:
|
||||||
|
# This translates names like "HTTPFinger" to "http". "HTTPFinger" is an old classname on the
|
||||||
|
# agent-side and is therefore unnecessarily couples the island to the fingerprinter's
|
||||||
|
# implementation within the agent. For the time being, fingerprinters will have names like
|
||||||
|
# "http", "ssh", "elastic", etc. This will be revisited when fingerprinters become plugins.
|
||||||
|
return re.sub(r"Finger", "", name).lower()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _format_targets_from_flat_config(config: Dict) -> Dict[str, Any]:
|
||||||
flat_blocked_ips_field = "blocked_ips"
|
flat_blocked_ips_field = "blocked_ips"
|
||||||
flat_inaccessible_subnets_field = "inaccessible_subnets"
|
flat_inaccessible_subnets_field = "inaccessible_subnets"
|
||||||
flat_local_network_scan_field = "local_network_scan"
|
flat_local_network_scan_field = "local_network_scan"
|
||||||
|
@ -572,7 +591,7 @@ class ConfigService:
|
||||||
return formatted_scan_targets_config
|
return formatted_scan_targets_config
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format_exploiters_from_flat_config(config: Dict):
|
def _format_exploiters_from_flat_config(config: Dict) -> Dict[str, List[Dict[str, Any]]]:
|
||||||
flat_config_exploiter_classes_field = "exploiter_classes"
|
flat_config_exploiter_classes_field = "exploiter_classes"
|
||||||
brute_force_category = "brute_force"
|
brute_force_category = "brute_force"
|
||||||
vulnerability_category = "vulnerability"
|
vulnerability_category = "vulnerability"
|
||||||
|
|
|
@ -30,7 +30,12 @@ def scan_config():
|
||||||
"icmp": {
|
"icmp": {
|
||||||
"timeout_ms": 1000,
|
"timeout_ms": 1000,
|
||||||
},
|
},
|
||||||
"fingerprinters": {"HTTPFinger", "SMBFinger", "SSHFinger"},
|
"fingerprinters": [
|
||||||
|
{"name": "HTTPFinger", "options": {}},
|
||||||
|
{"name": "SMBFinger", "options": {}},
|
||||||
|
{"name": "SSHFinger", "options": {}},
|
||||||
|
]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from infection_monkey.i_puppet import PortScanData, PortStatus
|
||||||
|
from infection_monkey.network.http_fingerprinter import HTTPFingerprinter
|
||||||
|
|
||||||
|
OPTIONS = {"http_ports": [80, 443, 8080, 9200]}
|
||||||
|
|
||||||
|
PYTHON_SERVER_HEADER = "SimpleHTTP/0.6 Python/3.6.9"
|
||||||
|
APACHE_SERVER_HEADER = "Apache/Server/Header"
|
||||||
|
|
||||||
|
SERVER_HEADERS = {
|
||||||
|
"https://127.0.0.1:443": PYTHON_SERVER_HEADER,
|
||||||
|
"http://127.0.0.1:8080": APACHE_SERVER_HEADER,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_get_server_from_headers():
|
||||||
|
return MagicMock(side_effect=lambda port: SERVER_HEADERS.get(port, None))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def patch_get_server_from_headers(monkeypatch, mock_get_server_from_headers):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"infection_monkey.network.http_fingerprinter._get_server_from_headers",
|
||||||
|
mock_get_server_from_headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def http_fingerprinter():
|
||||||
|
return HTTPFingerprinter()
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_http_ports_open(mock_get_server_from_headers, http_fingerprinter):
|
||||||
|
port_scan_data = {
|
||||||
|
80: PortScanData(80, PortStatus.CLOSED, "", "tcp-80"),
|
||||||
|
123: PortScanData(123, PortStatus.OPEN, "", "tcp-123"),
|
||||||
|
443: PortScanData(443, PortStatus.CLOSED, "", "tcp-443"),
|
||||||
|
8080: PortScanData(8080, PortStatus.CLOSED, "", "tcp-8080"),
|
||||||
|
}
|
||||||
|
http_fingerprinter.get_host_fingerprint("127.0.0.1", None, port_scan_data, OPTIONS)
|
||||||
|
|
||||||
|
assert not mock_get_server_from_headers.called
|
||||||
|
|
||||||
|
|
||||||
|
def test_fingerprint_only_port_443(mock_get_server_from_headers, http_fingerprinter):
|
||||||
|
port_scan_data = {
|
||||||
|
80: PortScanData(80, PortStatus.CLOSED, "", "tcp-80"),
|
||||||
|
123: PortScanData(123, PortStatus.OPEN, "", "tcp-123"),
|
||||||
|
443: PortScanData(443, PortStatus.OPEN, "", "tcp-443"),
|
||||||
|
8080: PortScanData(8080, PortStatus.CLOSED, "", "tcp-8080"),
|
||||||
|
}
|
||||||
|
fingerprint_data = http_fingerprinter.get_host_fingerprint(
|
||||||
|
"127.0.0.1", None, port_scan_data, OPTIONS
|
||||||
|
)
|
||||||
|
|
||||||
|
assert mock_get_server_from_headers.call_count == 1
|
||||||
|
mock_get_server_from_headers.assert_called_with("https://127.0.0.1:443")
|
||||||
|
|
||||||
|
assert fingerprint_data.os_type is None
|
||||||
|
assert fingerprint_data.os_version is None
|
||||||
|
assert len(fingerprint_data.services.keys()) == 1
|
||||||
|
|
||||||
|
assert fingerprint_data.services["tcp-443"]["data"][0] == PYTHON_SERVER_HEADER
|
||||||
|
assert fingerprint_data.services["tcp-443"]["data"][1] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_port_no_http_server(mock_get_server_from_headers, http_fingerprinter):
|
||||||
|
port_scan_data = {
|
||||||
|
80: PortScanData(80, PortStatus.CLOSED, "", "tcp-80"),
|
||||||
|
123: PortScanData(123, PortStatus.OPEN, "", "tcp-123"),
|
||||||
|
443: PortScanData(443, PortStatus.CLOSED, "", "tcp-443"),
|
||||||
|
9200: PortScanData(9200, PortStatus.OPEN, "", "tcp-9200"),
|
||||||
|
}
|
||||||
|
fingerprint_data = http_fingerprinter.get_host_fingerprint(
|
||||||
|
"127.0.0.1", None, port_scan_data, OPTIONS
|
||||||
|
)
|
||||||
|
|
||||||
|
assert mock_get_server_from_headers.call_count == 2
|
||||||
|
mock_get_server_from_headers.assert_any_call("https://127.0.0.1:9200")
|
||||||
|
mock_get_server_from_headers.assert_any_call("http://127.0.0.1:9200")
|
||||||
|
|
||||||
|
assert fingerprint_data.os_type is None
|
||||||
|
assert fingerprint_data.os_version is None
|
||||||
|
assert len(fingerprint_data.services.keys()) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_open_ports(mock_get_server_from_headers, http_fingerprinter):
|
||||||
|
port_scan_data = {
|
||||||
|
80: PortScanData(80, PortStatus.CLOSED, "", "tcp-80"),
|
||||||
|
443: PortScanData(443, PortStatus.OPEN, "", "tcp-443"),
|
||||||
|
8080: PortScanData(8080, PortStatus.OPEN, "", "tcp-8080"),
|
||||||
|
}
|
||||||
|
fingerprint_data = http_fingerprinter.get_host_fingerprint(
|
||||||
|
"127.0.0.1", None, port_scan_data, OPTIONS
|
||||||
|
)
|
||||||
|
|
||||||
|
assert mock_get_server_from_headers.call_count == 3
|
||||||
|
mock_get_server_from_headers.assert_any_call("https://127.0.0.1:443")
|
||||||
|
mock_get_server_from_headers.assert_any_call("https://127.0.0.1:8080")
|
||||||
|
mock_get_server_from_headers.assert_any_call("http://127.0.0.1:8080")
|
||||||
|
|
||||||
|
assert fingerprint_data.os_type is None
|
||||||
|
assert fingerprint_data.os_version is None
|
||||||
|
assert len(fingerprint_data.services.keys()) == 2
|
||||||
|
|
||||||
|
assert fingerprint_data.services["tcp-443"]["data"][0] == PYTHON_SERVER_HEADER
|
||||||
|
assert fingerprint_data.services["tcp-443"]["data"][1] is True
|
||||||
|
assert fingerprint_data.services["tcp-8080"]["data"][0] == APACHE_SERVER_HEADER
|
||||||
|
assert fingerprint_data.services["tcp-8080"]["data"][1] is False
|
|
@ -147,11 +147,14 @@ def test_format_config_for_agent__network_scan(flat_monkey_config):
|
||||||
"timeout_ms": 1000,
|
"timeout_ms": 1000,
|
||||||
},
|
},
|
||||||
"fingerprinters": [
|
"fingerprinters": [
|
||||||
"SMBFinger",
|
{"name": "elastic", "options": {}},
|
||||||
"SSHFinger",
|
{
|
||||||
"HTTPFinger",
|
"name": "http",
|
||||||
"MSSQLFinger",
|
"options": {"http_ports": [80, 443, 7001, 8008, 8080, 9200]},
|
||||||
"ElasticFinger",
|
},
|
||||||
|
{"name": "mssql", "options": {}},
|
||||||
|
{"name": "smb", "options": {}},
|
||||||
|
{"name": "ssh", "options": {}},
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
ConfigService.format_flat_config_for_agent(flat_monkey_config)
|
ConfigService.format_flat_config_for_agent(flat_monkey_config)
|
||||||
|
|
Loading…
Reference in New Issue