From b8230ffb734fcce49779883708aeefc769adf179 Mon Sep 17 00:00:00 2001 From: Kekoa Kaaikala Date: Wed, 21 Sep 2022 18:19:58 +0000 Subject: [PATCH] Island: Fix mypy issues for encryptors --- .../encryption/data_store_encryptor.py | 6 ++--- .../encryption/repository_encryptor.py | 25 +++++++++---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/monkey/monkey_island/cc/server_utils/encryption/data_store_encryptor.py b/monkey/monkey_island/cc/server_utils/encryption/data_store_encryptor.py index ed491d888..8027e252b 100644 --- a/monkey/monkey_island/cc/server_utils/encryption/data_store_encryptor.py +++ b/monkey/monkey_island/cc/server_utils/encryption/data_store_encryptor.py @@ -1,7 +1,7 @@ import os import secrets from pathlib import Path -from typing import Union +from typing import Optional from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file @@ -12,7 +12,7 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor _KEY_FILE_NAME = "mongo_key.bin" -_encryptor: Union[None, IEncryptor] = None +_encryptor: Optional[IEncryptor] = None # NOTE: This class is being replaced by RepositoryEncryptor @@ -73,5 +73,5 @@ def _initialize_datastore_encryptor(key_file: Path, secret: str): _encryptor = DataStoreEncryptor(secret, key_file) -def get_datastore_encryptor() -> IEncryptor: +def get_datastore_encryptor() -> Optional[IEncryptor]: return _encryptor diff --git a/monkey/monkey_island/cc/server_utils/encryption/repository_encryptor.py b/monkey/monkey_island/cc/server_utils/encryption/repository_encryptor.py index 48970fa81..097d460ef 100644 --- a/monkey/monkey_island/cc/server_utils/encryption/repository_encryptor.py +++ b/monkey/monkey_island/cc/server_utils/encryption/repository_encryptor.py @@ -1,10 +1,11 @@ import secrets from pathlib import Path +from typing import Optional from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file -from . import ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError +from . import IEncryptor, ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError from .key_based_encryptor import KeyBasedEncryptor from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor @@ -12,33 +13,32 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor class RepositoryEncryptor(ILockableEncryptor): def __init__(self, key_file: Path): self._key_file = key_file - self._password_based_encryptor = None - self._key_based_encryptor = None + self._key_based_encryptor: Optional[IEncryptor] = None def unlock(self, secret: bytes): try: - self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode()) - self._key_based_encryptor = self._initialize_key_based_encryptor() + encryptor = PasswordBasedBytesEncryptor(secret.decode()) + self._key_based_encryptor = self._initialize_key_based_encryptor(encryptor) except Exception as err: raise UnlockError(err) - def _initialize_key_based_encryptor(self): + def _initialize_key_based_encryptor(self, encryptor: IEncryptor) -> KeyBasedEncryptor: if self._key_file.is_file(): - return self._load_key() + return self._load_key(encryptor) - return self._create_key() + return self._create_key(encryptor) - def _load_key(self) -> KeyBasedEncryptor: + def _load_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor: with open(self._key_file, "rb") as f: encrypted_key = f.read() - plaintext_key = EncryptionKey32Bytes(self._password_based_encryptor.decrypt(encrypted_key)) + plaintext_key = EncryptionKey32Bytes(encryptor.decrypt(encrypted_key)) return KeyBasedEncryptor(plaintext_key) - def _create_key(self) -> KeyBasedEncryptor: + def _create_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor: plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32)) - encrypted_key = self._password_based_encryptor.encrypt(plaintext_key) + encrypted_key = encryptor.encrypt(plaintext_key) with open_new_securely_permissioned_file(str(self._key_file), "wb") as f: f.write(encrypted_key) @@ -54,7 +54,6 @@ class RepositoryEncryptor(ILockableEncryptor): except Exception as err: raise ResetKeyError(err) - self._password_based_encryptor = None self._key_based_encryptor = None def encrypt(self, plaintext: bytes) -> bytes: