Changed import to `import common.data.zero_trust_consts as zero_trust_consts`

Looks better
This commit is contained in:
Shay Nehmad 2019-11-04 11:27:34 +02:00
parent 530e1a3b65
commit c778ae7aa1
17 changed files with 253 additions and 238 deletions

View File

@ -1,4 +1,4 @@
from common.data.zero_trust_consts import TEST_MALICIOUS_ACTIVITY_TIMELINE, STATUS_VERIFY
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
@ -26,7 +26,7 @@ class AggregateFinding(Finding):
def add_malicious_activity_to_timeline(events):
AggregateFinding.create_or_add_to_existing(
test=TEST_MALICIOUS_ACTIVITY_TIMELINE,
status=STATUS_VERIFY,
test=zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE,
status=zero_trust_consts.STATUS_VERIFY,
events=events
)

View File

@ -2,7 +2,7 @@ from datetime import datetime
from mongoengine import EmbeddedDocument, DateTimeField, StringField
from common.data.zero_trust_consts import EVENT_TYPES
import common.data.zero_trust_consts as zero_trust_consts
class Event(EmbeddedDocument):
@ -19,7 +19,7 @@ class Event(EmbeddedDocument):
timestamp = DateTimeField(required=True)
title = StringField(required=True)
message = StringField()
event_type = StringField(required=True, choices=EVENT_TYPES)
event_type = StringField(required=True, choices=zero_trust_consts.EVENT_TYPES)
# LOGIC
@staticmethod

View File

