Added zero trust service with passing sanity UTs

This commit is contained in:
Shay Nehmad 2019-08-13 14:33:18 +03:00
parent bfcd469e04
commit d4f922ab00
5 changed files with 324 additions and 122 deletions

View File

@ -61,7 +61,7 @@ TESTS_MAP = {
EXPLANATION_KEY: u"The Monkeys in the network performed malicious-looking actions, like scanning and attempting exploitation.",
FINDING_FORMAT_KEY: u"Malicious activity performed by the Monkeys. See 'events' for detailed information.",
DIRECTIVE_KEY: DIRECTIVE_ANALYZE_NETWORK_TRAFFIC,
PILLARS_KEY: [NETWORKS],
PILLARS_KEY: [NETWORKS, VISIBILITY_ANALYTICS],
POSSIBLE_STATUSES_KEY: [STATUS_UNEXECUTED, STATUS_INCONCLUSIVE]
},
TEST_ENDPOINT_SECURITY_EXISTS: {

View File

@ -6,8 +6,7 @@ from flask import jsonify
from monkey_island.cc.auth import jwt_required
from monkey_island.cc.services.reporting.report import ReportService
from monkey_island.cc.services.reporting.zero_trust_report import get_all_findings, get_pillars_grades, \
get_directives_status
from monkey_island.cc.services.reporting.zero_trust_service import ZeroTrustService
ZERO_TRUST_REPORT_TYPE = "zero_trust"
GENERAL_REPORT_TYPE = "general"
@ -27,11 +26,11 @@ class Report(flask_restful.Resource):
if report_type == GENERAL_REPORT_TYPE:
return ReportService.get_report()
elif report_type == ZERO_TRUST_REPORT_TYPE:
if report_data == REPORT_DATA_FINDINGS:
return jsonify(get_all_findings())
elif report_data == REPORT_DATA_PILLARS:
return jsonify(get_pillars_grades())
if report_data == REPORT_DATA_PILLARS:
return jsonify(ZeroTrustService.get_pillars_grades())
elif report_data == REPORT_DATA_DIRECTIVES_STATUS:
return jsonify(get_directives_status())
return jsonify(ZeroTrustService.get_directives_status())
elif report_data == REPORT_DATA_FINDINGS:
return jsonify(ZeroTrustService.get_all_findings())
flask_restful.abort(httplib.NOT_FOUND)

View File

@ -0,0 +1,200 @@
from monkey_island.cc.services.reporting.zero_trust_service import ZeroTrustService
from common.data.zero_trust_consts import *
from monkey_island.cc.models.finding import Finding
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
def save_example_findings():
# arrange
Finding.save_finding(TEST_ENDPOINT_SECURITY_EXISTS, STATUS_POSITIVE, []) # devices positive = 1
Finding.save_finding(TEST_ENDPOINT_SECURITY_EXISTS, STATUS_POSITIVE, []) # devices positive = 2
Finding.save_finding(TEST_ENDPOINT_SECURITY_EXISTS, STATUS_CONCLUSIVE, []) # devices conclusive = 1
# devices unexecuted = 1
# people inconclusive = 1
# networks inconclusive = 1
Finding.save_finding(TEST_SCHEDULED_EXECUTION, STATUS_INCONCLUSIVE, [])
# people inconclusive = 2
# networks inconclusive = 2
Finding.save_finding(TEST_SCHEDULED_EXECUTION, STATUS_INCONCLUSIVE, [])
# data conclusive 1
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_CONCLUSIVE, [])
# data conclusive 2
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_CONCLUSIVE, [])
# data conclusive 3
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_CONCLUSIVE, [])
# data conclusive 4
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_CONCLUSIVE, [])
# data conclusive 5
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_CONCLUSIVE, [])
# data inconclusive 1
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_INCONCLUSIVE, [])
# data inconclusive 2
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_INCONCLUSIVE, [])
# data positive 1
Finding.save_finding(TEST_DATA_ENDPOINT_HTTP, STATUS_POSITIVE, [])
class TestZeroTrustService(IslandTestCase):
def test_get_pillars_grades(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
save_example_findings()
expected = [
{
"Conclusive": 5,
"Inconclusive": 2,
"Positive": 1,
"Unexecuted": 1,
"pillar": "Data"
},
{
"Conclusive": 0,
"Inconclusive": 2,
"Positive": 0,
"Unexecuted": 0,
"pillar": "People"
},
{
"Conclusive": 0,
"Inconclusive": 2,
"Positive": 0,
"Unexecuted": 2,
"pillar": "Networks"
},
{
"Conclusive": 1,
"Inconclusive": 0,
"Positive": 2,
"Unexecuted": 1,
"pillar": "Devices"
},
{
"Conclusive": 0,
"Inconclusive": 0,
"Positive": 0,
"Unexecuted": 0,
"pillar": "Workloads"
},
{
"Conclusive": 0,
"Inconclusive": 0,
"Positive": 0,
"Unexecuted": 1,
"pillar": "Visibility & Analytics"
},
{
"Conclusive": 0,
"Inconclusive": 0,
"Positive": 0,
"Unexecuted": 0,
"pillar": "Automation & Orchestration"
}
]
result = ZeroTrustService.get_pillars_grades()
self.assertEquals(result, expected)
def test_get_directives_status(self):
self.fail_if_not_testing_env()
self.clean_finding_db()
save_example_findings()
expected = {
AUTOMATION_ORCHESTRATION: [],
DATA: [
{
"directive": DIRECTIVE_DATA_TRANSIT,
"status": STATUS_CONCLUSIVE,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TEST_DATA_ENDPOINT_ELASTIC
},
{
"status": STATUS_CONCLUSIVE,
"test": TEST_DATA_ENDPOINT_HTTP
}
]
}
],
DEVICES: [
{
"directive": "endpoint_security",
"status": "Conclusive",
"tests": [
{
"status": "Conclusive",
"test": "endpoint_security_exists"
},
{
"status": "Unexecuted",
"test": "machine_exploited"
}
]
}
],
NETWORKS: [
{
"directive": "segmentation",
"status": "Unexecuted",
"tests": [
{
"status": "Unexecuted",
"test": "segmentation"
}
]
},
{
"directive": "user_behaviour",
"status": STATUS_INCONCLUSIVE,
"tests": [
{
"status": STATUS_INCONCLUSIVE,
"test": TEST_SCHEDULED_EXECUTION
}
]
},
{
"directive": "analyze_network_traffic",
"status": "Unexecuted",
"tests": [
{
"status": "Unexecuted",
"test": "malicious_activity_timeline"
}
]
}
],
PEOPLE: [
{
"directive": "user_behaviour",
"status": STATUS_INCONCLUSIVE,
"tests": [
{
"status": STATUS_INCONCLUSIVE,
"test": TEST_SCHEDULED_EXECUTION
}
]
}
],
"Visibility & Analytics": [
{
"directive": DIRECTIVE_ANALYZE_NETWORK_TRAFFIC,
"status": STATUS_UNEXECUTED,
"tests": [
{
"status": STATUS_UNEXECUTED,
"test": TEST_ACTIVITY_TIMELINE
}
]
}
],
"Workloads": []
}
self.assertEquals(ZeroTrustService.get_directives_status(), expected)

