forked from p15670423/monkey
Added positive segmentation findings
This commit is contained in:
parent
fbb82f412b
commit
a9ba3273dd
|
@ -1,7 +1,8 @@
|
|||
import itertools
|
||||
from six import text_type
|
||||
|
||||
from common.data.zero_trust_consts import STATUS_CONCLUSIVE, EVENT_TYPE_MONKEY_NETWORK
|
||||
from common.data.zero_trust_consts import STATUS_CONCLUSIVE, EVENT_TYPE_MONKEY_NETWORK, STATUS_POSITIVE, \
|
||||
EVENT_TYPE_ISLAND
|
||||
from common.network.network_range import NetworkRange
|
||||
from common.network.segmentation_utils import get_ip_in_src_and_not_in_dst, get_ip_if_in_subnet
|
||||
from monkey_island.cc.models import Monkey
|
||||
|
@ -29,15 +30,11 @@ def is_segmentation_violation(current_monkey, target_ip, source_subnet, target_s
|
|||
return cross_segment_ip is not None
|
||||
|
||||
|
||||
def test_segmentation_violation(telemetry_json):
|
||||
"""
|
||||
|
||||
:param telemetry_json: A SCAN telemetry sent from a Monkey.
|
||||
"""
|
||||
def test_segmentation_violation(scan_telemetry_json):
|
||||
# TODO - lower code duplication between this and report.py.
|
||||
# TODO - single machine
|
||||
current_monkey = Monkey.get_single_monkey_by_guid(telemetry_json['monkey_guid'])
|
||||
target_ip = telemetry_json['data']['machine']['ip_addr']
|
||||
current_monkey = Monkey.get_single_monkey_by_guid(scan_telemetry_json['monkey_guid'])
|
||||
target_ip = scan_telemetry_json['data']['machine']['ip_addr']
|
||||
subnet_groups = get_config_network_segments_as_subnet_groups()
|
||||
for subnet_group in subnet_groups:
|
||||
subnet_pairs = itertools.product(subnet_group, subnet_group)
|
||||
|
@ -67,6 +64,37 @@ def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, t
|
|||
)
|
||||
|
||||
|
||||
def test_positive_findings_for_unreached_segments(telemetry_json):
|
||||
current_monkey = Monkey.get_single_monkey_by_guid(telemetry_json['monkey_guid'])
|
||||
subnet_groups = get_config_network_segments_as_subnet_groups()
|
||||
def test_positive_findings_for_unreached_segments(state_telemetry_json):
|
||||
flat_all_subnets = [item for sublist in get_config_network_segments_as_subnet_groups() for item in sublist]
|
||||
current_monkey = Monkey.get_single_monkey_by_guid(state_telemetry_json['monkey_guid'])
|
||||
create_or_add_findings_for_all_pairs(flat_all_subnets, current_monkey)
|
||||
|
||||
|
||||
def create_or_add_findings_for_all_pairs(all_subnets, current_monkey):
|
||||
# Filter the subnets that this monkey is part of.
|
||||
this_monkey_subnets = []
|
||||
for subnet in all_subnets:
|
||||
if get_ip_if_in_subnet(current_monkey.ip_addresses, NetworkRange.get_range_obj(subnet)) is not None:
|
||||
this_monkey_subnets.append(subnet)
|
||||
|
||||
# Get all the other subnets.
|
||||
other_subnets = list(set(all_subnets) - set(this_monkey_subnets))
|
||||
|
||||
# Calculate the cartesian product - (this monkey subnets X other subnets). These pairs are the pairs that the monkey
|
||||
# should have tested.
|
||||
all_subnets_pairs_for_this_monkey = itertools.product(this_monkey_subnets, other_subnets)
|
||||
|
||||
for subnet_pair in all_subnets_pairs_for_this_monkey:
|
||||
SegmentationFinding.create_or_add_to_existing_finding(
|
||||
subnets=list(subnet_pair),
|
||||
status=STATUS_POSITIVE,
|
||||
segmentation_event=Event.create_event(
|
||||
"Segmentation test done",
|
||||
message="Monkey on {hostname} is done attempting cross-segment communications from `{src_seg}` "
|
||||
"segments to `{dst_seg}` segments.".format(
|
||||
hostname=current_monkey.hostname,
|
||||
src_seg=subnet_pair[0],
|
||||
dst_seg=subnet_pair[1]),
|
||||
event_type=EVENT_TYPE_ISLAND
|
||||
)
|
||||
)
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
import uuid
|
||||
|
||||
from common.data.zero_trust_consts import TEST_SEGMENTATION, STATUS_POSITIVE, STATUS_CONCLUSIVE, \
|
||||
EVENT_TYPE_MONKEY_NETWORK
|
||||
from monkey_island.cc.models import Monkey
|
||||
from monkey_island.cc.models.zero_trust.event import Event
|
||||
from monkey_island.cc.models.zero_trust.finding import Finding
|
||||
from monkey_island.cc.models.zero_trust.segmentation_finding import SegmentationFinding
|
||||
from monkey_island.cc.services.telemetry.zero_trust_tests.segmentation import create_or_add_findings_for_all_pairs
|
||||
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
||||
|
||||
FIRST_SUBNET = "1.1.1.1"
|
||||
SECOND_SUBNET = "2.2.2.0/24"
|
||||
THIRD_SUBNET = "3.3.3.3-3.3.3.200"
|
||||
|
||||
|
||||
class TestSegmentationTests(IslandTestCase):
|
||||
def test_create_findings_for_all_done_pairs(self):
|
||||
self.fail_if_not_testing_env()
|
||||
self.clean_finding_db()
|
||||
|
||||
all_subnets = [FIRST_SUBNET, SECOND_SUBNET, THIRD_SUBNET]
|
||||
|
||||
monkey = Monkey(
|
||||
guid=str(uuid.uuid4()),
|
||||
ip_addresses=[FIRST_SUBNET])
|
||||
|
||||
# no findings
|
||||
self.assertEquals(len(Finding.objects(test=TEST_SEGMENTATION)), 0)
|
||||
|
||||
# This is like the monkey is done and sent done telem
|
||||
create_or_add_findings_for_all_pairs(all_subnets, monkey)
|
||||
|
||||
# There are 2 subnets in which the monkey is NOT
|
||||
self.assertEquals(len(Finding.objects(test=TEST_SEGMENTATION, status=STATUS_POSITIVE)), 2)
|
||||
|
||||
# This is a monkey from 2nd subnet communicated with 1st subnet.
|
||||
SegmentationFinding.create_or_add_to_existing_finding(
|
||||
[FIRST_SUBNET, SECOND_SUBNET],
|
||||
STATUS_CONCLUSIVE,
|
||||
Event.create_event(title="sdf", message="asd", event_type=EVENT_TYPE_MONKEY_NETWORK)
|
||||
)
|
||||
|
||||
print("Printing all segmentation findings")
|
||||
all_findings = Finding.objects(test=TEST_SEGMENTATION)
|
||||
for f in all_findings:
|
||||
print(f.to_json())
|
||||
|
||||
self.assertEquals(len(Finding.objects(test=TEST_SEGMENTATION, status=STATUS_POSITIVE)), 1)
|
||||
self.assertEquals(len(Finding.objects(test=TEST_SEGMENTATION, status=STATUS_CONCLUSIVE)), 1)
|
||||
self.assertEquals(len(Finding.objects(test=TEST_SEGMENTATION)), 2)
|
Loading…
Reference in New Issue