Island: Fix mypy issues for encryptors

This commit is contained in:
Kekoa Kaaikala 2022-09-21 18:19:58 +00:00
parent e8aa231f92
commit e595a70019
2 changed files with 15 additions and 16 deletions

View File

@ -1,7 +1,7 @@
import os import os
import secrets import secrets
from pathlib import Path from pathlib import Path
from typing import Union from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
@ -12,7 +12,7 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
_KEY_FILE_NAME = "mongo_key.bin" _KEY_FILE_NAME = "mongo_key.bin"
_encryptor: Union[None, IEncryptor] = None _encryptor: Optional[IEncryptor] = None
# NOTE: This class is being replaced by RepositoryEncryptor # NOTE: This class is being replaced by RepositoryEncryptor
@ -73,5 +73,5 @@ def _initialize_datastore_encryptor(key_file: Path, secret: str):
_encryptor = DataStoreEncryptor(secret, key_file) _encryptor = DataStoreEncryptor(secret, key_file)
def get_datastore_encryptor() -> IEncryptor: def get_datastore_encryptor() -> Optional[IEncryptor]:
return _encryptor return _encryptor

View File

@ -1,10 +1,11 @@
import secrets import secrets
from pathlib import Path from pathlib import Path
from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
from . import ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError from . import IEncryptor, ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from .key_based_encryptor import KeyBasedEncryptor from .key_based_encryptor import KeyBasedEncryptor
from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
@ -12,33 +13,32 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
class RepositoryEncryptor(ILockableEncryptor): class RepositoryEncryptor(ILockableEncryptor):
def __init__(self, key_file: Path): def __init__(self, key_file: Path):
self._key_file = key_file self._key_file = key_file
self._password_based_encryptor = None self._key_based_encryptor: Optional[IEncryptor] = None
self._key_based_encryptor = None
def unlock(self, secret: bytes): def unlock(self, secret: bytes):
try: try:
self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode()) encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor() self._key_based_encryptor = self._initialize_key_based_encryptor(encryptor)
except Exception as err: except Exception as err:
raise UnlockError(err) raise UnlockError(err)
def _initialize_key_based_encryptor(self): def _initialize_key_based_encryptor(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
if self._key_file.is_file(): if self._key_file.is_file():
return self._load_key() return self._load_key(encryptor)
return self._create_key() return self._create_key(encryptor)
def _load_key(self) -> KeyBasedEncryptor: def _load_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
with open(self._key_file, "rb") as f: with open(self._key_file, "rb") as f:
encrypted_key = f.read() encrypted_key = f.read()
plaintext_key = EncryptionKey32Bytes(self._password_based_encryptor.decrypt(encrypted_key)) plaintext_key = EncryptionKey32Bytes(encryptor.decrypt(encrypted_key))
return KeyBasedEncryptor(plaintext_key) return KeyBasedEncryptor(plaintext_key)
def _create_key(self) -> KeyBasedEncryptor: def _create_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32)) plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32))
encrypted_key = self._password_based_encryptor.encrypt(plaintext_key) encrypted_key = encryptor.encrypt(plaintext_key)
with open_new_securely_permissioned_file(str(self._key_file), "wb") as f: with open_new_securely_permissioned_file(str(self._key_file), "wb") as f:
f.write(encrypted_key) f.write(encrypted_key)
@ -54,7 +54,6 @@ class RepositoryEncryptor(ILockableEncryptor):
except Exception as err: except Exception as err:
raise ResetKeyError(err) raise ResetKeyError(err)
self._password_based_encryptor = None
self._key_based_encryptor = None self._key_based_encryptor = None
def encrypt(self, plaintext: bytes) -> bytes: def encrypt(self, plaintext: bytes) -> bytes: