Added UT's to monkey_zt_finding_service.py and scoutsuite_zt_finding_service.py

This commit is contained in:
VakarisZ 2021-01-20 10:55:15 +02:00
parent 1b35b8fb4a
commit d31e9064c8
4 changed files with 174 additions and 74 deletions

View File

@ -1,63 +0,0 @@
import unittest
import mongomock
from packaging import version
import common.common_consts.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.services.zero_trust.monkey_findings.monkey_zt_finding_service import MonkeyZTFindingService
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
class TestAggregateFinding(IslandTestCase):
@unittest.skipIf(version.parse(mongomock.__version__) <= version.parse("3.19.0"),
"mongomock version doesn't support this test")
def test_create_or_add_to_existing(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
test = zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE
status = zero_trust_consts.STATUS_VERIFY
events = [Event.create_event("t", "t", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)]
self.assertEqual(len(Finding.objects(test=test, status=status)), 0)
MonkeyZTFindingService.create_or_add_to_existing(test, status, events)
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 1)
MonkeyZTFindingService.create_or_add_to_existing(test, status, events)
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 2)
@unittest.skipIf(version.parse(mongomock.__version__) <= version.parse("3.19.0"),
"mongomock version doesn't support this test")
def test_create_or_add_to_existing_2_tests_already_exist(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
test = zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE
status = zero_trust_consts.STATUS_VERIFY
event = Event.create_event("t", "t", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
events = [event]
self.assertEqual(len(Finding.objects(test=test, status=status)), 0)
Finding.save_finding(test, status, events)
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 1)
MonkeyZTFindingService.create_or_add_to_existing(test, status, events)
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 2)
Finding.save_finding(test, status, events)
self.assertEqual(len(Finding.objects(test=test, status=status)), 2)
with self.assertRaises(AssertionError):
MonkeyZTFindingService.create_or_add_to_existing(test, status, events)

View File

@ -4,33 +4,39 @@ from mongoengine import ValidationError
import common.common_consts.zero_trust_consts as zero_trust_consts import common.common_consts.zero_trust_consts as zero_trust_consts
from monkey_island.cc.models.zero_trust.event import Event from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
from monkey_island.cc.models.zero_trust.scoutsuite_finding_details import ScoutSuiteFindingDetails
from monkey_island.cc.testing.IslandTestCase import IslandTestCase from monkey_island.cc.testing.IslandTestCase import IslandTestCase
class TestFinding(IslandTestCase): MONKEY_FINDING_DETAIL_MOCK = MonkeyFindingDetails()
""" MONKEY_FINDING_DETAIL_MOCK.events = ['mock1', 'mock2']
Make sure to set server environment to `testing` in server.json! Otherwise this will mess up your mongo instance and SCOUTSUITE_FINDING_DETAIL_MOCK = ScoutSuiteFindingDetails()
won't work. SCOUTSUITE_FINDING_DETAIL_MOCK.scoutsuite_rules = []
Also, the working directory needs to be the working directory from which you usually run the island so the
server.json file is found and loaded. class TestFinding(IslandTestCase):
"""
def test_save_finding_validation(self): def test_save_finding_validation(self):
with self.assertRaises(ValidationError): with self.assertRaises(ValidationError):
_ = Finding.save_finding(test="bla bla", status=zero_trust_consts.STATUS_FAILED, events=[]) _ = Finding.save_finding(test="bla bla",
status=zero_trust_consts.STATUS_FAILED,
detail_ref=MONKEY_FINDING_DETAIL_MOCK)
with self.assertRaises(ValidationError): with self.assertRaises(ValidationError):
_ = Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION, status="bla bla", events=[]) _ = Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION,
status="bla bla",
detail_ref=SCOUTSUITE_FINDING_DETAIL_MOCK)
@pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed")
def test_save_finding_sanity(self): def test_save_finding_sanity(self):
self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 0) self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 0)
event_example = Event.create_event( event_example = Event.create_event(
title="Event Title", message="event message", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK) title="Event Title", message="event message", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
monkey_details_example = MonkeyFindingDetails()
monkey_details_example.events.append(event_example)
Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION, Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION,
status=zero_trust_consts.STATUS_FAILED, events=[event_example]) status=zero_trust_consts.STATUS_FAILED, detail_ref=monkey_details_example)
self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 1) self.assertEqual(len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)), 1)
self.assertEqual(len(Finding.objects(status=zero_trust_consts.STATUS_FAILED)), 1) self.assertEqual(len(Finding.objects(status=zero_trust_consts.STATUS_FAILED)), 1)

View File