@ -5,7 +5,7 @@ Define a Document Schema for Zero Trust findings.
from mongoengine import Document, StringField, EmbeddedDocumentListField
from common.data.zero_trust_consts import ORDERED_TEST_STATUSES, TESTS, TESTS_MAP, TEST_EXPLANATION_KEY, PILLARS_KEY
import common.data.zero_trust_consts as zero_trust_consts
# Dummy import for mongoengine.
# noinspection PyUnresolvedReferences
from monkey_island.cc.models.zero_trust.event import Event
@ -30,18 +30,18 @@ class Finding(Document):
times, or complex action we will perform - somewhat like an API.
"""
# SCHEMA
test = StringField(required=True, choices=TESTS)
status = StringField(required=True, choices=ORDERED_TEST_STATUSES)
test = StringField(required=True, choices=zero_trust_consts.TESTS)
status = StringField(required=True, choices=zero_trust_consts.ORDERED_TEST_STATUSES)
events = EmbeddedDocumentListField(document_type=Event)
# http://docs.mongoengine.org/guide/defining-documents.html#document-inheritance
meta = {'allow_inheritance': True}
# LOGIC
def get_test_explanation(self):
return TESTS_MAP[self.test][TEST_EXPLANATION_KEY]
return zero_trust_consts.TESTS_MAP[self.test][zero_trust_consts.TEST_EXPLANATION_KEY]
def get_pillars(self):
return TESTS_MAP[self.test][PILLARS_KEY]
return zero_trust_consts.TESTS_MAP[self.test][zero_trust_consts.PILLARS_KEY]
# Creation methods
@staticmethod

View File

@ -1,11 +1,11 @@
from mongoengine import StringField
from common.data.zero_trust_consts import TEST_SEGMENTATION, STATUS_FAILED, STATUS_PASSED
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
def need_to_overwrite_status(saved_status, new_status):
return (saved_status == STATUS_PASSED) and (new_status == STATUS_FAILED)
return (saved_status == zero_trust_consts.STATUS_PASSED) and (new_status == zero_trust_consts.STATUS_FAILED)
class SegmentationFinding(Finding):
@ -35,7 +35,7 @@ class SegmentationFinding(Finding):
new_finding = SegmentationFinding(
first_subnet=subnets[0],
second_subnet=subnets[1],
test=TEST_SEGMENTATION,
test=zero_trust_consts.TEST_SEGMENTATION,
status=status,
events=[segmentation_event]
)

View File

@ -1,4 +1,4 @@
from common.data.zero_trust_consts import TEST_MALICIOUS_ACTIVITY_TIMELINE, STATUS_VERIFY, EVENT_TYPE_MONKEY_NETWORK
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.aggregate_finding import AggregateFinding
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding
@ -10,9 +10,9 @@ class TestAggregateFinding(IslandTestCase):
self.fail_if_not_testing_env()
self.clean_finding_db()
test = TEST_MALICIOUS_ACTIVITY_TIMELINE
status = STATUS_VERIFY
events = [Event.create_event("t", "t", EVENT_TYPE_MONKEY_NETWORK)]
test = zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE
status = zero_trust_consts.STATUS_VERIFY
events = [Event.create_event("t", "t", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)]
self.assertEqual(len(Finding.objects(test=test, status=status)), 0)
AggregateFinding.create_or_add_to_existing(test, status, events)
@ -29,9 +29,9 @@ class TestAggregateFinding(IslandTestCase):
self.fail_if_not_testing_env()
self.clean_finding_db()
test = TEST_MALICIOUS_ACTIVITY_TIMELINE
status = STATUS_VERIFY
event = Event.create_event("t", "t", EVENT_TYPE_MONKEY_NETWORK)
test = zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE
status = zero_trust_consts.STATUS_VERIFY
event = Event.create_event("t", "t", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
events = [event]
self.assertEqual(len(Finding.objects(test=test, status=status)), 0)

View File

@ -1,6 +1,6 @@
from mongoengine import ValidationError
from common.data.zero_trust_consts import EVENT_TYPE_MONKEY_NETWORK
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
@ -14,7 +14,7 @@ class TestEvent(IslandTestCase):
_ = Event.create_event(
title=None, # title required
message="bla bla",
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
)
with self.assertRaises(ValidationError):
@ -28,5 +28,5 @@ class TestEvent(IslandTestCase):
_ = Event.create_event(
title="skjs",
message="bla bla",
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
)

View File

@ -1,6 +1,6 @@
from mongoengine import ValidationError
from common.data.zero_trust_consts import STATUS_FAILED, TEST_SEGMENTATION, EVENT_TYPE_MONKEY_NETWORK
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
@ -20,20 +20,20 @@ class TestFinding(IslandTestCase):
self.clean_finding_db()
with self.assertRaises(ValidationError):
_ = Finding.save_finding(test="bla bla", status=STATUS_FAILED, events=[])
_ = Finding.save_finding(test="bla bla", status=zero_trust_consts.STATUS_FAILED, events=[])
with self.assertRaises(ValidationError):
_ = Finding.save_finding(test=TEST_SEGMENTATION, status="bla bla", events=[])
_ = Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION, status="bla bla", events=[])
def test_save_finding_sanity(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION)), 0)
self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 0)
event_example = Event.create_event(
title="Event Title", message="event message", event_type=EVENT_TYPE_MONKEY_NETWORK)
Finding.save_finding(test=TEST_SEGMENTATION, status=STATUS_FAILED, events=[event_example])
title="Event Title", message="event message", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION, status=zero_trust_consts.STATUS_FAILED, events=[event_example])
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION)), 1)
self.assertEqual(len(Finding.objects(status=STATUS_FAILED)), 1)
self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 1)
self.assertEqual(len(Finding.objects(status=zero_trust_consts.STATUS_FAILED)), 1)

View File

@ -1,4 +1,4 @@
from common.data.zero_trust_consts import STATUS_FAILED, EVENT_TYPE_MONKEY_NETWORK
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
from monkey_island.cc.models.zero_trust.segmentation_finding import SegmentationFinding
@ -12,11 +12,11 @@ class TestSegmentationFinding(IslandTestCase):
first_segment = "1.1.1.0/24"
second_segment = "2.2.2.0-2.2.2.254"
third_segment = "3.3.3.3"
event = Event.create_event("bla", "bla", EVENT_TYPE_MONKEY_NETWORK)
event = Event.create_event("bla", "bla", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
SegmentationFinding.create_or_add_to_existing_finding(
subnets=[first_segment, second_segment],
status=STATUS_FAILED,
status=zero_trust_consts.STATUS_FAILED,
segmentation_event=event
)
@ -26,7 +26,7 @@ class TestSegmentationFinding(IslandTestCase):
SegmentationFinding.create_or_add_to_existing_finding(
# !!! REVERSE ORDER
subnets=[second_segment, first_segment],
status=STATUS_FAILED,
status=zero_trust_consts.STATUS_FAILED,
segmentation_event=event
)
@ -36,7 +36,7 @@ class TestSegmentationFinding(IslandTestCase):
SegmentationFinding.create_or_add_to_existing_finding(
# !!! REVERSE ORDER
subnets=[first_segment, third_segment],
status=STATUS_FAILED,
status=zero_trust_consts.STATUS_FAILED,
segmentation_event=event
)
@ -45,7 +45,7 @@ class TestSegmentationFinding(IslandTestCase):
SegmentationFinding.create_or_add_to_existing_finding(
# !!! REVERSE ORDER
subnets=[second_segment, third_segment],
status=STATUS_FAILED,
status=zero_trust_consts.STATUS_FAILED,
segmentation_event=event
)

View File

@ -1,185 +1,197 @@
from common.data.zero_trust_consts import AUTOMATION_ORCHESTRATION, DATA, PRINCIPLES, PRINCIPLE_DATA_TRANSIT, STATUS_FAILED, \
TESTS_MAP, TEST_DATA_ENDPOINT_HTTP, TEST_EXPLANATION_KEY, STATUS_UNEXECUTED, TEST_DATA_ENDPOINT_ELASTIC, DEVICES, \
PRINCIPLE_ENDPOINT_SECURITY, TEST_MACHINE_EXPLOITED, TEST_ENDPOINT_SECURITY_EXISTS, NETWORKS, PRINCIPLE_SEGMENTATION, \
TEST_SEGMENTATION, PRINCIPLE_USER_BEHAVIOUR, STATUS_VERIFY, TEST_SCHEDULED_EXECUTION, PRINCIPLE_USERS_MAC_POLICIES, \
TEST_COMMUNICATE_AS_NEW_USER, PRINCIPLE_ANALYZE_NETWORK_TRAFFIC, TEST_MALICIOUS_ACTIVITY_TIMELINE, \
PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES, TEST_TUNNELING, PEOPLE, VISIBILITY_ANALYTICS, WORKLOADS, STATUS_PASSED
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.services.reporting.zero_trust_service import ZeroTrustService
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
EXPECTED_DICT = {
AUTOMATION_ORCHESTRATION: [],
DATA: [
zero_trust_consts.AUTOMATION_ORCHESTRATION: [],
zero_trust_consts.DATA: [
{
"principle": PRINCIPLES[PRINCIPLE_DATA_TRANSIT],
"status": STATUS_FAILED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_DATA_TRANSIT],
"status": zero_trust_consts.STATUS_FAILED,
"tests": [
{
"status": STATUS_FAILED,
"test": TESTS_MAP[TEST_DATA_ENDPOINT_HTTP][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_FAILED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_DATA_ENDPOINT_HTTP][zero_trust_consts.TEST_EXPLANATION_KEY]
},
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_DATA_ENDPOINT_ELASTIC][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_DATA_ENDPOINT_ELASTIC][zero_trust_consts.TEST_EXPLANATION_KEY]
},
]
}
],
DEVICES: [
zero_trust_consts.DEVICES: [
{
"principle": PRINCIPLES[PRINCIPLE_ENDPOINT_SECURITY],
"status": STATUS_FAILED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ENDPOINT_SECURITY],
"status": zero_trust_consts.STATUS_FAILED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_MACHINE_EXPLOITED][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_MACHINE_EXPLOITED][zero_trust_consts.TEST_EXPLANATION_KEY]
},
{
"status": STATUS_FAILED,
"test": TESTS_MAP[TEST_ENDPOINT_SECURITY_EXISTS][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_FAILED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS][zero_trust_consts.TEST_EXPLANATION_KEY]
},
]
}
],
NETWORKS: [
zero_trust_consts.NETWORKS: [
{
"principle": PRINCIPLES[PRINCIPLE_SEGMENTATION],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_SEGMENTATION],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_SEGMENTATION][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SEGMENTATION][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_USER_BEHAVIOUR],
"status": STATUS_VERIFY,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USER_BEHAVIOUR],
"status": zero_trust_consts.STATUS_VERIFY,
"tests": [
{
"status": STATUS_VERIFY,
"test": TESTS_MAP[TEST_SCHEDULED_EXECUTION][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_VERIFY,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SCHEDULED_EXECUTION][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_USERS_MAC_POLICIES],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_COMMUNICATE_AS_NEW_USER][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_ANALYZE_NETWORK_TRAFFIC],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ANALYZE_NETWORK_TRAFFIC],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_MALICIOUS_ACTIVITY_TIMELINE][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_TUNNELING][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_TUNNELING][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
],
PEOPLE: [
zero_trust_consts.PEOPLE: [
{
"principle": PRINCIPLES[PRINCIPLE_USER_BEHAVIOUR],
"status": STATUS_VERIFY,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USER_BEHAVIOUR],
"status": zero_trust_consts.STATUS_VERIFY,
"tests": [
{
"status": STATUS_VERIFY,
"test": TESTS_MAP[TEST_SCHEDULED_EXECUTION][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_VERIFY,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SCHEDULED_EXECUTION][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_USERS_MAC_POLICIES],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_COMMUNICATE_AS_NEW_USER][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
}
],
VISIBILITY_ANALYTICS: [
zero_trust_consts.VISIBILITY_ANALYTICS: [
{
"principle": PRINCIPLES[PRINCIPLE_USERS_MAC_POLICIES],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_COMMUNICATE_AS_NEW_USER][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_ANALYZE_NETWORK_TRAFFIC],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ANALYZE_NETWORK_TRAFFIC],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_MALICIOUS_ACTIVITY_TIMELINE][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": PRINCIPLES[PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES],
"status": STATUS_UNEXECUTED,
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TESTS_MAP[TEST_TUNNELING][TEST_EXPLANATION_KEY]
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_TUNNELING][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
],
WORKLOADS: []
zero_trust_consts.WORKLOADS: []
}
def save_example_findings():
# arrange
Finding.save_finding(TEST_ENDPOINT_SECURITY_EXISTS, STATUS_PASSED, []) # devices passed = 1
Finding.save_finding(TEST_ENDPOINT_SECURITY_EXISTS, STATUS_PASSED, []) # devices passed = 2
Finding.save_finding(TEST_ENDPOINT_SECURITY_EXISTS, STATUS_FAILED, []) # devices failed = 1
Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_PASSED,
[]) # devices passed = 1
Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_PASSED,
[]) # devices passed = 2
Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_FAILED,
[]) # devices failed = 1
# devices unexecuted = 1
# people verify = 1
# networks verify = 1
Finding.save_finding(TEST_SCHEDULED_EXECUTION, STATUS_VERIFY, [])
Finding.save_finding(zero_trust_consts.TEST_SCHEDULED_EXECUTION, zero_trust_consts.STATUS_VERIFY, [])
# people verify = 2
# networks verify = 2
Finding.save_finding(TEST_SCHEDULED_EXECUTION, STATUS_VERIFY, [])
Finding.save_finding(zero_trust_consts.TEST_SCHEDULED_EXECUTION, zero_trust_consts.STATUS_VERIFY, [])
# data failed 1
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_FAILED, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 2
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_FAILED, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 3
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_FAILED, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 4
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_FAILED, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 5
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_FAILED, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data verify 1
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_VERIFY, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_VERIFY, [])
# data verify 2
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_VERIFY, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_VERIFY, [])
# data passed 1
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_PASSED, [])
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_PASSED, [])
class TestZeroTrustService(IslandTestCase):
@ -191,52 +203,52 @@ class TestZeroTrustService(IslandTestCase):
expected = [
{
STATUS_FAILED: 5,
STATUS_VERIFY: 2,
STATUS_PASSED: 1,
STATUS_UNEXECUTED: 1,
zero_trust_consts.STATUS_FAILED: 5,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 1,
zero_trust_consts.STATUS_UNEXECUTED: 1,
"pillar": "Data"
},
{
STATUS_FAILED: 0,
STATUS_VERIFY: 2,
STATUS_PASSED: 0,
STATUS_UNEXECUTED: 1,
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 1,
"pillar": "People"
},
{
STATUS_FAILED: 0,
STATUS_VERIFY: 2,
STATUS_PASSED: 0,
STATUS_UNEXECUTED: 4,
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 4,
"pillar": "Networks"
},
{
STATUS_FAILED: 1,
STATUS_VERIFY: 0,
STATUS_PASSED: 2,
STATUS_UNEXECUTED: 1,
zero_trust_consts.STATUS_FAILED: 1,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 2,
zero_trust_consts.STATUS_UNEXECUTED: 1,
"pillar": "Devices"
},
{
STATUS_FAILED: 0,
STATUS_VERIFY: 0,
STATUS_PASSED: 0,
STATUS_UNEXECUTED: 0,
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 0,
"pillar": "Workloads"
},
{
STATUS_FAILED: 0,
STATUS_VERIFY: 0,
STATUS_PASSED: 0,
STATUS_UNEXECUTED: 3,
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 3,
"pillar": "Visibility & Analytics"
},
{
STATUS_FAILED: 0,
STATUS_VERIFY: 0,
STATUS_PASSED: 0,
STATUS_UNEXECUTED: 0,
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 0,
"pillar": "Automation & Orchestration"
}
]
@ -279,13 +291,13 @@ class TestZeroTrustService(IslandTestCase):
self.maxDiff = None
expected = {
AUTOMATION_ORCHESTRATION: STATUS_UNEXECUTED,
DEVICES: STATUS_UNEXECUTED,
NETWORKS: STATUS_UNEXECUTED,
PEOPLE: STATUS_UNEXECUTED,
VISIBILITY_ANALYTICS: STATUS_UNEXECUTED,
WORKLOADS: STATUS_UNEXECUTED,
DATA: STATUS_UNEXECUTED
zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DEVICES: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DATA: zero_trust_consts.STATUS_UNEXECUTED
}
self.assertEqual(ZeroTrustService.get_pillars_to_statuses(), expected)
@ -293,13 +305,13 @@ class TestZeroTrustService(IslandTestCase):
save_example_findings()
expected = {
AUTOMATION_ORCHESTRATION: STATUS_UNEXECUTED,
DEVICES: STATUS_FAILED,
NETWORKS: STATUS_VERIFY,
PEOPLE: STATUS_VERIFY,
VISIBILITY_ANALYTICS: STATUS_UNEXECUTED,
WORKLOADS: STATUS_UNEXECUTED,
DATA: STATUS_FAILED
zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DEVICES: zero_trust_consts.STATUS_FAILED,
zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_VERIFY,
zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_VERIFY,
zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DATA: zero_trust_consts.STATUS_FAILED
}
self.assertEqual(ZeroTrustService.get_pillars_to_statuses(), expected)

View File

@ -1,8 +1,7 @@
import json
from common.data.zero_trust_consts import PILLARS, STATUS_FAILED, STATUS_VERIFY, STATUS_PASSED, STATUS_UNEXECUTED, \
PILLARS_TO_TESTS, TESTS_MAP, PILLARS_KEY, PRINCIPLES_TO_TESTS, PRINCIPLES_TO_PILLARS, PRINCIPLES, ORDERED_TEST_STATUSES, \
TEST_EXPLANATION_KEY, FINDING_EXPLANATION_BY_STATUS_KEY
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
@ -10,7 +9,7 @@ class ZeroTrustService(object):
@staticmethod
def get_pillars_grades():
pillars_grades = []
for pillar in PILLARS:
for pillar in zero_trust_consts.PILLARS:
pillars_grades.append(ZeroTrustService.__get_pillar_grade(pillar))
return pillars_grades
@ -19,13 +18,13 @@ class ZeroTrustService(object):
all_findings = Finding.objects()
pillar_grade = {
"pillar": pillar,
STATUS_FAILED: 0,
STATUS_VERIFY: 0,
STATUS_PASSED: 0,
STATUS_UNEXECUTED: 0
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 0
}
tests_of_this_pillar = PILLARS_TO_TESTS[pillar]
tests_of_this_pillar = zero_trust_consts.PILLARS_TO_TESTS[pillar]
test_unexecuted = {}
for test in tests_of_this_pillar:
@ -33,11 +32,11 @@ class ZeroTrustService(object):
for finding in all_findings:
test_unexecuted[finding.test] = False
test_info = TESTS_MAP[finding.test]
if pillar in test_info[PILLARS_KEY]:
test_info = zero_trust_consts.TESTS_MAP[finding.test]
if pillar in test_info[zero_trust_consts.PILLARS_KEY]:
pillar_grade[finding.status] += 1
pillar_grade[STATUS_UNEXECUTED] = sum(1 for condition in list(test_unexecuted.values()) if condition)
pillar_grade[zero_trust_consts.STATUS_UNEXECUTED] = sum(1 for condition in list(test_unexecuted.values()) if condition)
return pillar_grade
@ -46,14 +45,14 @@ class ZeroTrustService(object):
all_principles_statuses = {}
# init with empty lists
for pillar in PILLARS:
for pillar in zero_trust_consts.PILLARS:
all_principles_statuses[pillar] = []
for principle, principle_tests in list(PRINCIPLES_TO_TESTS.items()):
for pillar in PRINCIPLES_TO_PILLARS[principle]:
for principle, principle_tests in list(zero_trust_consts.PRINCIPLES_TO_TESTS.items()):
for pillar in zero_trust_consts.PRINCIPLES_TO_PILLARS[principle]:
all_principles_statuses[pillar].append(
{
"principle": PRINCIPLES[principle],
"principle": zero_trust_consts.PRINCIPLES[principle],
"tests": ZeroTrustService.__get_tests_status(principle_tests),
"status": ZeroTrustService.__get_principle_status(principle_tests)
}
@ -63,13 +62,13 @@ class ZeroTrustService(object):
@staticmethod
def __get_principle_status(principle_tests):
worst_status = STATUS_UNEXECUTED
worst_status = zero_trust_consts.STATUS_UNEXECUTED
all_statuses = set()
for test in principle_tests:
all_statuses |= set(Finding.objects(test=test).distinct("status"))
for status in all_statuses:
if ORDERED_TEST_STATUSES.index(status) < ORDERED_TEST_STATUSES.index(worst_status):
if zero_trust_consts.ORDERED_TEST_STATUSES.index(status) < zero_trust_consts.ORDERED_TEST_STATUSES.index(worst_status):
worst_status = status
return worst_status
@ -81,7 +80,7 @@ class ZeroTrustService(object):
test_findings = Finding.objects(test=test)
results.append(
{
"test": TESTS_MAP[test][TEST_EXPLANATION_KEY],
"test": zero_trust_consts.TESTS_MAP[test][zero_trust_consts.TEST_EXPLANATION_KEY],
"status": ZeroTrustService.__get_lcd_worst_status_for_test(test_findings)
}
)
@ -94,9 +93,9 @@ class ZeroTrustService(object):
:return: the "worst" (i.e. most severe) status out of the given findings.
lcd stands for lowest common denominator.
"""
current_worst_status = STATUS_UNEXECUTED
current_worst_status = zero_trust_consts.STATUS_UNEXECUTED
for finding in all_findings_for_test:
if ORDERED_TEST_STATUSES.index(finding.status) < ORDERED_TEST_STATUSES.index(current_worst_status):
if zero_trust_consts.ORDERED_TEST_STATUSES.index(finding.status) < zero_trust_consts.ORDERED_TEST_STATUSES.index(current_worst_status):
current_worst_status = finding.status
return current_worst_status
@ -109,11 +108,11 @@ class ZeroTrustService(object):
@staticmethod
def __get_enriched_finding(finding):
test_info = TESTS_MAP[finding.test]
test_info = zero_trust_consts.TESTS_MAP[finding.test]
enriched_finding = {
"test": test_info[FINDING_EXPLANATION_BY_STATUS_KEY][finding.status],
"test": test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY][finding.status],
"test_key": finding.test,
"pillars": test_info[PILLARS_KEY],
"pillars": test_info[zero_trust_consts.PILLARS_KEY],
"status": finding.status,
"events": ZeroTrustService.__get_events_as_dict(finding.events)
}
@ -126,12 +125,12 @@ class ZeroTrustService(object):
@staticmethod
def get_statuses_to_pillars():
results = {
STATUS_FAILED: [],
STATUS_VERIFY: [],
STATUS_PASSED: [],
STATUS_UNEXECUTED: []
zero_trust_consts.STATUS_FAILED: [],
zero_trust_consts.STATUS_VERIFY: [],
zero_trust_consts.STATUS_PASSED: [],
zero_trust_consts.STATUS_UNEXECUTED: []
}
for pillar in PILLARS:
for pillar in zero_trust_consts.PILLARS:
results[ZeroTrustService.__get_status_of_single_pillar(pillar)].append(pillar)
return results
@ -139,7 +138,7 @@ class ZeroTrustService(object):
@staticmethod
def get_pillars_to_statuses():
results = {}
for pillar in PILLARS:
for pillar in zero_trust_consts.PILLARS:
results[pillar] = ZeroTrustService.__get_status_of_single_pillar(pillar)
return results
@ -147,7 +146,7 @@ class ZeroTrustService(object):
@staticmethod
def __get_status_of_single_pillar(pillar):
grade = ZeroTrustService.__get_pillar_grade(pillar)
for status in ORDERED_TEST_STATUSES:
for status in zero_trust_consts.ORDERED_TEST_STATUSES:
if grade[status] > 0:
return status
return STATUS_UNEXECUTED
return zero_trust_consts.STATUS_UNEXECUTED

View File

@ -1,7 +1,6 @@
import json
from common.data.zero_trust_consts import EVENT_TYPE_MONKEY_LOCAL, \
STATUS_PASSED, STATUS_FAILED, TEST_ENDPOINT_SECURITY_EXISTS
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models import Monkey
from monkey_island.cc.models.zero_trust.aggregate_finding import AggregateFinding
from monkey_island.cc.models.zero_trust.event import Event
@ -14,7 +13,7 @@ def test_antivirus_existence(telemetry_json):
process_list_event = Event.create_event(
title="Process list",
message="Monkey on {} scanned the process list".format(current_monkey.hostname),
event_type=EVENT_TYPE_MONKEY_LOCAL)
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_LOCAL)
events = [process_list_event]
av_processes = filter_av_processes(telemetry_json)
@ -24,15 +23,15 @@ def test_antivirus_existence(telemetry_json):
title="Found AV process",
message="The process '{}' was recognized as an Anti Virus process. Process "
"details: {}".format(process[1]['name'], json.dumps(process[1])),
event_type=EVENT_TYPE_MONKEY_LOCAL
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_LOCAL
))
if len(av_processes) > 0:
test_status = STATUS_PASSED
test_status = zero_trust_consts.STATUS_PASSED
else:
test_status = STATUS_FAILED
test_status = zero_trust_consts.STATUS_FAILED
AggregateFinding.create_or_add_to_existing(
test=TEST_ENDPOINT_SECURITY_EXISTS, status=test_status, events=events
test=zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, status=test_status, events=events
)

