From e4d38631b82fa2dd63b3959bfabb4366bfa5309f Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Wed, 6 Jul 2022 20:45:49 -0400 Subject: [PATCH] Common: Serialize/Deserialize Credentials --- monkey/common/credentials/credentials.py | 125 +++++++++++++++++- .../common/credentials/test_credentials.py | 57 ++++++++ 2 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 monkey/tests/unit_tests/common/credentials/test_credentials.py diff --git a/monkey/common/credentials/credentials.py b/monkey/common/credentials/credentials.py index d5591f6d7..f60c71fbe 100644 --- a/monkey/common/credentials/credentials.py +++ b/monkey/common/credentials/credentials.py @@ -1,10 +1,131 @@ -from dataclasses import dataclass -from typing import Tuple +from __future__ import annotations +from dataclasses import dataclass +from typing import Any, Mapping, Tuple + +from marshmallow import INCLUDE, Schema, fields, post_load, pre_dump +from marshmallow_enum import EnumField + +from . import CredentialComponentType, LMHash, NTHash, Password, SSHKeypair, Username from .i_credential_component import ICredentialComponent +from .lm_hash import LMHashSchema +from .nt_hash import NTHashSchema +from .password import PasswordSchema +from .ssh_keypair import SSHKeypairSchema +from .username import UsernameSchema + +CREDENTIL_COMPINENT_TYPE_TO_CLASS = { + CredentialComponentType.LM_HASH: LMHash, + CredentialComponentType.NT_HASH: NTHash, + CredentialComponentType.PASSWORD: Password, + CredentialComponentType.SSH_KEYPAIR: SSHKeypair, + CredentialComponentType.USERNAME: Username, +} +CREDENTIL_COMPINENT_TYPE_TO_CLASS_SCHEMA = { + CredentialComponentType.LM_HASH: LMHashSchema(), + CredentialComponentType.NT_HASH: NTHashSchema(), + CredentialComponentType.PASSWORD: PasswordSchema(), + CredentialComponentType.SSH_KEYPAIR: SSHKeypairSchema(), + CredentialComponentType.USERNAME: UsernameSchema(), +} + + +class GenericCredentialComponentSchema(Schema): + class Meta: + unknown = INCLUDE + + credential_type = EnumField(CredentialComponentType) + + """ + @post_load + def _string_to_enum(self, data, **kwargs) -> Mapping[str, Any]: + data["credential_type"] = CredentialComponentType[data["credential_type"]] + """ + + +class CredentialsSchema(Schema): + # Use fields.List instead of fields.Tuple because marshmallow requires fields.Tuple to have a + # fixed length. + # identities = fields.List(fields.Nested(GenericCredentialComponentSchema)) + # secrets = fields.List(fields.Nested(GenericCredentialComponentSchema)) + identities = fields.List(fields.Mapping()) + secrets = fields.List(fields.Mapping()) + + @post_load + def _make_credentials(self, data, **kwargs) -> Mapping[str, Tuple(Mapping)]: + from pprint import pprint + + for component in data["identities"]: + pprint(component) + data["identities"] = tuple( + [ + CredentialsSchema._build_credential_component(component) + for component in data["identities"] + ] + ) + data["secrets"] = tuple( + [ + CredentialsSchema._build_credential_component(component) + for component in data["secrets"] + ] + ) + + return data + + @staticmethod + def _build_credential_component(data: Mapping[str, Any]): + credential_component_type = CredentialComponentType[data["credential_type"]] + credential_component_class = CREDENTIL_COMPINENT_TYPE_TO_CLASS[credential_component_type] + credential_component_schema = CREDENTIL_COMPINENT_TYPE_TO_CLASS_SCHEMA[ + credential_component_type + ] + + data["credential_type"] = data["credential_type"] + return credential_component_class(**credential_component_schema.load(data)) + + @pre_dump + def _serialize_credentials( + self, credentials: Credentials, **kwargs + ) -> Mapping[str, Tuple[Mapping[str, Any]]]: + data = {} + data["identities"] = tuple( + [ + CredentialsSchema._serialize_credential_component(component) + for component in credentials.identities + ] + ) + data["secrets"] = tuple( + [ + CredentialsSchema._serialize_credential_component(component) + for component in credentials.secrets + ] + ) + + return data + + @staticmethod + def _serialize_credential_component(credential_component: ICredentialComponent): + credential_component_schema = CREDENTIL_COMPINENT_TYPE_TO_CLASS_SCHEMA[ + credential_component.credential_type + ] + return credential_component_schema.dump(credential_component) @dataclass(frozen=True) class Credentials: identities: Tuple[ICredentialComponent] secrets: Tuple[ICredentialComponent] + + @staticmethod + def from_mapping(credentials: Mapping) -> Credentials: + deserialized_data = CredentialsSchema().load(credentials) + return Credentials(**deserialized_data) + + @staticmethod + def from_json(credentials: str) -> Credentials: + deserialized_data = CredentialsSchema().loads(credentials) + return Credentials(**deserialized_data) + + @staticmethod + def to_json(credentials: Credentials) -> str: + return CredentialsSchema().dumps(credentials) diff --git a/monkey/tests/unit_tests/common/credentials/test_credentials.py b/monkey/tests/unit_tests/common/credentials/test_credentials.py new file mode 100644 index 000000000..4496a2f05 --- /dev/null +++ b/monkey/tests/unit_tests/common/credentials/test_credentials.py @@ -0,0 +1,57 @@ +import json + +from common.credentials import Credentials, LMHash, NTHash, Password, SSHKeypair, Username + +USER1 = "test_user_1" +USER2 = "test_user_2" +PASSWORD = "12435" +LM_HASH = "AEBD4DE384C7EC43AAD3B435B51404EE" +NT_HASH = "7A21990FCD3D759941E45C490F143D5F" +PUBLIC_KEY = "MY_PUBLIC_KEY" +PRIVATE_KEY = "MY_PRIVATE_KEY" + +CREDENTIALS_DICT = { + "identities": [ + {"credential_type": "USERNAME", "username": USER1}, + {"credential_type": "USERNAME", "username": USER2}, + ], + "secrets": [ + {"credential_type": "PASSWORD", "password": PASSWORD}, + {"credential_type": "LM_HASH", "lm_hash": LM_HASH}, + {"credential_type": "NT_HASH", "nt_hash": NT_HASH}, + { + "credential_type": "SSH_KEYPAIR", + "public_key": PUBLIC_KEY, + "private_key": PRIVATE_KEY, + }, + ], +} + +CREDENTIALS_JSON = json.dumps(CREDENTIALS_DICT) + +IDENTITIES = (Username(USER1), Username(USER2)) +SECRETS = ( + Password(PASSWORD), + LMHash(LM_HASH), + NTHash(NT_HASH), + SSHKeypair(PRIVATE_KEY, PUBLIC_KEY), +) +CREDENTIALS_OBJECT = Credentials(IDENTITIES, SECRETS) + + +def test_credentials_serialization_json(): + serialized_credentials = Credentials.to_json(CREDENTIALS_OBJECT) + + assert json.loads(serialized_credentials) == CREDENTIALS_DICT + + +def test_credentials_deserialization__from_mapping(): + deserialized_credentials = Credentials.from_mapping(CREDENTIALS_DICT) + + assert deserialized_credentials == CREDENTIALS_OBJECT + + +def test_credentials_deserialization__from_json(): + deserialized_credentials = Credentials.from_json(CREDENTIALS_JSON) + + assert deserialized_credentials == CREDENTIALS_OBJECT