diff --git a/monkey/monkey_island/cc/models/test_monkey.py b/monkey/monkey_island/cc/models/test_monkey.py index fad2ea94e..7860de20e 100644 --- a/monkey/monkey_island/cc/models/test_monkey.py +++ b/monkey/monkey_island/cc/models/test_monkey.py @@ -7,13 +7,14 @@ import pytest from monkey_island.cc.models.monkey import Monkey, MonkeyNotFoundError from .monkey_ttl import MonkeyTtl +from ..test_common.fixtures import FixtureEnum logger = logging.getLogger(__name__) class TestMonkey: - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_is_dead(self): # Arrange alive_monkey_ttl = MonkeyTtl.create_ttl_expire_in(30) @@ -41,7 +42,7 @@ class TestMonkey: assert mia_monkey.is_dead() assert not alive_monkey.is_dead() - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_ttl_renewal(self): # Arrange monkey = Monkey(guid=str(uuid.uuid4())) @@ -52,7 +53,7 @@ class TestMonkey: monkey.renew_ttl() assert monkey.ttl_ref - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_get_single_monkey_by_id(self): # Arrange a_monkey = Monkey(guid=str(uuid.uuid4())) @@ -66,7 +67,7 @@ class TestMonkey: with pytest.raises(MonkeyNotFoundError) as _: _ = Monkey.get_single_monkey_by_id("abcdefabcdefabcdefabcdef") - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_get_os(self): linux_monkey = Monkey(guid=str(uuid.uuid4()), description="Linux shay-Virtual-Machine 4.15.0-50-generic #54-Ubuntu") @@ -82,7 +83,7 @@ class TestMonkey: assert 1 == len([m for m in Monkey.objects() if m.get_os() == "linux"]) assert 1 == len([m for m in Monkey.objects() if m.get_os() == "unknown"]) - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_get_tunneled_monkeys(self): linux_monkey = Monkey(guid=str(uuid.uuid4()), description="Linux shay-Virtual-Machine") @@ -100,9 +101,9 @@ class TestMonkey: and unknown_monkey in tunneled_monkeys and linux_monkey not in tunneled_monkeys and len(tunneled_monkeys) == 2) - assert test == "Tunneling test" + assert test - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_get_label_by_id(self): hostname_example = "a_hostname" ip_example = "1.1.1.1" @@ -148,7 +149,7 @@ class TestMonkey: assert cache_info_after_query_3.hits == 1 assert cache_info_after_query_3.misses == 2 - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_is_monkey(self): a_monkey = Monkey(guid=str(uuid.uuid4())) a_monkey.save() diff --git a/monkey/monkey_island/cc/models/zero_trust/test_finding.py b/monkey/monkey_island/cc/models/zero_trust/test_finding.py index fd78e2671..4df4b7bab 100644 --- a/monkey/monkey_island/cc/models/zero_trust/test_finding.py +++ b/monkey/monkey_island/cc/models/zero_trust/test_finding.py @@ -6,7 +6,7 @@ from monkey_island.cc.models.zero_trust.event import Event from monkey_island.cc.models.zero_trust.finding import Finding from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails from monkey_island.cc.models.zero_trust.scoutsuite_finding_details import ScoutSuiteFindingDetails - +from monkey_island.cc.test_common.fixtures import FixtureEnum MONKEY_FINDING_DETAIL_MOCK = MonkeyFindingDetails() MONKEY_FINDING_DETAIL_MOCK.events = ['mock1', 'mock2'] @@ -16,6 +16,7 @@ SCOUTSUITE_FINDING_DETAIL_MOCK.scoutsuite_rules = [] class TestFinding: + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_save_finding_validation(self): with pytest.raises(ValidationError): _ = Finding.save_finding(test="bla bla", @@ -27,6 +28,7 @@ class TestFinding: status="bla bla", detail_ref=SCOUTSUITE_FINDING_DETAIL_MOCK) + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_save_finding_sanity(self): assert len(Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION)) == 0 @@ -34,6 +36,7 @@ class TestFinding: title="Event Title", message="event message", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK) monkey_details_example = MonkeyFindingDetails() monkey_details_example.events.append(event_example) + monkey_details_example.save() Finding.save_finding(test=zero_trust_consts.TEST_SEGMENTATION, status=zero_trust_consts.STATUS_FAILED, detail_ref=monkey_details_example) diff --git a/monkey/monkey_island/cc/services/edge/test_edge.py b/monkey/monkey_island/cc/services/edge/test_edge.py index 26ab82311..f327bc2d1 100644 --- a/monkey/monkey_island/cc/services/edge/test_edge.py +++ b/monkey/monkey_island/cc/services/edge/test_edge.py @@ -5,13 +5,14 @@ from mongomock import ObjectId from monkey_island.cc.models.edge import Edge from monkey_island.cc.services.edge.edge import EdgeService +from monkey_island.cc.test_common.fixtures import FixtureEnum logger = logging.getLogger(__name__) class TestEdgeService: - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_get_or_create_edge(self): src_id = ObjectId() dst_id = ObjectId() diff --git a/monkey/monkey_island/cc/services/telemetry/zero_trust_checks/test_segmentation.py b/monkey/monkey_island/cc/services/telemetry/zero_trust_checks/test_segmentation.py index b29f9e3c6..ca58549d1 100644 --- a/monkey/monkey_island/cc/services/telemetry/zero_trust_checks/test_segmentation.py +++ b/monkey/monkey_island/cc/services/telemetry/zero_trust_checks/test_segmentation.py @@ -30,7 +30,10 @@ class TestSegmentationChecks: # There are 2 subnets in which the monkey is NOT zt_seg_findings = Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION, status=zero_trust_consts.STATUS_PASSED) - assert len(zt_seg_findings) == 2 + + # Assert that there's only one finding with multiple events (one for each subnet) + assert len(zt_seg_findings) == 1 + assert len(Finding.objects().get().details.fetch().events) == 2 # This is a monkey from 2nd subnet communicated with 1st subnet. MonkeyZTFindingService.create_or_add_to_existing( @@ -39,7 +42,6 @@ class TestSegmentationChecks: events=[Event.create_event(title="sdf", message="asd", event_type=zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)] - ) zt_seg_findings = Finding.objects(test=zero_trust_consts.TEST_SEGMENTATION, diff --git a/monkey/monkey_island/cc/services/zero_trust/monkey_findings/test_monkey_zt_finding_service.py b/monkey/monkey_island/cc/services/zero_trust/monkey_findings/test_monkey_zt_finding_service.py index cadb88aed..c3db9ea39 100644 --- a/monkey/monkey_island/cc/services/zero_trust/monkey_findings/test_monkey_zt_finding_service.py +++ b/monkey/monkey_island/cc/services/zero_trust/monkey_findings/test_monkey_zt_finding_service.py @@ -1,8 +1,12 @@ from datetime import datetime +import pytest + from common.common_consts import zero_trust_consts from monkey_island.cc.models.zero_trust.event import Event +from monkey_island.cc.models.zero_trust.finding import Finding from monkey_island.cc.services.zero_trust.monkey_findings.monkey_zt_finding_service import MonkeyZTFindingService +from monkey_island.cc.test_common.fixtures import FixtureEnum EVENTS = [ Event.create_event( @@ -34,13 +38,33 @@ STATUS = [ class TestMonkeyZTFindingService: - def test_create_or_add_to_existing(self): - + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) + def test_create_or_add_to_existing_creation(self): # Create new finding - MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=EVENTS[0]) + MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=[EVENTS[0]]) + # Assert that it was properly created + findings = list(Finding.objects()) + assert len(findings) == 1 + assert findings[0].test == TESTS[0] + assert findings[0].status == STATUS[0] + finding_details = findings[0].details.fetch() + assert len(finding_details.events) == 1 + assert finding_details.events[0].message == EVENTS[0].message + + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) + def test_create_or_add_to_existing_addition(self): + # Create new finding + MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=[EVENTS[0]]) + # Assert that there's only one finding + assert len(Finding.objects()) == 1 # Add events to an existing finding - MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=EVENTS[1]) + MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[0], status=STATUS[0], events=[EVENTS[1]]) + # Assert there's still only one finding, only events got appended + assert len(Finding.objects()) == 1 + assert len(Finding.objects()[0].details.fetch().events) == 2 # Create new finding - MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[1], status=STATUS[1], events=EVENTS[1]) + MonkeyZTFindingService.create_or_add_to_existing(test=TESTS[1], status=STATUS[1], events=[EVENTS[1]]) + # Assert there was a new finding created, because test and status is different + assert len(Finding.objects()) == 2 diff --git a/monkey/monkey_island/cc/services/zero_trust/scoutsuite/test_scoutsuite_zt_finding_service.py b/monkey/monkey_island/cc/services/zero_trust/scoutsuite/test_scoutsuite_zt_finding_service.py index e3a8de1bc..0350bd2f3 100644 --- a/monkey/monkey_island/cc/services/zero_trust/scoutsuite/test_scoutsuite_zt_finding_service.py +++ b/monkey/monkey_island/cc/services/zero_trust/scoutsuite/test_scoutsuite_zt_finding_service.py @@ -2,82 +2,17 @@ import pytest from common.common_consts import zero_trust_consts from monkey_island.cc.models.zero_trust.finding import Finding -from monkey_island.cc.models.zero_trust.scoutsuite_rule import ScoutSuiteRule -from monkey_island.cc.services.zero_trust.scoutsuite.consts.findings import PermissiveFirewallRules, \ - UnencryptedData from monkey_island.cc.services.zero_trust.scoutsuite.scoutsuite_zt_finding_service import ScoutSuiteZTFindingService - -RULES = [ - ScoutSuiteRule( - checked_items=179, - compliance=None, - dashboard_name='Rules', - description='Security Group Opens All Ports to All', - flagged_items=2, - items=[ - 'ec2.regions.eu-central-1.vpcs.vpc-0ee259b1a13c50229.security_groups.sg-035779fe5c293fc72' - '.rules.ingress.protocols.ALL.ports.1-65535.cidrs.2.CIDR', - 'ec2.regions.eu-central-1.vpcs.vpc-00015526b6695f9aa.security_groups.sg-019eb67135ec81e65' - '.rules.ingress.protocols.ALL.ports.1-65535.cidrs.0.CIDR' - ], - level='danger', - path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR', - rationale='It was detected that all ports in the security group are open, and any source IP address' - ' could send traffic to these ports, which creates a wider attack surface for resources ' - 'assigned to it. Open ports should be reduced to the minimum needed to correctly', - references=[], - remediation=None, - service='EC2' - ), - ScoutSuiteRule( - checked_items=179, - compliance=[{'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.1'}, - {'name': 'CIS Amazon Web Services Foundations', 'version': '1.0.0', 'reference': '4.2'}, - {'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.1'}, - {'name': 'CIS Amazon Web Services Foundations', 'version': '1.1.0', 'reference': '4.2'}, - {'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.1'}, - {'name': 'CIS Amazon Web Services Foundations', 'version': '1.2.0', 'reference': '4.2'}], - dashboard_name='Rules', - description='Security Group Opens RDP Port to All', - flagged_items=7, - items=[ - 'ec2.regions.eu-central-1.vpcs.vpc-076500a2138ee09da.security_groups.sg-00bdef5951797199c' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR', - 'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-007931ba8a364e330' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR', - 'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-05014daf996b042dd' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR', - 'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0c745fe56c66335b2' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR', - 'ec2.regions.eu-central-1.vpcs.vpc-d33026b8.security_groups.sg-0f99b85cfad63d1b1' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR', - 'ec2.regions.us-east-1.vpcs.vpc-9e56cae4.security_groups.sg-0dc253aa79062835a' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR', - 'ec2.regions.us-east-1.vpcs.vpc-002d543353cd4e97d.security_groups.sg-01902f153d4f938da' - '.rules.ingress.protocols.TCP.ports.3389.cidrs.0.CIDR'], - level='danger', - path='ec2.regions.id.vpcs.id.security_groups.id.rules.id.protocols.id.ports.id.cidrs.id.CIDR', - rationale='The security group was found to be exposing a well-known port to all source addresses.' - ' Well-known ports are commonly probed by automated scanning tools, and could be an indicator ' - 'of sensitive services exposed to Internet. If such services need to be expos', - references=[], - remediation='Remove the inbound rules that expose open ports', - service='EC2' - ) -] - -FINDINGS = [ - PermissiveFirewallRules, - UnencryptedData -] +from monkey_island.cc.services.zero_trust.test_common.scoutsuite_finding_data import RULES, SCOUTSUITE_FINDINGS +from monkey_island.cc.test_common.fixtures import FixtureEnum class TestScoutSuiteZTFindingService: - @pytest.mark.usefixtures('uses_database') + @pytest.mark.usefixtures(FixtureEnum.USES_DATABASE) def test_process_rule(self): # Creates new PermissiveFirewallRules finding with a rule - ScoutSuiteZTFindingService.process_rule(FINDINGS[0], RULES[0]) + ScoutSuiteZTFindingService.process_rule(SCOUTSUITE_FINDINGS[0], RULES[0]) findings = list(Finding.objects()) assert len(findings) == 1 assert findings[0].finding_type == zero_trust_consts.SCOUTSUITE_FINDING @@ -87,7 +22,7 @@ class TestScoutSuiteZTFindingService: assert details.scoutsuite_rules[0] == RULES[0] # Rule processing should add rule to an already existing finding - ScoutSuiteZTFindingService.process_rule(FINDINGS[0], RULES[1]) + ScoutSuiteZTFindingService.process_rule(SCOUTSUITE_FINDINGS[0], RULES[1]) findings = list(Finding.objects()) assert len(findings) == 1 assert findings[0].finding_type == zero_trust_consts.SCOUTSUITE_FINDING @@ -97,7 +32,7 @@ class TestScoutSuiteZTFindingService: assert details.scoutsuite_rules[1] == RULES[1] # New finding created - ScoutSuiteZTFindingService.process_rule(FINDINGS[1], RULES[1]) + ScoutSuiteZTFindingService.process_rule(SCOUTSUITE_FINDINGS[1], RULES[1]) findings = list(Finding.objects()) assert len(findings) == 2 assert findings[1].finding_type == zero_trust_consts.SCOUTSUITE_FINDING diff --git a/monkey/monkey_island/cc/services/zero_trust/zero_trust_report/test_zero_trust_service.py b/monkey/monkey_island/cc/services/zero_trust/zero_trust_report/test_zero_trust_service.py deleted file mode 100644 index 8c8adb133..000000000 --- a/monkey/monkey_island/cc/services/zero_trust/zero_trust_report/test_zero_trust_service.py +++ /dev/null @@ -1,341 +0,0 @@ -import pytest - -import common.common_consts.zero_trust_consts as zero_trust_consts -import monkey_island.cc.services.zero_trust.zero_trust_service -from monkey_island.cc.models.zero_trust.finding import Finding -from monkey_island.cc.services.zero_trust.zero_trust_service import ZeroTrustService -from monkey_island.cc.testing.IslandTestCase import IslandTestCase - -EXPECTED_DICT = { - zero_trust_consts.AUTOMATION_ORCHESTRATION: [], - zero_trust_consts.DATA: [ - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_DATA_CONFIDENTIALITY], - "status": zero_trust_consts.STATUS_FAILED, - "tests": [ - { - "status": zero_trust_consts.STATUS_FAILED, - "test": zero_trust_consts.TESTS_MAP - [zero_trust_consts.TEST_DATA_ENDPOINT_HTTP][zero_trust_consts.TEST_EXPLANATION_KEY] - }, - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP - [zero_trust_consts.TEST_DATA_ENDPOINT_ELASTIC][zero_trust_consts.TEST_EXPLANATION_KEY] - }, - ] - } - ], - zero_trust_consts.DEVICES: [ - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ENDPOINT_SECURITY], - "status": zero_trust_consts.STATUS_FAILED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP - [zero_trust_consts.TEST_MACHINE_EXPLOITED][zero_trust_consts.TEST_EXPLANATION_KEY] - }, - { - "status": zero_trust_consts.STATUS_FAILED, - "test": zero_trust_consts.TESTS_MAP - [zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS][zero_trust_consts.TEST_EXPLANATION_KEY] - }, - ] - } - ], - zero_trust_consts.NETWORKS: [ - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_SEGMENTATION], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SEGMENTATION][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USER_BEHAVIOUR], - "status": zero_trust_consts.STATUS_VERIFY, - "tests": [ - { - "status": zero_trust_consts.STATUS_VERIFY, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SCHEDULED_EXECUTION][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ANALYZE_NETWORK_TRAFFIC], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_TUNNELING][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - ], - zero_trust_consts.PEOPLE: [ - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USER_BEHAVIOUR], - "status": zero_trust_consts.STATUS_VERIFY, - "tests": [ - { - "status": zero_trust_consts.STATUS_VERIFY, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_SCHEDULED_EXECUTION][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - } - ], - zero_trust_consts.VISIBILITY_ANALYTICS: [ - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_USERS_MAC_POLICIES], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_COMMUNICATE_AS_NEW_USER][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_ANALYZE_NETWORK_TRAFFIC], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - { - "principle": zero_trust_consts.PRINCIPLES[zero_trust_consts.PRINCIPLE_RESTRICTIVE_NETWORK_POLICIES], - "status": zero_trust_consts.STATUS_UNEXECUTED, - "tests": [ - { - "status": zero_trust_consts.STATUS_UNEXECUTED, - "test": zero_trust_consts.TESTS_MAP[zero_trust_consts.TEST_TUNNELING][ - zero_trust_consts.TEST_EXPLANATION_KEY] - } - ] - }, - ], - zero_trust_consts.WORKLOADS: [] -} - - -def save_example_findings(): - # arrange - Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_PASSED, - []) # devices passed = 1 - Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_PASSED, - []) # devices passed = 2 - Finding.save_finding(zero_trust_consts.TEST_ENDPOINT_SECURITY_EXISTS, zero_trust_consts.STATUS_FAILED, - []) # devices failed = 1 - # devices unexecuted = 1 - # people verify = 1 - # networks verify = 1 - Finding.save_finding(zero_trust_consts.TEST_SCHEDULED_EXECUTION, zero_trust_consts.STATUS_VERIFY, []) - # people verify = 2 - # networks verify = 2 - Finding.save_finding(zero_trust_consts.TEST_SCHEDULED_EXECUTION, zero_trust_consts.STATUS_VERIFY, []) - # data failed 1 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, []) - # data failed 2 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, []) - # data failed 3 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, []) - # data failed 4 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, []) - # data failed 5 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_FAILED, []) - # data verify 1 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_VERIFY, []) - # data verify 2 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_VERIFY, []) - # data passed 1 - Finding.save_finding(zero_trust_consts.TEST_DATA_ENDPOINT_HTTP, zero_trust_consts.STATUS_PASSED, []) - - -class TestZeroTrustService(IslandTestCase): - - @pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed") - def test_get_pillars_grades(self): - self.fail_if_not_testing_env() - self.clean_finding_db() - - save_example_findings() - - expected = [ - { - zero_trust_consts.STATUS_FAILED: 5, - zero_trust_consts.STATUS_VERIFY: 2, - zero_trust_consts.STATUS_PASSED: 1, - zero_trust_consts.STATUS_UNEXECUTED: 1, - "pillar": "Data" - }, - { - zero_trust_consts.STATUS_FAILED: 0, - zero_trust_consts.STATUS_VERIFY: 2, - zero_trust_consts.STATUS_PASSED: 0, - zero_trust_consts.STATUS_UNEXECUTED: 1, - "pillar": "People" - }, - { - zero_trust_consts.STATUS_FAILED: 0, - zero_trust_consts.STATUS_VERIFY: 2, - zero_trust_consts.STATUS_PASSED: 0, - zero_trust_consts.STATUS_UNEXECUTED: 4, - "pillar": "Networks" - }, - { - zero_trust_consts.STATUS_FAILED: 1, - zero_trust_consts.STATUS_VERIFY: 0, - zero_trust_consts.STATUS_PASSED: 2, - zero_trust_consts.STATUS_UNEXECUTED: 1, - "pillar": "Devices" - }, - { - zero_trust_consts.STATUS_FAILED: 0, - zero_trust_consts.STATUS_VERIFY: 0, - zero_trust_consts.STATUS_PASSED: 0, - zero_trust_consts.STATUS_UNEXECUTED: 0, - "pillar": "Workloads" - }, - { - zero_trust_consts.STATUS_FAILED: 0, - zero_trust_consts.STATUS_VERIFY: 0, - zero_trust_consts.STATUS_PASSED: 0, - zero_trust_consts.STATUS_UNEXECUTED: 3, - "pillar": "Visibility & Analytics" - }, - { - zero_trust_consts.STATUS_FAILED: 0, - zero_trust_consts.STATUS_VERIFY: 0, - zero_trust_consts.STATUS_PASSED: 0, - zero_trust_consts.STATUS_UNEXECUTED: 0, - "pillar": "Automation & Orchestration" - } - ] - - result = ZeroTrustService.get_pillars_grades() - - self.assertEqual(result, expected) - - @pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed") - def test_get_principles_status(self): - self.fail_if_not_testing_env() - self.clean_finding_db() - - self.maxDiff = None - - save_example_findings() - - expected = dict(EXPECTED_DICT) # new mutable - - result = ZeroTrustService.get_principles_status() - # Compare expected and result, no order: - for pillar_name, pillar_principles_status_result in result.items(): - for index, pillar_principle_status_expected in enumerate(expected.get(pillar_name)): - correct_one = None - for pillar_principle_status_result in pillar_principles_status_result: - if pillar_principle_status_result["principle"] == pillar_principle_status_expected["principle"]: - correct_one = pillar_principle_status_result - break - - # Compare tests no order - self.assertTrue(compare_lists_no_order(correct_one["tests"], pillar_principle_status_expected["tests"])) - # Compare the rest - del pillar_principle_status_expected["tests"] - del correct_one["tests"] - self.assertEqual(sorted(correct_one), sorted(pillar_principle_status_expected)) - - @pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed") - def test_get_pillars_to_statuses(self): - self.fail_if_not_testing_env() - self.clean_finding_db() - - self.maxDiff = None - - expected = { - zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.DEVICES: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.DATA: zero_trust_consts.STATUS_UNEXECUTED - } - - self.assertEqual(ZeroTrustService.get_pillars_to_statuses(), expected) - - save_example_findings() - - expected = { - zero_trust_consts.AUTOMATION_ORCHESTRATION: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.DEVICES: zero_trust_consts.STATUS_FAILED, - zero_trust_consts.NETWORKS: zero_trust_consts.STATUS_VERIFY, - zero_trust_consts.PEOPLE: zero_trust_consts.STATUS_VERIFY, - zero_trust_consts.VISIBILITY_ANALYTICS: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.WORKLOADS: zero_trust_consts.STATUS_UNEXECUTED, - zero_trust_consts.DATA: zero_trust_consts.STATUS_FAILED - } - - self.assertEqual(ZeroTrustService.get_pillars_to_statuses(), expected) - - @pytest.mark.skip(reason="Broken during ScoutSuite refactoring, need to be fixed") - def test_get_events_without_overlap(self): - monkey_island.cc.services.reporting.zero_trust_service.EVENT_FETCH_CNT = 5 - self.assertListEqual([], ZeroTrustService._get_events_without_overlap(5, [1, 2, 3])) - self.assertListEqual([3], ZeroTrustService._get_events_without_overlap(6, [1, 2, 3])) - self.assertListEqual([1, 2, 3, 4, 5], ZeroTrustService._get_events_without_overlap(10, [1, 2, 3, 4, 5])) - - -def compare_lists_no_order(s, t): - t = list(t) # make a mutable copy - try: - for elem in s: - t.remove(elem) - except ValueError: - return False - return not t diff --git a/monkey/monkey_island/cc/test_common/fixtures/fixture_enum.py b/monkey/monkey_island/cc/test_common/fixtures/fixture_enum.py new file mode 100644 index 000000000..00ab2905f --- /dev/null +++ b/monkey/monkey_island/cc/test_common/fixtures/fixture_enum.py @@ -0,0 +1,4 @@ + + +class FixtureEnum: + USES_DATABASE = 'uses_database'