Merge pull request #2222 from guardicore/2135-use-ipaddress

2135 use ipaddress
This commit is contained in:
Mike Salvatore 2022-08-30 10:04:21 -04:00 committed by GitHub
commit 4cbefedad2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 88 additions and 86 deletions

View File

@ -39,7 +39,7 @@ repos:
rev: v0.971 rev: v0.971
hooks: hooks:
- id: mypy - id: mypy
additional_dependencies: [types-paramiko, types-python-dateutil, types-requests] additional_dependencies: [types-ipaddress, types-paramiko, types-python-dateutil, types-requests]
exclude: "vulture_allowlist.py" exclude: "vulture_allowlist.py"
args: [--ignore-missing-imports] args: [--ignore-missing-imports]
- repo: https://github.com/koalaman/shellcheck-precommit - repo: https://github.com/koalaman/shellcheck-precommit

View File

@ -1,6 +1,7 @@
import logging import logging
import threading import threading
import time import time
from ipaddress import IPv4Interface
from typing import Any, Callable, Iterable, List, Optional from typing import Any, Callable, Iterable, List, Optional
from common.agent_configuration import CustomPBAConfiguration, PluginConfiguration from common.agent_configuration import CustomPBAConfiguration, PluginConfiguration
@ -10,7 +11,6 @@ from infection_monkey.i_control_channel import IControlChannel, IslandCommunicat
from infection_monkey.i_master import IMaster from infection_monkey.i_master import IMaster
from infection_monkey.i_puppet import IPuppet from infection_monkey.i_puppet import IPuppet
from infection_monkey.model import VictimHostFactory from infection_monkey.model import VictimHostFactory
from infection_monkey.network import NetworkInterface
from infection_monkey.telemetry.credentials_telem import CredentialsTelem from infection_monkey.telemetry.credentials_telem import CredentialsTelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
from infection_monkey.telemetry.post_breach_telem import PostBreachTelem from infection_monkey.telemetry.post_breach_telem import PostBreachTelem
@ -39,7 +39,7 @@ class AutomatedMaster(IMaster):
telemetry_messenger: ITelemetryMessenger, telemetry_messenger: ITelemetryMessenger,
victim_host_factory: VictimHostFactory, victim_host_factory: VictimHostFactory,
control_channel: IControlChannel, control_channel: IControlChannel,
local_network_interfaces: List[NetworkInterface], local_network_interfaces: List[IPv4Interface],
credentials_store: IPropagationCredentialsRepository, credentials_store: IPropagationCredentialsRepository,
): ):
self._current_depth = current_depth self._current_depth = current_depth

View File

@ -1,5 +1,6 @@
import logging import logging
from dataclasses import replace from dataclasses import replace
from ipaddress import IPv4Interface
from queue import Queue from queue import Queue
from threading import Event from threading import Event
from typing import List, Sequence from typing import List, Sequence
@ -18,7 +19,7 @@ from infection_monkey.i_puppet import (
PortStatus, PortStatus,
) )
from infection_monkey.model import VictimHost, VictimHostFactory from infection_monkey.model import VictimHost, VictimHostFactory
from infection_monkey.network import NetworkAddress, NetworkInterface from infection_monkey.network import NetworkAddress
from infection_monkey.network_scanning.scan_target_generator import compile_scan_target_list from infection_monkey.network_scanning.scan_target_generator import compile_scan_target_list
from infection_monkey.telemetry.exploit_telem import ExploitTelem from infection_monkey.telemetry.exploit_telem import ExploitTelem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
@ -37,7 +38,7 @@ class Propagator:
ip_scanner: IPScanner, ip_scanner: IPScanner,
exploiter: Exploiter, exploiter: Exploiter,
victim_host_factory: VictimHostFactory, victim_host_factory: VictimHostFactory,
local_network_interfaces: List[NetworkInterface], local_network_interfaces: List[IPv4Interface],
): ):
self._telemetry_messenger = telemetry_messenger self._telemetry_messenger = telemetry_messenger
self._ip_scanner = ip_scanner self._ip_scanner = ip_scanner

