Merge pull request #1659 from guardicore/1597-implement-scan-target-generator

1597 implement scan target generator
This commit is contained in:
Mike Salvatore 2021-12-16 09:11:23 -05:00 committed by GitHub
commit fd29393ce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 707 additions and 17 deletions

View File

@ -4,10 +4,15 @@ import random
import socket import socket
import struct import struct
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from typing import List, Tuple
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class InvalidNetworkRangeError(Exception):
"""Raise when invalid network range is provided"""
class NetworkRange(object, metaclass=ABCMeta): class NetworkRange(object, metaclass=ABCMeta):
def __init__(self, shuffle=True): def __init__(self, shuffle=True):
self._shuffle = shuffle self._shuffle = shuffle
@ -44,23 +49,50 @@ class NetworkRange(object, metaclass=ABCMeta):
if not address_str: # Empty string if not address_str: # Empty string
return None return None
address_str = address_str.strip() address_str = address_str.strip()
if address_str.endswith("/32"):
address_str = address_str[:-3]
if NetworkRange.check_if_range(address_str): if NetworkRange.check_if_range(address_str):
return IpRange(ip_range=address_str) return IpRange(ip_range=address_str)
if -1 != address_str.find("/"): if "/" in address_str:
return CidrRange(cidr_range=address_str) return CidrRange(cidr_range=address_str)
return SingleIpRange(ip_address=address_str) return SingleIpRange(ip_address=address_str)
@staticmethod
def filter_invalid_ranges(ranges: List[str], error_msg: str) -> List[str]:
valid_ranges = []
for target_range in ranges:
try:
NetworkRange.validate_range(target_range)
except InvalidNetworkRangeError as e:
logger.error(f"{error_msg} {e}")
continue
valid_ranges.append(target_range)
return valid_ranges
@staticmethod
def validate_range(address_str: str):
try:
NetworkRange.get_range_obj(address_str)
except (ValueError, OSError) as e:
raise InvalidNetworkRangeError(e)
@staticmethod @staticmethod
def check_if_range(address_str): def check_if_range(address_str):
if -1 != address_str.find("-"): if -1 != address_str.find("-"):
ips = address_str.split("-")
try: try:
ipaddress.ip_address(ips[0]) and ipaddress.ip_address(ips[1]) NetworkRange._range_to_ips(address_str)
except ValueError: except ValueError:
return False return False
return True return True
return False return False
@staticmethod
def _range_to_ips(ip_range: str) -> Tuple[str, str]:
ips = ip_range.split("-")
ips = [ip.strip() for ip in ips]
ips = sorted(ips, key=lambda ip: socket.inet_aton(ip))
return ips[0], ips[1]
@staticmethod @staticmethod
def _ip_to_number(address): def _ip_to_number(address):
return struct.unpack(">L", socket.inet_aton(address))[0] return struct.unpack(">L", socket.inet_aton(address))[0]
@ -94,12 +126,7 @@ class IpRange(NetworkRange):
def __init__(self, ip_range=None, lower_end_ip=None, higher_end_ip=None, shuffle=True): def __init__(self, ip_range=None, lower_end_ip=None, higher_end_ip=None, shuffle=True):
super(IpRange, self).__init__(shuffle=shuffle) super(IpRange, self).__init__(shuffle=shuffle)
if ip_range is not None: if ip_range is not None:
addresses = ip_range.split("-") self._lower_end_ip, self._higher_end_ip = IpRange._range_to_ips(ip_range)
if len(addresses) != 2:
raise ValueError(
"Illegal IP range format: %s. Format is 192.168.0.5-192.168.0.20" % ip_range
)
self._lower_end_ip, self._higher_end_ip = [x.strip() for x in addresses]
elif (lower_end_ip is not None) and (higher_end_ip is not None): elif (lower_end_ip is not None) and (higher_end_ip is not None):
self._lower_end_ip = lower_end_ip.strip() self._lower_end_ip = lower_end_ip.strip()
self._higher_end_ip = higher_end_ip.strip() self._higher_end_ip = higher_end_ip.strip()
@ -163,7 +190,7 @@ class SingleIpRange(NetworkRange):
:return: A tuple in format (IP, domain_name). Eg. (192.168.55.1, www.google.com) :return: A tuple in format (IP, domain_name). Eg. (192.168.55.1, www.google.com)
""" """
# The most common use case is to enter ip/range into "Scan IP/subnet list" # The most common use case is to enter ip/range into "Scan IP/subnet list"
domain_name = "" domain_name = None
# Try casting user's input as IP # Try casting user's input as IP
try: try:
@ -174,10 +201,9 @@ class SingleIpRange(NetworkRange):
ip = socket.gethostbyname(string_) ip = socket.gethostbyname(string_)
domain_name = string_ domain_name = string_
except socket.error: except socket.error:
logger.error( raise ValueError(
"Your specified host: {} is not found as a domain name and" "Your specified host: {} is not found as a domain name and"
" it's not an IP address".format(string_) " it's not an IP address".format(string_)
) )
return None, string_
# If a string_ was entered instead of IP we presume that it was domain name and translate it # If a string_ was entered instead of IP we presume that it was domain name and translate it
return ip, domain_name return ip, domain_name

