Agent: Move _filter_invalid_ranges to NetworkRanges

This commit is contained in:
Mike Salvatore 2021-12-16 08:56:35 -05:00
parent bfed27301a
commit c8469f5521
4 changed files with 57 additions and 53 deletions

View File

@ -4,7 +4,7 @@ import random
import socket
import struct
from abc import ABCMeta, abstractmethod
from typing import Tuple
from typing import List, Tuple
logger = logging.getLogger(__name__)
@ -57,6 +57,18 @@ class NetworkRange(object, metaclass=ABCMeta):
return CidrRange(cidr_range=address_str)
return SingleIpRange(ip_address=address_str)
@staticmethod
def filter_invalid_ranges(ranges: List[str], error_msg: str) -> List[str]:
valid_ranges = []
for target_range in ranges:
try:
NetworkRange.validate_range(target_range)
except InvalidNetworkRangeError as e:
logger.error(f"{error_msg} {e}")
continue
valid_ranges.append(target_range)
return valid_ranges
@staticmethod
def validate_range(address_str: str):
try:

View File

@ -60,7 +60,7 @@ def _range_to_addresses(range_obj: NetworkRange) -> List[NetworkAddress]:
def _get_ips_from_ranges_to_scan(ranges_to_scan: List[str]) -> List[NetworkAddress]:
scan_targets = []
ranges_to_scan = _filter_invalid_ranges(
ranges_to_scan = NetworkRange.filter_invalid_ranges(
ranges_to_scan, "Bad network range input for targets to scan:"
)
@ -76,7 +76,9 @@ def _get_ips_to_scan_from_local_interface(
) -> List[NetworkAddress]:
ranges = [f"{interface.address}{interface.netmask}" for interface in interfaces]
ranges = _filter_invalid_ranges(ranges, "Local network interface returns an invalid IP:")
ranges = NetworkRange.filter_invalid_ranges(
ranges, "Local network interface returns an invalid IP:"
)
return _get_ips_from_ranges_to_scan(ranges)
@ -90,7 +92,9 @@ def _remove_interface_ips(
def _remove_blocklisted_ips(
scan_targets: List[NetworkAddress], blocked_ips: List[str]
) -> List[NetworkAddress]:
filtered_blocked_ips = _filter_invalid_ranges(blocked_ips, "Invalid blocked IP provided:")
filtered_blocked_ips = NetworkRange.filter_invalid_ranges(
blocked_ips, "Invalid blocked IP provided:"
)
if len(filtered_blocked_ips) != len(blocked_ips):
raise InvalidNetworkRangeError("Received an invalid blocked IP. Aborting just in case.")
return _remove_ips_from_scan_targets(scan_targets, filtered_blocked_ips)
@ -109,8 +113,8 @@ def _get_segmentation_check_targets(
ips_to_scan = []
local_ips = [interface.address for interface in local_interfaces]
local_ips = _filter_invalid_ranges(local_ips, "Invalid local IP found: ")
inaccessible_subnets = _filter_invalid_ranges(
local_ips = NetworkRange.filter_invalid_ranges(local_ips, "Invalid local IP found: ")
inaccessible_subnets = NetworkRange.filter_invalid_ranges(
inaccessible_subnets, "Invalid segmentation scan target: "
)
@ -125,18 +129,6 @@ def _get_segmentation_check_targets(
return ips_to_scan
def _filter_invalid_ranges(ranges: List[str], error_msg: str) -> List[str]:
valid_ranges = []
for target_range in ranges:
try:
NetworkRange.validate_range(target_range)
except InvalidNetworkRangeError as e:
logger.error(f"{error_msg} {e}")
continue
valid_ranges.append(target_range)
return valid_ranges
def _convert_to_range_object(subnets: List[str]) -> List[NetworkRange]:
return [NetworkRange.get_range_obj(subnet) for subnet in subnets]

View File

@ -0,0 +1,35 @@
from common.network.network_range import NetworkRange
def test_range_filtering():
invalid_ranges = [
# Invalid IP segment
"172.60.999.109",
"172.60.-1.109",
"172.60.999.109 - 172.60.1.109",
"172.60.999.109/32",
"172.60.999.109/24",
# Invalid CIDR
"172.60.1.109/33",
"172.60.1.109/-1",
# Typos
"172.60.9.109 -t 172.60.1.109",
"172.60..9.109",
"172.60,9.109",
" 172.60 .9.109 ",
]
valid_ranges = [
" 172.60.9.109 ",
"172.60.9.109 - 172.60.1.109",
"172.60.9.109- 172.60.1.109",
"0.0.0.0",
"localhost",
]
invalid_ranges.extend(valid_ranges)
remaining = NetworkRange.filter_invalid_ranges(invalid_ranges, "Test error:")
for _range in remaining:
assert _range in valid_ranges
assert len(remaining) == len(valid_ranges)

View File

@ -6,7 +6,6 @@ from common.network.network_range import InvalidNetworkRangeError
from infection_monkey.network.scan_target_generator import (
NetworkAddress,
NetworkInterface,
_filter_invalid_ranges,
compile_scan_target_list,
)
@ -450,40 +449,6 @@ def test_invalid_inputs():
assert NetworkAddress(f"172.60.145.{ip}", None) in scan_targets
def test_range_filtering():
invalid_ranges = [
# Invalid IP segment
"172.60.999.109",
"172.60.-1.109",
"172.60.999.109 - 172.60.1.109",
"172.60.999.109/32",
"172.60.999.109/24",
# Invalid CIDR
"172.60.1.109/33",
"172.60.1.109/-1",
# Typos
"172.60.9.109 -t 172.60.1.109",
"172.60..9.109",
"172.60,9.109",
" 172.60 .9.109 ",
]
valid_ranges = [
" 172.60.9.109 ",
"172.60.9.109 - 172.60.1.109",
"172.60.9.109- 172.60.1.109",
"0.0.0.0",
"localhost",
]
invalid_ranges.extend(valid_ranges)
remaining = _filter_invalid_ranges(invalid_ranges, "Test error:")
for _range in remaining:
assert _range in valid_ranges
assert len(remaining) == len(valid_ranges)
def test_invalid_blocklisted_ip():
local_network_interfaces = [NetworkInterface("172.60.145.109", "/30")]