Added zero trust report backend tests and common test data used in these tests

This commit is contained in:
VakarisZ 2021-01-27 08:54:09 +02:00
parent 7f690bb880
commit 393eed42da
9 changed files with 447 additions and 9 deletions

View File

@ -8,10 +8,6 @@ from monkey_island.cc.models.zero_trust.event import Event
class MonkeyFindingDetails(Document): class MonkeyFindingDetails(Document):
"""
This model represents additional information about monkey finding:
Events
"""
# SCHEMA # SCHEMA
events = EmbeddedDocumentListField(document_type=Event, required=False) events = EmbeddedDocumentListField(document_type=Event, required=False)

View File

@ -4,11 +4,6 @@ from monkey_island.cc.models.zero_trust.scoutsuite_rule import ScoutSuiteRule
class ScoutSuiteFindingDetails(Document): class ScoutSuiteFindingDetails(Document):
"""
This model represents additional information about monkey finding:
Events if monkey finding
Scoutsuite findings if scoutsuite finding
"""
# SCHEMA # SCHEMA
scoutsuite_rules = EmbeddedDocumentListField(document_type=ScoutSuiteRule, required=False) scoutsuite_rules = EmbeddedDocumentListField(document_type=ScoutSuiteRule, required=False)

View File

@ -0,0 +1,23 @@
from common.common_consts.zero_trust_consts import TEST_SCOUTSUITE_SERVICE_SECURITY, STATUS_FAILED, SCOUTSUITE_FINDING, \
TEST_ENDPOINT_SECURITY_EXISTS, STATUS_PASSED, MONKEY_FINDING
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.services.zero_trust.test_common.monkey_finding_data import get_monkey_details_dto
from monkey_island.cc.services.zero_trust.test_common.scoutsuite_finding_data import get_scoutsuite_details_dto
def get_scoutsuite_finding_dto() -> Finding:
scoutsuite_details = get_scoutsuite_details_dto()
scoutsuite_details.save()
return Finding(test=TEST_SCOUTSUITE_SERVICE_SECURITY,
status=STATUS_FAILED,
finding_type=SCOUTSUITE_FINDING,
details=scoutsuite_details)
def get_monkey_finding_dto() -> Finding:
monkey_details = get_monkey_details_dto()
monkey_details.save()
return Finding(test=TEST_ENDPOINT_SECURITY_EXISTS,
status=STATUS_PASSED,
finding_type=MONKEY_FINDING,
details=monkey_details)

View File

@ -0,0 +1,33 @@
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
EVENTS = [
{
"timestamp": "2021-01-20T15:40:28.357Z",
"title": "Process list",
"message": "Monkey on gc-pc-244 scanned the process list",
"event_type": "monkey_local"
},
{
"timestamp": "2021-01-20T16:08:29.519Z",
"title": "Process list",
"message": "",
"event_type": "monkey_local"
},
]
EVENTS_DTO = [
Event(timestamp=event['timestamp'],
title=event['title'],
message=event['message'],
event_type=event['event_type']) for event in EVENTS
]
DETAILS_DTO = []
def get_monkey_details_dto() -> MonkeyFindingDetails:
monkey_details = MonkeyFindingDetails()
monkey_details.events.append(EVENTS_DTO[0])
monkey_details.events.append(EVENTS_DTO[1])
return monkey_details

View File

