Refactored the use of fixtures and fixed up various tests accordingly

This commit is contained in:
VakarisZ 2021-01-27 08:53:09 +02:00
parent 20cc720c21
commit 7f690bb880
8 changed files with 58 additions and 429 deletions

View File

@ -7,13 +7,14 @@ import pytest
from monkey_island.cc.models.monkey import Monkey, MonkeyNotFoundError
from .monkey_ttl import MonkeyTtl
from ..test_common.fixtures import FixtureEnum
logger = logging.getLogger(__name__)
class TestMonkey:
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_is_dead(self):
# Arrange
alive_monkey_ttl = MonkeyTtl.create_ttl_expire_in(30)
@ -41,7 +42,7 @@ class TestMonkey:
assert mia_monkey.is_dead()
assert not alive_monkey.is_dead()
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_ttl_renewal(self):
# Arrange
monkey = Monkey(guid=str(uuid.uuid4()))
@ -52,7 +53,7 @@ class TestMonkey:
monkey.renew_ttl()
assert monkey.ttl_ref
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_single_monkey_by_id(self):
# Arrange
a_monkey = Monkey(guid=str(uuid.uuid4()))
@ -66,7 +67,7 @@ class TestMonkey:
with pytest.raises(MonkeyNotFoundError) as _:
_ = Monkey.get_single_monkey_by_id("abcdefabcdefabcdefabcdef")
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_os(self):
linux_monkey = Monkey(guid=str(uuid.uuid4()),
description="Linux shay-Virtual-Machine 4.15.0-50-generic #54-Ubuntu")
@ -82,7 +83,7 @@ class TestMonkey:
assert 1 == len([m for m in Monkey.objects() if m.get_os() == "linux"])
assert 1 == len([m for m in Monkey.objects() if m.get_os() == "unknown"])
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_tunneled_monkeys(self):
linux_monkey = Monkey(guid=str(uuid.uuid4()),
description="Linux shay-Virtual-Machine")
@ -100,9 +101,9 @@ class TestMonkey:
and unknown_monkey in tunneled_monkeys
and linux_monkey not in tunneled_monkeys
and len(tunneled_monkeys) == 2)
assert test == "Tunneling test"
assert test
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_label_by_id(self):
hostname_example = "a_hostname"
ip_example = "1.1.1.1"
@ -148,7 +149,7 @@ class TestMonkey:
assert cache_info_after_query_3.hits == 1
assert cache_info_after_query_3.misses == 2
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_is_monkey(self):
a_monkey = Monkey(guid=str(uuid.uuid4()))
a_monkey.save()

View File

@ -6,7 +6,7 @@ from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
from monkey_island.cc.models.zero_trust.scoutsuite_finding_details import ScoutSuiteFindingDetails
from monkey_island.cc.test_common.fixtures import FixtureEnum
MONKEY_FINDING_DETAIL_MOCK = MonkeyFindingDetails()
MONKEY_FINDING_DETAIL_MOCK.events = ['mock1', 'mock2']
@ -16,6 +16,7 @@ SCOUTSUITE_FINDING_DETAIL_MOCK.scoutsuite_rules = []
class TestFinding:
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_save_finding_validation(self):
with pytest.raises(ValidationError):
_ = Finding.save_finding(test="bla bla",
@ -27,6 +28,7 @@ class TestFinding:
status="bla bla",
detail_ref=SCOUTSUITE_FINDING_DETAIL_MOCK)
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_save_finding_sanity(self):
assert len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)) == 0
@ -34,6 +36,7 @@ class TestFinding:
title="Event Title", message="event message", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)
monkey_details_example = MonkeyFindingDetails()
monkey_details_example.events.append(event_example)
monkey_details_example.save()
Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION,
status=zero_trust_consts.STATUS_FAILED, detail_ref=monkey_details_example)

View File

@ -5,13 +5,14 @@ from mongomock import ObjectId
from monkey_island.cc.models.edge import Edge
from monkey_island.cc.services.edge.edge import EdgeService
from monkey_island.cc.test_common.fixtures import FixtureEnum
logger = logging.getLogger(__name__)
class TestEdgeService:
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_get_or_create_edge(self):
src_id = ObjectId()
dst_id = ObjectId()

View File

@ -30,7 +30,10 @@ class TestSegmentationChecks:
# There are 2 subnets in which the monkey is NOT
zt_seg_findings = Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION,
status=zero_trust_consts.STATUS_PASSED)
assert len(zt_seg_findings) == 2
# Assert that there's only one finding with multiple events (one for each subnet)
assert len(zt_seg_findings) == 1
assert len(Finding.objects().get().details.fetch().events) == 2
# This is a monkey from 2nd subnet communicated with 1st subnet.
MonkeyZTFindingService.create_or_add_to_existing(
@ -39,7 +42,6 @@ class TestSegmentationChecks:
events=[Event.create_event(title="sdf",
message="asd",
event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)]
)
zt_seg_findings = Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION,

View File

@ -1,8 +1,12 @@
from datetime import datetime
import pytest
from common.common_consts import zero_trust_consts
from monkey_island.cc.models.zero_trust.event import Event
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.services.zero_trust.monkey_findings.monkey_zt_finding_service import MonkeyZTFindingService
from monkey_island.cc.test_common.fixtures import FixtureEnum
EVENTS = [
Event.create_event(
@ -34,13 +38,33 @@ STATUS = [
class TestMonkeyZTFindingService:
def test_create_or_add_to_existing(self):
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_create_or_add_to_existing_creation(self):
# Create new finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=EVENTS[0])
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=[EVENTS[0]])
# Assert that it was properly created
findings = list(Finding.objects())
assert len(findings) == 1
assert findings[0].test == TESTS[0]
assert findings[0].status == STATUS[0]
finding_details = findings[0].details.fetch()
assert len(finding_details.events) == 1
assert finding_details.events[0].message == EVENTS[0].message
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_create_or_add_to_existing_addition(self):
# Create new finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=[EVENTS[0]])
# Assert that there's only one finding
assert len(Finding.objects()) == 1
# Add events to an existing finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=EVENTS[1])
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=[EVENTS[1]])
# Assert there's still only one finding, only events got appended
assert len(Finding.objects()) == 1
assert len(Finding.objects()[0].details.fetch().events) == 2
# Create new finding
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[1], status=STATUS[1], events=EVENTS[1])
MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[1], status=STATUS[1], events=[EVENTS[1]])
# Assert there was a new finding created, because test and status is different
assert len(Finding.objects()) == 2

View File

