UT: Test that stored events are encrypted

This commit is contained in:
Kekoa Kaaikala 2022-09-20 14:05:51 +00:00
parent dea7b4f74a
commit a83186f532
1 changed files with 72 additions and 1 deletions

View File

@ -1,10 +1,12 @@
import uuid
from typing import List
from typing import Any, Iterable, List, Mapping
from unittest.mock import MagicMock
import mongomock
import pytest
from pydantic import Field
from pymongo import MongoClient
from tests.unit_tests.monkey_island.cc.repository.mongo import get_all_collections_in_mongo
from common.agent_event_serializers import (
AgentEventSerializerRegistry,
@ -18,6 +20,11 @@ from monkey_island.cc.repository import (
RetrievalError,
StorageError,
)
from monkey_island.cc.repository.agent_event_encryption import (
ENCRYPTED_PREFIX,
SERIALIZED_EVENT_FIELDS,
)
from monkey_island.cc.repository.consts import MONGO_OBJECT_ID_KEY
class FakeAgentEvent(AbstractAgentEvent):
@ -103,6 +110,15 @@ def test_mongo_agent_event_repository__save_event(mongo_repository: IAgentEventR
assert event in events
def test_mongo_agent_event_repository__saved_events_are_encrypted(
mongo_repository: IAgentEventRepository, mongo_client
):
event = FakeAgentEvent(source=uuid.uuid4())
mongo_repository.save_event(event)
assert_events_are_encrypted(mongo_client, [event])
def test_mongo_agent_event_repository__save_event_raises(
error_raising_mongo_repository: IAgentEventRepository,
):
@ -186,3 +202,58 @@ def test_mongo_agent_event_repository__reset_raises(
):
with pytest.raises(RemovalError):
error_raising_mongo_repository.reset()
def get_all_events_in_mongo(
mongo_client: MongoClient,
) -> Iterable[Mapping[str, Mapping[str, Any]]]:
events = []
for collection in get_all_collections_in_mongo(mongo_client):
mongo_events = collection.find({}, {MONGO_OBJECT_ID_KEY: False})
for mongo_event in mongo_events:
events.append(mongo_event)
return events
def is_encrypted_event(original_event: AbstractAgentEvent, other_event) -> bool:
"""
Checks if an event is an encrypted version of the original
- The number of fields match
- The AbstractAgentEvent fields match
- The remaining fields have a matching encrypted_ prefix
- The remaining fields are the encrypted version of the original fields
"""
event = original_event.dict(simplify=True)
# Note: The serializer adds a "type" field
event["type"] = type(original_event).__name__
if len(event.keys()) != len(other_event.keys()):
return False
for field in event.keys():
if field in SERIALIZED_EVENT_FIELDS:
if event[field] != other_event[field]:
return False
else:
encrypted_field = ENCRYPTED_PREFIX + field
if (
encrypted_field not in other_event.keys()
or event[field] == other_event[encrypted_field]
):
return False
return True
def assert_events_are_encrypted(
mongo_client: MongoClient, original_events: Iterable[AbstractAgentEvent]
):
stored_events = get_all_events_in_mongo(mongo_client)
for event in original_events:
assert any([is_encrypted_event(event, se) for se in stored_events])