From 974998464064c41329cf39895235e11034a59a07 Mon Sep 17 00:00:00 2001 From: vakarisz Date: Thu, 22 Sep 2022 17:09:32 +0300 Subject: [PATCH] Agent: Fix type hints mypy found in propagator.py --- monkey/infection_monkey/master/ip_scanner.py | 3 +-- monkey/infection_monkey/master/propagator.py | 15 +++++++++++---- .../network_scanning/scan_target_generator.py | 10 +++++----- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/monkey/infection_monkey/master/ip_scanner.py b/monkey/infection_monkey/master/ip_scanner.py index bbabd7ae7..6f25f1f4b 100644 --- a/monkey/infection_monkey/master/ip_scanner.py +++ b/monkey/infection_monkey/master/ip_scanner.py @@ -8,7 +8,6 @@ from typing import Callable, Dict, Sequence from common.agent_configuration.agent_sub_configurations import ( NetworkScanConfiguration, PluginConfiguration, - ScanTargetConfiguration, ) from infection_monkey.i_puppet import ( FingerprintData, @@ -35,7 +34,7 @@ class IPScanner: def scan( self, addresses_to_scan: Sequence[NetworkAddress], - options: ScanTargetConfiguration, + options: NetworkScanConfiguration, results_callback: Callback, stop: Event, ): diff --git a/monkey/infection_monkey/master/propagator.py b/monkey/infection_monkey/master/propagator.py index 74a0a41ca..9b14bef2a 100644 --- a/monkey/infection_monkey/master/propagator.py +++ b/monkey/infection_monkey/master/propagator.py @@ -2,7 +2,7 @@ import logging from ipaddress import IPv4Interface from queue import Queue from threading import Event -from typing import List, Sequence +from typing import List, Mapping, Sequence from common.agent_configuration import ( ExploitationConfiguration, @@ -26,6 +26,7 @@ from infection_monkey.telemetry.scan_telem import ScanTelem from infection_monkey.utils.threading import create_daemon_thread from . import Exploiter, IPScanner, IPScanResults +from .ip_scan_results import FingerprinterName, Port logger = logging.getLogger() @@ -149,8 +150,12 @@ class Propagator: victim_host.os["type"] = ping_scan_data.os @staticmethod - def _process_tcp_scan_results(victim_host: VictimHost, port_scan_data: PortScanData): - for psd in filter(lambda psd: psd.status == PortStatus.OPEN, port_scan_data.values()): + def _process_tcp_scan_results( + victim_host: VictimHost, port_scan_data: Mapping[Port, PortScanData] + ): + for psd in filter( + lambda scan_data: scan_data.status == PortStatus.OPEN, port_scan_data.values() + ): victim_host.services[psd.service] = {} victim_host.services[psd.service]["display_name"] = "unknown(TCP)" victim_host.services[psd.service]["port"] = psd.port @@ -158,7 +163,9 @@ class Propagator: victim_host.services[psd.service]["banner"] = psd.banner @staticmethod - def _process_fingerprinter_results(victim_host: VictimHost, fingerprint_data: FingerprintData): + def _process_fingerprinter_results( + victim_host: VictimHost, fingerprint_data: Mapping[FingerprinterName, FingerprintData] + ): for fd in fingerprint_data.values(): # TODO: This logic preserves the existing behavior prior to introducing IMaster and # IPuppet, but it is possibly flawed. Different fingerprinters may detect diff --git a/monkey/infection_monkey/network_scanning/scan_target_generator.py b/monkey/infection_monkey/network_scanning/scan_target_generator.py index 0efbaee1c..6f66be507 100644 --- a/monkey/infection_monkey/network_scanning/scan_target_generator.py +++ b/monkey/infection_monkey/network_scanning/scan_target_generator.py @@ -2,7 +2,7 @@ import itertools import logging import socket from ipaddress import IPv4Interface -from typing import Dict, List +from typing import Dict, List, Sequence from common.network.network_range import InvalidNetworkRangeError, NetworkRange from infection_monkey.network import NetworkAddress @@ -14,10 +14,10 @@ logger = logging.getLogger(__name__) def compile_scan_target_list( - local_network_interfaces: List[IPv4Interface], - ranges_to_scan: List[str], - inaccessible_subnets: List[str], - blocklisted_ips: List[str], + local_network_interfaces: Sequence[IPv4Interface], + ranges_to_scan: Sequence[str], + inaccessible_subnets: Sequence[str], + blocklisted_ips: Sequence[str], enable_local_network_scan: bool, ) -> List[NetworkAddress]: scan_targets = _get_ips_from_subnets_to_scan(ranges_to_scan)