View File

@ -1,114 +0,0 @@
import json
from common.data.zero_trust_consts import *
from monkey_island.cc.models.finding import Finding
def get_all_findings():
all_findings = Finding.objects()
enriched_findings = [get_enriched_finding(f) for f in all_findings]
return enriched_findings
def get_events_as_dict(events):
return [json.loads(event.to_json()) for event in events]
def get_enriched_finding(finding):
test_info = TESTS_MAP[finding.test]
enriched_finding = {
# TODO add test explanation per status.
"test": test_info[EXPLANATION_KEY],
"pillars": test_info[PILLARS_KEY],
"status": finding.status,
"events": get_events_as_dict(finding.events)
}
return enriched_finding
def get_lcd_worst_status_for_test(all_findings_for_test):
current_status = STATUS_UNEXECUTED
for finding in all_findings_for_test:
if TEST_STATUSES.index(finding.status) < TEST_STATUSES.index(current_status):
current_status = finding.status
return current_status
def get_tests_status(directive_tests):
results = []
for test in directive_tests:
test_findings = Finding.objects(test=test)
results.append(
{
"test": test,
"status": get_lcd_worst_status_for_test(test_findings)
}
)
return results
def get_directive_status(directive_tests):
worst_status = STATUS_UNEXECUTED
all_statuses = set()
for test in directive_tests:
all_statuses |= set(Finding.objects(test=test).distinct("status"))
for status in all_statuses:
if TEST_STATUSES.index(status) < TEST_STATUSES.index(worst_status):
worst_status = status
return worst_status
def get_directives_status():
all_directive_statuses = {}
# init with empty lists
for pillar in PILLARS:
all_directive_statuses[pillar] = []
for directive, directive_tests in DIRECTIVES_TO_TESTS.items():
for pillar in DIRECTIVES_TO_PILLARS[directive]:
all_directive_statuses[pillar].append(
{
"directive": directive,
"tests": get_tests_status(directive_tests),
"status": get_directive_status(directive_tests)
}
)
return all_directive_statuses
def get_pillar_grade(pillar, all_findings):
pillar_grade = {
"pillar": pillar,
STATUS_CONCLUSIVE: 0,
STATUS_INCONCLUSIVE: 0,
STATUS_POSITIVE: 0,
STATUS_UNEXECUTED: 0
}
tests_of_this_pillar = PILLARS_TO_TESTS[pillar]
test_unexecuted = {}
for test in tests_of_this_pillar:
test_unexecuted[test] = True
for finding in all_findings:
test_unexecuted[finding.test] = False
test_info = TESTS_MAP[finding.test]
if pillar in test_info[PILLARS_KEY]:
pillar_grade[finding.status] += 1
pillar_grade[STATUS_UNEXECUTED] = sum(1 for condition in test_unexecuted.values() if condition)
return pillar_grade
def get_pillars_grades():
pillars_grades = []
all_findings = Finding.objects()
for pillar in PILLARS:
pillars_grades.append(get_pillar_grade(pillar, all_findings))
return pillars_grades

