Agent: Fix typing issues

This commit is contained in:
Kekoa Kaaikala 2022-08-25 18:12:48 +00:00
parent 06ae6a8b90
commit 75ba889f57
2 changed files with 7 additions and 7 deletions
monkey/infection_monkey/network_scanning

View File

@ -1,7 +1,7 @@
import itertools import itertools
import logging import logging
import socket import socket
from typing import List from typing import Any, Dict, List
from common.network.network_range import InvalidNetworkRangeError, NetworkRange from common.network.network_range import InvalidNetworkRangeError, NetworkRange
from infection_monkey.network import NetworkAddress, NetworkInterface from infection_monkey.network import NetworkAddress, NetworkInterface
@ -39,7 +39,7 @@ def compile_scan_target_list(
def _remove_redundant_targets(targets: List[NetworkAddress]) -> List[NetworkAddress]: def _remove_redundant_targets(targets: List[NetworkAddress]) -> List[NetworkAddress]:
reverse_dns = {} reverse_dns: Dict[Any, Any] = {}
for target in targets: for target in targets:
domain_name = target.domain domain_name = target.domain
ip = target.ip ip = target.ip
@ -124,7 +124,7 @@ def _get_segmentation_check_targets(
for (subnet1, subnet2) in subnet_pairs: for (subnet1, subnet2) in subnet_pairs:
if _is_segmentation_check_required(local_ips, subnet1, subnet2): if _is_segmentation_check_required(local_ips, subnet1, subnet2):
ips = _get_ips_from_ranges_to_scan(subnet2) ips = _get_ips_from_ranges_to_scan([subnet2])
ips_to_scan.extend(ips) ips_to_scan.extend(ips)
return ips_to_scan return ips_to_scan

View File

@ -3,7 +3,7 @@ import select
import socket import socket
import time import time
from pprint import pformat from pprint import pformat
from typing import Iterable, Mapping, Tuple from typing import Collection, Iterable, Mapping, Tuple
from common.utils import Timer from common.utils import Timer
from infection_monkey.i_puppet import PortScanData, PortStatus from infection_monkey.i_puppet import PortScanData, PortStatus
@ -16,7 +16,7 @@ EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, None, None)}
def scan_tcp_ports( def scan_tcp_ports(
host: str, ports_to_scan: Iterable[int], timeout: float host: str, ports_to_scan: Collection[int], timeout: float
) -> Mapping[int, PortScanData]: ) -> Mapping[int, PortScanData]:
try: try:
return _scan_tcp_ports(host, ports_to_scan, timeout) return _scan_tcp_ports(host, ports_to_scan, timeout)
@ -25,7 +25,7 @@ def scan_tcp_ports(
return EMPTY_PORT_SCAN return EMPTY_PORT_SCAN
def _scan_tcp_ports(host: str, ports_to_scan: Iterable[int], timeout: float): def _scan_tcp_ports(host: str, ports_to_scan: Collection[int], timeout: float):
open_ports = _check_tcp_ports(host, ports_to_scan, timeout) open_ports = _check_tcp_ports(host, ports_to_scan, timeout)
return _build_port_scan_data(ports_to_scan, open_ports) return _build_port_scan_data(ports_to_scan, open_ports)
@ -52,7 +52,7 @@ def _get_closed_port_data(port: int) -> PortScanData:
def _check_tcp_ports( def _check_tcp_ports(
ip: str, ports_to_scan: Iterable[int], timeout: float = DEFAULT_TIMEOUT ip: str, ports_to_scan: Collection[int], timeout: float = DEFAULT_TIMEOUT
) -> Mapping[int, str]: ) -> Mapping[int, str]:
""" """
Checks whether any of the given ports are open on a target IP. Checks whether any of the given ports are open on a target IP.