diff --git a/monkey/monkey_island/cc/services/post_breach_files.py b/monkey/monkey_island/cc/services/post_breach_files.py index 94569db37..8268265a9 100644 --- a/monkey/monkey_island/cc/services/post_breach_files.py +++ b/monkey/monkey_island/cc/services/post_breach_files.py @@ -1,6 +1,7 @@ import logging import os -from pathlib import Path + +from monkey_island.cc.server_utils.file_utils import create_secure_directory logger = logging.getLogger(__name__) @@ -15,7 +16,8 @@ class PostBreachFilesService: @classmethod def initialize(cls, data_dir): cls.DATA_DIR = data_dir - Path(cls.get_custom_pba_directory()).mkdir(mode=0o0700, parents=True, exist_ok=True) + custom_pba_dir = cls.get_custom_pba_directory() + create_secure_directory(custom_pba_dir) @staticmethod def save_file(filename: str, file_contents: bytes): diff --git a/monkey/tests/monkey_island/utils.py b/monkey/tests/monkey_island/utils.py new file mode 100644 index 000000000..dd781f85d --- /dev/null +++ b/monkey/tests/monkey_island/utils.py @@ -0,0 +1,35 @@ +from monkey_island.cc.server_utils.file_utils import is_windows_os + +if is_windows_os(): + import win32api + import win32security + + FULL_CONTROL = 2032127 + ACE_ACCESS_MODE_GRANT_ACCESS = win32security.GRANT_ACCESS + ACE_INHERIT_OBJECT_AND_CONTAINER = 3 + + +def _get_acl_and_sid_from_path(path: str): + sid, _, _ = win32security.LookupAccountName("", win32api.GetUserName()) + security_descriptor = win32security.GetNamedSecurityInfo( + path, win32security.SE_FILE_OBJECT, win32security.DACL_SECURITY_INFORMATION + ) + acl = security_descriptor.GetSecurityDescriptorDacl() + return acl, sid + + +def assert_windows_permissions(path: str): + acl, user_sid = _get_acl_and_sid_from_path(path) + + assert acl.GetAceCount() == 1 + + ace = acl.GetExplicitEntriesFromAcl()[0] + + ace_access_mode = ace["AccessMode"] + ace_permissions = ace["AccessPermissions"] + ace_inheritance = ace["Inheritance"] + ace_sid = ace["Trustee"]["Identifier"] + + assert ace_sid == user_sid + assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS + assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER diff --git a/monkey/tests/unit_tests/monkey_island/cc/server_utils/test_file_utils.py b/monkey/tests/unit_tests/monkey_island/cc/server_utils/test_file_utils.py index 9a9ada29d..444e2ca17 100644 --- a/monkey/tests/unit_tests/monkey_island/cc/server_utils/test_file_utils.py +++ b/monkey/tests/unit_tests/monkey_island/cc/server_utils/test_file_utils.py @@ -2,6 +2,7 @@ import os import stat import pytest +from tests.monkey_island.utils import assert_windows_permissions from monkey_island.cc.server_utils.file_utils import ( create_secure_directory, @@ -10,14 +11,6 @@ from monkey_island.cc.server_utils.file_utils import ( open_new_securely_permissioned_file, ) -if is_windows_os(): - import win32api - import win32security - - FULL_CONTROL = 2032127 - ACE_ACCESS_MODE_GRANT_ACCESS = win32security.GRANT_ACCESS - ACE_INHERIT_OBJECT_AND_CONTAINER = 3 - def test_expand_user(patched_home_env): input_path = os.path.join("~", "test") @@ -47,15 +40,6 @@ def test_path(tmpdir): return path -def _get_acl_and_sid_from_path(path: str): - sid, _, _ = win32security.LookupAccountName("", win32api.GetUserName()) - security_descriptor = win32security.GetNamedSecurityInfo( - path, win32security.SE_FILE_OBJECT, win32security.DACL_SECURITY_INFORMATION - ) - acl = security_descriptor.GetSecurityDescriptorDacl() - return acl, sid - - def test_create_secure_directory__already_exists(test_path): os.mkdir(test_path) assert os.path.isdir(test_path) @@ -82,20 +66,7 @@ def test_create_secure_directory__perm_linux(test_path): def test_create_secure_directory__perm_windows(test_path): create_secure_directory(test_path) - acl, user_sid = _get_acl_and_sid_from_path(test_path) - - assert acl.GetAceCount() == 1 - - ace = acl.GetExplicitEntriesFromAcl()[0] - - ace_access_mode = ace["AccessMode"] - ace_permissions = ace["AccessPermissions"] - ace_inheritance = ace["Inheritance"] - ace_sid = ace["Trustee"]["Identifier"] - - assert ace_sid == user_sid - assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS - assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER + assert_windows_permissions(test_path) def test_open_new_securely_permissioned_file__already_exists(test_path): @@ -131,20 +102,7 @@ def test_open_new_securely_permissioned_file__perm_windows(test_path): with open_new_securely_permissioned_file(test_path): pass - acl, user_sid = _get_acl_and_sid_from_path(test_path) - - assert acl.GetAceCount() == 1 - - ace = acl.GetExplicitEntriesFromAcl()[0] - - ace_access_mode = ace["AccessMode"] - ace_permissions = ace["AccessPermissions"] - ace_inheritance = ace["Inheritance"] - ace_sid = ace["Trustee"]["Identifier"] - - assert ace_sid == user_sid - assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS - assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER + assert_windows_permissions(test_path) def test_open_new_securely_permissioned_file__write(test_path): diff --git a/monkey/tests/unit_tests/monkey_island/cc/services/test_post_breach_files.py b/monkey/tests/unit_tests/monkey_island/cc/services/test_post_breach_files.py index 3c3fe82fe..5a2ddaa17 100644 --- a/monkey/tests/unit_tests/monkey_island/cc/services/test_post_breach_files.py +++ b/monkey/tests/unit_tests/monkey_island/cc/services/test_post_breach_files.py @@ -1,7 +1,9 @@ import os import pytest +from tests.monkey_island.utils import assert_windows_permissions +from monkey_island.cc.server_utils.file_utils import is_windows_os from monkey_island.cc.services.post_breach_files import PostBreachFilesService @@ -32,13 +34,20 @@ def dir_is_empty(dir_path): return len(dir_contents) == 0 -@pytest.mark.skipif(os.name != "posix", reason="Tests Posix (not Windows) permissions.") -def test_custom_pba_dir_permissions(): +@pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.") +def test_custom_pba_dir_permissions_linux(): st = os.stat(PostBreachFilesService.get_custom_pba_directory()) assert st.st_mode == 0o40700 +@pytest.mark.skipif(not is_windows_os(), reason="Tests Windows (not Posix) permissions.") +def test_custom_pba_dir_permissions_windows(): + pba_dir = PostBreachFilesService.get_custom_pba_directory() + + assert_windows_permissions(pba_dir) + + def test_remove_failure(monkeypatch): monkeypatch.setattr(os, "remove", lambda x: raise_(OSError("Permission denied")))