View File

@ -40,6 +40,8 @@ class NetworkScanner(object):
self._ranges += self._get_inaccessible_subnets_ips() self._ranges += self._get_inaccessible_subnets_ips()
logger.info("Base local networks to scan are: %r", self._ranges) logger.info("Base local networks to scan are: %r", self._ranges)
# TODO remove afret agent refactoring,
# it's already handled in network.scan_target_generator._get_inaccessible_subnets_ips
def _get_inaccessible_subnets_ips(self): def _get_inaccessible_subnets_ips(self):
""" """
For each of the machine's IPs, checks if it's in one of the subnets specified in the For each of the machine's IPs, checks if it's in one of the subnets specified in the
@ -109,6 +111,8 @@ class NetworkScanner(object):
return return
@staticmethod @staticmethod
# TODO remove afret agent refactoring,
# it's already handled in network.scan_target_generator._is_any_ip_in_subnet
def _is_any_ip_in_subnet(ip_addresses, subnet_str): def _is_any_ip_in_subnet(ip_addresses, subnet_str):
for ip_address in ip_addresses: for ip_address in ip_addresses:
if NetworkRange.get_range_obj(subnet_str).is_in_range(ip_address): if NetworkRange.get_range_obj(subnet_str).is_in_range(ip_address):

View File

@ -0,0 +1,147 @@
import itertools
import logging
import socket
from collections import namedtuple
from typing import List
from common.network.network_range import InvalidNetworkRangeError, NetworkRange
NetworkInterface = namedtuple("NetworkInterface", ("address", "netmask"))
NetworkAddress = namedtuple("NetworkAddress", ("ip", "domain"))
logger = logging.getLogger(__name__)
def compile_scan_target_list(
local_network_interfaces: List[NetworkInterface],
ranges_to_scan: List[str],
inaccessible_subnets: List[str],
blocklisted_ips: List[str],
enable_local_network_scan: bool,
) -> List[NetworkAddress]:
scan_targets = _get_ips_from_ranges_to_scan(ranges_to_scan)
if enable_local_network_scan:
scan_targets.extend(_get_ips_to_scan_from_local_interface(local_network_interfaces))
if inaccessible_subnets:
inaccessible_subnets = _get_segmentation_check_targets(
inaccessible_subnets, local_network_interfaces
)
scan_targets.extend(inaccessible_subnets)
scan_targets = _remove_interface_ips(scan_targets, local_network_interfaces)
scan_targets = _remove_blocklisted_ips(scan_targets, blocklisted_ips)
scan_targets = _remove_redundant_targets(scan_targets)
scan_targets.sort(key=lambda network_address: socket.inet_aton(network_address.ip))
return scan_targets
def _remove_redundant_targets(targets: List[NetworkAddress]) -> List[NetworkAddress]:
reverse_dns = {}
for target in targets:
domain_name = target.domain
ip = target.ip
if ip not in reverse_dns or (reverse_dns[ip] is None and domain_name is not None):
reverse_dns[ip] = domain_name
return [NetworkAddress(key, value) for (key, value) in reverse_dns.items()]
def _range_to_addresses(range_obj: NetworkRange) -> List[NetworkAddress]:
addresses = []
for address in range_obj:
if hasattr(range_obj, "domain_name"):
addresses.append(NetworkAddress(address, range_obj.domain_name))
else:
addresses.append(NetworkAddress(address, None))
return addresses
def _get_ips_from_ranges_to_scan(ranges_to_scan: List[str]) -> List[NetworkAddress]:
scan_targets = []
ranges_to_scan = NetworkRange.filter_invalid_ranges(
ranges_to_scan, "Bad network range input for targets to scan:"
)
network_ranges = [NetworkRange.get_range_obj(_range) for _range in ranges_to_scan]
for _range in network_ranges:
scan_targets.extend(_range_to_addresses(_range))
return scan_targets
def _get_ips_to_scan_from_local_interface(
interfaces: List[NetworkInterface],
) -> List[NetworkAddress]:
ranges = [f"{interface.address}{interface.netmask}" for interface in interfaces]
ranges = NetworkRange.filter_invalid_ranges(
ranges, "Local network interface returns an invalid IP:"
)
return _get_ips_from_ranges_to_scan(ranges)
def _remove_interface_ips(
scan_targets: List[NetworkAddress], interfaces: List[NetworkInterface]
) -> List[NetworkAddress]:
interface_ips = [interface.address for interface in interfaces]
return _remove_ips_from_scan_targets(scan_targets, interface_ips)
def _remove_blocklisted_ips(
scan_targets: List[NetworkAddress], blocked_ips: List[str]
) -> List[NetworkAddress]:
filtered_blocked_ips = NetworkRange.filter_invalid_ranges(
blocked_ips, "Invalid blocked IP provided:"
)
if len(filtered_blocked_ips) != len(blocked_ips):
raise InvalidNetworkRangeError("Received an invalid blocked IP. Aborting just in case.")
return _remove_ips_from_scan_targets(scan_targets, filtered_blocked_ips)
def _remove_ips_from_scan_targets(
scan_targets: List[NetworkAddress], ips_to_remove: List[str]
) -> List[NetworkAddress]:
ips_to_remove_set = set(ips_to_remove)
return [address for address in scan_targets if address.ip not in ips_to_remove_set]
def _get_segmentation_check_targets(
inaccessible_subnets: List[str], local_interfaces: List[NetworkInterface]
) -> List[NetworkAddress]:
ips_to_scan = []
local_ips = [interface.address for interface in local_interfaces]
local_ips = NetworkRange.filter_invalid_ranges(local_ips, "Invalid local IP found: ")
inaccessible_subnets = NetworkRange.filter_invalid_ranges(
inaccessible_subnets, "Invalid segmentation scan target: "
)
inaccessible_subnets = _convert_to_range_object(inaccessible_subnets)
subnet_pairs = itertools.product(inaccessible_subnets, inaccessible_subnets)
for (subnet1, subnet2) in subnet_pairs:
if _is_segmentation_check_required(local_ips, subnet1, subnet2):
ips = _get_ips_from_ranges_to_scan(subnet2)
ips_to_scan.extend(ips)
return ips_to_scan
def _convert_to_range_object(subnets: List[str]) -> List[NetworkRange]:
return [NetworkRange.get_range_obj(subnet) for subnet in subnets]
def _is_segmentation_check_required(
local_ips: List[str], subnet1: NetworkRange, subnet2: NetworkRange
):
return _is_any_ip_in_subnet(local_ips, subnet1) and not _is_any_ip_in_subnet(local_ips, subnet2)
def _is_any_ip_in_subnet(ip_addresses: List[str], subnet: NetworkRange):
for ip_address in ip_addresses:
if subnet.is_in_range(ip_address):
return True
return False

