Refactor DocumentEncryptor class into a series of methods.

DocumentEncryptor class serves no purpose because it holds no state, sensitive_fields can be passed as a parameter to methods
This commit is contained in:
VakarisZ 2021-09-22 15:36:51 +03:00
parent f3865d022b
commit 854ce4e1e1
6 changed files with 52 additions and 52 deletions

View File

@ -3,7 +3,13 @@ from __future__ import annotations
from bson import json_util from bson import json_util
from mongoengine import DictField, Document from mongoengine import DictField, Document
from monkey_island.cc.models.utils import report_encryptor from monkey_island.cc.models.utils import document_encryptor
from monkey_island.cc.models.utils.document_encryptor import SensitiveField
from monkey_island.cc.models.utils.field_encryptors.string_list_encryptor import StringListEncryptor
sensitive_fields = [
SensitiveField(path="overview.config_passwords", field_encryptor=StringListEncryptor)
]
class Report(Document): class Report(Document):
@ -18,7 +24,7 @@ class Report(Document):
@staticmethod @staticmethod
def save_report(report_dict: dict): def save_report(report_dict: dict):
report_dict = _encode_dot_char_before_mongo_insert(report_dict) report_dict = _encode_dot_char_before_mongo_insert(report_dict)
report_dict = report_encryptor.encrypt(report_dict) report_dict = document_encryptor.encrypt(sensitive_fields, report_dict)
Report.objects.delete() Report.objects.delete()
Report( Report(
overview=report_dict["overview"], overview=report_dict["overview"],
@ -30,7 +36,9 @@ class Report(Document):
@staticmethod @staticmethod
def get_report() -> dict: def get_report() -> dict:
report_dict = Report.objects.first().to_mongo() report_dict = Report.objects.first().to_mongo()
return _decode_dot_char_before_mongo_insert(report_encryptor.decrypt(report_dict)) return _decode_dot_char_before_mongo_insert(
document_encryptor.decrypt(sensitive_fields, report_dict)
)
def _encode_dot_char_before_mongo_insert(report_dict): def _encode_dot_char_before_mongo_insert(report_dict):

View File

@ -1,51 +1,45 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable, List, Type from typing import Callable, List, Type
import dpath.util import dpath.util
from monkey_island.cc.models.utils.field_types.field_type_abc import FieldTypeABC from monkey_island.cc.models.utils.field_encryptors.i_field_encryptor import IFieldEncryptor
class FieldNotFoundError(Exception):
pass
@dataclass @dataclass
class SensitiveField: class SensitiveField:
path: str path: str
path_separator = "." path_separator = "."
field_type: Type[FieldTypeABC] field_encryptor: Type[IFieldEncryptor]
class DocumentEncryptor(ABC): def encrypt(sensitive_fields: List[SensitiveField], document_dict: dict) -> dict:
@property for sensitive_field in sensitive_fields:
@abstractmethod _apply_operation_to_document_field(
def sensitive_fields(self) -> List[SensitiveField]: document_dict, sensitive_field, sensitive_field.field_encryptor.encrypt
pass
@classmethod
def encrypt(cls, document_dict: dict) -> dict:
for sensitive_field in cls.sensitive_fields:
DocumentEncryptor._apply_operation_to_document_field(
document_dict, sensitive_field, sensitive_field.field_type.encrypt
) )
return document_dict return document_dict
@classmethod
def decrypt(cls, document_dict: dict) -> dict: def decrypt(sensitive_fields: List[SensitiveField], document_dict: dict) -> dict:
for sensitive_field in cls.sensitive_fields: for sensitive_field in sensitive_fields:
DocumentEncryptor._apply_operation_to_document_field( _apply_operation_to_document_field(
document_dict, sensitive_field, sensitive_field.field_type.decrypt document_dict, sensitive_field, sensitive_field.field_encryptor.decrypt
) )
return document_dict return document_dict
@staticmethod
def _apply_operation_to_document_field( def _apply_operation_to_document_field(
report: dict, sensitive_field: SensitiveField, operation: Callable report: dict, sensitive_field: SensitiveField, operation: Callable
): ):
field_value = dpath.util.get( field_value = dpath.util.get(report, sensitive_field.path, sensitive_field.path_separator, None)
report, sensitive_field.path, sensitive_field.path_separator, None
)
if field_value is None: if field_value is None:
raise Exception( raise FieldNotFoundError(
f"Can't encrypt object because the path {sensitive_field.path} doesn't exist." f"Can't encrypt object because the path {sensitive_field.path} doesn't exist."
) )

View File

@ -4,8 +4,8 @@ from typing import List
import pytest import pytest
from monkey_island.cc.models import Report from monkey_island.cc.models import Report
from monkey_island.cc.models.utils.document_encryptor import SensitiveField
from monkey_island.cc.models.utils.field_encryptors.i_field_encryptor import IFieldEncryptor from monkey_island.cc.models.utils.field_encryptors.i_field_encryptor import IFieldEncryptor
from monkey_island.cc.models.utils.report_encryptor import SensitiveField
MOCK_SENSITIVE_FIELD_CONTENTS = ["the_string", "the_string2"] MOCK_SENSITIVE_FIELD_CONTENTS = ["the_string", "the_string2"]
MOCK_REPORT_DICT = { MOCK_REPORT_DICT = {
@ -19,32 +19,30 @@ MOCK_REPORT_DICT = {
} }
class MockFieldEncryptor(IFieldEncryptor): class MockStringListEncryptor(IFieldEncryptor):
plaintext = [] plaintext = []
@staticmethod @staticmethod
def encrypt(value: List[str]) -> List[str]: def encrypt(value: List[str]) -> List[str]:
return [MockFieldEncryptor._encrypt(v) for v in value] return [MockStringListEncryptor._encrypt(v) for v in value]
@staticmethod @staticmethod
def _encrypt(value: str) -> str: def _encrypt(value: str) -> str:
MockFieldEncryptor.plaintext.append(value) MockStringListEncryptor.plaintext.append(value)
return f"ENCRYPTED_{str(len(MockFieldEncryptor.plaintext) - 1)}" return f"ENCRYPTED_{str(len(MockStringListEncryptor.plaintext) - 1)}"
@staticmethod @staticmethod
def decrypt(value: List[str]) -> List[str]: def decrypt(value: List[str]) -> List[str]:
return MockFieldEncryptor.plaintext return MockStringListEncryptor.plaintext
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def patch_sensitive_fields(monkeypatch): def patch_sensitive_fields(monkeypatch):
mock_sensitive_fields = [ mock_sensitive_fields = [
SensitiveField("overview.foo.the_key", MockFieldEncryptor), SensitiveField("overview.foo.the_key", MockStringListEncryptor),
SensitiveField("overview.bar.the_key", MockFieldEncryptor), SensitiveField("overview.bar.the_key", MockStringListEncryptor),
] ]
monkeypatch.setattr( monkeypatch.setattr("monkey_island.cc.models.report.sensitive_fields", mock_sensitive_fields)
"monkey_island.cc.models.utils.report_encryptor.sensitive_fields", mock_sensitive_fields
)
@pytest.mark.usefixtures("uses_database") @pytest.mark.usefixtures("uses_database")