View File

@ -1,5 +1,4 @@
from common.data.zero_trust_consts import EVENT_TYPE_MONKEY_NETWORK, STATUS_FAILED, TEST_COMMUNICATE_AS_NEW_USER, \
STATUS_PASSED
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.aggregate_finding import AggregateFinding
from monkey_island.cc.models.zero_trust.event import Event
@ -10,9 +9,9 @@ COMM_AS_NEW_USER_SUCCEEDED_FORMAT = \
def test_new_user_communication(current_monkey, success, message):
AggregateFinding.create_or_add_to_existing(
test=TEST_COMMUNICATE_AS_NEW_USER,
test=zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER,
# If the monkey succeeded to create a user, then the test failed.
status=STATUS_FAILED if success else STATUS_PASSED,
status=zero_trust_consts.STATUS_FAILED if success else zero_trust_consts.STATUS_PASSED,
events=[
get_attempt_event(current_monkey),
get_result_event(current_monkey, message, success)
@ -24,7 +23,7 @@ def get_attempt_event(current_monkey):
tried_to_communicate_event = Event.create_event(
title="Communicate as new user",
message="Monkey on {} tried to create a new user and communicate from it.".format(current_monkey.hostname),
event_type=EVENT_TYPE_MONKEY_NETWORK)
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
return tried_to_communicate_event
@ -34,4 +33,4 @@ def get_result_event(current_monkey, message, success):
return Event.create_event(
title="Communicate as new user",
message=message_format.format(current_monkey.hostname, message),
event_type=EVENT_TYPE_MONKEY_NETWORK)
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)

View File

@ -1,8 +1,7 @@
import json
from common.data.network_consts import ES_SERVICE
from common.data.zero_trust_consts import STATUS_PASSED, EVENT_TYPE_MONKEY_NETWORK, STATUS_FAILED, TEST_DATA_ENDPOINT_HTTP, \
TEST_DATA_ENDPOINT_ELASTIC
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models import Monkey
from monkey_island.cc.models.zero_trust.aggregate_finding import AggregateFinding, add_malicious_activity_to_timeline
from monkey_island.cc.models.zero_trust.event import Event
@ -13,8 +12,8 @@ HTTP_SERVERS_SERVICES_NAMES = ['tcp-80']
def test_open_data_endpoints(telemetry_json):
services = telemetry_json["data"]["machine"]["services"]
current_monkey = Monkey.get_single_monkey_by_guid(telemetry_json['monkey_guid'])
found_http_server_status = STATUS_PASSED
found_elastic_search_server = STATUS_PASSED
found_http_server_status = zero_trust_consts.STATUS_PASSED
found_elastic_search_server = zero_trust_consts.STATUS_PASSED
events = [
Event.create_event(
@ -22,7 +21,7 @@ def test_open_data_endpoints(telemetry_json):
message="Monkey on {} tried to perform a network scan, the target was {}.".format(
current_monkey.hostname,
telemetry_json["data"]["machine"]["ip_addr"]),
event_type=EVENT_TYPE_MONKEY_NETWORK,
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK,
timestamp=telemetry_json["timestamp"]
)
]
@ -31,10 +30,10 @@ def test_open_data_endpoints(telemetry_json):
events.append(Event.create_event(
title="Scan telemetry analysis",
message="Scanned service: {}.".format(service_name),
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
))
if service_name in HTTP_SERVERS_SERVICES_NAMES:
found_http_server_status = STATUS_FAILED
found_http_server_status = zero_trust_consts.STATUS_FAILED
events.append(Event.create_event(
title="Scan telemetry analysis",
message="Service {} on {} recognized as an open data endpoint! Service details: {}".format(
@ -42,10 +41,10 @@ def test_open_data_endpoints(telemetry_json):
telemetry_json["data"]["machine"]["ip_addr"],
json.dumps(service_data)
),
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
))
if service_name == ES_SERVICE:
found_elastic_search_server = STATUS_FAILED
found_elastic_search_server = zero_trust_consts.STATUS_FAILED
events.append(Event.create_event(
title="Scan telemetry analysis",
message="Service {} on {} recognized as an open data endpoint! Service details: {}".format(
@ -53,17 +52,17 @@ def test_open_data_endpoints(telemetry_json):
telemetry_json["data"]["machine"]["ip_addr"],
json.dumps(service_data)
),
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
))
AggregateFinding.create_or_add_to_existing(
test=TEST_DATA_ENDPOINT_HTTP,
test=zero_trust_consts.TEST_DATA_ENDPOINT_HTTP,
status=found_http_server_status,
events=events
)
AggregateFinding.create_or_add_to_existing(
test=TEST_DATA_ENDPOINT_ELASTIC,
test=zero_trust_consts.TEST_DATA_ENDPOINT_ELASTIC,
status=found_elastic_search_server,
events=events
)

View File

@ -1,4 +1,4 @@
from common.data.zero_trust_consts import EVENT_TYPE_MONKEY_NETWORK, STATUS_PASSED, STATUS_FAILED, TEST_MACHINE_EXPLOITED
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.aggregate_finding import AggregateFinding, add_malicious_activity_to_timeline
from monkey_island.cc.models.zero_trust.event import Event
@ -11,11 +11,11 @@ def test_machine_exploited(current_monkey, exploit_successful, exploiter, target
current_monkey.hostname,
target_ip,
exploiter),
event_type=EVENT_TYPE_MONKEY_NETWORK,
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK,
timestamp=timestamp
)
]
status = STATUS_PASSED
status = zero_trust_consts.STATUS_PASSED
if exploit_successful:
events.append(
Event.create_event(
@ -24,13 +24,13 @@ def test_machine_exploited(current_monkey, exploit_successful, exploiter, target
current_monkey.hostname,
target_ip,
exploiter),
event_type=EVENT_TYPE_MONKEY_NETWORK,
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK,
timestamp=timestamp)
)
status = STATUS_FAILED
status = zero_trust_consts.STATUS_FAILED
AggregateFinding.create_or_add_to_existing(
test=TEST_MACHINE_EXPLOITED,
test=zero_trust_consts.TEST_MACHINE_EXPLOITED,
status=status,
events=events
)

