Agent: Fix type hints mypy found in propagator.py

This commit is contained in:
vakarisz 2022-09-22 17:09:32 +03:00
parent 13f7301db9
commit 9749984640
3 changed files with 17 additions and 11 deletions

View File

@ -8,7 +8,6 @@ from typing import Callable, Dict, Sequence
from common.agent_configuration.agent_sub_configurations import ( from common.agent_configuration.agent_sub_configurations import (
NetworkScanConfiguration, NetworkScanConfiguration,
PluginConfiguration, PluginConfiguration,
ScanTargetConfiguration,
) )
from infection_monkey.i_puppet import ( from infection_monkey.i_puppet import (
FingerprintData, FingerprintData,
@ -35,7 +34,7 @@ class IPScanner:
def scan( def scan(
self, self,
addresses_to_scan: Sequence[NetworkAddress], addresses_to_scan: Sequence[NetworkAddress],
options: ScanTargetConfiguration, options: NetworkScanConfiguration,
results_callback: Callback, results_callback: Callback,
stop: Event, stop: Event,
): ):

View File

@ -2,7 +2,7 @@ import logging
from ipaddress import IPv4Interface from ipaddress import IPv4Interface
from queue import Queue from queue import Queue
from threading import Event from threading import Event
from typing import List, Sequence from typing import List, Mapping, Sequence
from common.agent_configuration import ( from common.agent_configuration import (
ExploitationConfiguration, ExploitationConfiguration,
@ -26,6 +26,7 @@ from infection_monkey.telemetry.scan_telem import ScanTelem
from infection_monkey.utils.threading import create_daemon_thread from infection_monkey.utils.threading import create_daemon_thread
from . import Exploiter, IPScanner, IPScanResults from . import Exploiter, IPScanner, IPScanResults
from .ip_scan_results import FingerprinterName, Port
logger = logging.getLogger() logger = logging.getLogger()
@ -149,8 +150,12 @@ class Propagator:
victim_host.os["type"] = ping_scan_data.os victim_host.os["type"] = ping_scan_data.os
@staticmethod @staticmethod
def _process_tcp_scan_results(victim_host: VictimHost, port_scan_data: PortScanData): def _process_tcp_scan_results(
for psd in filter(lambda psd: psd.status == PortStatus.OPEN, port_scan_data.values()): victim_host: VictimHost, port_scan_data: Mapping[Port, PortScanData]
):
for psd in filter(
lambda scan_data: scan_data.status == PortStatus.OPEN, port_scan_data.values()
):
victim_host.services[psd.service] = {} victim_host.services[psd.service] = {}
victim_host.services[psd.service]["display_name"] = "unknown(TCP)" victim_host.services[psd.service]["display_name"] = "unknown(TCP)"
victim_host.services[psd.service]["port"] = psd.port victim_host.services[psd.service]["port"] = psd.port
@ -158,7 +163,9 @@ class Propagator:
victim_host.services[psd.service]["banner"] = psd.banner victim_host.services[psd.service]["banner"] = psd.banner
@staticmethod @staticmethod
def _process_fingerprinter_results(victim_host: VictimHost, fingerprint_data: FingerprintData): def _process_fingerprinter_results(
victim_host: VictimHost, fingerprint_data: Mapping[FingerprinterName, FingerprintData]
):
for fd in fingerprint_data.values(): for fd in fingerprint_data.values():
# TODO: This logic preserves the existing behavior prior to introducing IMaster and # TODO: This logic preserves the existing behavior prior to introducing IMaster and
# IPuppet, but it is possibly flawed. Different fingerprinters may detect # IPuppet, but it is possibly flawed. Different fingerprinters may detect

View File

@ -2,7 +2,7 @@ import itertools
import logging import logging
import socket import socket
from ipaddress import IPv4Interface from ipaddress import IPv4Interface
from typing import Dict, List from typing import Dict, List, Sequence
from common.network.network_range import InvalidNetworkRangeError, NetworkRange from common.network.network_range import InvalidNetworkRangeError, NetworkRange
from infection_monkey.network import NetworkAddress from infection_monkey.network import NetworkAddress
@ -14,10 +14,10 @@ logger = logging.getLogger(__name__)
def compile_scan_target_list( def compile_scan_target_list(
local_network_interfaces: List[IPv4Interface], local_network_interfaces: Sequence[IPv4Interface],
ranges_to_scan: List[str], ranges_to_scan: Sequence[str],
inaccessible_subnets: List[str], inaccessible_subnets: Sequence[str],
blocklisted_ips: List[str], blocklisted_ips: Sequence[str],
enable_local_network_scan: bool, enable_local_network_scan: bool,
) -> List[NetworkAddress]: ) -> List[NetworkAddress]:
scan_targets = _get_ips_from_subnets_to_scan(ranges_to_scan) scan_targets = _get_ips_from_subnets_to_scan(ranges_to_scan)