View File

@ -0,0 +1,117 @@
import json
from common.data.zero_trust_consts import *
from monkey_island.cc.models.finding import Finding
class ZeroTrustService(object):
@staticmethod
def get_pillars_grades():
pillars_grades = []
all_findings = Finding.objects()
for pillar in PILLARS:
pillars_grades.append(ZeroTrustService.__get_pillar_grade(pillar, all_findings))
return pillars_grades
@staticmethod
def __get_pillar_grade(pillar, all_findings):
pillar_grade = {
"pillar": pillar,
STATUS_CONCLUSIVE: 0,
STATUS_INCONCLUSIVE: 0,
STATUS_POSITIVE: 0,
STATUS_UNEXECUTED: 0
}
tests_of_this_pillar = PILLARS_TO_TESTS[pillar]
test_unexecuted = {}
for test in tests_of_this_pillar:
test_unexecuted[test] = True
for finding in all_findings:
test_unexecuted[finding.test] = False
test_info = TESTS_MAP[finding.test]
if pillar in test_info[PILLARS_KEY]:
pillar_grade[finding.status] += 1
pillar_grade[STATUS_UNEXECUTED] = sum(1 for condition in test_unexecuted.values() if condition)
return pillar_grade
@staticmethod
def get_directives_status():
all_directive_statuses = {}
# init with empty lists
for pillar in PILLARS:
all_directive_statuses[pillar] = []
for directive, directive_tests in DIRECTIVES_TO_TESTS.items():
for pillar in DIRECTIVES_TO_PILLARS[directive]:
all_directive_statuses[pillar].append(
{
"directive": directive,
"tests": ZeroTrustService.__get_tests_status(directive_tests),
"status": ZeroTrustService.__get_directive_status(directive_tests)
}
)
return all_directive_statuses
@staticmethod
def __get_directive_status(directive_tests):
worst_status = STATUS_UNEXECUTED
all_statuses = set()
for test in directive_tests:
all_statuses |= set(Finding.objects(test=test).distinct("status"))
for status in all_statuses:
if TEST_STATUSES.index(status) < TEST_STATUSES.index(worst_status):
worst_status = status
return worst_status
@staticmethod
def __get_tests_status(directive_tests):
results = []
for test in directive_tests:
test_findings = Finding.objects(test=test)
results.append(
{
"test": test,
"status": ZeroTrustService.__get_lcd_worst_status_for_test(test_findings)
}
)
return results
@staticmethod
def __get_lcd_worst_status_for_test(all_findings_for_test):
current_status = STATUS_UNEXECUTED
for finding in all_findings_for_test:
if TEST_STATUSES.index(finding.status) < TEST_STATUSES.index(current_status):
current_status = finding.status
return current_status
@staticmethod
def get_all_findings():
all_findings = Finding.objects()
enriched_findings = [ZeroTrustService.__get_enriched_finding(f) for f in all_findings]
return enriched_findings
@staticmethod
def __get_enriched_finding(finding):
test_info = TESTS_MAP[finding.test]
enriched_finding = {
# TODO add test explanation per status.
"test": test_info[EXPLANATION_KEY],
"pillars": test_info[PILLARS_KEY],
"status": finding.status,
"events": ZeroTrustService.__get_events_as_dict(finding.events)
}
return enriched_finding
@staticmethod
def __get_events_as_dict(events):
return [json.loads(event.to_json()) for event in events]