Refactored secure directory creation into a separate method. Data dir creation and db dir creation now use that method. Added unit tests for secure directory creation.

This commit is contained in:
VakarisZ 2021-05-27 10:36:37 +03:00
parent 5aeab3a56c
commit cb14a4ea9b
6 changed files with 108 additions and 42 deletions

View File

@ -0,0 +1,7 @@
import os
import stat
def set_perms_to_owner_only(path: str):
# Read, write, and execute by owner
os.chmod(path, stat.S_IRWXU)

View File

@ -1,5 +1,48 @@
import logging
import os
import platform
def is_windows_os() -> bool:
return platform.system() == "Windows"
if is_windows_os():
from monkey_island.cc.environment.windows_permissions import ( # noqa: E402
set_full_folder_access,
)
else:
from monkey_island.cc.environment.linux_permissions import set_perms_to_owner_only # noqa: E402
LOG = logging.getLogger(__name__)
def create_secure_directory(path: str, create_parent_dirs: bool):
if not os.path.isdir(path):
create_directory(path, create_parent_dirs)
set_secure_permissions(path)
def create_directory(path: str, create_parent_dirs: bool):
try:
if create_parent_dirs:
os.makedirs(path)
else:
os.mkdir(path)
except Exception as ex:
LOG.error(
f'Could not create a directory at "{path}" (maybe `$HOME` could not be '
f"resolved?): {str(ex)}"
)
raise ex
def set_secure_permissions(dir_path: str):
try:
if is_windows_os():
set_full_folder_access(folder_path=dir_path)
else:
set_perms_to_owner_only(path=dir_path)
except Exception as ex:
LOG.error(f"Permissions could not be " f"set successfully for {dir_path}: {str(ex)}")
raise ex

View File

@ -1,10 +1,7 @@
from monkey_island.cc.environment.utils import is_windows_os
if is_windows_os():
import ntsecuritycon
import win32api
import win32con
import win32security
import ntsecuritycon
import win32api
import win32con
import win32security
def set_full_folder_access(folder_path: str) -> None:

View File

@ -3,6 +3,7 @@ import os
import subprocess
from typing import List
from monkey_island.cc.environment.utils import create_secure_directory
from monkey_island.cc.server_utils.consts import MONGO_EXECUTABLE_PATH
logger = logging.getLogger(__name__)
@ -28,9 +29,7 @@ class MongoDbRunner:
def _create_db_dir(self) -> str:
db_path = os.path.join(self.db_dir_parent_path, DB_DIR_NAME)
logger.info(f"Database content directory: {db_path}.")
if not os.path.isdir(db_path):
logger.info("Database content directory not found, creating one.")
os.mkdir(os.path.join(self.db_dir_parent_path, DB_DIR_NAME))
create_secure_directory(db_path)
return db_path
def _start_mongodb_process(self, db_dir_path: str):

View File

@ -0,0 +1,52 @@
import os
import shutil
import stat
import pytest
from monkey_island.cc.environment.utils import create_secure_directory, is_windows_os
@pytest.fixture
def test_path_nested(tmpdir):
nested_path = "/test1/test2/test3"
path = os.path.join(tmpdir, nested_path)
yield path
try:
shutil.rmtree(os.path.join(tmpdir, "/test1"))
except Exception:
pass
@pytest.fixture
def test_path(tmpdir):
test_path = "/test1"
path = os.path.join(tmpdir, test_path)
yield path
try:
shutil.rmtree(path)
except Exception:
pass
def test_create_secure_directory__parent_dirs(test_path_nested):
create_secure_directory(test_path_nested, create_parent_dirs=True)
assert os.path.isdir(test_path_nested)
def test_create_secure_directory__already_created(test_path):
os.mkdir(test_path)
assert os.path.isdir(test_path)
create_secure_directory(test_path, create_parent_dirs=False)
def test_create_secure_directory__no_parent_dir(test_path_nested):
with pytest.raises(Exception):
create_secure_directory(test_path_nested, create_parent_dirs=False)
@pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.")
def test_create_secure_directory__perm_linux(test_path_nested):
create_secure_directory(test_path_nested, create_parent_dirs=True)
st = os.stat(test_path_nested)
return bool(st.st_mode & stat.S_IRWXU)

View File

@ -1,32 +0,0 @@
import os
import pytest
from monkey_island.cc.setup.mongo_process_runner import MongoDbRunner
TEST_DIR_NAME = "test_dir"
@pytest.fixture(autouse=True)
def fake_db_dir(monkeypatch):
monkeypatch.setattr("monkey_island.cc.setup.mongo_process_runner.DB_DIR_NAME", TEST_DIR_NAME)
@pytest.fixture
def expected_path(tmpdir):
expected_path = os.path.join(tmpdir, TEST_DIR_NAME)
return expected_path
def test_create_db_dir(tmpdir, expected_path):
db_path = MongoDbRunner(tmpdir, tmpdir)._create_db_dir()
assert os.path.isdir(expected_path)
assert db_path == expected_path
def test_create_db_dir__already_created(tmpdir, expected_path):
os.mkdir(expected_path)
db_path = MongoDbRunner(tmpdir, tmpdir)._create_db_dir()
assert os.path.isdir(expected_path)
assert db_path == expected_path