forked from p15670423/monkey
cc: rework EnvironmentConfig test
1. Rewrote in pytest 2. Removed reduntant tests 3. Added tests for add_user() and get_users()
This commit is contained in:
parent
1d73f6e860
commit
986219bd86
|
@ -3,7 +3,7 @@ import logging
|
|||
import monkey_island.cc.resources.auth.user_store as user_store
|
||||
from monkey_island.cc.environment import (EnvironmentConfig, aws, password,
|
||||
standard, testing)
|
||||
from monkey_island.cc.consts import DEFAULT_SERVER_CONFIG_PATH
|
||||
from monkey_island.cc.server_utils.consts import DEFAULT_SERVER_CONFIG_PATH
|
||||
|
||||
__author__ = 'itay.mizeretz'
|
||||
|
||||
|
|
|
@ -1,92 +1,123 @@
|
|||
import json
|
||||
import os
|
||||
from typing import Dict
|
||||
from unittest import TestCase
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import monkey_island.cc.test_common.environment.server_config_mocks as config_mocks
|
||||
from monkey_island.cc.server_utils.consts import MONKEY_ISLAND_ABS_PATH
|
||||
from monkey_island.cc.environment.environment_config import EnvironmentConfig
|
||||
from monkey_island.cc.environment.user_creds import UserCreds
|
||||
|
||||
|
||||
def get_server_config_file_path_test_version():
|
||||
return os.path.join(os.getcwd(), 'test_config.json')
|
||||
|
||||
|
||||
class TestEnvironmentConfig(TestCase):
|
||||
@pytest.fixture
|
||||
def config_file(tmpdir):
|
||||
return os.path.join(tmpdir, "test_config.json")
|
||||
|
||||
def test_get_from_json(self):
|
||||
self._test_get_from_json(config_mocks.CONFIG_WITH_CREDENTIALS)
|
||||
self._test_get_from_json(config_mocks.CONFIG_NO_CREDENTIALS)
|
||||
self._test_get_from_json(config_mocks.CONFIG_PARTIAL_CREDENTIALS)
|
||||
|
||||
def _test_get_from_json(self, config: Dict):
|
||||
config_json = json.dumps(config)
|
||||
env_config_object = EnvironmentConfig.get_from_json(config_json)
|
||||
self.assertEqual(config['server_config'], env_config_object.server_config)
|
||||
self.assertEqual(config['deployment'], env_config_object.deployment)
|
||||
if 'user' in config:
|
||||
self.assertEqual(config['user'], env_config_object.user_creds.username)
|
||||
if 'password_hash' in config:
|
||||
self.assertEqual(config['password_hash'], env_config_object.user_creds.password_hash)
|
||||
if 'aws' in config:
|
||||
self.assertEqual(config['aws'], env_config_object.aws)
|
||||
def test_get_with_credentials(config_file):
|
||||
test_conf = config_mocks.CONFIG_WITH_CREDENTIALS
|
||||
|
||||
def test_save_to_file(self):
|
||||
self._test_save_to_file(config_mocks.CONFIG_WITH_CREDENTIALS)
|
||||
self._test_save_to_file(config_mocks.CONFIG_NO_CREDENTIALS)
|
||||
self._test_save_to_file(config_mocks.CONFIG_PARTIAL_CREDENTIALS)
|
||||
_write_test_config_to_tmp(config_file, test_conf)
|
||||
config_dict = EnvironmentConfig.get_from_file(config_file).to_dict()
|
||||
|
||||
@patch.object(target=EnvironmentConfig, attribute="get_config_file_path",
|
||||
new=MagicMock(return_value=get_server_config_file_path_test_version()))
|
||||
def _test_save_to_file(self, config: Dict):
|
||||
user_creds = UserCreds.get_from_dict(config)
|
||||
env_config = EnvironmentConfig(server_config=config['server_config'],
|
||||
deployment=config['deployment'],
|
||||
user_creds=user_creds)
|
||||
env_config.server_config_path = get_server_config_file_path_test_version()
|
||||
assert len(config_dict.keys()) == 4
|
||||
assert config_dict["server_config"] == test_conf["server_config"]
|
||||
assert config_dict["deployment"] == test_conf["deployment"]
|
||||
assert config_dict["user"] == test_conf["user"]
|
||||
assert config_dict["password_hash"] == test_conf["password_hash"]
|
||||
|
||||
env_config.save_to_file()
|
||||
file_path = get_server_config_file_path_test_version()
|
||||
with open(file_path, 'r') as f:
|
||||
content_from_file = f.read()
|
||||
os.remove(file_path)
|
||||
|
||||
self.assertDictEqual(config, json.loads(content_from_file))
|
||||
def test_get_with_no_credentials(config_file):
|
||||
test_conf = config_mocks.CONFIG_NO_CREDENTIALS
|
||||
|
||||
def test_get_from_dict(self):
|
||||
config_dict = config_mocks.CONFIG_WITH_CREDENTIALS
|
||||
env_conf = EnvironmentConfig.get_from_dict(config_dict)
|
||||
self.assertEqual(env_conf.server_config, config_dict['server_config'])
|
||||
self.assertEqual(env_conf.deployment, config_dict['deployment'])
|
||||
self.assertEqual(env_conf.user_creds.username, config_dict['user'])
|
||||
self.assertEqual(env_conf.aws, None)
|
||||
_write_test_config_to_tmp(config_file, test_conf)
|
||||
config_dict = EnvironmentConfig.get_from_file(config_file).to_dict()
|
||||
|
||||
config_dict = config_mocks.CONFIG_BOGUS_VALUES
|
||||
env_conf = EnvironmentConfig.get_from_dict(config_dict)
|
||||
self.assertEqual(env_conf.server_config, config_dict['server_config'])
|
||||
self.assertEqual(env_conf.deployment, config_dict['deployment'])
|
||||
self.assertEqual(env_conf.user_creds.username, config_dict['user'])
|
||||
self.assertEqual(env_conf.aws, config_dict['aws'])
|
||||
assert len(config_dict.keys()) == 2
|
||||
assert config_dict["server_config"] == test_conf["server_config"]
|
||||
assert config_dict["deployment"] == test_conf["deployment"]
|
||||
|
||||
def test_to_dict(self):
|
||||
conf_json1 = json.dumps(config_mocks.CONFIG_WITH_CREDENTIALS)
|
||||
self._test_to_dict(EnvironmentConfig.get_from_json(conf_json1))
|
||||
|
||||
conf_json2 = json.dumps(config_mocks.CONFIG_NO_CREDENTIALS)
|
||||
self._test_to_dict(EnvironmentConfig.get_from_json(conf_json2))
|
||||
def test_get_with_partial_credentials(config_file):
|
||||
test_conf = config_mocks.CONFIG_PARTIAL_CREDENTIALS
|
||||
|
||||
conf_json3 = json.dumps(config_mocks.CONFIG_PARTIAL_CREDENTIALS)
|
||||
self._test_to_dict(EnvironmentConfig.get_from_json(conf_json3))
|
||||
_write_test_config_to_tmp(config_file, test_conf)
|
||||
config_dict = EnvironmentConfig.get_from_file(config_file).to_dict()
|
||||
|
||||
def _test_to_dict(self, env_config_object: EnvironmentConfig):
|
||||
test_dict = {'server_config': env_config_object.server_config,
|
||||
'deployment': env_config_object.deployment}
|
||||
user_creds = env_config_object.user_creds
|
||||
if user_creds.username:
|
||||
test_dict.update({'user': user_creds.username})
|
||||
if user_creds.password_hash:
|
||||
test_dict.update({'password_hash': user_creds.password_hash})
|
||||
assert len(config_dict.keys()) == 3
|
||||
assert config_dict["server_config"] == test_conf["server_config"]
|
||||
assert config_dict["deployment"] == test_conf["deployment"]
|
||||
assert config_dict["user"] == test_conf["user"]
|
||||
|
||||
self.assertDictEqual(test_dict, env_config_object.to_dict())
|
||||
|
||||
def _write_test_config_to_tmp(config_file, config: Dict):
|
||||
with open(config_file, "wt") as f:
|
||||
json.dump(config, f)
|
||||
|
||||
|
||||
def test_save_to_file(config_file):
|
||||
server_config = "standard"
|
||||
deployment = "develop"
|
||||
user = "test_user"
|
||||
password_hash = "abcdef"
|
||||
aws = "test"
|
||||
|
||||
environment_config = EnvironmentConfig(
|
||||
server_config, deployment, UserCreds(user, password_hash), aws
|
||||
)
|
||||
environment_config.server_config_path = config_file
|
||||
|
||||
environment_config.save_to_file()
|
||||
with open(config_file, "r") as f:
|
||||
from_file = json.load(f)
|
||||
|
||||
assert len(from_file.keys()) == 5
|
||||
assert from_file["server_config"] == server_config
|
||||
assert from_file["deployment"] == deployment
|
||||
assert from_file["user"] == user
|
||||
assert from_file["password_hash"] == password_hash
|
||||
assert from_file["aws"] == aws
|
||||
|
||||
|
||||
def test_add_user(config_file):
|
||||
server_config = "standard"
|
||||
deployment = "develop"
|
||||
user = "test_user"
|
||||
password_hash = "abcdef"
|
||||
|
||||
new_user = "new_user"
|
||||
new_password_hash = "fedcba"
|
||||
new_user_creds = UserCreds(new_user, new_password_hash)
|
||||
|
||||
environment_config = EnvironmentConfig(
|
||||
server_config, deployment, UserCreds(user, password_hash)
|
||||
)
|
||||
environment_config.server_config_path = config_file
|
||||
|
||||
environment_config.add_user(new_user_creds)
|
||||
|
||||
with open(config_file, "r") as f:
|
||||
from_file = json.load(f)
|
||||
|
||||
assert len(from_file.keys()) == 4
|
||||
assert from_file["user"] == new_user
|
||||
assert from_file["password_hash"] == new_password_hash
|
||||
|
||||
|
||||
def test_get_users():
|
||||
server_config = "standard"
|
||||
deployment = "develop"
|
||||
user = "test_user"
|
||||
password_hash = "abcdef"
|
||||
|
||||
environment_config = EnvironmentConfig(
|
||||
server_config, deployment, UserCreds(user, password_hash)
|
||||
)
|
||||
|
||||
users = environment_config.get_users()
|
||||
assert len(users) == 1
|
||||
assert users[0].id == 1
|
||||
assert users[0].username == user
|
||||
assert users[0].secret == password_hash
|
||||
|
|
|
@ -21,7 +21,7 @@ json_setup_logging(default_path=Path(MONKEY_ISLAND_ABS_PATH, 'cc', 'island_logge
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
import monkey_island.cc.environment.environment_singleton as env_singleton # noqa: E402
|
||||
from monkey_island.cc.consts import DEFAULT_SERVER_CONFIG_PATH
|
||||
from monkey_island.cc.server_utils.consts import DEFAULT_SERVER_CONFIG_PATH
|
||||
from common.version import get_version # noqa: E402
|
||||
from monkey_island.cc.app import init_app # noqa: E402
|
||||
from monkey_island.cc.server_utils.bootloader_server import BootloaderHttpServer # noqa: E402
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import platform
|
||||
import monkey_island.cc.consts as consts
|
||||
import monkey_island.cc.server_utils.consts as consts
|
||||
|
||||
|
||||
def test_default_server_config_file_path():
|
||||
|
|
Loading…
Reference in New Issue