Island, UT: Fix file_utils paths

This commit is contained in:
Ilija Lazoroski 2022-04-30 01:39:45 +02:00 committed by Mike Salvatore
parent 8d65fa36f2
commit 86c6a55097
4 changed files with 17 additions and 16 deletions

View File

@ -3,6 +3,7 @@ import os
import platform import platform
import stat import stat
from contextlib import contextmanager from contextlib import contextmanager
from pathlib import Path
from typing import Generator from typing import Generator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -20,32 +21,32 @@ if is_windows_os():
import monkey_island.cc.server_utils.windows_permissions as windows_permissions import monkey_island.cc.server_utils.windows_permissions as windows_permissions
def create_secure_directory(path: str): def create_secure_directory(path: Path):
if not os.path.isdir(path): if not path.is_dir():
if is_windows_os(): if is_windows_os():
_create_secure_directory_windows(path) _create_secure_directory_windows(path)
else: else:
_create_secure_directory_linux(path) _create_secure_directory_linux(path)
def _create_secure_directory_linux(path: str): def _create_secure_directory_linux(path: Path):
try: try:
# Don't split directory creation and permission setting # Don't split directory creation and permission setting
# because it will temporarily create an accessible directory which anyone can use. # because it will temporarily create an accessible directory which anyone can use.
os.mkdir(path, mode=stat.S_IRWXU) path.mkdir(mode=stat.S_IRWXU)
except Exception as ex: except Exception as ex:
logger.error(f'Could not create a directory at "{path}": {str(ex)}') logger.error(f'Could not create a directory at "{path}": {str(ex)}')
raise ex raise ex
def _create_secure_directory_windows(path: str): def _create_secure_directory_windows(path: Path):
try: try:
security_attributes = win32security.SECURITY_ATTRIBUTES() security_attributes = win32security.SECURITY_ATTRIBUTES()
security_attributes.SECURITY_DESCRIPTOR = ( security_attributes.SECURITY_DESCRIPTOR = (
windows_permissions.get_security_descriptor_for_owner_only_perms() windows_permissions.get_security_descriptor_for_owner_only_perms()
) )
win32file.CreateDirectory(path, security_attributes) win32file.CreateDirectory(str(path), security_attributes)
except Exception as ex: except Exception as ex:
logger.error(f'Could not create a directory at "{path}": {str(ex)}') logger.error(f'Could not create a directory at "{path}": {str(ex)}')

View File

@ -20,7 +20,7 @@ def setup_data_dir(data_dir_path: Path):
if _is_data_dir_old(data_dir_path): if _is_data_dir_old(data_dir_path):
logger.info("Version in data directory does not match the Island's version.") logger.info("Version in data directory does not match the Island's version.")
_handle_old_data_directory(data_dir_path) _handle_old_data_directory(data_dir_path)
create_secure_directory(str(data_dir_path)) create_secure_directory(data_dir_path)
write_version(data_dir_path) write_version(data_dir_path)
logger.info(f"Data directory set up in {data_dir_path}.") logger.info(f"Data directory set up in {data_dir_path}.")

View File

@ -31,12 +31,12 @@ def start_mongodb(data_dir: Path) -> MongoDbProcess:
return mongo_db_process return mongo_db_process
def _create_db_dir(db_dir_parent_path) -> str: def _create_db_dir(db_dir_parent_path: Path) -> str:
db_dir = os.path.join(db_dir_parent_path, DB_DIR_NAME) db_dir = db_dir_parent_path / DB_DIR_NAME
logger.info(f"Database content directory: {db_dir}.") logger.info(f"Database content directory: {db_dir}.")
create_secure_directory(db_dir) create_secure_directory(db_dir)
return db_dir return str(db_dir)
def register_mongo_shutdown_callback(mongo_db_process: MongoDbProcess): def register_mongo_shutdown_callback(mongo_db_process: MongoDbProcess):

View File

@ -12,22 +12,22 @@ from monkey_island.cc.server_utils.file_utils import (
@pytest.fixture @pytest.fixture
def test_path_nested(tmpdir): def test_path_nested(tmp_path):
path = os.path.join(tmpdir, "test1", "test2", "test3") path = tmp_path / "test1" / "test2" / "test3"
return path return path
@pytest.fixture @pytest.fixture
def test_path(tmpdir): def test_path(tmp_path):
test_path = "test1" test_path = "test1"
path = os.path.join(tmpdir, test_path) path = tmp_path / test_path
return path return path
def test_create_secure_directory__already_exists(test_path): def test_create_secure_directory__already_exists(test_path):
os.mkdir(test_path) test_path.mkdir()
assert os.path.isdir(test_path) assert test_path.is_dir()
create_secure_directory(test_path) create_secure_directory(test_path)