Common: Serialize/Deserialize Credentials

This commit is contained in:
Mike Salvatore 2022-07-06 20:45:49 -04:00
parent 82ce091063
commit e4d38631b8
2 changed files with 180 additions and 2 deletions

View File

@ -1,10 +1,131 @@
from dataclasses import dataclass from __future__ import annotations
from typing import Tuple
from dataclasses import dataclass
from typing import Any, Mapping, Tuple
from marshmallow import INCLUDE, Schema, fields, post_load, pre_dump
from marshmallow_enum import EnumField
from . import CredentialComponentType, LMHash, NTHash, Password, SSHKeypair, Username
from .i_credential_component import ICredentialComponent from .i_credential_component import ICredentialComponent
from .lm_hash import LMHashSchema
from .nt_hash import NTHashSchema
from .password import PasswordSchema
from .ssh_keypair import SSHKeypairSchema
from .username import UsernameSchema
CREDENTIL_COMPINENT_TYPE_TO_CLASS = {
CredentialComponentType.LM_HASH: LMHash,
CredentialComponentType.NT_HASH: NTHash,
CredentialComponentType.PASSWORD: Password,
CredentialComponentType.SSH_KEYPAIR: SSHKeypair,
CredentialComponentType.USERNAME: Username,
}
CREDENTIL_COMPINENT_TYPE_TO_CLASS_SCHEMA = {
CredentialComponentType.LM_HASH: LMHashSchema(),
CredentialComponentType.NT_HASH: NTHashSchema(),
CredentialComponentType.PASSWORD: PasswordSchema(),
CredentialComponentType.SSH_KEYPAIR: SSHKeypairSchema(),
CredentialComponentType.USERNAME: UsernameSchema(),
}
class GenericCredentialComponentSchema(Schema):
class Meta:
unknown = INCLUDE
credential_type = EnumField(CredentialComponentType)
"""
@post_load
def _string_to_enum(self, data, **kwargs) -> Mapping[str, Any]:
data["credential_type"] = CredentialComponentType[data["credential_type"]]
"""
class CredentialsSchema(Schema):
# Use fields.List instead of fields.Tuple because marshmallow requires fields.Tuple to have a
# fixed length.
# identities = fields.List(fields.Nested(GenericCredentialComponentSchema))
# secrets = fields.List(fields.Nested(GenericCredentialComponentSchema))
identities = fields.List(fields.Mapping())
secrets = fields.List(fields.Mapping())
@post_load
def _make_credentials(self, data, **kwargs) -> Mapping[str, Tuple(Mapping)]:
from pprint import pprint
for component in data["identities"]:
pprint(component)
data["identities"] = tuple(
[
CredentialsSchema._build_credential_component(component)
for component in data["identities"]
]
)
data["secrets"] = tuple(
[
CredentialsSchema._build_credential_component(component)
for component in data["secrets"]
]
)
return data
@staticmethod
def _build_credential_component(data: Mapping[str, Any]):
credential_component_type = CredentialComponentType[data["credential_type"]]
credential_component_class = CREDENTIL_COMPINENT_TYPE_TO_CLASS[credential_component_type]
credential_component_schema = CREDENTIL_COMPINENT_TYPE_TO_CLASS_SCHEMA[
credential_component_type
]
data["credential_type"] = data["credential_type"]
return credential_component_class(**credential_component_schema.load(data))
@pre_dump
def _serialize_credentials(
self, credentials: Credentials, **kwargs
) -> Mapping[str, Tuple[Mapping[str, Any]]]:
data = {}
data["identities"] = tuple(
[
CredentialsSchema._serialize_credential_component(component)
for component in credentials.identities
]
)
data["secrets"] = tuple(
[
CredentialsSchema._serialize_credential_component(component)
for component in credentials.secrets
]
)
return data
@staticmethod
def _serialize_credential_component(credential_component: ICredentialComponent):
credential_component_schema = CREDENTIL_COMPINENT_TYPE_TO_CLASS_SCHEMA[
credential_component.credential_type
]
return credential_component_schema.dump(credential_component)
@dataclass(frozen=True) @dataclass(frozen=True)
class Credentials: class Credentials:
identities: Tuple[ICredentialComponent] identities: Tuple[ICredentialComponent]
secrets: Tuple[ICredentialComponent] secrets: Tuple[ICredentialComponent]
@staticmethod
def from_mapping(credentials: Mapping) -> Credentials:
deserialized_data = CredentialsSchema().load(credentials)
return Credentials(**deserialized_data)
@staticmethod
def from_json(credentials: str) -> Credentials:
deserialized_data = CredentialsSchema().loads(credentials)
return Credentials(**deserialized_data)
@staticmethod
def to_json(credentials: Credentials) -> str:
return CredentialsSchema().dumps(credentials)

View File

@ -0,0 +1,57 @@
import json
from common.credentials import Credentials, LMHash, NTHash, Password, SSHKeypair, Username
USER1 = "test_user_1"
USER2 = "test_user_2"
PASSWORD = "12435"
LM_HASH = "AEBD4DE384C7EC43AAD3B435B51404EE"
NT_HASH = "7A21990FCD3D759941E45C490F143D5F"
PUBLIC_KEY = "MY_PUBLIC_KEY"
PRIVATE_KEY = "MY_PRIVATE_KEY"
CREDENTIALS_DICT = {
"identities": [
{"credential_type": "USERNAME", "username": USER1},
{"credential_type": "USERNAME", "username": USER2},
],
"secrets": [
{"credential_type": "PASSWORD", "password": PASSWORD},
{"credential_type": "LM_HASH", "lm_hash": LM_HASH},
{"credential_type": "NT_HASH", "nt_hash": NT_HASH},
{
"credential_type": "SSH_KEYPAIR",
"public_key": PUBLIC_KEY,
"private_key": PRIVATE_KEY,
},
],
}
CREDENTIALS_JSON = json.dumps(CREDENTIALS_DICT)
IDENTITIES = (Username(USER1), Username(USER2))
SECRETS = (
Password(PASSWORD),
LMHash(LM_HASH),
NTHash(NT_HASH),
SSHKeypair(PRIVATE_KEY, PUBLIC_KEY),
)
CREDENTIALS_OBJECT = Credentials(IDENTITIES, SECRETS)
def test_credentials_serialization_json():
serialized_credentials = Credentials.to_json(CREDENTIALS_OBJECT)
assert json.loads(serialized_credentials) == CREDENTIALS_DICT
def test_credentials_deserialization__from_mapping():
deserialized_credentials = Credentials.from_mapping(CREDENTIALS_DICT)
assert deserialized_credentials == CREDENTIALS_OBJECT
def test_credentials_deserialization__from_json():
deserialized_credentials = Credentials.from_json(CREDENTIALS_JSON)
assert deserialized_credentials == CREDENTIALS_OBJECT