diff --git a/monkey/common/network/network_range.py b/monkey/common/network/network_range.py index 1ab37ba57..326e365ce 100644 --- a/monkey/common/network/network_range.py +++ b/monkey/common/network/network_range.py @@ -4,10 +4,15 @@ import random import socket import struct from abc import ABCMeta, abstractmethod +from typing import List, Tuple logger = logging.getLogger(__name__) +class InvalidNetworkRangeError(Exception): + """Raise when invalid network range is provided""" + + class NetworkRange(object, metaclass=ABCMeta): def __init__(self, shuffle=True): self._shuffle = shuffle @@ -44,23 +49,50 @@ class NetworkRange(object, metaclass=ABCMeta): if not address_str: # Empty string return None address_str = address_str.strip() + if address_str.endswith("/32"): + address_str = address_str[:-3] if NetworkRange.check_if_range(address_str): return IpRange(ip_range=address_str) - if -1 != address_str.find("/"): + if "/" in address_str: return CidrRange(cidr_range=address_str) return SingleIpRange(ip_address=address_str) + @staticmethod + def filter_invalid_ranges(ranges: List[str], error_msg: str) -> List[str]: + valid_ranges = [] + for target_range in ranges: + try: + NetworkRange.validate_range(target_range) + except InvalidNetworkRangeError as e: + logger.error(f"{error_msg} {e}") + continue + valid_ranges.append(target_range) + return valid_ranges + + @staticmethod + def validate_range(address_str: str): + try: + NetworkRange.get_range_obj(address_str) + except (ValueError, OSError) as e: + raise InvalidNetworkRangeError(e) + @staticmethod def check_if_range(address_str): if -1 != address_str.find("-"): - ips = address_str.split("-") try: - ipaddress.ip_address(ips[0]) and ipaddress.ip_address(ips[1]) + NetworkRange._range_to_ips(address_str) except ValueError: return False return True return False + @staticmethod + def _range_to_ips(ip_range: str) -> Tuple[str, str]: + ips = ip_range.split("-") + ips = [ip.strip() for ip in ips] + ips = sorted(ips, key=lambda ip: socket.inet_aton(ip)) + return ips[0], ips[1] + @staticmethod def _ip_to_number(address): return struct.unpack(">L", socket.inet_aton(address))[0] @@ -94,12 +126,7 @@ class IpRange(NetworkRange): def __init__(self, ip_range=None, lower_end_ip=None, higher_end_ip=None, shuffle=True): super(IpRange, self).__init__(shuffle=shuffle) if ip_range is not None: - addresses = ip_range.split("-") - if len(addresses) != 2: - raise ValueError( - "Illegal IP range format: %s. Format is 192.168.0.5-192.168.0.20" % ip_range - ) - self._lower_end_ip, self._higher_end_ip = [x.strip() for x in addresses] + self._lower_end_ip, self._higher_end_ip = IpRange._range_to_ips(ip_range) elif (lower_end_ip is not None) and (higher_end_ip is not None): self._lower_end_ip = lower_end_ip.strip() self._higher_end_ip = higher_end_ip.strip() @@ -163,7 +190,7 @@ class SingleIpRange(NetworkRange): :return: A tuple in format (IP, domain_name). Eg. (192.168.55.1, www.google.com) """ # The most common use case is to enter ip/range into "Scan IP/subnet list" - domain_name = "" + domain_name = None # Try casting user's input as IP try: @@ -174,10 +201,9 @@ class SingleIpRange(NetworkRange): ip = socket.gethostbyname(string_) domain_name = string_ except socket.error: - logger.error( + raise ValueError( "Your specified host: {} is not found as a domain name and" " it's not an IP address".format(string_) ) - return None, string_ # If a string_ was entered instead of IP we presume that it was domain name and translate it return ip, domain_name diff --git a/monkey/infection_monkey/network/network_scanner.py b/monkey/infection_monkey/network/network_scanner.py index 340763957..e8df06bd4 100644 --- a/monkey/infection_monkey/network/network_scanner.py +++ b/monkey/infection_monkey/network/network_scanner.py @@ -40,6 +40,8 @@ class NetworkScanner(object): self._ranges += self._get_inaccessible_subnets_ips() logger.info("Base local networks to scan are: %r", self._ranges) + # TODO remove afret agent refactoring, + # it's already handled in network.scan_target_generator._get_inaccessible_subnets_ips def _get_inaccessible_subnets_ips(self): """ For each of the machine's IPs, checks if it's in one of the subnets specified in the @@ -109,6 +111,8 @@ class NetworkScanner(object): return @staticmethod + # TODO remove afret agent refactoring, + # it's already handled in network.scan_target_generator._is_any_ip_in_subnet def _is_any_ip_in_subnet(ip_addresses, subnet_str): for ip_address in ip_addresses: if NetworkRange.get_range_obj(subnet_str).is_in_range(ip_address): diff --git a/monkey/infection_monkey/network/scan_target_generator.py b/monkey/infection_monkey/network/scan_target_generator.py new file mode 100644 index 000000000..6cec82223 --- /dev/null +++ b/monkey/infection_monkey/network/scan_target_generator.py @@ -0,0 +1,147 @@ +import itertools +import logging +import socket +from collections import namedtuple +from typing import List + +from common.network.network_range import InvalidNetworkRangeError, NetworkRange + +NetworkInterface = namedtuple("NetworkInterface", ("address", "netmask")) +NetworkAddress = namedtuple("NetworkAddress", ("ip", "domain")) + +logger = logging.getLogger(__name__) + + +def compile_scan_target_list( + local_network_interfaces: List[NetworkInterface], + ranges_to_scan: List[str], + inaccessible_subnets: List[str], + blocklisted_ips: List[str], + enable_local_network_scan: bool, +) -> List[NetworkAddress]: + scan_targets = _get_ips_from_ranges_to_scan(ranges_to_scan) + + if enable_local_network_scan: + scan_targets.extend(_get_ips_to_scan_from_local_interface(local_network_interfaces)) + + if inaccessible_subnets: + inaccessible_subnets = _get_segmentation_check_targets( + inaccessible_subnets, local_network_interfaces + ) + scan_targets.extend(inaccessible_subnets) + + scan_targets = _remove_interface_ips(scan_targets, local_network_interfaces) + scan_targets = _remove_blocklisted_ips(scan_targets, blocklisted_ips) + scan_targets = _remove_redundant_targets(scan_targets) + scan_targets.sort(key=lambda network_address: socket.inet_aton(network_address.ip)) + + return scan_targets + + +def _remove_redundant_targets(targets: List[NetworkAddress]) -> List[NetworkAddress]: + reverse_dns = {} + for target in targets: + domain_name = target.domain + ip = target.ip + if ip not in reverse_dns or (reverse_dns[ip] is None and domain_name is not None): + reverse_dns[ip] = domain_name + return [NetworkAddress(key, value) for (key, value) in reverse_dns.items()] + + +def _range_to_addresses(range_obj: NetworkRange) -> List[NetworkAddress]: + addresses = [] + for address in range_obj: + if hasattr(range_obj, "domain_name"): + addresses.append(NetworkAddress(address, range_obj.domain_name)) + else: + addresses.append(NetworkAddress(address, None)) + return addresses + + +def _get_ips_from_ranges_to_scan(ranges_to_scan: List[str]) -> List[NetworkAddress]: + scan_targets = [] + + ranges_to_scan = NetworkRange.filter_invalid_ranges( + ranges_to_scan, "Bad network range input for targets to scan:" + ) + + network_ranges = [NetworkRange.get_range_obj(_range) for _range in ranges_to_scan] + + for _range in network_ranges: + scan_targets.extend(_range_to_addresses(_range)) + return scan_targets + + +def _get_ips_to_scan_from_local_interface( + interfaces: List[NetworkInterface], +) -> List[NetworkAddress]: + ranges = [f"{interface.address}{interface.netmask}" for interface in interfaces] + + ranges = NetworkRange.filter_invalid_ranges( + ranges, "Local network interface returns an invalid IP:" + ) + return _get_ips_from_ranges_to_scan(ranges) + + +def _remove_interface_ips( + scan_targets: List[NetworkAddress], interfaces: List[NetworkInterface] +) -> List[NetworkAddress]: + interface_ips = [interface.address for interface in interfaces] + return _remove_ips_from_scan_targets(scan_targets, interface_ips) + + +def _remove_blocklisted_ips( + scan_targets: List[NetworkAddress], blocked_ips: List[str] +) -> List[NetworkAddress]: + filtered_blocked_ips = NetworkRange.filter_invalid_ranges( + blocked_ips, "Invalid blocked IP provided:" + ) + if len(filtered_blocked_ips) != len(blocked_ips): + raise InvalidNetworkRangeError("Received an invalid blocked IP. Aborting just in case.") + return _remove_ips_from_scan_targets(scan_targets, filtered_blocked_ips) + + +def _remove_ips_from_scan_targets( + scan_targets: List[NetworkAddress], ips_to_remove: List[str] +) -> List[NetworkAddress]: + ips_to_remove_set = set(ips_to_remove) + return [address for address in scan_targets if address.ip not in ips_to_remove_set] + + +def _get_segmentation_check_targets( + inaccessible_subnets: List[str], local_interfaces: List[NetworkInterface] +) -> List[NetworkAddress]: + ips_to_scan = [] + local_ips = [interface.address for interface in local_interfaces] + + local_ips = NetworkRange.filter_invalid_ranges(local_ips, "Invalid local IP found: ") + inaccessible_subnets = NetworkRange.filter_invalid_ranges( + inaccessible_subnets, "Invalid segmentation scan target: " + ) + + inaccessible_subnets = _convert_to_range_object(inaccessible_subnets) + subnet_pairs = itertools.product(inaccessible_subnets, inaccessible_subnets) + + for (subnet1, subnet2) in subnet_pairs: + if _is_segmentation_check_required(local_ips, subnet1, subnet2): + ips = _get_ips_from_ranges_to_scan(subnet2) + ips_to_scan.extend(ips) + + return ips_to_scan + + +def _convert_to_range_object(subnets: List[str]) -> List[NetworkRange]: + return [NetworkRange.get_range_obj(subnet) for subnet in subnets] + + +def _is_segmentation_check_required( + local_ips: List[str], subnet1: NetworkRange, subnet2: NetworkRange +): + return _is_any_ip_in_subnet(local_ips, subnet1) and not _is_any_ip_in_subnet(local_ips, subnet2) + + +def _is_any_ip_in_subnet(ip_addresses: List[str], subnet: NetworkRange): + for ip_address in ip_addresses: + if subnet.is_in_range(ip_address): + return True + return False diff --git a/monkey/tests/unit_tests/common/network/test_network_range.py b/monkey/tests/unit_tests/common/network/test_network_range.py new file mode 100644 index 000000000..0abb793d1 --- /dev/null +++ b/monkey/tests/unit_tests/common/network/test_network_range.py @@ -0,0 +1,35 @@ +from common.network.network_range import NetworkRange + + +def test_range_filtering(): + invalid_ranges = [ + # Invalid IP segment + "172.60.999.109", + "172.60.-1.109", + "172.60.999.109 - 172.60.1.109", + "172.60.999.109/32", + "172.60.999.109/24", + # Invalid CIDR + "172.60.1.109/33", + "172.60.1.109/-1", + # Typos + "172.60.9.109 -t 172.60.1.109", + "172.60..9.109", + "172.60,9.109", + " 172.60 .9.109 ", + ] + + valid_ranges = [ + " 172.60.9.109 ", + "172.60.9.109 - 172.60.1.109", + "172.60.9.109- 172.60.1.109", + "0.0.0.0", + "localhost", + ] + + invalid_ranges.extend(valid_ranges) + + remaining = NetworkRange.filter_invalid_ranges(invalid_ranges, "Test error:") + for _range in remaining: + assert _range in valid_ranges + assert len(remaining) == len(valid_ranges) diff --git a/monkey/tests/unit_tests/infection_monkey/model/test_victim_host_generator.py b/monkey/tests/unit_tests/infection_monkey/model/test_victim_host_generator.py index c60992fee..0133102eb 100644 --- a/monkey/tests/unit_tests/infection_monkey/model/test_victim_host_generator.py +++ b/monkey/tests/unit_tests/infection_monkey/model/test_victim_host_generator.py @@ -39,8 +39,3 @@ class TestVictimHostGenerator(TestCase): victims = list(generator.generate_victims_from_range(self.local_host_range)) self.assertEqual(len(victims), 1) self.assertEqual(victims[0].domain_name, "localhost") - - # don't generate for other victims - victims = list(generator.generate_victims_from_range(self.random_single_ip_range)) - self.assertEqual(len(victims), 1) - self.assertEqual(victims[0].domain_name, "") diff --git a/monkey/tests/unit_tests/infection_monkey/network/test_scan_target_generator.py b/monkey/tests/unit_tests/infection_monkey/network/test_scan_target_generator.py new file mode 100644 index 000000000..03febe44c --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/network/test_scan_target_generator.py @@ -0,0 +1,483 @@ +from itertools import chain + +import pytest + +from common.network.network_range import InvalidNetworkRangeError +from infection_monkey.network.scan_target_generator import ( + NetworkAddress, + NetworkInterface, + compile_scan_target_list, +) + + +def compile_ranges_only(ranges): + return compile_scan_target_list( + local_network_interfaces=[], + ranges_to_scan=ranges, + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + +def test_single_subnet(): + scan_targets = compile_ranges_only(["10.0.0.0/24"]) + + assert len(scan_targets) == 255 + + for i in range(0, 255): + assert NetworkAddress(f"10.0.0.{i}", None) in scan_targets + + +@pytest.mark.parametrize("single_ip", ["10.0.0.2", "10.0.0.2/32", "10.0.0.2-10.0.0.2"]) +def test_single_ip(single_ip): + print(single_ip) + scan_targets = compile_ranges_only([single_ip]) + + assert len(scan_targets) == 1 + assert NetworkAddress("10.0.0.2", None) in scan_targets + assert NetworkAddress("10.0.0.2", None) == scan_targets[0] + + +def test_multiple_subnet(): + scan_targets = compile_ranges_only(["10.0.0.0/24", "192.168.56.8/29"]) + + assert len(scan_targets) == 262 + + for i in range(0, 255): + assert NetworkAddress(f"10.0.0.{i}", None) in scan_targets + + for i in range(8, 15): + assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets + + +def test_middle_of_range_subnet(): + scan_targets = compile_ranges_only(["192.168.56.4/29"]) + + assert len(scan_targets) == 7 + + for i in range(0, 7): + assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets + + +@pytest.mark.parametrize( + "ip_range", + ["192.168.56.25-192.168.56.33", "192.168.56.25 - 192.168.56.33", "192.168.56.33-192.168.56.25"], +) +def test_ip_range(ip_range): + scan_targets = compile_ranges_only([ip_range]) + + assert len(scan_targets) == 9 + + for i in range(25, 34): + assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets + + +def test_no_duplicates(): + scan_targets = compile_ranges_only(["192.168.56.0/29", "192.168.56.2", "192.168.56.4"]) + + assert len(scan_targets) == 7 + + for i in range(0, 7): + assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets + + +def test_blocklisted_ips(): + blocklisted_ips = ["10.0.0.5", "10.0.0.32", "10.0.0.119", "192.168.1.33"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=[], + ranges_to_scan=["10.0.0.0/24"], + inaccessible_subnets=[], + blocklisted_ips=blocklisted_ips, + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 252 + for blocked_ip in blocklisted_ips: + assert blocked_ip not in scan_targets + + +@pytest.mark.parametrize("ranges_to_scan", [["10.0.0.5"], []]) +def test_only_ip_blocklisted(ranges_to_scan): + blocklisted_ips = ["10.0.0.5"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=[], + ranges_to_scan=ranges_to_scan, + inaccessible_subnets=[], + blocklisted_ips=blocklisted_ips, + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 0 + + +def test_local_network_interface_ips_removed_from_targets(): + local_network_interfaces = [ + NetworkInterface("10.0.0.5", "/24"), + NetworkInterface("10.0.0.32", "/24"), + NetworkInterface("10.0.0.119", "/24"), + NetworkInterface("192.168.1.33", "/24"), + ] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=["10.0.0.0/24"], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 252 + for interface in local_network_interfaces: + assert interface.address not in scan_targets + + +def test_no_redundant_targets(): + local_network_interfaces = [ + NetworkInterface("10.0.0.5", "/24"), + ] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=["127.0.0.0", "127.0.0.1", "localhost"], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 2 + assert NetworkAddress(ip="127.0.0.0", domain=None) in scan_targets + assert NetworkAddress(ip="127.0.0.1", domain="localhost") in scan_targets + + +@pytest.mark.parametrize("ranges_to_scan", [["10.0.0.5"], []]) +def test_only_scan_ip_is_local(ranges_to_scan): + local_network_interfaces = [ + NetworkInterface("10.0.0.5", "/24"), + NetworkInterface("10.0.0.32", "/24"), + NetworkInterface("10.0.0.119", "/24"), + NetworkInterface("192.168.1.33", "/24"), + ] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=ranges_to_scan, + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 0 + + +def test_local_network_interface_ips_and_blocked_ips_removed_from_targets(): + local_network_interfaces = [ + NetworkInterface("10.0.0.5", "/24"), + NetworkInterface("10.0.0.32", "/24"), + NetworkInterface("10.0.0.119", "/24"), + NetworkInterface("192.168.1.33", "/24"), + ] + blocked_ips = ["10.0.0.63", "192.168.1.77", "0.0.0.0"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=["10.0.0.0/24", "192.168.1.0/24"], + inaccessible_subnets=[], + blocklisted_ips=blocked_ips, + enable_local_network_scan=False, + ) + + assert len(scan_targets) == (2 * (256 - 1)) - len(local_network_interfaces) - ( + len(blocked_ips) - 1 + ) + + for interface in local_network_interfaces: + assert interface.address not in scan_targets + + for ip in blocked_ips: + assert ip not in scan_targets + + +def test_local_subnet_added(): + local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=True, + ) + + assert len(scan_targets) == 254 + + for ip in chain(range(0, 5), range(6, 255)): + assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets + + +def test_multiple_local_subnets_added(): + local_network_interfaces = [ + NetworkInterface("10.0.0.5", "/24"), + NetworkInterface("172.33.66.99", "/24"), + ] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=True, + ) + + assert len(scan_targets) == 2 * (255 - 1) + + for ip in chain(range(0, 5), range(6, 255)): + assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets + + for ip in chain(range(0, 99), range(100, 255)): + assert NetworkAddress(f"172.33.66.{ip}", None) in scan_targets + + +def test_blocklisted_ips_missing_from_local_subnets(): + local_network_interfaces = [ + NetworkInterface("10.0.0.5", "/24"), + NetworkInterface("172.33.66.99", "/24"), + ] + blocklisted_ips = ["10.0.0.12", "10.0.0.13", "172.33.66.25"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=[], + blocklisted_ips=blocklisted_ips, + enable_local_network_scan=True, + ) + + assert len(scan_targets) == 2 * (255 - 1) - len(blocklisted_ips) + + for ip in blocklisted_ips: + assert ip not in scan_targets + + +def test_local_subnets_and_ranges_added(): + local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=["172.33.66.40/30"], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=True, + ) + + assert len(scan_targets) == 254 + 3 + + for ip in range(0, 5): + assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets + for ip in range(6, 255): + assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets + + for ip in range(40, 43): + assert NetworkAddress(f"172.33.66.{ip}", None) in scan_targets + + +def test_local_network_interfaces_specified_but_disabled(): + local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=["172.33.66.40/30"], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 3 + + for ip in range(40, 43): + assert NetworkAddress(f"172.33.66.{ip}", None) in scan_targets + + +def test_local_network_interfaces_subnet_masks(): + local_network_interfaces = [ + NetworkInterface("172.60.145.109", "/30"), + NetworkInterface("172.60.145.144", "/30"), + ] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=[], + blocklisted_ips=[], + enable_local_network_scan=True, + ) + + assert len(scan_targets) == 4 + + for ip in [108, 110, 145, 146]: + assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets + + +def test_segmentation_targets(): + local_network_interfaces = [NetworkInterface("172.60.145.109", "/24")] + + inaccessible_subnets = ["172.60.145.108/30", "172.60.145.144/30"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 3 + + for ip in [144, 145, 146]: + assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets + + +def test_segmentation_clash_with_blocked(): + local_network_interfaces = [ + NetworkInterface("172.60.145.109", "/30"), + ] + + inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"] + + blocked = ["172.60.145.148", "172.60.145.149", "172.60.145.150"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=blocked, + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 0 + + +def test_segmentation_clash_with_targets(): + local_network_interfaces = [ + NetworkInterface("172.60.145.109", "/30"), + ] + + inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"] + + targets = ["172.60.145.149", "172.60.145.150"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=targets, + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 3 + + for ip in [148, 149, 150]: + assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets + + +def test_segmentation_one_network(): + local_network_interfaces = [ + NetworkInterface("172.60.145.109", "/30"), + ] + + inaccessible_subnets = ["172.60.145.1/24"] + + targets = ["172.60.145.149/30"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=targets, + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 3 + + +def test_segmentation_inaccessible_networks(): + local_network_interfaces = [ + NetworkInterface("172.60.1.1", "/24"), + NetworkInterface("172.60.2.1", "/24"), + ] + + inaccessible_subnets = ["172.60.144.1/24", "172.60.146.1/24"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=[], + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 0 + + +def test_invalid_inputs(): + local_network_interfaces = [ + NetworkInterface("172.60.999.109", "/30"), + NetworkInterface("172.60.145.109", "/30"), + ] + + inaccessible_subnets = [ + "172.60.145.1 - 172.60.145.1111", + "172.60.147.888/30" "172.60.147.8/30", + "172.60.147.148/30", + ] + + targets = ["172.60.145.149/33", "1.-1.1.1", "1.a.2.2", "172.60.145.151/30"] + + scan_targets = compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=targets, + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=[], + enable_local_network_scan=False, + ) + + assert len(scan_targets) == 3 + + for ip in [148, 149, 150]: + assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets + + +def test_invalid_blocklisted_ip(): + local_network_interfaces = [NetworkInterface("172.60.145.109", "/30")] + + inaccessible_subnets = ["172.60.147.8/30", "172.60.147.148/30"] + + targets = ["172.60.145.151/30"] + + blocklisted = ["172.60.145.153", "172.60.145.753"] + + with pytest.raises(InvalidNetworkRangeError): + compile_scan_target_list( + local_network_interfaces=local_network_interfaces, + ranges_to_scan=targets, + inaccessible_subnets=inaccessible_subnets, + blocklisted_ips=blocklisted, + enable_local_network_scan=False, + ) + + +def test_sorted_scan_targets(): + expected_results = [f"10.1.0.{i}" for i in range(0, 255)] + expected_results.extend([f"10.2.0.{i}" for i in range(0, 255)]) + expected_results.extend([f"10.10.0.{i}" for i in range(0, 255)]) + expected_results.extend([f"10.20.0.{i}" for i in range(0, 255)]) + + scan_targets = compile_scan_target_list( + [], ["10.1.0.0/24", "10.10.0.0/24", "10.20.0.0/24", "10.2.0.0/24"], [], [], False + ) + + actual_results = [network_address.ip for network_address in scan_targets] + + assert expected_results == actual_results