View File

@ -1,6 +1,6 @@
import itertools
from common.data.zero_trust_consts import STATUS_FAILED, EVENT_TYPE_MONKEY_NETWORK, STATUS_PASSED
import common.data.zero_trust_consts as zero_trust_consts
from common.network.network_range import NetworkRange
from common.network.segmentation_utils import get_ip_in_src_and_not_in_dst, get_ip_if_in_subnet
from monkey_island.cc.models import Monkey
@ -28,7 +28,7 @@ def test_segmentation_violation(current_monkey, target_ip):
event = get_segmentation_violation_event(current_monkey, source_subnet, target_ip, target_subnet)
SegmentationFinding.create_or_add_to_existing_finding(
subnets=[source_subnet, target_subnet],
status=STATUS_FAILED,
status=zero_trust_consts.STATUS_FAILED,
segmentation_event=event
)
@ -66,7 +66,7 @@ def get_segmentation_violation_event(current_monkey, source_subnet, target_ip, t
target_ip=target_ip,
target_seg=target_subnet
),
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
)
@ -92,7 +92,7 @@ def create_or_add_findings_for_all_pairs(all_subnets, current_monkey):
for subnet_pair in all_subnets_pairs_for_this_monkey:
SegmentationFinding.create_or_add_to_existing_finding(
subnets=list(subnet_pair),
status=STATUS_PASSED,
status=zero_trust_consts.STATUS_PASSED,
segmentation_event=get_segmentation_done_event(current_monkey, subnet_pair)
)
@ -104,5 +104,5 @@ def get_segmentation_done_event(current_monkey, subnet_pair):
hostname=current_monkey.hostname,
src_seg=subnet_pair[0],
dst_seg=subnet_pair[1]),
event_type=EVENT_TYPE_MONKEY_NETWORK
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK
)

