Agent: Refactor HTTPFinger to conform to IFingerprinter interface

* Remove dependency on Plugin, HostFinger, and WormConfiguration
* Improve readability
* Reduce unnecessary HTTP requests by using the PortScanData to only
  query ports we know are open.
This commit is contained in:
Mike Salvatore 2022-02-07 10:26:20 -05:00
parent 4361aa2325
commit f5ef660bd2
1 changed files with 64 additions and 31 deletions

View File

@ -1,47 +1,80 @@
import logging import logging
from contextlib import closing
from typing import Dict, Iterable, Optional, Set, Tuple
import infection_monkey.config from requests import head
from infection_monkey.network.HostFinger import HostFinger from requests.exceptions import ConnectionError, Timeout
from infection_monkey.i_puppet import (
FingerprintData,
IFingerprinter,
PingScanData,
PortScanData,
PortStatus,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class HTTPFinger(HostFinger): class HTTPFinger(IFingerprinter):
""" """
Goal is to recognise HTTP servers, where what we currently care about is apache. Goal is to recognise HTTP servers, where what we currently care about is apache.
""" """
_SCANNED_SERVICE = "HTTP" def get_host_fingerprint(
self,
host: str,
ping_scan_data: PingScanData,
port_scan_data: Dict[int, PortScanData],
options: Dict,
):
services = {}
http_ports = set(options.get("http_ports", []))
ports_to_fingerprint = _get_open_http_ports(http_ports, port_scan_data)
def __init__(self): for port in ports_to_fingerprint:
self._config = infection_monkey.config.WormConfiguration server_header_contents, ssl = _query_potential_http_server(host, port)
self.HTTP = [(port, str(port)) for port in self._config.HTTP_PORTS]
def get_host_fingerprint(self, host): if server_header_contents is not None:
from contextlib import closing services[f"tcp-{port}"] = {
"display_name": "HTTP",
"port": port,
"name": "http",
"data": (server_header_contents, ssl),
}
from requests import head return FingerprintData(None, None, services)
from requests.exceptions import ConnectionError, Timeout
for port in self.HTTP:
def _query_potential_http_server(host: str, port: int) -> Tuple[Optional[str], Optional[bool]]:
# check both http and https # check both http and https
http = "http://" + host.ip_addr + ":" + port[1] http = f"http://{host}:{port}"
https = "https://" + host.ip_addr + ":" + port[1] https = f"https://{host}:{port}"
# try http, we don't optimise for 443 # try http, we don't optimise for 443
for url in (https, http): # start with https and downgrade for url, ssl in ((https, True), (http, False)): # start with https and downgrade
server_header_contents = _get_server_from_headers(url)
if server_header_contents is not None:
return (server_header_contents, ssl)
return (None, None)
def _get_server_from_headers(url: str) -> Optional[str]:
try: try:
with closing(head(url, verify=False, timeout=1)) as req: # noqa: DUO123 with closing(head(url, verify=False, timeout=1)) as req: # noqa: DUO123
server = req.headers.get("Server") return req.headers.get("Server")
ssl = True if "https://" in url else False
self.init_service(host.services, ("tcp-" + port[1]), port[0])
host.services["tcp-" + port[1]]["name"] = "http"
host.services["tcp-" + port[1]]["data"] = (server, ssl)
logger.info("Port %d is open on host %s " % (port[0], host))
break # https will be the same on the same port
except Timeout: except Timeout:
logger.debug(f"Timeout while requesting headers from {url}") logger.debug(f"Timeout while requesting headers from {url}")
except ConnectionError: # Someone doesn't like us except ConnectionError: # Someone doesn't like us
logger.debug(f"Connection error while requesting headers from {url}") logger.debug(f"Connection error while requesting headers from {url}")
return True return None
def _get_open_http_ports(
allowed_http_ports: Set, port_scan_data: Dict[int, PortScanData]
) -> Iterable[int]:
open_ports = (psd.port for psd in port_scan_data.values() if psd.status == PortStatus.Open)
return (port for port in open_ports if port in allowed_http_ports)