From 9dfb250d59ccd73986674a177d94c4d97bdd00e6 Mon Sep 17 00:00:00 2001 From: Shay Nehmad Date: Sun, 25 Aug 2019 18:07:49 +0300 Subject: [PATCH] Added a custom segmentation finding type --- .../cc/models/zero_trust/finding.py | 2 + .../models/zero_trust/segmentation_finding.py | 52 +++++++++++++++++++ .../zero_trust/test_segmentation_finding.py | 52 +++++++++++++++++++ 3 files changed, 106 insertions(+) create mode 100644 monkey/monkey_island/cc/models/zero_trust/segmentation_finding.py create mode 100644 monkey/monkey_island/cc/models/zero_trust/test_segmentation_finding.py diff --git a/monkey/monkey_island/cc/models/zero_trust/finding.py b/monkey/monkey_island/cc/models/zero_trust/finding.py index 1869d6f18..5454ad9e1 100644 --- a/monkey/monkey_island/cc/models/zero_trust/finding.py +++ b/monkey/monkey_island/cc/models/zero_trust/finding.py @@ -32,6 +32,8 @@ class Finding(Document): test = StringField(required=True, choices=TESTS) status = StringField(required=True, choices=ORDERED_TEST_STATUSES) events = EmbeddedDocumentListField(document_type=Event) + # http://docs.mongoengine.org/guide/defining-documents.html#document-inheritance + meta = {'allow_inheritance': True} # LOGIC def get_test_explanation(self): diff --git a/monkey/monkey_island/cc/models/zero_trust/segmentation_finding.py b/monkey/monkey_island/cc/models/zero_trust/segmentation_finding.py new file mode 100644 index 000000000..428af72cb --- /dev/null +++ b/monkey/monkey_island/cc/models/zero_trust/segmentation_finding.py @@ -0,0 +1,52 @@ +from mongoengine import StringField + +from common.data.zero_trust_consts import TEST_SEGMENTATION, STATUS_CONCLUSIVE, STATUS_POSITIVE +from monkey_island.cc.models.zero_trust.finding import Finding + + +def need_to_overwrite_status(saved_status, new_status): + return (saved_status == STATUS_POSITIVE) and (new_status == STATUS_CONCLUSIVE) + + +class SegmentationFinding(Finding): + """ + trying to add conclusive: + If the finding doesn't exist at all: create conclusive + else: + if positive, turn to conclusive + add event + + trying to add positive: + If the finding doesn't exist at all: create positive + else: add event + """ + first_subnet = StringField() + second_subnet = StringField() + + @staticmethod + def create_or_add_to_existing_finding(subnets, status, segmentation_event): + assert len(subnets) == 2 + + # Sort them so A -> B and B -> A segmentation findings will be the same one. + subnets.sort() + + existing_findings = SegmentationFinding.objects(first_subnet=subnets[0], second_subnet=subnets[1]) + + if len(existing_findings) == 0: + # No finding exists - create. + new_finding = SegmentationFinding( + first_subnet=subnets[0], + second_subnet=subnets[1], + test=TEST_SEGMENTATION, + status=status, + events=[segmentation_event] + ) + new_finding.save() + else: + # A finding exists (should be one). Add the event to it. + assert len(existing_findings) == 1 + existing_finding = existing_findings[0] + existing_finding.events.append(segmentation_event) + if need_to_overwrite_status(existing_finding.status, status): + existing_finding.status = status + existing_finding.save() diff --git a/monkey/monkey_island/cc/models/zero_trust/test_segmentation_finding.py b/monkey/monkey_island/cc/models/zero_trust/test_segmentation_finding.py new file mode 100644 index 000000000..ad3ff9b97 --- /dev/null +++ b/monkey/monkey_island/cc/models/zero_trust/test_segmentation_finding.py @@ -0,0 +1,52 @@ +from common.data.zero_trust_consts import STATUS_CONCLUSIVE, EVENT_TYPE_MONKEY_NETWORK +from monkey_island.cc.models.zero_trust.event import Event +from monkey_island.cc.testing.IslandTestCase import IslandTestCase +from monkey_island.cc.models.zero_trust.segmentation_finding import SegmentationFinding + + +class TestSegmentationFinding(IslandTestCase): + def test_create_or_add_to_existing_finding(self): + self.fail_if_not_testing_env() + self.clean_finding_db() + + first_segment = "1.1.1.0/24" + second_segment = "2.2.2.0-2.2.2.254" + third_segment = "3.3.3.3" + event = Event.create_event("bla", "bla", EVENT_TYPE_MONKEY_NETWORK) + + SegmentationFinding.create_or_add_to_existing_finding( + subnets=[first_segment, second_segment], + status=STATUS_CONCLUSIVE, + segmentation_event=event + ) + + self.assertEquals(len(SegmentationFinding.objects()), 1) + self.assertEquals(len(SegmentationFinding.objects()[0].events), 1) + + SegmentationFinding.create_or_add_to_existing_finding( + # !!! REVERSE ORDER + subnets=[second_segment, first_segment], + status=STATUS_CONCLUSIVE, + segmentation_event=event + ) + + self.assertEquals(len(SegmentationFinding.objects()), 1) + self.assertEquals(len(SegmentationFinding.objects()[0].events), 2) + + SegmentationFinding.create_or_add_to_existing_finding( + # !!! REVERSE ORDER + subnets=[first_segment, third_segment], + status=STATUS_CONCLUSIVE, + segmentation_event=event + ) + + self.assertEquals(len(SegmentationFinding.objects()), 2) + + SegmentationFinding.create_or_add_to_existing_finding( + # !!! REVERSE ORDER + subnets=[second_segment, third_segment], + status=STATUS_CONCLUSIVE, + segmentation_event=event + ) + + self.assertEquals(len(SegmentationFinding.objects()), 3)