View File

@ -1,7 +1,6 @@
import uuid
from common.data.zero_trust_consts import TEST_SEGMENTATION, STATUS_PASSED, STATUS_FAILED, \
EVENT_TYPE_MONKEY_NETWORK
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models import Monkey
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding
@ -26,21 +25,29 @@ class TestSegmentationTests(IslandTestCase):
ip_addresses=[FIRST_SUBNET])
# no findings
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION)), 0)
self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 0)
# This is like the monkey is done and sent done telem
create_or_add_findings_for_all_pairs(all_subnets, monkey)
# There are 2 subnets in which the monkey is NOT
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION, status=STATUS_PASSED)), 2)
self.assertEqual(
len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION, status=zero_trust_consts.STATUS_PASSED)),
2)
# This is a monkey from 2nd subnet communicated with 1st subnet.
SegmentationFinding.create_or_add_to_existing_finding(
[FIRST_SUBNET, SECOND_SUBNET],
STATUS_FAILED,
Event.create_event(title="sdf", message="asd", event_type=EVENT_TYPE_MONKEY_NETWORK)
zero_trust_consts.STATUS_FAILED,
Event.create_event(title="sdf", message="asd", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
)
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION, status=STATUS_PASSED)), 1)
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION, status=STATUS_FAILED)), 1)
self.assertEqual(len(Finding.objects(test=TEST_SEGMENTATION)), 2)
self.assertEqual(
len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION, status=zero_trust_consts.STATUS_PASSED)),
1)
self.assertEqual(
len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION, status=zero_trust_consts.STATUS_FAILED)),
1)
self.assertEqual(
len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)),
2)

View File

@ -1,4 +1,4 @@
from common.data.zero_trust_consts import TEST_TUNNELING, STATUS_FAILED, EVENT_TYPE_MONKEY_NETWORK
import common.data.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models import Monkey
from monkey_island.cc.models.zero_trust.aggregate_finding import AggregateFinding, add_malicious_activity_to_timeline
from monkey_island.cc.models.zero_trust.event import Event
@ -14,13 +14,13 @@ def test_tunneling_violation(tunnel_telemetry_json):
title="Tunneling event",
message="Monkey on {hostname} tunneled traffic through {proxy}.".format(
hostname=current_monkey.hostname, proxy=tunnel_host_ip),
event_type=EVENT_TYPE_MONKEY_NETWORK,
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK,
timestamp=tunnel_telemetry_json['timestamp']
)]
AggregateFinding.create_or_add_to_existing(
test=TEST_TUNNELING,
status=STATUS_FAILED,
test=zero_trust_consts.TEST_TUNNELING,
status=zero_trust_consts.STATUS_FAILED,
events=tunneling_events
)