Island: Fix mypy issues for encryptors

This commit is contained in:
Kekoa Kaaikala 2022-09-21 18:19:58 +00:00
parent 421ed942fe
commit b8230ffb73
2 changed files with 15 additions and 16 deletions

View File

@ -1,7 +1,7 @@
import os
import secrets
from pathlib import Path
from typing import Union
from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
@ -12,7 +12,7 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
_KEY_FILE_NAME = "mongo_key.bin"
_encryptor: Union[None, IEncryptor] = None
_encryptor: Optional[IEncryptor] = None
# NOTE: This class is being replaced by RepositoryEncryptor
@ -73,5 +73,5 @@ def _initialize_datastore_encryptor(key_file: Path, secret: str):
_encryptor = DataStoreEncryptor(secret, key_file)
def get_datastore_encryptor() -> IEncryptor:
def get_datastore_encryptor() -> Optional[IEncryptor]:
return _encryptor

View File

@ -1,10 +1,11 @@
import secrets
from pathlib import Path
from typing import Optional
from monkey_island.cc.server_utils.encryption.encryption_key_types import EncryptionKey32Bytes
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
from . import ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from . import IEncryptor, ILockableEncryptor, LockedKeyError, ResetKeyError, UnlockError
from .key_based_encryptor import KeyBasedEncryptor
from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
@ -12,33 +13,32 @@ from .password_based_bytes_encryptor import PasswordBasedBytesEncryptor
class RepositoryEncryptor(ILockableEncryptor):
def __init__(self, key_file: Path):
self._key_file = key_file
self._password_based_encryptor = None
self._key_based_encryptor = None
self._key_based_encryptor: Optional[IEncryptor] = None
def unlock(self, secret: bytes):
try:
self._password_based_encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor()
encryptor = PasswordBasedBytesEncryptor(secret.decode())
self._key_based_encryptor = self._initialize_key_based_encryptor(encryptor)
except Exception as err:
raise UnlockError(err)
def _initialize_key_based_encryptor(self):
def _initialize_key_based_encryptor(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
if self._key_file.is_file():
return self._load_key()
return self._load_key(encryptor)
return self._create_key()
return self._create_key(encryptor)
def _load_key(self) -> KeyBasedEncryptor:
def _load_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
with open(self._key_file, "rb") as f:
encrypted_key = f.read()
plaintext_key = EncryptionKey32Bytes(self._password_based_encryptor.decrypt(encrypted_key))
plaintext_key = EncryptionKey32Bytes(encryptor.decrypt(encrypted_key))
return KeyBasedEncryptor(plaintext_key)
def _create_key(self) -> KeyBasedEncryptor:
def _create_key(self, encryptor: IEncryptor) -> KeyBasedEncryptor:
plaintext_key = EncryptionKey32Bytes(secrets.token_bytes(32))
encrypted_key = self._password_based_encryptor.encrypt(plaintext_key)
encrypted_key = encryptor.encrypt(plaintext_key)
with open_new_securely_permissioned_file(str(self._key_file), "wb") as f:
f.write(encrypted_key)
@ -54,7 +54,6 @@ class RepositoryEncryptor(ILockableEncryptor):
except Exception as err:
raise ResetKeyError(err)
self._password_based_encryptor = None
self._key_based_encryptor = None
def encrypt(self, plaintext: bytes) -> bytes: