Added a custom segmentation finding type

This commit is contained in:
Shay Nehmad 2019-08-25 18:07:49 +03:00
parent 6ec4e613cf
commit 9dfb250d59
3 changed files with 106 additions and 0 deletions

View File

@ -32,6 +32,8 @@ class Finding(Document):
test = StringField(required=True, choices=TESTS)
status = StringField(required=True, choices=ORDERED_TEST_STATUSES)
events = EmbeddedDocumentListField(document_type=Event)
# http://docs.mongoengine.org/guide/defining-documents.html#document-inheritance
meta = {'allow_inheritance': True}
# LOGIC
def get_test_explanation(self):

View File

@ -0,0 +1,52 @@
from mongoengine import StringField
from common.data.zero_trust_consts import TEST_SEGMENTATION, STATUS_CONCLUSIVE, STATUS_POSITIVE
from monkey_island.cc.models.zero_trust.finding import Finding
def need_to_overwrite_status(saved_status, new_status):
return (saved_status == STATUS_POSITIVE) and (new_status == STATUS_CONCLUSIVE)
class SegmentationFinding(Finding):
"""
trying to add conclusive:
If the finding doesn't exist at all: create conclusive
else:
if positive, turn to conclusive
add event
trying to add positive:
If the finding doesn't exist at all: create positive
else: add event
"""
first_subnet = StringField()
second_subnet = StringField()
@staticmethod
def create_or_add_to_existing_finding(subnets, status, segmentation_event):
assert len(subnets) == 2
# Sort them so A -> B and B -> A segmentation findings will be the same one.
subnets.sort()
existing_findings = SegmentationFinding.objects(first_subnet=subnets[0], second_subnet=subnets[1])
if len(existing_findings) == 0:
# No finding exists - create.
new_finding = SegmentationFinding(
first_subnet=subnets[0],
second_subnet=subnets[1],
test=TEST_SEGMENTATION,
status=status,
events=[segmentation_event]
)
new_finding.save()
else:
# A finding exists (should be one). Add the event to it.
assert len(existing_findings) == 1
existing_finding = existing_findings[0]
existing_finding.events.append(segmentation_event)
if need_to_overwrite_status(existing_finding.status, status):
existing_finding.status = status
existing_finding.save()

View File

@ -0,0 +1,52 @@
from common.data.zero_trust_consts import STATUS_CONCLUSIVE, EVENT_TYPE_MONKEY_NETWORK
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
from monkey_island.cc.models.zero_trust.segmentation_finding import SegmentationFinding
class TestSegmentationFinding(IslandTestCase):
def test_create_or_add_to_existing_finding(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
first_segment = "1.1.1.0/24"
second_segment = "2.2.2.0-2.2.2.254"
third_segment = "3.3.3.3"
event = Event.create_event("bla", "bla", EVENT_TYPE_MONKEY_NETWORK)
SegmentationFinding.create_or_add_to_existing_finding(
subnets=[first_segment, second_segment],
status=STATUS_CONCLUSIVE,
segmentation_event=event
)
self.assertEquals(len(SegmentationFinding.objects()), 1)
self.assertEquals(len(SegmentationFinding.objects()[0].events), 1)
SegmentationFinding.create_or_add_to_existing_finding(
# !!! REVERSE ORDER
subnets=[second_segment, first_segment],
status=STATUS_CONCLUSIVE,
segmentation_event=event
)
self.assertEquals(len(SegmentationFinding.objects()), 1)
self.assertEquals(len(SegmentationFinding.objects()[0].events), 2)
SegmentationFinding.create_or_add_to_existing_finding(
# !!! REVERSE ORDER
subnets=[first_segment, third_segment],
status=STATUS_CONCLUSIVE,
segmentation_event=event
)
self.assertEquals(len(SegmentationFinding.objects()), 2)
SegmentationFinding.create_or_add_to_existing_finding(
# !!! REVERSE ORDER
subnets=[second_segment, third_segment],
status=STATUS_CONCLUSIVE,
segmentation_event=event
)
self.assertEquals(len(SegmentationFinding.objects()), 3)