Extracted event text and creation to function

This commit is contained in:
Shay Nehmad 2019-09-02 14:43:39 +03:00
parent 5487395797
commit 02cd1ad684
1 changed files with 32 additions and 26 deletions

View File

@ -10,11 +10,31 @@ from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.segmentation_finding import SegmentationFinding
from monkey_island.cc.services.configuration.utils import get_config_network_segments_as_subnet_groups
SEGMENTATION_DONE_EVENT_TEXT = "Monkey on {hostname} is done attempting cross-segment communications " \
"from `{src_seg}` segments to `{dst_seg}` segments."
SEGMENTATION_VIOLATION_EVENT_TEXT = \
"Segmentation violation! Monkey on '{hostname}', with the {source_ip} IP address (in segment {source_seg}) " \
"managed to communicate cross segment to {target_ip} (in segment {target_seg})."
def test_segmentation_violation(current_monkey, target_ip):
# TODO - lower code duplication between this and report.py.
subnet_groups = get_config_network_segments_as_subnet_groups()
for subnet_group in subnet_groups:
subnet_pairs = itertools.product(subnet_group, subnet_group)
for subnet_pair in subnet_pairs:
source_subnet = subnet_pair[0]
target_subnet = subnet_pair[1]
if is_segmentation_violation(current_monkey, target_ip, source_subnet, target_subnet):
event = get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet)
SegmentationFinding.create_or_add_to_existing_finding(
subnets=[source_subnet, target_subnet],
status=STATUS_FAILED,
segmentation_event=event
)
def is_segmentation_violation(current_monkey, target_ip, source_subnet, target_subnet):
# type: (Monkey, str, str, str) -> bool
"""
@ -39,23 +59,6 @@ def is_segmentation_violation(current_monkey, target_ip, source_subnet, target_s
return cross_segment_ip is not None
def test_segmentation_violation(current_monkey, target_ip):
# TODO - lower code duplication between this and report.py.
subnet_groups = get_config_network_segments_as_subnet_groups()
for subnet_group in subnet_groups:
subnet_pairs = itertools.product(subnet_group, subnet_group)
for subnet_pair in subnet_pairs:
source_subnet = subnet_pair[0]
target_subnet = subnet_pair[1]
if is_segmentation_violation(current_monkey, target_ip, source_subnet, target_subnet):
event = get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet)
SegmentationFinding.create_or_add_to_existing_finding(
subnets=[source_subnet, target_subnet],
status=STATUS_FAILED,
segmentation_event=event
)
def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet):
return Event.create_event(
title="Segmentation event",
@ -93,13 +96,16 @@ def create_or_add_findings_for_all_pairs(all_subnets, current_monkey):
SegmentationFinding.create_or_add_to_existing_finding(
subnets=list(subnet_pair),
status=STATUS_PASSED,
segmentation_event=Event.create_event(
"Segmentation test done",
message="Monkey on {hostname} is done attempting cross-segment communications from `{src_seg}` "
"segments to `{dst_seg}` segments.".format(
hostname=current_monkey.hostname,
src_seg=subnet_pair[0],
dst_seg=subnet_pair[1]),
event_type=EVENT_TYPE_ISLAND
)
segmentation_event=get_segmentation_done_event(current_monkey, subnet_pair)
)
def get_segmentation_done_event(current_monkey, subnet_pair):
return Event.create_event(
title="Segmentation test done",
message=SEGMENTATION_DONE_EVENT_TEXT.format(
hostname=current_monkey.hostname,
src_seg=subnet_pair[0],
dst_seg=subnet_pair[1]),
event_type=EVENT_TYPE_ISLAND
)