Agent: Fix PortScanData mypy issues

This commit is contained in:
Kekoa Kaaikala 2022-09-21 20:28:25 +00:00
parent d4f6c83f56
commit e40d061091
5 changed files with 49 additions and 26 deletions

View File

@ -1,10 +1,10 @@
import abc
import threading
from collections import namedtuple
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Mapping, Sequence
from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple
from common import OperatingSystem
from common.credentials import Credentials
from infection_monkey.model import VictimHost
@ -26,15 +26,37 @@ class ExploiterResultData:
propagation_success: bool = False
interrupted: bool = False
os: str = ""
info: Mapping = None
attempts: Iterable = None
info: Mapping = field(default_factory=lambda: {})
attempts: Iterable = field(default_factory=lambda: [])
error_message: str = ""
PingScanData = namedtuple("PingScanData", ["response_received", "os"])
PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"])
FingerprintData = namedtuple("FingerprintData", ["os_type", "os_version", "services"])
PostBreachData = namedtuple("PostBreachData", ["display_name", "command", "result"])
@dataclass(frozen=True)
class FingerprintData:
os_type: Optional[OperatingSystem]
os_version: str
services: Mapping = field(default_factory=lambda: {})
@dataclass(frozen=True)
class PingScanData:
response_received: bool
os: Optional[OperatingSystem]
@dataclass(frozen=True)
class PortScanData:
port: int
status: PortStatus
banner: str
service: str
@dataclass(frozen=True)
class PostBreachData:
display_name: str
command: str
result: Tuple[str, bool]
class IPuppet(metaclass=abc.ABCMeta):
@ -84,7 +106,7 @@ class IPuppet(metaclass=abc.ABCMeta):
@abc.abstractmethod
def scan_tcp_ports(
self, host: str, ports: List[int], timeout: float = 3
) -> Dict[int, PortScanData]:
) -> Mapping[int, PortScanData]:
"""
Scans a list of TCP ports on a remote host
@ -92,7 +114,7 @@ class IPuppet(metaclass=abc.ABCMeta):
:param int ports: List of TCP port numbers to scan
:param float timeout: The maximum amount of time (in seconds) to wait for a response
:return: The data collected by scanning the provided host:ports combination
:rtype: Dict[int, PortScanData]
:rtype: Mapping[int, PortScanData]
"""
@abc.abstractmethod
@ -125,6 +147,7 @@ class IPuppet(metaclass=abc.ABCMeta):
name: str,
host: VictimHost,
current_depth: int,
servers: Sequence[str],
options: Dict,
interrupt: threading.Event,
) -> ExploiterResultData:

View File

@ -12,7 +12,7 @@ from infection_monkey.network.tools import BANNER_READ, DEFAULT_TIMEOUT, tcp_por
logger = logging.getLogger(__name__)
POLL_INTERVAL = 0.5
EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, None, None)}
EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, "", "")}
def scan_tcp_ports(
@ -48,7 +48,7 @@ def _build_port_scan_data(
def _get_closed_port_data(port: int) -> PortScanData:
return PortScanData(port, PortStatus.CLOSED, None, None)
return PortScanData(port, PortStatus.CLOSED, "", "")
def _check_tcp_ports(

View File

@ -77,14 +77,14 @@ class MockPuppet(IPuppet):
) -> Dict[int, PortScanData]:
logger.debug(f"run_scan_tcp_port({host}, {ports}, {timeout})")
dot_1_results = {
22: PortScanData(22, PortStatus.CLOSED, None, None),
22: PortScanData(22, PortStatus.CLOSED, "", ""),
445: PortScanData(445, PortStatus.OPEN, "SMB BANNER", "tcp-445"),
3389: PortScanData(3389, PortStatus.OPEN, "", "tcp-3389"),
}
dot_3_results = {
22: PortScanData(22, PortStatus.OPEN, "SSH BANNER", "tcp-22"),
443: PortScanData(443, PortStatus.OPEN, "HTTPS BANNER", "tcp-443"),
3389: PortScanData(3389, PortStatus.CLOSED, "", None),
3389: PortScanData(3389, PortStatus.CLOSED, "", ""),
}
if host == DOT_1:
@ -104,7 +104,7 @@ class MockPuppet(IPuppet):
options: Dict,
) -> FingerprintData:
logger.debug(f"fingerprint({name}, {host})")
empty_fingerprint_data = FingerprintData(None, None, {})
empty_fingerprint_data = FingerprintData(None, "", {})
dot_1_results = {
"SMBFinger": FingerprintData(
@ -118,7 +118,7 @@ class MockPuppet(IPuppet):
),
"HTTPFinger": FingerprintData(
None,
None,
"",
{
"tcp-80": {"name": "http", "data": ("SERVER_HEADERS", False)},
"tcp-443": {"name": "http", "data": ("SERVER_HEADERS_2", True)},
@ -248,4 +248,4 @@ class MockPuppet(IPuppet):
def _get_empty_results(port: int):
return PortScanData(port, PortStatus.CLOSED, None, None)
return PortScanData(port, PortStatus.CLOSED, "", "")

View File

@ -35,12 +35,12 @@ def mock_victim_host_factory():
return MockVictimHostFactory()
empty_fingerprint_data = FingerprintData(None, None, {})
empty_fingerprint_data = FingerprintData(None, "", {})
dot_1_scan_results = IPScanResults(
PingScanData(True, "windows"),
{
22: PortScanData(22, PortStatus.CLOSED, None, None),
22: PortScanData(22, PortStatus.CLOSED, "", ""),
445: PortScanData(445, PortStatus.OPEN, "SMB BANNER", "tcp-445"),
3389: PortScanData(3389, PortStatus.OPEN, "", "tcp-3389"),
},
@ -56,7 +56,7 @@ dot_3_scan_results = IPScanResults(
{
22: PortScanData(22, PortStatus.OPEN, "SSH BANNER", "tcp-22"),
443: PortScanData(443, PortStatus.OPEN, "HTTPS BANNER", "tcp-443"),
3389: PortScanData(3389, PortStatus.CLOSED, "", None),
3389: PortScanData(3389, PortStatus.CLOSED, "", ""),
},
{
"SSHFinger": FingerprintData(
@ -64,7 +64,7 @@ dot_3_scan_results = IPScanResults(
),
"HTTPFinger": FingerprintData(
None,
None,
"",
{
"tcp-80": {"name": "http", "data": ("SERVER_HEADERS", False)},
"tcp-443": {"name": "http", "data": ("SERVER_HEADERS_2", True)},
@ -77,9 +77,9 @@ dot_3_scan_results = IPScanResults(
dead_host_scan_results = IPScanResults(
PingScanData(False, None),
{
22: PortScanData(22, PortStatus.CLOSED, None, None),
443: PortScanData(443, PortStatus.CLOSED, None, None),
3389: PortScanData(3389, PortStatus.CLOSED, "", None),
22: PortScanData(22, PortStatus.CLOSED, "", ""),
443: PortScanData(443, PortStatus.CLOSED, "", ""),
3389: PortScanData(3389, PortStatus.CLOSED, "", ""),
},
{},
)

View File

@ -34,7 +34,7 @@ def test_tcp_successful(monkeypatch, patch_check_tcp_ports, open_ports_data):
for port in closed_ports:
assert port_scan_data[port].port == port
assert port_scan_data[port].status == PortStatus.CLOSED
assert port_scan_data[port].banner is None
assert not port_scan_data[port].banner
@pytest.mark.parametrize("open_ports_data", [{}])