View File

@ -3,6 +3,7 @@ import logging
import os import os
import subprocess import subprocess
import sys import sys
from ipaddress import IPv4Interface
from pathlib import Path, WindowsPath from pathlib import Path, WindowsPath
from typing import List from typing import List
@ -39,7 +40,6 @@ from infection_monkey.i_puppet import IPuppet, PluginType
from infection_monkey.master import AutomatedMaster from infection_monkey.master import AutomatedMaster
from infection_monkey.master.control_channel import ControlChannel from infection_monkey.master.control_channel import ControlChannel
from infection_monkey.model import VictimHostFactory from infection_monkey.model import VictimHostFactory
from infection_monkey.network import NetworkInterface
from infection_monkey.network.firewall import app as firewall from infection_monkey.network.firewall import app as firewall
from infection_monkey.network.info import get_local_network_interfaces from infection_monkey.network.info import get_local_network_interfaces
from infection_monkey.network_scanning.elasticsearch_fingerprinter import ElasticSearchFingerprinter from infection_monkey.network_scanning.elasticsearch_fingerprinter import ElasticSearchFingerprinter
@ -239,10 +239,10 @@ class InfectionMonkey:
) )
@staticmethod @staticmethod
def _get_local_network_interfaces(): def _get_local_network_interfaces() -> List[IPv4Interface]:
local_network_interfaces = get_local_network_interfaces() local_network_interfaces = get_local_network_interfaces()
for i in local_network_interfaces: for interface in local_network_interfaces:
logger.debug(f"Found local interface {i.address}{i.netmask}") logger.debug(f"Found local interface {str(interface)}")
return local_network_interfaces return local_network_interfaces
@ -364,7 +364,7 @@ class InfectionMonkey:
return puppet return puppet
def _build_victim_host_factory( def _build_victim_host_factory(
self, local_network_interfaces: List[NetworkInterface] self, local_network_interfaces: List[IPv4Interface]
) -> VictimHostFactory: ) -> VictimHostFactory:
on_island = self._running_on_island(local_network_interfaces) on_island = self._running_on_island(local_network_interfaces)
logger.debug(f"This agent is running on the island: {on_island}") logger.debug(f"This agent is running on the island: {on_island}")
@ -373,9 +373,9 @@ class InfectionMonkey:
self._monkey_inbound_tunnel, self._cmd_island_ip, self._cmd_island_port, on_island self._monkey_inbound_tunnel, self._cmd_island_ip, self._cmd_island_port, on_island
) )
def _running_on_island(self, local_network_interfaces: List[NetworkInterface]) -> bool: def _running_on_island(self, local_network_interfaces: List[IPv4Interface]) -> bool:
server_ip, _ = address_to_ip_port(self._control_client.server_address) server_ip, _ = address_to_ip_port(self._control_client.server_address)
return server_ip in {interface.address for interface in local_network_interfaces} return server_ip in {str(interface.ip) for interface in local_network_interfaces}
def _is_another_monkey_running(self): def _is_another_monkey_running(self):
return not self._singleton.try_lock() return not self._singleton.try_lock()

View File

@ -1 +1 @@
from .info import NetworkAddress, NetworkInterface from .info import NetworkAddress

View File

