island: Refactor get_file_descriptor_for_new_secure_file as contextmgr
get_file_descriptor_for_new_secure_file() has been refactored as a contextmanager. Additionally, it has been renamed to open_new_securely_permissioned_file(). The function can now be used similarly to open(). Example: with open_new_securely_permissioned_file(file_path, "wb") as f: f.write(data)
This commit is contained in:
parent
b39440e871
commit
51aa0d1564
|
@ -6,7 +6,7 @@ import os
|
|||
from Crypto import Random # noqa: DUO133 # nosec: B413
|
||||
from Crypto.Cipher import AES # noqa: DUO133 # nosec: B413
|
||||
|
||||
from monkey_island.cc.server_utils.file_utils import get_file_descriptor_for_new_secure_file
|
||||
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
|
||||
|
||||
__author__ = "itay.mizeretz"
|
||||
|
||||
|
@ -27,8 +27,7 @@ class Encryptor:
|
|||
|
||||
def _init_key(self, password_file_path: str):
|
||||
self._cipher_key = Random.new().read(self._BLOCK_SIZE)
|
||||
get_file_descriptor_for_new_secure_file(path=password_file_path)
|
||||
with open(password_file_path, "wb") as f:
|
||||
with open_new_securely_permissioned_file(password_file_path, "wb") as f:
|
||||
f.write(self._cipher_key)
|
||||
|
||||
def _load_existing_key(self, password_file):
|
||||
|
|
|
@ -2,6 +2,8 @@ import logging
|
|||
import os
|
||||
import platform
|
||||
import stat
|
||||
from contextlib import contextmanager
|
||||
from typing import Generator
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -54,11 +56,15 @@ def _create_secure_directory_windows(path: str):
|
|||
raise ex
|
||||
|
||||
|
||||
def get_file_descriptor_for_new_secure_file(path: str) -> int:
|
||||
@contextmanager
|
||||
def open_new_securely_permissioned_file(path: str, mode: str = "w") -> Generator:
|
||||
if is_windows_os():
|
||||
return _get_file_descriptor_for_new_secure_file_windows(path)
|
||||
fd = _get_file_descriptor_for_new_secure_file_windows(path)
|
||||
else:
|
||||
return _get_file_descriptor_for_new_secure_file_linux(path)
|
||||
fd = _get_file_descriptor_for_new_secure_file_linux(path)
|
||||
|
||||
with open(fd, mode) as f:
|
||||
yield f
|
||||
|
||||
|
||||
def _get_file_descriptor_for_new_secure_file_linux(path: str) -> int:
|
||||
|
|
|
@ -6,8 +6,8 @@ import pytest
|
|||
from monkey_island.cc.server_utils.file_utils import (
|
||||
create_secure_directory,
|
||||
expand_path,
|
||||
get_file_descriptor_for_new_secure_file,
|
||||
is_windows_os,
|
||||
open_new_securely_permissioned_file,
|
||||
)
|
||||
|
||||
if is_windows_os():
|
||||
|
@ -98,22 +98,26 @@ def test_create_secure_directory__perm_windows(test_path):
|
|||
assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER
|
||||
|
||||
|
||||
def test_get_file_descriptor_for_new_secure_file__already_exists(test_path):
|
||||
def test_open_new_securely_permissioned_file__already_exists(test_path):
|
||||
os.close(os.open(test_path, os.O_CREAT, stat.S_IRWXU))
|
||||
assert os.path.isfile(test_path)
|
||||
|
||||
with pytest.raises(Exception):
|
||||
get_file_descriptor_for_new_secure_file(test_path)
|
||||
with open_new_securely_permissioned_file(test_path):
|
||||
pass
|
||||
|
||||
|
||||
def test_get_file_descriptor_for_new_secure_file__no_parent_dir(test_path_nested):
|
||||
def test_open_new_securely_permissioned_file__no_parent_dir(test_path_nested):
|
||||
with pytest.raises(Exception):
|
||||
get_file_descriptor_for_new_secure_file(test_path_nested)
|
||||
with open_new_securely_permissioned_file(test_path_nested):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.")
|
||||
def test_get_file_descriptor_for_new_secure_file__perm_linux(test_path):
|
||||
os.close(get_file_descriptor_for_new_secure_file(test_path))
|
||||
def test_open_new_securely_permissioned_file__perm_linux(test_path):
|
||||
with open_new_securely_permissioned_file(test_path):
|
||||
pass
|
||||
|
||||
st = os.stat(test_path)
|
||||
|
||||
expected_mode = stat.S_IRUSR | stat.S_IWUSR
|
||||
|
@ -123,8 +127,9 @@ def test_get_file_descriptor_for_new_secure_file__perm_linux(test_path):
|
|||
|
||||
|
||||
@pytest.mark.skipif(not is_windows_os(), reason="Tests Windows (not Posix) permissions.")
|
||||
def test_get_file_descriptor_for_new_secure_file__perm_windows(test_path):
|
||||
os.close(get_file_descriptor_for_new_secure_file(test_path))
|
||||
def test_open_new_securely_permissioned_file__perm_windows(test_path):
|
||||
with open_new_securely_permissioned_file(test_path):
|
||||
pass
|
||||
|
||||
acl, user_sid = _get_acl_and_sid_from_path(test_path)
|
||||
|
||||
|
@ -140,3 +145,12 @@ def test_get_file_descriptor_for_new_secure_file__perm_windows(test_path):
|
|||
assert ace_sid == user_sid
|
||||
assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS
|
||||
assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER
|
||||
|
||||
|
||||
def test_open_new_securely_permissioned_file__write(test_path):
|
||||
TEST_STR = b"Hello World"
|
||||
with open_new_securely_permissioned_file(test_path, "wb") as f:
|
||||
f.write(TEST_STR)
|
||||
|
||||
with open(test_path, "rb") as f:
|
||||
assert f.read() == TEST_STR
|
||||
|
|
Loading…
Reference in New Issue