@ -2,82 +2,17 @@ import pytest
from common.common_consts import zero_trust_consts
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.models.zero_trust.scoutsuite_rule import ScoutSuiteRule
from monkey_island.cc.services.zero_trust.scoutsuite.consts.findings import PermissiveFirewallRules, \
UnencryptedData
from monkey_island.cc.services.zero_trust.scoutsuite.scoutsuite_zt_finding_service import ScoutSuiteZTFindingService
RULES = [
ScoutSuiteRule(
checked_items=179,
compliance=None,
dashboard_name='Rules',
description='Security Group Opens All Ports to All',
flagged_items=2,
items=[
'ec2.regions.eu-central-1.vpcs.vpc-0ee259b1a13c50229.security_groups.sg-035779fe5c293fc72'
'.rules.ingress.protocols.ALL.ports.1-65535.cidrs.2.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-00015526b6695f9aa.security_groups.sg-019eb67135ec81e65'
'.rules.ingress.protocols.ALL.ports.1-65535.cidrs.0.CIDR'
],
level='danger',
path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR',
rationale='It was detected that all ports in the security group are open, and any source IP address'
' could send traffic to these ports, which creates a wider attack surface for resources '
'assigned to it. Open ports should be reduced to the minimum needed to correctly',
references=[],
remediation=None,
service='EC2'
),
ScoutSuiteRule(
checked_items=179,
compliance=[{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.2'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.2'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.1'},
{'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.2'}],
dashboard_name='Rules',
description='Security Group Opens RDP Port to All',
flagged_items=7,
items=[
'ec2.regions.eu-central-1.vpcs.vpc-076500a2138ee09da.security_groups.sg-00bdef5951797199c'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-007931ba8a364e330'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-05014daf996b042dd'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0c745fe56c66335b2'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0f99b85cfad63d1b1'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.us-east-1.vpcs.vpc-9e56cae4.security_groups.sg-0dc253aa79062835a'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR',
'ec2.regions.us-east-1.vpcs.vpc-002d543353cd4e97d.security_groups.sg-01902f153d4f938da'
'.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR'],
level='danger',
path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR',
rationale='The security group was found to be exposing a well-known port to all source addresses.'
' Well-known ports are commonly probed by automated scanning tools, and could be an indicator '
'of sensitive services exposed to Internet. If such services need to be expos',
references=[],
remediation='Remove the inbound rules that expose open ports',
service='EC2'
)
]
FINDINGS = [
PermissiveFirewallRules,
UnencryptedData
]
from monkey_island.cc.services.zero_trust.test_common.scoutsuite_finding_data import RULES, SCOUTSUITE_FINDINGS
from monkey_island.cc.test_common.fixtures import FixtureEnum
class TestScoutSuiteZTFindingService:
@pytest.mark.usefixtures('uses_database')
@pytest.mark.usefixtures(FixtureEnum.USES_DATABASE)
def test_process_rule(self):
# Creates new PermissiveFirewallRules finding with a rule
ScoutSuiteZTFindingService.process_rule(FINDINGS[0], RULES[0])
ScoutSuiteZTFindingService.process_rule(SCOUTSUITE_FINDINGS[0], RULES[0])
findings = list(Finding.objects())
assert len(findings) == 1
assert findings[0].finding_type == zero_trust_consts.SCOUTSUITE_FINDING
@ -87,7 +22,7 @@ class TestScoutSuiteZTFindingService:
assert details.scoutsuite_rules[0] == RULES[0]
# Rule processing should add rule to an already existing finding
ScoutSuiteZTFindingService.process_rule(FINDINGS[0], RULES[1])
ScoutSuiteZTFindingService.process_rule(SCOUTSUITE_FINDINGS[0], RULES[1])
findings = list(Finding.objects())
assert len(findings) == 1
assert findings[0].finding_type == zero_trust_consts.SCOUTSUITE_FINDING
@ -97,7 +32,7 @@ class TestScoutSuiteZTFindingService:
assert details.scoutsuite_rules[1] == RULES[1]
# New finding created
ScoutSuiteZTFindingService.process_rule(FINDINGS[1], RULES[1])
ScoutSuiteZTFindingService.process_rule(SCOUTSUITE_FINDINGS[1], RULES[1])
findings = list(Finding.objects())
assert len(findings) == 2
assert findings[1].finding_type == zero_trust_consts.SCOUTSUITE_FINDING

View File

@ -1,341 +0,0 @@
import pytest
import common.common_consts.zero_trust_consts as zero_trust_consts
import monkey_island.cc.services.zero_trust.zero_trust_service
from monkey_island.cc.models.zero_trust.finding import Finding
from monkey_island.cc.services.zero_trust.zero_trust_service import ZeroTrustService
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
EXPECTED_DICT = {
zero_trust_consts.AUTOMATION_ORCHESTRATION: [],
zero_trust_consts.DATA: [
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_DATA_CONFIDENTIALITY],
"status": zero_trust_consts.STATUS_FAILED,
"tests": [
{
"status": zero_trust_consts.STATUS_FAILED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_DATA_ENDPOINT_HTTP][zero_trust_consts.TEST_EXPLANATION_KEY]
},
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_DATA_ENDPOINT_ELASTIC][zero_trust_consts.TEST_EXPLANATION_KEY]
},
]
}
],
zero_trust_consts.DEVICES: [
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ENDPOINT_SECURITY],
"status": zero_trust_consts.STATUS_FAILED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_MACHINE_EXPLOITED][zero_trust_consts.TEST_EXPLANATION_KEY]
},
{
"status": zero_trust_consts.STATUS_FAILED,
"test": zero_trust_consts.TESTS_MAP
[zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS][zero_trust_consts.TEST_EXPLANATION_KEY]
},
]
}
],
zero_trust_consts.NETWORKS: [
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_SEGMENTATION],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SEGMENTATION][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USER_BEHAVIOUR],
"status": zero_trust_consts.STATUS_VERIFY,
"tests": [
{
"status": zero_trust_consts.STATUS_VERIFY,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SCHEDULED_EXECUTION][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ANALYZE_NETWORK_TRAFFIC],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_TUNNELING][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
],
zero_trust_consts.PEOPLE: [
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USER_BEHAVIOUR],
"status": zero_trust_consts.STATUS_VERIFY,
"tests": [
{
"status": zero_trust_consts.STATUS_VERIFY,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SCHEDULED_EXECUTION][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
}
],
zero_trust_consts.VISIBILITY_ANALYTICS: [
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ANALYZE_NETWORK_TRAFFIC],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
{
"principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES],
"status": zero_trust_consts.STATUS_UNEXECUTED,
"tests": [
{
"status": zero_trust_consts.STATUS_UNEXECUTED,
"test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_TUNNELING][
zero_trust_consts.TEST_EXPLANATION_KEY]
}
]
},
],
zero_trust_consts.WORKLOADS: []
}
def save_example_findings():
# arrange
Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_PASSED,
[]) # devices passed = 1
Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_PASSED,
[]) # devices passed = 2
Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_FAILED,
[]) # devices failed = 1
# devices unexecuted = 1
# people verify = 1
# networks verify = 1
Finding.save_finding(zero_trust_consts.TEST_SCHEDULED_EXECUTION, zero_trust_consts.STATUS_VERIFY, [])
# people verify = 2
# networks verify = 2
Finding.save_finding(zero_trust_consts.TEST_SCHEDULED_EXECUTION, zero_trust_consts.STATUS_VERIFY, [])
# data failed 1
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 2
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 3
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 4
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data failed 5
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, [])
# data verify 1
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_VERIFY, [])
# data verify 2
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_VERIFY, [])
# data passed 1
Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_PASSED, [])
class TestZeroTrustService(IslandTestCase):
@pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed")
def test_get_pillars_grades(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
save_example_findings()
expected = [
{
zero_trust_consts.STATUS_FAILED: 5,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 1,
zero_trust_consts.STATUS_UNEXECUTED: 1,
"pillar": "Data"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 1,
"pillar": "People"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 2,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 4,
"pillar": "Networks"
},
{
zero_trust_consts.STATUS_FAILED: 1,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 2,
zero_trust_consts.STATUS_UNEXECUTED: 1,
"pillar": "Devices"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 0,
"pillar": "Workloads"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 3,
"pillar": "Visibility & Analytics"
},
{
zero_trust_consts.STATUS_FAILED: 0,
zero_trust_consts.STATUS_VERIFY: 0,
zero_trust_consts.STATUS_PASSED: 0,
zero_trust_consts.STATUS_UNEXECUTED: 0,
"pillar": "Automation & Orchestration"
}
]
result = ZeroTrustService.get_pillars_grades()
self.assertEqual(result, expected)
@pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed")
def test_get_principles_status(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
self.maxDiff = None
save_example_findings()
expected = dict(EXPECTED_DICT) # new mutable
result = ZeroTrustService.get_principles_status()
# Compare expected and result, no order:
for pillar_name, pillar_principles_status_result in result.items():
for index, pillar_principle_status_expected in enumerate(expected.get(pillar_name)):
correct_one = None
for pillar_principle_status_result in pillar_principles_status_result:
if pillar_principle_status_result["principle"] == pillar_principle_status_expected["principle"]:
correct_one = pillar_principle_status_result
break
# Compare tests no order
self.assertTrue(compare_lists_no_order(correct_one["tests"], pillar_principle_status_expected["tests"]))
# Compare the rest
del pillar_principle_status_expected["tests"]
del correct_one["tests"]
self.assertEqual(sorted(correct_one), sorted(pillar_principle_status_expected))
@pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed")
def test_get_pillars_to_statuses(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
self.maxDiff = None
expected = {
zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DEVICES: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DATA: zero_trust_consts.STATUS_UNEXECUTED
}
self.assertEqual(ZeroTrustService.get_pillars_to_statuses(), expected)
save_example_findings()
expected = {
zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DEVICES: zero_trust_consts.STATUS_FAILED,
zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_VERIFY,
zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_VERIFY,
zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED,
zero_trust_consts.DATA: zero_trust_consts.STATUS_FAILED
}
self.assertEqual(ZeroTrustService.get_pillars_to_statuses(), expected)
@pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed")
def test_get_events_without_overlap(self):
monkey_island.cc.services.reporting.zero_trust_service.EVENT_FETCH_CNT = 5
self.assertListEqual([], ZeroTrustService._get_events_without_overlap(5, [1, 2, 3]))
self.assertListEqual([3], ZeroTrustService._get_events_without_overlap(6, [1, 2, 3]))
self.assertListEqual([1, 2, 3, 4, 5], ZeroTrustService._get_events_without_overlap(10, [1, 2, 3, 4, 5]))
def compare_lists_no_order(s, t):
t = list(t) # make a mutable copy
try:
for elem in s:
t.remove(elem)
except ValueError:
return False
return not t

View File

@ -0,0 +1,4 @@
class FixtureEnum:
USES_DATABASE = 'uses_database'