@ -1,8 +1,8 @@
import itertools import itertools
import socket import socket
import struct import struct
from collections import namedtuple from dataclasses import dataclass
from ipaddress import IPv4Network from ipaddress import IPv4Interface
from random import randint # noqa: DUO102 from random import randint # noqa: DUO102
from typing import List from typing import List
@ -18,20 +18,15 @@ SIOCGIFNETMASK = 0x891B # get network PA mask
RTF_UP = 0x0001 # Route usable RTF_UP = 0x0001 # Route usable
RTF_REJECT = 0x0200 RTF_REJECT = 0x0200
# TODO: We can probably replace both of these namedtuples with classes in Python's ipaddress
# library: https://docs.python.org/3/library/ipaddress.html @dataclass
NetworkInterface = namedtuple("NetworkInterface", ("address", "netmask")) class NetworkAddress:
NetworkAddress = namedtuple("NetworkAddress", ("ip", "domain")) ip: str
domain: str
def get_local_network_interfaces() -> List[NetworkInterface]: def get_local_network_interfaces() -> List[IPv4Interface]:
network_interfaces = [] return [IPv4Interface(f"{i['addr']}/{i['netmask']}") for i in get_host_subnets()]
for i in get_host_subnets():
netmask_bits = IPv4Network(f"{i['addr']}/{i['netmask']}", strict=False).prefixlen
cidr_netmask = f"/{netmask_bits}"
network_interfaces.append(NetworkInterface(i["addr"], cidr_netmask))
return network_interfaces
def get_host_subnets(): def get_host_subnets():

View File

