diff --git a/monkey/monkey_island/cc/services/telemetry/zero_trust_tests/segmentation.py b/monkey/monkey_island/cc/services/telemetry/zero_trust_tests/segmentation.py index a10b89262..552192c23 100644 --- a/monkey/monkey_island/cc/services/telemetry/zero_trust_tests/segmentation.py +++ b/monkey/monkey_island/cc/services/telemetry/zero_trust_tests/segmentation.py @@ -10,11 +10,31 @@ from monkey_island.cc.models.zero_trust.event import Event from monkey_island.cc.models.zero_trust.segmentation_finding import SegmentationFinding from monkey_island.cc.services.configuration.utils import get_config_network_segments_as_subnet_groups +SEGMENTATION_DONE_EVENT_TEXT = "Monkey on {hostname} is done attempting cross-segment communications " \ + "from `{src_seg}` segments to `{dst_seg}` segments." + SEGMENTATION_VIOLATION_EVENT_TEXT = \ "Segmentation violation! Monkey on '{hostname}', with the {source_ip} IP address (in segment {source_seg}) " \ "managed to communicate cross segment to {target_ip} (in segment {target_seg})." +def test_segmentation_violation(current_monkey, target_ip): + # TODO - lower code duplication between this and report.py. + subnet_groups = get_config_network_segments_as_subnet_groups() + for subnet_group in subnet_groups: + subnet_pairs = itertools.product(subnet_group, subnet_group) + for subnet_pair in subnet_pairs: + source_subnet = subnet_pair[0] + target_subnet = subnet_pair[1] + if is_segmentation_violation(current_monkey, target_ip, source_subnet, target_subnet): + event = get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet) + SegmentationFinding.create_or_add_to_existing_finding( + subnets=[source_subnet, target_subnet], + status=STATUS_FAILED, + segmentation_event=event + ) + + def is_segmentation_violation(current_monkey, target_ip, source_subnet, target_subnet): # type: (Monkey, str, str, str) -> bool """ @@ -39,23 +59,6 @@ def is_segmentation_violation(current_monkey, target_ip, source_subnet, target_s return cross_segment_ip is not None -def test_segmentation_violation(current_monkey, target_ip): - # TODO - lower code duplication between this and report.py. - subnet_groups = get_config_network_segments_as_subnet_groups() - for subnet_group in subnet_groups: - subnet_pairs = itertools.product(subnet_group, subnet_group) - for subnet_pair in subnet_pairs: - source_subnet = subnet_pair[0] - target_subnet = subnet_pair[1] - if is_segmentation_violation(current_monkey, target_ip, source_subnet, target_subnet): - event = get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet) - SegmentationFinding.create_or_add_to_existing_finding( - subnets=[source_subnet, target_subnet], - status=STATUS_FAILED, - segmentation_event=event - ) - - def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet): return Event.create_event( title="Segmentation event", @@ -93,13 +96,16 @@ def create_or_add_findings_for_all_pairs(all_subnets, current_monkey): SegmentationFinding.create_or_add_to_existing_finding( subnets=list(subnet_pair), status=STATUS_PASSED, - segmentation_event=Event.create_event( - "Segmentation test done", - message="Monkey on {hostname} is done attempting cross-segment communications from `{src_seg}` " - "segments to `{dst_seg}` segments.".format( - hostname=current_monkey.hostname, - src_seg=subnet_pair[0], - dst_seg=subnet_pair[1]), - event_type=EVENT_TYPE_ISLAND - ) + segmentation_event=get_segmentation_done_event(current_monkey, subnet_pair) ) + + +def get_segmentation_done_event(current_monkey, subnet_pair): + return Event.create_event( + title="Segmentation test done", + message=SEGMENTATION_DONE_EVENT_TEXT.format( + hostname=current_monkey.hostname, + src_seg=subnet_pair[0], + dst_seg=subnet_pair[1]), + event_type=EVENT_TYPE_ISLAND + )