@ -0,0 +1,52 @@
from datetime import datetime
from typing import List
from bson import ObjectId
from common.common_consts import zero_trust_consts
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
from monkey_island.cc.services.zero_trust.monkey_findings.monkey_zt_finding_service import MonkeyZTFindingService
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
EVENTS = [
Event.create_event(
title='Process list',
message='Monkey on gc-pc-244 scanned the process list',
event_type='monkey_local',
timestamp=datetime.strptime('2021-01-19 12:07:17.802138', '%Y-%m-%d %H:%M:%S.%f')
),
Event.create_event(
title='Communicate as new user',
message='Monkey on gc-pc-244 couldn\'t communicate as new user. '
'Details: System error 5 has occurred. Access is denied.',
event_type='monkey_network',
timestamp=datetime.strptime('2021-01-19 12:22:42.246020', '%Y-%m-%d %H:%M:%S.%f')
)
]
TESTS = [
zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS,
zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER
]
STATUS = [
zero_trust_consts.STATUS_PASSED,
zero_trust_consts.STATUS_FAILED,
zero_trust_consts.STATUS_VERIFY
]
class TestMonkeyZTFindingService(IslandTestCase):
def test_create_or_add_to_existing(self):
# Create new finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=EVENTS[0])
# Add events to an existing finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=EVENTS[1])
# Create new finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[1], status=STATUS[1], events=EVENTS[1])

View File

@ -0,0 +1,105 @@
from common.common_consts import zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.models.zero_trust.scoutsuite_rule import ScoutSuiteRule
from monkey_island.cc.services.zero_trust.scoutsuite.consts.findings import PermissiveFirewallRules, \
UnencryptedData
from monkey_island.cc.services.zero_trust.scoutsuite.scoutsuite_zt_finding_service import ScoutSuiteZTFindingService
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
RULES = [
ScoutSuiteRule(
checked_items=179,
compliance=None,
dashboard_name='Rules',
description='Security Group Opens All Ports to All',
flagged_items=2,
items=[
'ec2.regions.eu-central-1.vpcs.vpc-0ee259b1a13c50229.security_groups.sg-035779fe5c293fc72'
'.rules.ingress.protocols.ALL.ports.1-65535.cidrs.2.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-00015526b6695f9aa.security_groups.sg-019eb67135ec81e65'
'.rules.ingress.protocols.ALL.ports.1-65535.cidrs.0.CIDR'
],
level='danger',
path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR',
rationale='It was detected that all ports in the security group are open, and any source IP address'
' could send traffic to these ports, which creates a wider attack surface for resources '
'assigned to it. Open ports should be reduced to the minimum needed to correctly',
references=[],
remediation=None,
service='EC2'
),
ScoutSuiteRule(
checked_items=179,
compliance=[{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.2'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.2'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.2'}],
dashboard_name='Rules',
description='Security Group Opens RDP Port to All',
flagged_items=7,
items=[
'ec2.regions.eu-central-1.vpcs.vpc-076500a2138ee09da.security_groups.sg-00bdef5951797199c'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-007931ba8a364e330'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-05014daf996b042dd'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0c745fe56c66335b2'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0f99b85cfad63d1b1'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.us-east-1.vpcs.vpc-9e56cae4.security_groups.sg-0dc253aa79062835a'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.us-east-1.vpcs.vpc-002d543353cd4e97d.security_groups.sg-01902f153d4f938da'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR'],
level='danger',
path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR',
rationale='The security group was found to be exposing a well-known port to all source addresses.'
' Well-known ports are commonly probed by automated scanning tools, and could be an indicator '
'of sensitive services exposed to Internet. If such services need to be expos',
references=[],
remediation='Remove the inbound rules that expose open ports',
service='EC2'
)
]
FINDINGS = [
PermissiveFirewallRules,
UnencryptedData
]
class TestScoutSuiteZTFindingService(IslandTestCase):
def test_process_rule(self):
# Creates new PermissiveFirewallRules finding with a rule
ScoutSuiteZTFindingService.process_rule(FINDINGS[0], RULES[0])
findings = list(Finding.objects())
self.assertEqual(len(findings), 1)
self.assertEqual(findings[0].finding_type, zero_trust_consts.SCOUTSUITE_FINDING)
# Assert that details were created properly
details = findings[0].details.fetch()
self.assertEqual(len(details.scoutsuite_rules), 1)
self.assertEqual(details.scoutsuite_rules[0], RULES[0])
# Rule processing should add rule to an already existing finding
ScoutSuiteZTFindingService.process_rule(FINDINGS[0], RULES[1])
findings = list(Finding.objects())
self.assertEqual(len(findings), 1)
self.assertEqual(findings[0].finding_type, zero_trust_consts.SCOUTSUITE_FINDING)
# Assert that details were created properly
details = findings[0].details.fetch()
self.assertEqual(len(details.scoutsuite_rules), 2)
self.assertEqual(details.scoutsuite_rules[1], RULES[1])
# New finding created
ScoutSuiteZTFindingService.process_rule(FINDINGS[1], RULES[1])
findings = list(Finding.objects())
self.assertEqual(len(findings), 2)
self.assertEqual(findings[1].finding_type, zero_trust_consts.SCOUTSUITE_FINDING)
# Assert that details were created properly
details = findings[1].details.fetch()
self.assertEqual(len(details.scoutsuite_rules), 1)
self.assertEqual(details.scoutsuite_rules[0], RULES[1])