forked from p15670423/monkey
Re-structured ZT files and separated class responsibilities better, also further refactor towards ZT findings being extendable with different types of details.
This commit is contained in:
parent
9952f69198
commit
3490be1d8f
|
@ -1,45 +0,0 @@
|
||||||
from typing import List
|
|
||||||
|
|
||||||
import common.common_consts.zero_trust_consts as zero_trust_consts
|
|
||||||
from monkey_island.cc.models.zero_trust.event import Event
|
|
||||||
from monkey_island.cc.models.zero_trust.finding import Finding
|
|
||||||
from monkey_island.cc.models.zero_trust.finding_details import FindingDetails
|
|
||||||
|
|
||||||
|
|
||||||
class AggregateFinding(Finding):
|
|
||||||
@staticmethod
|
|
||||||
def create_or_add_to_existing(test, status, events):
|
|
||||||
"""
|
|
||||||
Create a new finding or add the events to an existing one if it's the same (same meaning same status and same
|
|
||||||
test).
|
|
||||||
|
|
||||||
:raises: Assertion error if this is used when there's more then one finding which fits the query - this is not
|
|
||||||
when this function should be used.
|
|
||||||
"""
|
|
||||||
existing_findings = Finding.objects(test=test, status=status)
|
|
||||||
assert (len(existing_findings) < 2), "More than one finding exists for {}:{}".format(test, status)
|
|
||||||
|
|
||||||
if len(existing_findings) == 0:
|
|
||||||
AggregateFinding.create_new_finding(test, status, events)
|
|
||||||
else:
|
|
||||||
# Now we know for sure this is the only one
|
|
||||||
AggregateFinding.add_events(existing_findings[0], events)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_new_finding(test: str, status: str, events: List[Event]):
|
|
||||||
details = FindingDetails()
|
|
||||||
details.events = events
|
|
||||||
details.save()
|
|
||||||
Finding.save_finding(test, status, details)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def add_events(finding: Finding, events: List[Event]):
|
|
||||||
finding.details.fetch().add_events(events)
|
|
||||||
|
|
||||||
|
|
||||||
def add_malicious_activity_to_timeline(events):
|
|
||||||
AggregateFinding.create_or_add_to_existing(
|
|
||||||
test=zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE,
|
|
||||||
status=zero_trust_consts.STATUS_VERIFY,
|
|
||||||
events=events
|
|
||||||
)
|
|
|
@ -4,13 +4,14 @@ Define a Document Schema for Zero Trust findings.
|
||||||
"""
|
"""
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from mongoengine import Document, EmbeddedDocumentListField, StringField, LazyReferenceField
|
from mongoengine import Document, StringField, GenericLazyReferenceField
|
||||||
|
|
||||||
import common.common_consts.zero_trust_consts as zero_trust_consts
|
import common.common_consts.zero_trust_consts as zero_trust_consts
|
||||||
# Dummy import for mongoengine.
|
# Dummy import for mongoengine.
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
from monkey_island.cc.models.zero_trust.event import Event
|
from monkey_island.cc.models.zero_trust.event import Event
|
||||||
from monkey_island.cc.models.zero_trust.finding_details import FindingDetails
|
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
|
||||||
|
from monkey_island.cc.models.zero_trust.scoutsuite_finding_details import ScoutsuiteFindingDetails
|
||||||
|
|
||||||
|
|
||||||
class Finding(Document):
|
class Finding(Document):
|
||||||
|
@ -34,7 +35,7 @@ class Finding(Document):
|
||||||
# SCHEMA
|
# SCHEMA
|
||||||
test = StringField(required=True, choices=zero_trust_consts.TESTS)
|
test = StringField(required=True, choices=zero_trust_consts.TESTS)
|
||||||
status = StringField(required=True, choices=zero_trust_consts.ORDERED_TEST_STATUSES)
|
status = StringField(required=True, choices=zero_trust_consts.ORDERED_TEST_STATUSES)
|
||||||
details = LazyReferenceField(document_type=FindingDetails, required=True)
|
details = GenericLazyReferenceField(choices=[MonkeyFindingDetails, ScoutsuiteFindingDetails], required=True)
|
||||||
# http://docs.mongoengine.org/guide/defining-documents.html#document-inheritance
|
# http://docs.mongoengine.org/guide/defining-documents.html#document-inheritance
|
||||||
meta = {'allow_inheritance': True}
|
meta = {'allow_inheritance': True}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from mongoengine import DateTimeField, Document, StringField, EmbeddedDocumentListField
|
||||||
|
|
||||||
|
from monkey_island.cc.models.zero_trust.event import Event
|
||||||
|
|
||||||
|
class MonkeyFindingDetails(Document):
|
||||||
|
"""
|
||||||
|
This model represents additional information about monkey finding:
|
||||||
|
Events
|
||||||
|
"""
|
||||||
|
|
||||||
|
# SCHEMA
|
||||||
|
events = EmbeddedDocumentListField(document_type=Event, required=False)
|
||||||
|
|
||||||
|
# LOGIC
|
||||||
|
def add_events(self, events: List[Event]) -> None:
|
||||||
|
self.update(push_all__events=events)
|
|
@ -1,14 +1,11 @@
|
||||||
from datetime import datetime
|
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from mongoengine import DateTimeField, Document, StringField, EmbeddedDocumentListField
|
from mongoengine import DateTimeField, Document, StringField, EmbeddedDocumentListField
|
||||||
|
|
||||||
import common.common_consts.zero_trust_consts as zero_trust_consts
|
|
||||||
from monkey_island.cc.models.zero_trust.event import Event
|
|
||||||
from monkey_island.cc.models.zero_trust.scoutsuite_finding import ScoutsuiteFinding
|
from monkey_island.cc.models.zero_trust.scoutsuite_finding import ScoutsuiteFinding
|
||||||
|
|
||||||
|
|
||||||
class FindingDetails(Document):
|
class ScoutsuiteFindingDetails(Document):
|
||||||
"""
|
"""
|
||||||
This model represents additional information about monkey finding:
|
This model represents additional information about monkey finding:
|
||||||
Events if monkey finding
|
Events if monkey finding
|
||||||
|
@ -16,12 +13,7 @@ class FindingDetails(Document):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# SCHEMA
|
# SCHEMA
|
||||||
events = EmbeddedDocumentListField(document_type=Event, required=False)
|
|
||||||
scoutsuite_findings = EmbeddedDocumentListField(document_type=ScoutsuiteFinding, required=False)
|
scoutsuite_findings = EmbeddedDocumentListField(document_type=ScoutsuiteFinding, required=False)
|
||||||
|
|
||||||
# LOGIC
|
|
||||||
def add_events(self, events: List[Event]) -> None:
|
|
||||||
self.update(push_all__events=events)
|
|
||||||
|
|
||||||
def add_scoutsuite_findings(self, scoutsuite_findings: List[ScoutsuiteFinding]) -> None:
|
def add_scoutsuite_findings(self, scoutsuite_findings: List[ScoutsuiteFinding]) -> None:
|
||||||
self.update(push_all__scoutsuite_findings=scoutsuite_findings)
|
self.update(push_all__scoutsuite_findings=scoutsuite_findings)
|
|
@ -4,8 +4,7 @@ import mongomock
|
||||||
from packaging import version
|
from packaging import version
|
||||||
|
|
||||||
import common.common_consts.zero_trust_consts as zero_trust_consts
|
import common.common_consts.zero_trust_consts as zero_trust_consts
|
||||||
from monkey_island.cc.models.zero_trust.aggregate_finding import \
|
from monkey_island.cc.services.zero_trust.monkey_finding_service import MonkeyFindingService
|
||||||
AggregateFinding
|
|
||||||
from monkey_island.cc.models.zero_trust.event import Event
|
from monkey_island.cc.models.zero_trust.event import Event
|
||||||
from monkey_island.cc.models.zero_trust.finding import Finding
|
from monkey_island.cc.models.zero_trust.finding import Finding
|
||||||
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
||||||
|
@ -24,12 +23,12 @@ class TestAggregateFinding(IslandTestCase):
|
||||||
events = [Event.create_event("t", "t", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)]
|
events = [Event.create_event("t", "t", zero_trust_consts.EVENT_TYPE_MONKEY_NETWORK)]
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)), 0)
|
self.assertEqual(len(Finding.objects(test=test, status=status)), 0)
|
||||||
|
|
||||||
AggregateFinding.create_or_add_to_existing(test, status, events)
|
MonkeyFindingService.create_or_add_to_existing(test, status, events)
|
||||||
|
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 1)
|
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 1)
|
||||||
|
|
||||||
AggregateFinding.create_or_add_to_existing(test, status, events)
|
MonkeyFindingService.create_or_add_to_existing(test, status, events)
|
||||||
|
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 2)
|
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 2)
|
||||||
|
@ -51,7 +50,7 @@ class TestAggregateFinding(IslandTestCase):
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 1)
|
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 1)
|
||||||
|
|
||||||
AggregateFinding.create_or_add_to_existing(test, status, events)
|
MonkeyFindingService.create_or_add_to_existing(test, status, events)
|
||||||
|
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
self.assertEqual(len(Finding.objects(test=test, status=status)), 1)
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 2)
|
self.assertEqual(len(Finding.objects(test=test, status=status)[0].events), 2)
|
||||||
|
@ -61,4 +60,4 @@ class TestAggregateFinding(IslandTestCase):
|
||||||
self.assertEqual(len(Finding.objects(test=test, status=status)), 2)
|
self.assertEqual(len(Finding.objects(test=test, status=status)), 2)
|
||||||
|
|
||||||
with self.assertRaises(AssertionError):
|
with self.assertRaises(AssertionError):
|
||||||
AggregateFinding.create_or_add_to_existing(test, status, events)
|
MonkeyFindingService.create_or_add_to_existing(test, status, events)
|
||||||
|
|
|
@ -5,7 +5,7 @@ from flask import jsonify
|
||||||
|
|
||||||
from monkey_island.cc.resources.auth.auth import jwt_required
|
from monkey_island.cc.resources.auth.auth import jwt_required
|
||||||
from monkey_island.cc.services.reporting.report import ReportService
|
from monkey_island.cc.services.reporting.report import ReportService
|
||||||
from monkey_island.cc.services.reporting.zero_trust_service import \
|
from monkey_island.cc.services.zero_trust.zero_trust_service import \
|
||||||
ZeroTrustService
|
ZeroTrustService
|
||||||
|
|
||||||
ZERO_TRUST_REPORT_TYPE = "zero_trust"
|
ZERO_TRUST_REPORT_TYPE = "zero_trust"
|
||||||
|
|
|
@ -3,12 +3,11 @@ import json
|
||||||
import flask_restful
|
import flask_restful
|
||||||
|
|
||||||
from monkey_island.cc.resources.auth.auth import jwt_required
|
from monkey_island.cc.resources.auth.auth import jwt_required
|
||||||
from monkey_island.cc.services.reporting.zero_trust_service import \
|
from monkey_island.cc.services.zero_trust.monkey_finding_service import MonkeyFindingService
|
||||||
ZeroTrustService
|
|
||||||
|
|
||||||
|
|
||||||
class ZeroTrustFindingEvent(flask_restful.Resource):
|
class ZeroTrustFindingEvent(flask_restful.Resource):
|
||||||
|
|
||||||
@jwt_required
|
@jwt_required
|
||||||
def get(self, finding_id: str):
|
def get(self, finding_id: str):
|
||||||
return {'events_json': json.dumps(ZeroTrustService.get_events_by_finding(finding_id), default=str)}
|
return {'events_json': json.dumps(MonkeyFindingService.get_events_by_finding(finding_id), default=str)}
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from bson import ObjectId
|
||||||
|
|
||||||
|
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
|
||||||
|
|
||||||
|
# How many events of a single finding to return to UI.
|
||||||
|
# 50 will return 50 latest and 50 oldest events from a finding
|
||||||
|
EVENT_FETCH_CNT = 50
|
||||||
|
|
||||||
|
|
||||||
|
class EventsService:
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def fetch_events_for_display(finding_id: ObjectId):
|
||||||
|
pipeline = [{'$match': {'_id': finding_id}},
|
||||||
|
{'$addFields': {'oldest_events': {'$slice': ['$events', EVENT_FETCH_CNT]},
|
||||||
|
'latest_events': {'$slice': ['$events', -1 * EVENT_FETCH_CNT]},
|
||||||
|
'event_count': {'$size': '$events'}}},
|
||||||
|
{'$unset': ['events']}]
|
||||||
|
details = MonkeyFindingDetails.objects.aggregate(*pipeline).next()
|
||||||
|
details['latest_events'] = EventsService._get_events_without_overlap(details['event_count'],
|
||||||
|
details['latest_events'])
|
||||||
|
return details
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_events_without_overlap(event_count: int, events: List[object]) -> List[object]:
|
||||||
|
overlap_count = event_count - EVENT_FETCH_CNT
|
||||||
|
if overlap_count >= EVENT_FETCH_CNT:
|
||||||
|
return events
|
||||||
|
elif overlap_count <= 0:
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
return events[-1 * overlap_count:]
|
|
@ -0,0 +1,23 @@
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from common.common_consts import zero_trust_consts
|
||||||
|
from monkey_island.cc.models.zero_trust.finding import Finding
|
||||||
|
|
||||||
|
|
||||||
|
class FindingService:
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_all_findings() -> List[Finding]:
|
||||||
|
return list(Finding.objects)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_enriched_finding(finding):
|
||||||
|
test_info = zero_trust_consts.TESTS_MAP[finding['test']]
|
||||||
|
enriched_finding = {
|
||||||
|
'finding_id': str(finding['_id']),
|
||||||
|
'test': test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY][finding['status']],
|
||||||
|
'test_key': finding['test'],
|
||||||
|
'pillars': test_info[zero_trust_consts.PILLARS_KEY],
|
||||||
|
'status': finding['status'],
|
||||||
|
}
|
||||||
|
return enriched_finding
|
|
@ -0,0 +1,68 @@
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from bson import ObjectId
|
||||||
|
|
||||||
|
from common.common_consts import zero_trust_consts
|
||||||
|
from monkey_island.cc.models.zero_trust.event import Event
|
||||||
|
from monkey_island.cc.models.zero_trust.finding import Finding
|
||||||
|
from monkey_island.cc.models.zero_trust.monkey_finding_details import MonkeyFindingDetails
|
||||||
|
from monkey_island.cc.services.zero_trust.finding_service import FindingService
|
||||||
|
|
||||||
|
|
||||||
|
class MonkeyFindingService:
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create_or_add_to_existing(test, status, events):
|
||||||
|
"""
|
||||||
|
Create a new finding or add the events to an existing one if it's the same (same meaning same status and same
|
||||||
|
test).
|
||||||
|
|
||||||
|
:raises: Assertion error if this is used when there's more then one finding which fits the query - this is not
|
||||||
|
when this function should be used.
|
||||||
|
"""
|
||||||
|
existing_findings = Finding.objects(test=test, status=status)
|
||||||
|
assert (len(existing_findings) < 2), "More than one finding exists for {}:{}".format(test, status)
|
||||||
|
|
||||||
|
if len(existing_findings) == 0:
|
||||||
|
MonkeyFindingService.create_new_finding(test, status, events)
|
||||||
|
else:
|
||||||
|
# Now we know for sure this is the only one
|
||||||
|
MonkeyFindingService.add_events(existing_findings[0], events)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create_new_finding(test: str, status: str, events: List[Event]):
|
||||||
|
details = MonkeyFindingDetails()
|
||||||
|
details.events = events
|
||||||
|
details.save()
|
||||||
|
Finding.save_finding(test, status, details)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def add_events(finding: Finding, events: List[Event]):
|
||||||
|
finding.details.fetch().add_events(events)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_all_monkey_findings():
|
||||||
|
findings = FindingService.get_all_findings()
|
||||||
|
for i in range(len(findings)):
|
||||||
|
details = MonkeyFindingService.fetch_events_for_display(findings[i].details.id)
|
||||||
|
findings[i] = findings[i].to_mongo()
|
||||||
|
findings[i] = FindingService.get_enriched_finding(findings[i])
|
||||||
|
findings[i]['details'] = details
|
||||||
|
return findings
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_events_by_finding(finding_id: str) -> List[object]:
|
||||||
|
finding = Finding.objects.get(id=finding_id)
|
||||||
|
pipeline = [{'$match': {'_id': ObjectId(finding.details.id)}},
|
||||||
|
{'$unwind': '$events'},
|
||||||
|
{'$project': {'events': '$events'}},
|
||||||
|
{'$replaceRoot': {'newRoot': '$events'}}]
|
||||||
|
return list(MonkeyFindingDetails.objects.aggregate(*pipeline))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def add_malicious_activity_to_timeline(events):
|
||||||
|
MonkeyFindingService.create_or_add_to_existing(
|
||||||
|
test=zero_trust_consts.TEST_MALICIOUS_ACTIVITY_TIMELINE,
|
||||||
|
status=zero_trust_consts.STATUS_VERIFY,
|
||||||
|
events=events
|
||||||
|
)
|
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
class ScoutsuiteFindingService:
|
||||||
|
pass
|
|
@ -1,7 +1,7 @@
|
||||||
import common.common_consts.zero_trust_consts as zero_trust_consts
|
import common.common_consts.zero_trust_consts as zero_trust_consts
|
||||||
import monkey_island.cc.services.reporting.zero_trust_service
|
import monkey_island.cc.services.zero_trust.zero_trust_service
|
||||||
from monkey_island.cc.models.zero_trust.finding import Finding
|
from monkey_island.cc.models.zero_trust.finding import Finding
|
||||||
from monkey_island.cc.services.reporting.zero_trust_service import \
|
from monkey_island.cc.services.zero_trust.zero_trust_service import \
|
||||||
ZeroTrustService
|
ZeroTrustService
|
||||||
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
from monkey_island.cc.testing.IslandTestCase import IslandTestCase
|
||||||
|
|
|
@ -5,14 +5,8 @@ from bson.objectid import ObjectId
|
||||||
import common.common_consts.zero_trust_consts as zero_trust_consts
|
import common.common_consts.zero_trust_consts as zero_trust_consts
|
||||||
from monkey_island.cc.models.zero_trust.finding import Finding
|
from monkey_island.cc.models.zero_trust.finding import Finding
|
||||||
|
|
||||||
# How many events of a single finding to return to UI.
|
|
||||||
# 50 will return 50 latest and 50 oldest events from a finding
|
|
||||||
from monkey_island.cc.models.zero_trust.finding_details import FindingDetails
|
|
||||||
|
|
||||||
EVENT_FETCH_CNT = 50
|
class ZeroTrustService:
|
||||||
|
|
||||||
|
|
||||||
class ZeroTrustService(object):
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_pillars_grades():
|
def get_pillars_grades():
|
||||||
pillars_grades = []
|
pillars_grades = []
|
||||||
|
@ -110,54 +104,6 @@ class ZeroTrustService(object):
|
||||||
|
|
||||||
return current_worst_status
|
return current_worst_status
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_all_monkey_findings():
|
|
||||||
findings = list(Finding.objects)
|
|
||||||
for finding in findings:
|
|
||||||
details = finding.details.fetch()
|
|
||||||
finding.details = details
|
|
||||||
enriched_findings = [ZeroTrustService.__get_enriched_finding(f) for f in findings]
|
|
||||||
all_finding_details = ZeroTrustService._parse_finding_details_for_ui()
|
|
||||||
return enriched_findings
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_finding_details_for_ui() -> List[FindingDetails]:
|
|
||||||
"""
|
|
||||||
We don't need to return all events to UI, we only display N first and N last events.
|
|
||||||
This code returns a list of FindingDetails with ONLY the events which are relevant to UI.
|
|
||||||
"""
|
|
||||||
pipeline = [{'$addFields': {'oldest_events': {'$slice': ['$events', EVENT_FETCH_CNT]},
|
|
||||||
'latest_events': {'$slice': ['$events', -1 * EVENT_FETCH_CNT]},
|
|
||||||
'event_count': {'$size': '$events'}}},
|
|
||||||
{'$unset': ['events']}]
|
|
||||||
all_details = list(FindingDetails.objects.aggregate(*pipeline))
|
|
||||||
for details in all_details:
|
|
||||||
details['latest_events'] = ZeroTrustService._get_events_without_overlap(details['event_count'],
|
|
||||||
details['latest_events'])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_events_without_overlap(event_count: int, events: List[object]) -> List[object]:
|
|
||||||
overlap_count = event_count - EVENT_FETCH_CNT
|
|
||||||
if overlap_count >= EVENT_FETCH_CNT:
|
|
||||||
return events
|
|
||||||
elif overlap_count <= 0:
|
|
||||||
return []
|
|
||||||
else:
|
|
||||||
return events[-1 * overlap_count:]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def __get_enriched_finding(finding):
|
|
||||||
test_info = zero_trust_consts.TESTS_MAP[finding['test']]
|
|
||||||
enriched_finding = {
|
|
||||||
'finding_id': str(finding['_id']),
|
|
||||||
'test': test_info[zero_trust_consts.FINDING_EXPLANATION_BY_STATUS_KEY][finding['status']],
|
|
||||||
'test_key': finding['test'],
|
|
||||||
'pillars': test_info[zero_trust_consts.PILLARS_KEY],
|
|
||||||
'status': finding['status'],
|
|
||||||
}
|
|
||||||
return enriched_finding
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_statuses_to_pillars():
|
def get_statuses_to_pillars():
|
||||||
results = {
|
results = {
|
||||||
|
@ -187,11 +133,3 @@ class ZeroTrustService(object):
|
||||||
if grade[status] > 0:
|
if grade[status] > 0:
|
||||||
return status
|
return status
|
||||||
return zero_trust_consts.STATUS_UNEXECUTED
|
return zero_trust_consts.STATUS_UNEXECUTED
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_events_by_finding(finding_id: str) -> List[object]:
|
|
||||||
pipeline = [{'$match': {'_id': ObjectId(finding_id)}},
|
|
||||||
{'$unwind': '$events'},
|
|
||||||
{'$project': {'events': '$events'}},
|
|
||||||
{'$replaceRoot': {'newRoot': '$events'}}]
|
|
||||||
return list(Finding.objects.aggregate(*pipeline))
|
|
Loading…
Reference in New Issue