@ -1,10 +1,11 @@
import itertools import itertools
import logging import logging
import socket import socket
from typing import List from ipaddress import IPv4Interface
from typing import Dict, List
from common.network.network_range import InvalidNetworkRangeError, NetworkRange from common.network.network_range import InvalidNetworkRangeError, NetworkRange
from infection_monkey.network import NetworkAddress, NetworkInterface from infection_monkey.network import NetworkAddress
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -13,13 +14,13 @@ logger = logging.getLogger(__name__)
def compile_scan_target_list( def compile_scan_target_list(
local_network_interfaces: List[NetworkInterface], local_network_interfaces: List[IPv4Interface],
ranges_to_scan: List[str], ranges_to_scan: List[str],
inaccessible_subnets: List[str], inaccessible_subnets: List[str],
blocklisted_ips: List[str], blocklisted_ips: List[str],
enable_local_network_scan: bool, enable_local_network_scan: bool,
) -> List[NetworkAddress]: ) -> List[NetworkAddress]:
scan_targets = _get_ips_from_ranges_to_scan(ranges_to_scan) scan_targets = _get_ips_from_subnets_to_scan(ranges_to_scan)
if enable_local_network_scan: if enable_local_network_scan:
scan_targets.extend(_get_ips_to_scan_from_local_interface(local_network_interfaces)) scan_targets.extend(_get_ips_to_scan_from_local_interface(local_network_interfaces))
@ -39,7 +40,7 @@ def compile_scan_target_list(
def _remove_redundant_targets(targets: List[NetworkAddress]) -> List[NetworkAddress]: def _remove_redundant_targets(targets: List[NetworkAddress]) -> List[NetworkAddress]:
reverse_dns = {} reverse_dns: Dict[str, str] = {}
for target in targets: for target in targets:
domain_name = target.domain domain_name = target.domain
ip = target.ip ip = target.ip
@ -58,14 +59,17 @@ def _range_to_addresses(range_obj: NetworkRange) -> List[NetworkAddress]:
return addresses return addresses
def _get_ips_from_ranges_to_scan(ranges_to_scan: List[str]) -> List[NetworkAddress]: def _get_ips_from_subnets_to_scan(subnets_to_scan: List[str]) -> List[NetworkAddress]:
scan_targets = []
ranges_to_scan = NetworkRange.filter_invalid_ranges( ranges_to_scan = NetworkRange.filter_invalid_ranges(
ranges_to_scan, "Bad network range input for targets to scan:" subnets_to_scan, "Bad network range input for targets to scan:"
) )
network_ranges = [NetworkRange.get_range_obj(_range) for _range in ranges_to_scan] network_ranges = [NetworkRange.get_range_obj(_range) for _range in ranges_to_scan]
return _get_ips_from_ranges_to_scan(network_ranges)
def _get_ips_from_ranges_to_scan(network_ranges: List[NetworkRange]) -> List[NetworkAddress]:
scan_targets = []
for _range in network_ranges: for _range in network_ranges:
scan_targets.extend(_range_to_addresses(_range)) scan_targets.extend(_range_to_addresses(_range))
@ -73,20 +77,20 @@ def _get_ips_from_ranges_to_scan(ranges_to_scan: List[str]) -> List[NetworkAddre
def _get_ips_to_scan_from_local_interface( def _get_ips_to_scan_from_local_interface(
interfaces: List[NetworkInterface], interfaces: List[IPv4Interface],
) -> List[NetworkAddress]: ) -> List[NetworkAddress]:
ranges = [f"{interface.address}{interface.netmask}" for interface in interfaces] ranges = [str(interface) for interface in interfaces]
ranges = NetworkRange.filter_invalid_ranges( ranges = NetworkRange.filter_invalid_ranges(
ranges, "Local network interface returns an invalid IP:" ranges, "Local network interface returns an invalid IP:"
) )
return _get_ips_from_ranges_to_scan(ranges) return _get_ips_from_subnets_to_scan(ranges)
def _remove_interface_ips( def _remove_interface_ips(
scan_targets: List[NetworkAddress], interfaces: List[NetworkInterface] scan_targets: List[NetworkAddress], interfaces: List[IPv4Interface]
) -> List[NetworkAddress]: ) -> List[NetworkAddress]:
interface_ips = [interface.address for interface in interfaces] interface_ips = [str(interface.ip) for interface in interfaces]
return _remove_ips_from_scan_targets(scan_targets, interface_ips) return _remove_ips_from_scan_targets(scan_targets, interface_ips)
@ -109,22 +113,22 @@ def _remove_ips_from_scan_targets(
def _get_segmentation_check_targets( def _get_segmentation_check_targets(
inaccessible_subnets: List[str], local_interfaces: List[NetworkInterface] inaccessible_subnets: List[str], local_interfaces: List[IPv4Interface]
) -> List[NetworkAddress]: ) -> List[NetworkAddress]:
ips_to_scan = [] ips_to_scan = []
local_ips = [interface.address for interface in local_interfaces] local_ips = [str(interface.ip) for interface in local_interfaces]
local_ips = NetworkRange.filter_invalid_ranges(local_ips, "Invalid local IP found: ") local_ips = NetworkRange.filter_invalid_ranges(local_ips, "Invalid local IP found: ")
inaccessible_subnets = NetworkRange.filter_invalid_ranges( inaccessible_subnets = NetworkRange.filter_invalid_ranges(
inaccessible_subnets, "Invalid segmentation scan target: " inaccessible_subnets, "Invalid segmentation scan target: "
) )
inaccessible_subnets = _convert_to_range_object(inaccessible_subnets) inaccessible_ranges = _convert_to_range_object(inaccessible_subnets)
subnet_pairs = itertools.product(inaccessible_subnets, inaccessible_subnets) subnet_pairs = itertools.product(inaccessible_ranges, inaccessible_ranges)
for (subnet1, subnet2) in subnet_pairs: for (subnet1, subnet2) in subnet_pairs:
if _is_segmentation_check_required(local_ips, subnet1, subnet2): if _is_segmentation_check_required(local_ips, subnet1, subnet2):
ips = _get_ips_from_ranges_to_scan(subnet2) ips = _get_ips_from_ranges_to_scan([subnet2])
ips_to_scan.extend(ips) ips_to_scan.extend(ips)
return ips_to_scan return ips_to_scan

View File

@ -3,7 +3,7 @@ import select
import socket import socket
import time import time
from pprint import pformat from pprint import pformat
from typing import Iterable, Mapping, Tuple from typing import Collection, Iterable, Mapping, Tuple
from common.utils import Timer from common.utils import Timer
from infection_monkey.i_puppet import PortScanData, PortStatus from infection_monkey.i_puppet import PortScanData, PortStatus
@ -16,7 +16,7 @@ EMPTY_PORT_SCAN = {-1: PortScanData(-1, PortStatus.CLOSED, None, None)}
def scan_tcp_ports( def scan_tcp_ports(
host: str, ports_to_scan: Iterable[int], timeout: float host: str, ports_to_scan: Collection[int], timeout: float
) -> Mapping[int, PortScanData]: ) -> Mapping[int, PortScanData]:
try: try:
return _scan_tcp_ports(host, ports_to_scan, timeout) return _scan_tcp_ports(host, ports_to_scan, timeout)
@ -25,7 +25,7 @@ def scan_tcp_ports(
return EMPTY_PORT_SCAN return EMPTY_PORT_SCAN
def _scan_tcp_ports(host: str, ports_to_scan: Iterable[int], timeout: float): def _scan_tcp_ports(host: str, ports_to_scan: Collection[int], timeout: float):
open_ports = _check_tcp_ports(host, ports_to_scan, timeout) open_ports = _check_tcp_ports(host, ports_to_scan, timeout)
return _build_port_scan_data(ports_to_scan, open_ports) return _build_port_scan_data(ports_to_scan, open_ports)
@ -52,7 +52,7 @@ def _get_closed_port_data(port: int) -> PortScanData:
def _check_tcp_ports( def _check_tcp_ports(
ip: str, ports_to_scan: Iterable[int], timeout: float = DEFAULT_TIMEOUT ip: str, ports_to_scan: Collection[int], timeout: float = DEFAULT_TIMEOUT
) -> Mapping[int, str]: ) -> Mapping[int, str]:
""" """
Checks whether any of the given ports are open on a target IP. Checks whether any of the given ports are open on a target IP.

View File

@ -1,3 +1,4 @@
from ipaddress import IPv4Interface
from threading import Event from threading import Event
from unittest.mock import MagicMock from unittest.mock import MagicMock
@ -17,7 +18,7 @@ from infection_monkey.i_puppet import (
) )
from infection_monkey.master import IPScanResults, Propagator from infection_monkey.master import IPScanResults, Propagator
from infection_monkey.model import VictimHost, VictimHostFactory from infection_monkey.model import VictimHost, VictimHostFactory
from infection_monkey.network import NetworkAddress, NetworkInterface from infection_monkey.network import NetworkAddress
from infection_monkey.telemetry.exploit_telem import ExploitTelem from infection_monkey.telemetry.exploit_telem import ExploitTelem
@ -294,7 +295,7 @@ def test_exploiter_result_processing(
def test_scan_target_generation( def test_scan_target_generation(
telemetry_messenger_spy, mock_ip_scanner, mock_victim_host_factory, default_agent_configuration telemetry_messenger_spy, mock_ip_scanner, mock_victim_host_factory, default_agent_configuration
): ):
local_network_interfaces = [NetworkInterface("10.0.0.9", "/29")] local_network_interfaces = [IPv4Interface("10.0.0.9/29")]
p = Propagator( p = Propagator(
telemetry_messenger_spy, telemetry_messenger_spy,
mock_ip_scanner, mock_ip_scanner,

View File

@ -1,9 +1,10 @@
from ipaddress import IPv4Interface
from itertools import chain from itertools import chain
import pytest import pytest
from common.network.network_range import InvalidNetworkRangeError from common.network.network_range import InvalidNetworkRangeError
from infection_monkey.network import NetworkAddress, NetworkInterface from infection_monkey.network import NetworkAddress
from infection_monkey.network_scanning.scan_target_generator import compile_scan_target_list from infection_monkey.network_scanning.scan_target_generator import compile_scan_target_list
@ -112,10 +113,10 @@ def test_only_ip_blocklisted(ranges_to_scan):
def test_local_network_interface_ips_removed_from_targets(): def test_local_network_interface_ips_removed_from_targets():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"), IPv4Interface("10.0.0.5/24"),
NetworkInterface("10.0.0.32", "/24"), IPv4Interface("10.0.0.32/24"),
NetworkInterface("10.0.0.119", "/24"), IPv4Interface("10.0.0.119/24"),
NetworkInterface("192.168.1.33", "/24"), IPv4Interface("192.168.1.33/24"),
] ]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
@ -128,12 +129,12 @@ def test_local_network_interface_ips_removed_from_targets():
assert len(scan_targets) == 252 assert len(scan_targets) == 252
for interface in local_network_interfaces: for interface in local_network_interfaces:
assert interface.address not in scan_targets assert str(interface.ip) not in scan_targets
def test_no_redundant_targets(): def test_no_redundant_targets():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"), IPv4Interface("10.0.0.5/24"),
] ]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
@ -152,10 +153,10 @@ def test_no_redundant_targets():
@pytest.mark.parametrize("ranges_to_scan", [["10.0.0.5"], []]) @pytest.mark.parametrize("ranges_to_scan", [["10.0.0.5"], []])
def test_only_scan_ip_is_local(ranges_to_scan): def test_only_scan_ip_is_local(ranges_to_scan):
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"), IPv4Interface("10.0.0.5/24"),
NetworkInterface("10.0.0.32", "/24"), IPv4Interface("10.0.0.32/24"),
NetworkInterface("10.0.0.119", "/24"), IPv4Interface("10.0.0.119/24"),
NetworkInterface("192.168.1.33", "/24"), IPv4Interface("192.168.1.33/24"),
] ]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
@ -171,10 +172,10 @@ def test_only_scan_ip_is_local(ranges_to_scan):
def test_local_network_interface_ips_and_blocked_ips_removed_from_targets(): def test_local_network_interface_ips_and_blocked_ips_removed_from_targets():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"), IPv4Interface("10.0.0.5/24"),
NetworkInterface("10.0.0.32", "/24"), IPv4Interface("10.0.0.32/24"),
NetworkInterface("10.0.0.119", "/24"), IPv4Interface("10.0.0.119/24"),
NetworkInterface("192.168.1.33", "/24"), IPv4Interface("192.168.1.33/24"),
] ]
blocked_ips = ["10.0.0.63", "192.168.1.77", "0.0.0.0"] blocked_ips = ["10.0.0.63", "192.168.1.77", "0.0.0.0"]
@ -191,14 +192,14 @@ def test_local_network_interface_ips_and_blocked_ips_removed_from_targets():
) )
for interface in local_network_interfaces: for interface in local_network_interfaces:
assert interface.address not in scan_targets assert str(interface.ip) not in scan_targets
for ip in blocked_ips: for ip in blocked_ips:
assert ip not in scan_targets assert ip not in scan_targets
def test_local_subnet_added(): def test_local_subnet_added():
local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")] local_network_interfaces = [IPv4Interface("10.0.0.5/24")]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces, local_network_interfaces=local_network_interfaces,
@ -216,8 +217,8 @@ def test_local_subnet_added():
def test_multiple_local_subnets_added(): def test_multiple_local_subnets_added():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"), IPv4Interface("10.0.0.5/24"),
NetworkInterface("172.33.66.99", "/24"), IPv4Interface("172.33.66.99/24"),
] ]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
@ -239,8 +240,8 @@ def test_multiple_local_subnets_added():
def test_blocklisted_ips_missing_from_local_subnets(): def test_blocklisted_ips_missing_from_local_subnets():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("10.0.0.5", "/24"), IPv4Interface("10.0.0.5/24"),
NetworkInterface("172.33.66.99", "/24"), IPv4Interface("172.33.66.99/24"),
] ]
blocklisted_ips = ["10.0.0.12", "10.0.0.13", "172.33.66.25"] blocklisted_ips = ["10.0.0.12", "10.0.0.13", "172.33.66.25"]
@ -259,7 +260,7 @@ def test_blocklisted_ips_missing_from_local_subnets():
def test_local_subnets_and_ranges_added(): def test_local_subnets_and_ranges_added():
local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")] local_network_interfaces = [IPv4Interface("10.0.0.5/24")]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces, local_network_interfaces=local_network_interfaces,
@ -281,7 +282,7 @@ def test_local_subnets_and_ranges_added():
def test_local_network_interfaces_specified_but_disabled(): def test_local_network_interfaces_specified_but_disabled():
local_network_interfaces = [NetworkInterface("10.0.0.5", "/24")] local_network_interfaces = [IPv4Interface("10.0.0.5/24")]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
local_network_interfaces=local_network_interfaces, local_network_interfaces=local_network_interfaces,
@ -299,8 +300,8 @@ def test_local_network_interfaces_specified_but_disabled():
def test_local_network_interfaces_subnet_masks(): def test_local_network_interfaces_subnet_masks():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"), IPv4Interface("172.60.145.109/30"),
NetworkInterface("172.60.145.144", "/30"), IPv4Interface("172.60.145.144/30"),
] ]
scan_targets = compile_scan_target_list( scan_targets = compile_scan_target_list(
@ -318,7 +319,7 @@ def test_local_network_interfaces_subnet_masks():
def test_segmentation_targets(): def test_segmentation_targets():
local_network_interfaces = [NetworkInterface("172.60.145.109", "/24")] local_network_interfaces = [IPv4Interface("172.60.145.109/24")]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.144/30"] inaccessible_subnets = ["172.60.145.108/30", "172.60.145.144/30"]
@ -338,7 +339,7 @@ def test_segmentation_targets():
def test_segmentation_clash_with_blocked(): def test_segmentation_clash_with_blocked():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"), IPv4Interface("172.60.145.109/30"),
] ]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"] inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"]
@ -358,7 +359,7 @@ def test_segmentation_clash_with_blocked():
def test_segmentation_clash_with_targets(): def test_segmentation_clash_with_targets():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"), IPv4Interface("172.60.145.109/30"),
] ]
inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"] inaccessible_subnets = ["172.60.145.108/30", "172.60.145.149/30"]
@ -381,7 +382,7 @@ def test_segmentation_clash_with_targets():
def test_segmentation_one_network(): def test_segmentation_one_network():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("172.60.145.109", "/30"), IPv4Interface("172.60.145.109/30"),
] ]
inaccessible_subnets = ["172.60.145.1/24"] inaccessible_subnets = ["172.60.145.1/24"]
@ -401,8 +402,8 @@ def test_segmentation_one_network():
def test_segmentation_inaccessible_networks(): def test_segmentation_inaccessible_networks():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("172.60.1.1", "/24"), IPv4Interface("172.60.1.1/24"),
NetworkInterface("172.60.2.1", "/24"), IPv4Interface("172.60.2.1/24"),
] ]
inaccessible_subnets = ["172.60.144.1/24", "172.60.146.1/24"] inaccessible_subnets = ["172.60.144.1/24", "172.60.146.1/24"]
@ -420,8 +421,7 @@ def test_segmentation_inaccessible_networks():
def test_invalid_inputs(): def test_invalid_inputs():
local_network_interfaces = [ local_network_interfaces = [
NetworkInterface("172.60.999.109", "/30"), IPv4Interface("172.60.145.109/30"),
NetworkInterface("172.60.145.109", "/30"),
] ]
inaccessible_subnets = [ inaccessible_subnets = [
@ -447,7 +447,7 @@ def test_invalid_inputs():
def test_invalid_blocklisted_ip(): def test_invalid_blocklisted_ip():
local_network_interfaces = [NetworkInterface("172.60.145.109", "/30")] local_network_interfaces = [IPv4Interface("172.60.145.109/30")]
inaccessible_subnets = ["172.60.147.8/30", "172.60.147.148/30"] inaccessible_subnets = ["172.60.147.8/30", "172.60.147.148/30"]

View File

@ -218,6 +218,7 @@ stop_time
parent_id parent_id
cc_server cc_server
hardware_id hardware_id
network_interfaces
connections connections
# TODO DELETE AFTER RESOURCE REFACTORING # TODO DELETE AFTER RESOURCE REFACTORING