@ -0,0 +1,76 @@
from monkey_island.cc.models.zero_trust.scoutsuite_finding_details import ScoutSuiteFindingDetails
from monkey_island.cc.models.zero_trust.scoutsuite_rule import ScoutSuiteRule
from monkey_island.cc.services.zero_trust.scoutsuite.consts.findings import PermissiveFirewallRules, UnencryptedData
SCOUTSUITE_FINDINGS = [
PermissiveFirewallRules,
UnencryptedData
]
RULES = [
ScoutSuiteRule(
checked_items=179,
compliance=None,
dashboard_name='Rules',
description='Security Group Opens All Ports to All',
flagged_items=2,
items=[
'ec2.regions.eu-central-1.vpcs.vpc-0ee259b1a13c50229.security_groups.sg-035779fe5c293fc72'
'.rules.ingress.protocols.ALL.ports.1-65535.cidrs.2.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-00015526b6695f9aa.security_groups.sg-019eb67135ec81e65'
'.rules.ingress.protocols.ALL.ports.1-65535.cidrs.0.CIDR'
],
level='danger',
path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR',
rationale='It was detected that all ports in the security group are open, and any source IP address'
' could send traffic to these ports, which creates a wider attack surface for resources '
'assigned to it. Open ports should be reduced to the minimum needed to correctly',
references=[],
remediation=None,
service='EC2'
),
ScoutSuiteRule(
checked_items=179,
compliance=[{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.2'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.2'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.2'}],
dashboard_name='Rules',
description='Security Group Opens RDP Port to All',
flagged_items=7,
items=[
'ec2.regions.eu-central-1.vpcs.vpc-076500a2138ee09da.security_groups.sg-00bdef5951797199c'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-007931ba8a364e330'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-05014daf996b042dd'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0c745fe56c66335b2'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0f99b85cfad63d1b1'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.us-east-1.vpcs.vpc-9e56cae4.security_groups.sg-0dc253aa79062835a'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.us-east-1.vpcs.vpc-002d543353cd4e97d.security_groups.sg-01902f153d4f938da'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR'],
level='danger',
path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR',
rationale='The security group was found to be exposing a well-known port to all source addresses.'
' Well-known ports are commonly probed by automated scanning tools, and could be an indicator '
'of sensitive services exposed to Internet. If such services need to be expos',
references=[],
remediation='Remove the inbound rules that expose open ports',
service='EC2'
)
]
def get_scoutsuite_details_dto() -> ScoutSuiteFindingDetails:
scoutsuite_details = ScoutSuiteFindingDetails()
scoutsuite_details.scoutsuite_rules.append(RULES[0])
scoutsuite_details.scoutsuite_rules.append(RULES[1])
return scoutsuite_details

View File

@ -0,0 +1,57 @@
from common.common_consts import zero_trust_consts
from monkey_island.cc.services.zero_trust.test_common.finding_data import get_monkey_finding_dto, \
get_scoutsuite_finding_dto
def save_example_findings():
# devices passed = 1
_save_finding_with_status('scoutsuite', zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS,
zero_trust_consts.STATUS_PASSED)
# devices passed = 2
_save_finding_with_status('scoutsuite', zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS,
zero_trust_consts.STATUS_PASSED)
# devices failed = 1
_save_finding_with_status('monkey', zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS,
zero_trust_consts.STATUS_FAILED)
# people verify = 1
# networks verify = 1
_save_finding_with_status('scoutsuite', zero_trust_consts.TEST_SCHEDULED_EXECUTION,
zero_trust_consts.STATUS_VERIFY)
# people verify = 2
# networks verify = 2
_save_finding_with_status('monkey', zero_trust_consts.TEST_SCHEDULED_EXECUTION,
zero_trust_consts.STATUS_VERIFY)
# data failed 1
_save_finding_with_status('monkey', zero_trust_consts.TEST_DATA_ENDPOINT_HTTP,
zero_trust_consts.STATUS_FAILED)
# data failed 2
_save_finding_with_status('scoutsuite', zero_trust_consts.TEST_SCOUTSUITE_UNENCRYPTED_DATA,
zero_trust_consts.STATUS_FAILED)
# data failed 3
_save_finding_with_status('monkey', zero_trust_consts.TEST_DATA_ENDPOINT_HTTP,
zero_trust_consts.STATUS_FAILED)
# data failed 4
_save_finding_with_status('monkey', zero_trust_consts.TEST_DATA_ENDPOINT_HTTP,
zero_trust_consts.STATUS_FAILED)
# data failed 5
_save_finding_with_status('scoutsuite', zero_trust_consts.TEST_SCOUTSUITE_UNENCRYPTED_DATA,
zero_trust_consts.STATUS_FAILED)
# data verify 1
_save_finding_with_status('monkey', zero_trust_consts.TEST_DATA_ENDPOINT_HTTP,
zero_trust_consts.STATUS_VERIFY)
# data verify 2
_save_finding_with_status('monkey', zero_trust_consts.TEST_DATA_ENDPOINT_HTTP,
zero_trust_consts.STATUS_VERIFY)
# data passed 1
_save_finding_with_status('scoutsuite', zero_trust_consts.TEST_SCOUTSUITE_UNENCRYPTED_DATA,
zero_trust_consts.STATUS_PASSED)
def _save_finding_with_status(finding_type: str, test: str, status: str):
if finding_type == 'scoutsuite':
finding = get_scoutsuite_finding_dto()
else:
finding = get_monkey_finding_dto()
finding.test = test
finding.status = status
finding.save()

View File

@ -0,0 +1,51 @@
from unittest.mock import MagicMock
import pytest
from common.common_consts.zero_trust_consts import TESTS_MAP, TEST_SCOUTSUITE_SERVICE_SECURITY, STATUS_FAILED, \
SCOUTSUITE_FINDING, DEVICES, NETWORKS, MONKEY_FINDING, STATUS_PASSED, TEST_ENDPOINT_SECURITY_EXISTS
from monkey_island.cc.services.zero_trust.monkey_findings.monkey_zt_details_service import MonkeyZTDetailsService
from monkey_island.cc.services.zero_trust.test_common.finding_data import get_scoutsuite_finding_dto, get_monkey_finding_dto
from monkey_island.cc.services.zero_trust.zero_trust_report.finding_service import FindingService, EnrichedFinding
from monkey_island.cc.test_common.fixtures.fixture_enum import FixtureEnum
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_all_findings():
get_scoutsuite_finding_dto().save()
get_monkey_finding_dto().save()
# This method fails due to mongomock not being able to simulate $unset, so don't test details
MonkeyZTDetailsService.fetch_details_for_display = MagicMock(return_value=None)
findings = FindingService.get_all_findings()
description = TESTS_MAP[TEST_SCOUTSUITE_SERVICE_SECURITY]['finding_explanation'][STATUS_FAILED]
expected_finding0 = EnrichedFinding(finding_id=findings[0].finding_id,
finding_type=SCOUTSUITE_FINDING,
pillars=[DEVICES, NETWORKS],
status=STATUS_FAILED,
test=description,
test_key=TEST_SCOUTSUITE_SERVICE_SECURITY,
details=None)
description = TESTS_MAP[TEST_ENDPOINT_SECURITY_EXISTS]['finding_explanation'][STATUS_PASSED]
expected_finding1 = EnrichedFinding(finding_id=findings[1].finding_id,
finding_type=MONKEY_FINDING,
pillars=[DEVICES],
status=STATUS_PASSED,
test=description,
test_key=TEST_ENDPOINT_SECURITY_EXISTS,
details=None)
# Don't test details
details = []
for finding in findings:
details.append(finding.details)
finding.details = None
assert findings[0] == expected_finding0
assert findings[1] == expected_finding1

View File

@ -0,0 +1,113 @@
from typing import List
import pytest
from common.common_consts import zero_trust_consts
from common.common_consts.zero_trust_consts import DATA, PEOPLE, NETWORKS, WORKLOADS, VISIBILITY_ANALYTICS, \
AUTOMATION_ORCHESTRATION, DEVICES
from monkey_island.cc.services.zero_trust.zero_trust_report.pillar_service import PillarService
from monkey_island.cc.services.zero_trust.zero_trust_report.test_common.example_finding_data import \
save_example_findings
from monkey_island.cc.test_common.fixtures import FixtureEnum
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_pillars_grades():
save_example_findings()
expected_grades = _get_expected_pillar_grades()
computed_grades = PillarService.get_pillars_grades()
assert expected_grades == computed_grades
def _get_expected_pillar_grades() -> List[dict]:
return [
{
zero_trust_consts.STATUS_FAILED: 5,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 1,
# 2 different tests of DATA pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(DATA) - 2,
"pillar": "Data"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 0,
# 1 test of PEOPLE pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(PEOPLE) - 1,
"pillar": "People"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 0,
# 1 different tests of NETWORKS pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(NETWORKS) - 1,
"pillar": "Networks"
},
{
zero_trust_consts.STATUS_FAILED: 1,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 2,
# 1 different tests of DEVICES pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(DEVICES) - 1,
"pillar": "Devices"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
# 0 different tests of WORKLOADS pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(WORKLOADS),
"pillar": "Workloads"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
# 0 different tests of VISIBILITY_ANALYTICS pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(VISIBILITY_ANALYTICS),
"pillar": "Visibility & Analytics"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
# 0 different tests of AUTOMATION_ORCHESTRATION pillar were executed in _save_findings()
zero_trust_consts.STATUS_UNEXECUTED: _get_cnt_of_tests_in_pillar(AUTOMATION_ORCHESTRATION),
"pillar": "Automation & Orchestration"
}
]
def _get_cnt_of_tests_in_pillar(pillar: str):
tests_in_pillar = [value for (key, value) in zero_trust_consts.TESTS_MAP.items() if pillar in value['pillars']]
return len(tests_in_pillar)
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_pillars_to_statuses():
# Test empty database
expected = {
zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DEVICES: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DATA: zero_trust_consts.STATUS_UNEXECUTED
}
assert PillarService.get_pillars_to_statuses() == expected
# Test with example finding set
save_example_findings()
expected = {
zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DEVICES: zero_trust_consts.STATUS_FAILED,
zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_VERIFY,
zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_VERIFY,
zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DATA: zero_trust_consts.STATUS_FAILED
}
assert PillarService.get_pillars_to_statuses() == expected

View File

@ -0,0 +1,94 @@
import pytest
from common.common_consts import zero_trust_consts
from monkey_island.cc.services.zero_trust.test_common.finding_data import get_monkey_finding_dto, \
get_scoutsuite_finding_dto
from monkey_island.cc.services.zero_trust.zero_trust_report.principle_service import PrincipleService
from monkey_island.cc.test_common.fixtures import FixtureEnum
EXPECTED_DICT = {
'test_pillar1': [
{
"principle": 'Test principle description2',
"status": zero_trust_consts.STATUS_FAILED,
"tests": [
{
"status": zero_trust_consts.STATUS_PASSED,
"test": "You ran a test2"
},
{
"status": zero_trust_consts.STATUS_FAILED,
"test": "You ran a test3"
}
]
}
],
'test_pillar2': [
{
"principle": "Test principle description",
"status": zero_trust_consts.STATUS_PASSED,
"tests": [
{
"status": zero_trust_consts.STATUS_PASSED,
"test": "You ran a test1"
}
]
},
{
"principle": "Test principle description2",
"status": zero_trust_consts.STATUS_FAILED,
"tests": [
{
"status": zero_trust_consts.STATUS_PASSED,
"test": "You ran a test2"
},
{
"status": zero_trust_consts.STATUS_FAILED,
"test": "You ran a test3"
},
]
}
]
}
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_principles_status():
TEST_PILLAR1 = 'test_pillar1'
TEST_PILLAR2 = 'test_pillar2'
zero_trust_consts.PILLARS = (TEST_PILLAR1, TEST_PILLAR2)
principles_to_tests = {'network_policies': ['segmentation'],
'endpoint_security': ['tunneling', 'scoutsuite_service_security']}
zero_trust_consts.PRINCIPLES_TO_TESTS = principles_to_tests
principles_to_pillars = {'network_policies': {'test_pillar2'},
'endpoint_security': {'test_pillar1', 'test_pillar2'}}
zero_trust_consts.PRINCIPLES_TO_PILLARS = principles_to_pillars
principles = {'network_policies': 'Test principle description', 'endpoint_security': 'Test principle description2'}
zero_trust_consts.PRINCIPLES = principles
tests_map = {'segmentation': {'explanation': 'You ran a test1'},
'tunneling': {'explanation': 'You ran a test2'},
'scoutsuite_service_security': {'explanation': 'You ran a test3'}}
zero_trust_consts.TESTS_MAP = tests_map
monkey_finding = get_monkey_finding_dto()
monkey_finding.test = 'segmentation'
monkey_finding.save()
monkey_finding = get_monkey_finding_dto()
monkey_finding.test = 'tunneling'
monkey_finding.save()
scoutsuite_finding = get_scoutsuite_finding_dto()
scoutsuite_finding.test = 'scoutsuite_service_security'
scoutsuite_finding.save()
expected = dict(EXPECTED_DICT) # new mutable
result = PrincipleService.get_principles_status()
assert result == expected