View File

@ -0,0 +1,35 @@
from common.network.network_range import NetworkRange
def test_range_filtering():
invalid_ranges = [
# Invalid IP segment
"172.60.999.109",
"172.60.-1.109",
"172.60.999.109 - 172.60.1.109",
"172.60.999.109/32",
"172.60.999.109/24",
# Invalid CIDR
"172.60.1.109/33",
"172.60.1.109/-1",
# Typos
"172.60.9.109 -t 172.60.1.109",
"172.60..9.109",
"172.60,9.109",
" 172.60 .9.109 ",
]
valid_ranges = [
" 172.60.9.109 ",
"172.60.9.109 - 172.60.1.109",
"172.60.9.109- 172.60.1.109",
"0.0.0.0",
"localhost",
]
invalid_ranges.extend(valid_ranges)
remaining = NetworkRange.filter_invalid_ranges(invalid_ranges, "Test error:")
for _range in remaining:
assert _range in valid_ranges
assert len(remaining) == len(valid_ranges)

View File

@ -39,8 +39,3 @@ class TestVictimHostGenerator(TestCase):
victims = list(generator.generate_victims_from_range(self.local_host_range)) victims = list(generator.generate_victims_from_range(self.local_host_range))
self.assertEqual(len(victims), 1) self.assertEqual(len(victims), 1)
self.assertEqual(victims[0].domain_name, "localhost") self.assertEqual(victims[0].domain_name, "localhost")
# don't generate for other victims
victims = list(generator.generate_victims_from_range(self.random_single_ip_range))
self.assertEqual(len(victims), 1)
self.assertEqual(victims[0].domain_name, "")

View File

