Merge pull request #1217 from guardicore/data-dir-race-condition

Fix data dir race condition on Windows
This commit is contained in:
Mike Salvatore 2021-06-10 08:05:58 -04:00 committed by GitHub
commit b700870790
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 39 deletions

View File

@ -8,24 +8,26 @@ def is_windows_os() -> bool:
if is_windows_os(): if is_windows_os():
import win32file
import win32security
import monkey_island.cc.environment.windows_permissions as windows_permissions import monkey_island.cc.environment.windows_permissions as windows_permissions
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def create_secure_directory(path: str, create_parent_dirs: bool): def create_secure_directory(path: str):
if not os.path.isdir(path): if not os.path.isdir(path):
_create_secure_directory(path, create_parent_dirs) if is_windows_os():
set_secure_permissions(path) _create_secure_directory_windows(path)
else:
_create_secure_directory_linux(path)
def _create_secure_directory(path: str, create_parent_dirs: bool): def _create_secure_directory_linux(path: str):
try: try:
if create_parent_dirs:
# Don't split directory creation and permission setting # Don't split directory creation and permission setting
# because it will temporarily create an accessible directory which anyone can use. # because it will temporarily create an accessible directory which anyone can use.
os.makedirs(path, mode=0o700)
else:
os.mkdir(path, mode=0o700) os.mkdir(path, mode=0o700)
except Exception as ex: except Exception as ex:
LOG.error( LOG.error(
@ -35,10 +37,15 @@ def _create_secure_directory(path: str, create_parent_dirs: bool):
raise ex raise ex
def set_secure_permissions(dir_path: str): def _create_secure_directory_windows(path: str):
try: try:
if is_windows_os(): security_attributes = win32security.SECURITY_ATTRIBUTES()
windows_permissions.set_perms_to_owner_only(folder_path=dir_path) security_attributes.SECURITY_DESCRIPTOR = (
windows_permissions.get_security_descriptor_for_owner_only_perms()
)
win32file.CreateDirectory(path, security_attributes)
except Exception as ex: except Exception as ex:
LOG.error(f"Permissions could not be set successfully for {dir_path}: {str(ex)}") LOG.error(
f'Could not create a directory at "{path}": {str(ex)}'
)
raise ex raise ex

View File

@ -4,12 +4,10 @@ import win32con
import win32security import win32security
def set_perms_to_owner_only(folder_path: str) -> None: def get_security_descriptor_for_owner_only_perms() -> None:
user = get_user_pySID_object() user = get_user_pySID_object()
security_descriptor = win32security.SECURITY_DESCRIPTOR()
security_descriptor = win32security.GetFileSecurity(
folder_path, win32security.DACL_SECURITY_INFORMATION
)
dacl = win32security.ACL() dacl = win32security.ACL()
dacl.AddAccessAllowedAce( dacl.AddAccessAllowedAce(
win32security.ACL_REVISION, win32security.ACL_REVISION,
@ -17,9 +15,8 @@ def set_perms_to_owner_only(folder_path: str) -> None:
user, user,
) )
security_descriptor.SetSecurityDescriptorDacl(1, dacl, 0) security_descriptor.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(
folder_path, win32security.DACL_SECURITY_INFORMATION, security_descriptor return security_descriptor
)
def get_user_pySID_object(): def get_user_pySID_object():

View File

@ -18,7 +18,7 @@ def setup_data_dir(island_args: IslandCmdArgs) -> Tuple[IslandConfigOptions, str
def _setup_config_by_cmd_arg(server_config_path) -> Tuple[IslandConfigOptions, str]: def _setup_config_by_cmd_arg(server_config_path) -> Tuple[IslandConfigOptions, str]:
server_config_path = file_utils.expand_path(server_config_path) server_config_path = file_utils.expand_path(server_config_path)
config = server_config_handler.load_server_config_from_file(server_config_path) config = server_config_handler.load_server_config_from_file(server_config_path)
create_secure_directory(config.data_dir, create_parent_dirs=True) create_secure_directory(config.data_dir)
return config, server_config_path return config, server_config_path
@ -26,7 +26,7 @@ def _setup_default_config() -> Tuple[IslandConfigOptions, str]:
default_config = server_config_handler.load_server_config_from_file(DEFAULT_SERVER_CONFIG_PATH) default_config = server_config_handler.load_server_config_from_file(DEFAULT_SERVER_CONFIG_PATH)
default_data_dir = default_config.data_dir default_data_dir = default_config.data_dir
create_secure_directory(default_data_dir, create_parent_dirs=False) create_secure_directory(default_data_dir)
server_config_path = server_config_handler.create_default_server_config_file(default_data_dir) server_config_path = server_config_handler.create_default_server_config_file(default_data_dir)
config = server_config_handler.load_server_config_from_file(server_config_path) config = server_config_handler.load_server_config_from_file(server_config_path)

View File

@ -35,7 +35,7 @@ def _create_db_dir(db_dir_parent_path) -> str:
db_dir = os.path.join(db_dir_parent_path, DB_DIR_NAME) db_dir = os.path.join(db_dir_parent_path, DB_DIR_NAME)
logger.info(f"Database content directory: {db_dir}.") logger.info(f"Database content directory: {db_dir}.")
create_secure_directory(db_dir, create_parent_dirs=False) create_secure_directory(db_dir)
return db_dir return db_dir

View File

@ -20,38 +20,33 @@ def test_path(tmpdir):
return path return path
def test_create_secure_directory__parent_dirs(test_path_nested):
create_secure_directory(test_path_nested, create_parent_dirs=True)
assert os.path.isdir(test_path_nested)
def test_create_secure_directory__already_created(test_path): def test_create_secure_directory__already_created(test_path):
os.mkdir(test_path) os.mkdir(test_path)
assert os.path.isdir(test_path) assert os.path.isdir(test_path)
create_secure_directory(test_path, create_parent_dirs=False) create_secure_directory(test_path)
def test_create_secure_directory__no_parent_dir(test_path_nested): def test_create_secure_directory__no_parent_dir(test_path_nested):
with pytest.raises(Exception): with pytest.raises(Exception):
create_secure_directory(test_path_nested, create_parent_dirs=False) create_secure_directory(test_path_nested)
@pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.") @pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.")
def test_create_secure_directory__perm_linux(test_path_nested): def test_create_secure_directory__perm_linux(test_path):
create_secure_directory(test_path_nested, create_parent_dirs=True) create_secure_directory(test_path)
st = os.stat(test_path_nested) st = os.stat(test_path)
return bool(st.st_mode & stat.S_IRWXU) return bool(st.st_mode & stat.S_IRWXU)
@pytest.mark.skipif(not is_windows_os(), reason="Tests Windows (not Posix) permissions.") @pytest.mark.skipif(not is_windows_os(), reason="Tests Windows (not Posix) permissions.")
def test_create_secure_directory__perm_windows(test_path): def test_create_secure_directory__perm_windows(test_path):
import win32api # noqa: E402 import win32api
import win32security # noqa: E402 import win32security
FULL_CONTROL = 2032127 FULL_CONTROL = 2032127
ACE_TYPE_ALLOW = 0 ACE_TYPE_ALLOW = 0
create_secure_directory(test_path, create_parent_dirs=False) create_secure_directory(test_path)
user_sid, _, _ = win32security.LookupAccountName("", win32api.GetUserName()) user_sid, _, _ = win32security.LookupAccountName("", win32api.GetUserName())
security_descriptor = win32security.GetNamedSecurityInfo( security_descriptor = win32security.GetNamedSecurityInfo(