Island, UT: Implement segmentation scan targets in scan target generation

This commit is contained in:
vakarisz 2021-12-14 14:58:50 +02:00
parent 27884ad44d
commit 2329f80382
3 changed files with 146 additions and 1 deletions

View File

@ -41,6 +41,8 @@ class NetworkScanner(object):
self._ranges += self._get_inaccessible_subnets_ips() self._ranges += self._get_inaccessible_subnets_ips()
logger.info("Base local networks to scan are: %r", self._ranges) logger.info("Base local networks to scan are: %r", self._ranges)
# TODO remove afret agent refactoring,
# it's already handled in network.scan_target_generator._get_inaccessible_subnets_ips
def _get_inaccessible_subnets_ips(self): def _get_inaccessible_subnets_ips(self):
""" """
For each of the machine's IPs, checks if it's in one of the subnets specified in the For each of the machine's IPs, checks if it's in one of the subnets specified in the
@ -113,6 +115,8 @@ class NetworkScanner(object):
time.sleep(WormConfiguration.tcp_scan_interval / float(1000)) time.sleep(WormConfiguration.tcp_scan_interval / float(1000))
@staticmethod @staticmethod
# TODO remove afret agent refactoring,
# it's already handled in network.scan_target_generator._is_any_ip_in_subnet
def _is_any_ip_in_subnet(ip_addresses, subnet_str): def _is_any_ip_in_subnet(ip_addresses, subnet_str):
for ip_address in ip_addresses: for ip_address in ip_addresses:
if NetworkRange.get_range_obj(subnet_str).is_in_range(ip_address): if NetworkRange.get_range_obj(subnet_str).is_in_range(ip_address):

View File

@ -1,3 +1,4 @@
import itertools
from collections import namedtuple from collections import namedtuple
from typing import List, Set from typing import List, Set
@ -9,7 +10,6 @@ NetworkInterface = namedtuple("NetworkInterface", ("address", "netmask"))
# TODO: Validate all parameters # TODO: Validate all parameters
# TODO: Implement inaccessible_subnets
def compile_scan_target_list( def compile_scan_target_list(
local_network_interfaces: List[NetworkInterface], local_network_interfaces: List[NetworkInterface],
ranges_to_scan: List[str], ranges_to_scan: List[str],
@ -22,6 +22,12 @@ def compile_scan_target_list(
if enable_local_network_scan: if enable_local_network_scan:
scan_targets.update(_get_ips_to_scan_from_local_interface(local_network_interfaces)) scan_targets.update(_get_ips_to_scan_from_local_interface(local_network_interfaces))
if inaccessible_subnets:
inaccessible_subnets = _get_segmentation_check_targets(
inaccessible_subnets, local_network_interfaces
)
scan_targets.update(inaccessible_subnets)
_remove_interface_ips(scan_targets, local_network_interfaces) _remove_interface_ips(scan_targets, local_network_interfaces)
_remove_blocklisted_ips(scan_targets, blocklisted_ips) _remove_blocklisted_ips(scan_targets, blocklisted_ips)
@ -62,3 +68,37 @@ def _remove_ips_from_scan_targets(scan_targets: Set[str], ips_to_remove: List[st
except KeyError: except KeyError:
# We don't need to remove the ip if it's already missing from the scan_targets # We don't need to remove the ip if it's already missing from the scan_targets
pass pass
def _get_segmentation_check_targets(
inaccessible_subnets: List[str], local_interfaces: List[NetworkInterface]
):
subnets_to_scan = set()
local_ips = [interface.address for interface in local_interfaces]
inaccessible_subnets = _convert_to_range_object(inaccessible_subnets)
subnet_pairs = itertools.product(inaccessible_subnets, inaccessible_subnets)
for (subnet1, subnet2) in subnet_pairs:
if _is_segmentation_check_required(local_ips, subnet1, subnet2):
ips = _get_ips_from_ranges_to_scan(subnet2)
subnets_to_scan.update(ips)
return subnets_to_scan
def _convert_to_range_object(subnets: List[str]) -> List[NetworkRange]:
return [NetworkRange.get_range_obj(subnet) for subnet in subnets]
def _is_segmentation_check_required(
local_ips: List[str], subnet1: NetworkRange, subnet2: NetworkRange
):
return _is_any_ip_in_subnet(local_ips, subnet1) and not _is_any_ip_in_subnet(local_ips, subnet2)
def _is_any_ip_in_subnet(ip_addresses: List[str], subnet: NetworkRange):
for ip_address in ip_addresses:
if subnet.is_in_range(ip_address):
return True
return False

View File

@ -301,3 +301,104 @@ def test_local_network_interfaces_subnet_masks():
for ip in [108, 110, 145, 146]: for ip in [108, 110, 145, 146]:
assert f"172.60.145.{ip}" in scan_targets assert f"172.60.145.{ip}" in scan_targets
def test_segmentation_targets():
local_network_interfaces = [NetworkInterface("172.60.145.109", "/24")]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.144/30"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
for ip in [144, 145, 146]:
assert f"172.60.145.{ip}" in scan_targets
def test_segmentation_clash_with_blocked():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"]
blocked = ["172.60.145.148", "172.60.145.149", "172.60.145.150"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=blocked,
enable_local_network_scan=False,
)
assert len(scan_targets) == 0
def test_segmentation_clash_with_targets():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"]
targets = ["172.60.145.149", "172.60.145.150"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=targets,
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
for ip in [148, 149, 150]:
assert f"172.60.145.{ip}" in scan_targets
def test_segmentation_one_network():
local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"),
]
inaccessible_subnets = ["172.60.145.1/24"]
targets = ["172.60.145.149/30"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=targets,
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 3
def test_segmentation_inaccessible_networks():
local_network_interfaces = [
NetworkInterface("172.60.1.1", "/24"),
NetworkInterface("172.60.2.1", "/24"),
]
inaccessible_subnets = ["172.60.144.1/24", "172.60.146.1/24"]
scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces,
ranges_to_scan=[],
inaccessible_subnets=inaccessible_subnets,
blocklisted_ips=[],
enable_local_network_scan=False,
)
assert len(scan_targets) == 0