@ -0,0 +1,483 @@
from itertools import chain
import pytest
from common.network.network_range import InvalidNetworkRangeError
from infection_monkey.network.scan_target_generator import (
NetworkAddress,
NetworkInterface,
compile_scan_target_list,
)
def compile_ranges_only(ranges):
return compile_scan_target_list(
local_network_interfaces=[],
ranges_to_scan=ranges,
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=False,
)
def test_single_subnet():
scan_targets = compile_ranges_only(["10.0.0.0/24"])
assert len(scan_targets) == 255
for i in range(0, 255):
assert NetworkAddress(f"10.0.0.{i}", None) in scan_targets
@pytest.mark.parametrize("single_ip", ["10.0.0.2", "10.0.0.2/32", "10.0.0.2-10.0.0.2"])
def test_single_ip(single_ip):
print(single_ip)
scan_targets = compile_ranges_only([single_ip])
assert len(scan_targets) == 1
assert NetworkAddress("10.0.0.2", None) in scan_targets
assert NetworkAddress("10.0.0.2", None) == scan_targets[0]
def test_multiple_subnet():
scan_targets = compile_ranges_only(["10.0.0.0/24", "192.168.56.8/29"])
assert len(scan_targets) == 262
for i in range(0, 255):
assert NetworkAddress(f"10.0.0.{i}", None) in scan_targets
for i in range(8, 15):
assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets
def test_middle_of_range_subnet():
scan_targets = compile_ranges_only(["192.168.56.4/29"])
assert len(scan_targets) == 7
for i in range(0, 7):
assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets
@pytest.mark.parametrize(
"ip_range",
["192.168.56.25-192.168.56.33", "192.168.56.25 - 192.168.56.33", "192.168.56.33-192.168.56.25"],
)
def test_ip_range(ip_range):
scan_targets = compile_ranges_only([ip_range])
assert len(scan_targets) == 9
for i in range(25, 34):
assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets
def test_no_duplicates():
scan_targets = compile_ranges_only(["192.168.56.0/29", "192.168.56.2", "192.168.56.4"])
assert len(scan_targets) == 7
for i in range(0, 7):
assert NetworkAddress(f"192.168.56.{i}", None) in scan_targets
def test_blocklisted_ips():
blocklisted_ips = ["10.0.0.5", "10.0.0.32", "10.0.0.119", "192.168.1.33"]
scan_targets = compile_scan_target_list(
local_network_interfaces=[],
ranges_to_scan=["10.0.0.0/24"],
inaccessible_subnets=[],
blocklisted_ips=blocklisted_ips,
enable_local_network_scan=False,
)
assert len(scan_targets) == 252
for blocked_ip in blocklisted_ips:
assert blocked_ip not in scan_targets
@pytest.mark.parametrize("ranges_to_scan", [["10.0.0.5"], []])
def test_only_ip_blocklisted(ranges_to_scan):
blocklisted_ips = ["10.0.0.5"]
scan_targets = compile_scan_target_list(
local_network_interfaces=[],
ranges_to_scan=ranges_to_scan,
inaccessible_subnets=[],
blocklisted_ips=blocklisted_ips,
enable_local_network_scan=False,
)
assert len(scan_targets) == 0
def test_local_network_interface_ips_removed_from_targets():
local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"),
NetworkInterface("10.0.0.32", "/24"),
NetworkInterface("10.0.0.119", "/24"),
NetworkInterface("192.168.1.33", "/24"),
]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=["10.0.0.0/24"],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 252
for interface in local_network_interfaces:
assert interface.address not in scan_targets
def test_no_redundant_targets():
local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"),
]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=["127.0.0.0", "127.0.0.1", "localhost"],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 2
assert NetworkAddress(ip="127.0.0.0", domain=None) in scan_targets
assert NetworkAddress(ip="127.0.0.1", domain="localhost") in scan_targets
@pytest.mark.parametrize("ranges_to_scan", [["10.0.0.5"], []])
def test_only_scan_ip_is_local(ranges_to_scan):
local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"),
NetworkInterface("10.0.0.32", "/24"),
NetworkInterface("10.0.0.119", "/24"),
NetworkInterface("192.168.1.33", "/24"),
]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=ranges_to_scan,
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 0
def test_local_network_interface_ips_and_blocked_ips_removed_from_targets():
local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"),
NetworkInterface("10.0.0.32", "/24"),
NetworkInterface("10.0.0.119", "/24"),
NetworkInterface("192.168.1.33", "/24"),
]
blocked_ips = ["10.0.0.63", "192.168.1.77", "0.0.0.0"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=["10.0.0.0/24", "192.168.1.0/24"],
inaccessible_subnets=[],
blocklisted_ips=blocked_ips,
enable_local_network_scan=False,
)
assert len(scan_targets) == (2 * (256 - 1)) - len(local_network_interfaces) - (
len(blocked_ips) - 1
)
for interface in local_network_interfaces:
assert interface.address not in scan_targets
for ip in blocked_ips:
assert ip not in scan_targets
def test_local_subnet_added():
local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=True,
)
assert len(scan_targets) == 254
for ip in chain(range(0, 5), range(6, 255)):
assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets
def test_multiple_local_subnets_added():
local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"),
NetworkInterface("172.33.66.99", "/24"),
]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=True,
)
assert len(scan_targets) == 2 * (255 - 1)
for ip in chain(range(0, 5), range(6, 255)):
assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets
for ip in chain(range(0, 99), range(100, 255)):
assert NetworkAddress(f"172.33.66.{ip}", None) in scan_targets
def test_blocklisted_ips_missing_from_local_subnets():
local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"),
NetworkInterface("172.33.66.99", "/24"),
]
blocklisted_ips = ["10.0.0.12", "10.0.0.13", "172.33.66.25"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=[],
blocklisted_ips=blocklisted_ips,
enable_local_network_scan=True,
)
assert len(scan_targets) == 2 * (255 - 1) - len(blocklisted_ips)
for ip in blocklisted_ips:
assert ip not in scan_targets
def test_local_subnets_and_ranges_added():
local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=["172.33.66.40/30"],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=True,
)
assert len(scan_targets) == 254 + 3
for ip in range(0, 5):
assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets
for ip in range(6, 255):
assert NetworkAddress(f"10.0.0.{ip}", None) in scan_targets
for ip in range(40, 43):
assert NetworkAddress(f"172.33.66.{ip}", None) in scan_targets
def test_local_network_interfaces_specified_but_disabled():
local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=["172.33.66.40/30"],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
for ip in range(40, 43):
assert NetworkAddress(f"172.33.66.{ip}", None) in scan_targets
def test_local_network_interfaces_subnet_masks():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
NetworkInterface("172.60.145.144", "/30"),
]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=[],
blocklisted_ips=[],
enable_local_network_scan=True,
)
assert len(scan_targets) == 4
for ip in [108, 110, 145, 146]:
assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets
def test_segmentation_targets():
local_network_interfaces = [NetworkInterface("172.60.145.109", "/24")]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.144/30"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
for ip in [144, 145, 146]:
assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets
def test_segmentation_clash_with_blocked():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"]
blocked = ["172.60.145.148", "172.60.145.149", "172.60.145.150"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=blocked,
enable_local_network_scan=False,
)
assert len(scan_targets) == 0
def test_segmentation_clash_with_targets():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"]
targets = ["172.60.145.149", "172.60.145.150"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=targets,
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
for ip in [148, 149, 150]:
assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets
def test_segmentation_one_network():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = ["172.60.145.1/24"]
targets = ["172.60.145.149/30"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=targets,
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
def test_segmentation_inaccessible_networks():
local_network_interfaces = [
NetworkInterface("172.60.1.1", "/24"),
NetworkInterface("172.60.2.1", "/24"),
]
inaccessible_subnets = ["172.60.144.1/24", "172.60.146.1/24"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 0
def test_invalid_inputs():
local_network_interfaces = [
NetworkInterface("172.60.999.109", "/30"),
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = [
"172.60.145.1 - 172.60.145.1111",
"172.60.147.888/30" "172.60.147.8/30",
"172.60.147.148/30",
]
targets = ["172.60.145.149/33", "1.-1.1.1", "1.a.2.2", "172.60.145.151/30"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=targets,
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
for ip in [148, 149, 150]:
assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets
def test_invalid_blocklisted_ip():
local_network_interfaces = [NetworkInterface("172.60.145.109", "/30")]
inaccessible_subnets = ["172.60.147.8/30", "172.60.147.148/30"]
targets = ["172.60.145.151/30"]
blocklisted = ["172.60.145.153", "172.60.145.753"]
with pytest.raises(InvalidNetworkRangeError):
compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=targets,
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=blocklisted,
enable_local_network_scan=False,
)
def test_sorted_scan_targets():
expected_results = [f"10.1.0.{i}" for i in range(0, 255)]
expected_results.extend([f"10.2.0.{i}" for i in range(0, 255)])
expected_results.extend([f"10.10.0.{i}" for i in range(0, 255)])
expected_results.extend([f"10.20.0.{i}" for i in range(0, 255)])
scan_targets = compile_scan_target_list(
[], ["10.1.0.0/24", "10.10.0.0/24", "10.20.0.0/24", "10.2.0.0/24"], [], [], False
)
actual_results = [network_address.ip for network_address in scan_targets]
assert expected_results == actual_results