Merge pull request #1619 from guardicore/1206-remove-credentials-from-server-config

Remove credentials from server config
This commit is contained in:
Mike Salvatore 2021-11-18 15:24:43 -05:00 committed by GitHub
commit 5e60066480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 451 additions and 282 deletions

View File

@ -7,6 +7,7 @@ Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased]
### Added
- credentials.json file for storing Monkey Island user login information. #1206
### Changed
- "Communicate as Backdoor User" PBA's HTTP requests to request headers only and

View File

@ -12,7 +12,7 @@ Below are some of the most common questions we receive about the Infection Monke
- [How can I use an old data directory?](#how-can-i-use-an-old-data-directory)
- [How long does a single Infection Monkey agent run? Is there a time limit?](#how-long-does-a-single-infection-monkey-agent-run-is-there-a-time-limit)
- [Is the Infection Monkey a malware/virus?](#is-the-infection-monkey-a-malwarevirus)
- [Reset/enable the Monkey Island password](#resetenable-the-monkey-island-password)
- [Reset the Monkey Island password](#reset-the-monkey-island-password)
- [Should I run the Infection Monkey continuously?](#should-i-run-the-infection-monkey-continuously)
- [Which queries does the Infection Monkey perform to the internet exactly?](#which-queries-does-the-infection-monkey-perform-to-the-internet-exactly)
- [Logging and how to find logs](#logging-and-how-to-find-logs)
@ -71,7 +71,7 @@ downloaded installer](/usage/file-checksums) first. Then, create a new folder
and disable antivirus scan for that folder. Lastly, re-install the Infection
Monkey in the newly created folder.
## Reset/enable the Monkey Island password
## Reset the Monkey Island password
{{% notice warning %}}
@ -82,43 +82,18 @@ However, you can save the Monkey's existing configuration by logging in with you
### On Windows and Linux (AppImage)
When you first access the Monkey Island server, you'll be prompted to create an account.
To reset the credentials, edit the `server_config.json` file manually
Creating an account will write your credentials in `credentials.json` file
under [data directory]({{< ref "/reference/data_directory" >}}).
To reset the credentials:
1. **Remove** the `credentials.json` file manually
(located in the [data directory]({{< ref "/reference/data_directory" >}})).
In order to reset the credentials, the following edits need to be made:
1. Delete the `user` field. It will look like this:
```json
{
...
"user": "username",
...
}
```
1. Delete the `password_hash` field. It will look like this:
```json
{
...
"password_hash": "$2b$12$d050I/MsR5.F5E15Sm7EkunmmwMkUKaZE0P0tJXG.M9tF.Kmkd342",
...
}
```
1. Set `server_config` to `password`. It should look like this:
```json
{
...
"environment": {
...
"server_config": "password",
...
},
...
}
```
1. Restart the Monkey Island process:
2. Restart the Monkey Island process:
* On Linux, simply kill the Monkey Island process and execute the AppImage.
* On Windows, restart the program.
1. Go to the Monkey Island's URL and create a new account.
3. Go to the Monkey Island's URL and create a new account.
If you are still unable to log into Monkey Island after following the above
steps, you can perform a complete factory reset by removing the entire [data

View File

@ -10,12 +10,12 @@ class InvalidRegistrationCredentialsError(Exception):
""" Raise when server config file changed and island needs to restart """
class RegistrationNotNeededError(Exception):
class AlreadyRegisteredError(Exception):
""" Raise to indicate the reason why registration is not required """
class AlreadyRegisteredError(RegistrationNotNeededError):
""" Raise to indicate the reason why registration is not required """
class UnknownUserError(Exception):
""" Raise to indicate that authentication failed """
class IncorrectCredentialsError(Exception):

View File

@ -7,7 +7,6 @@ from common.utils.exceptions import (
InvalidRegistrationCredentialsError,
)
from monkey_island.cc.environment.environment_config import EnvironmentConfig
from monkey_island.cc.environment.user_creds import UserCreds
logger = logging.getLogger(__name__)
@ -23,34 +22,6 @@ class Environment(object, metaclass=ABCMeta):
self._config = config
self._testing = False # Assume env is not for unit testing.
def get_user(self):
return self._config.user_creds
def needs_registration(self) -> bool:
try:
needs_registration = self._try_needs_registration()
return needs_registration
except (AlreadyRegisteredError) as e:
logger.info(e)
return False
def try_add_user(self, credentials: UserCreds):
if not credentials:
raise InvalidRegistrationCredentialsError("Missing part of credentials.")
if self._try_needs_registration():
self._config.add_user(credentials)
logger.info(f"New user {credentials.username} registered!")
def _try_needs_registration(self) -> bool:
if self._is_registered():
raise AlreadyRegisteredError(
"User has already been registered. " "Reset credentials or login."
)
return True
def _is_registered(self) -> bool:
return self._config and self._config.user_creds
@property
def testing(self):
return self._testing

View File

@ -4,14 +4,11 @@ import json
import os
from typing import Dict
from monkey_island.cc.environment.user_creds import UserCreds
class EnvironmentConfig:
def __init__(self, file_path):
self._server_config_path = os.path.expanduser(file_path)
self.server_config = None
self.user_creds = None
self.aws = None
self._load_from_file(self._server_config_path)
@ -24,7 +21,7 @@ class EnvironmentConfig:
self._load_from_json(config_content)
def _load_from_json(self, config_json: str) -> EnvironmentConfig:
def _load_from_json(self, config_json: str):
data = json.loads(config_json)
self._load_from_dict(data["environment"])
@ -32,7 +29,6 @@ class EnvironmentConfig:
aws = dict_data["aws"] if "aws" in dict_data else None
self.server_config = dict_data["server_config"]
self.user_creds = _get_user_credentials_from_config(dict_data)
self.aws = aws
def save_to_file(self):
@ -50,16 +46,4 @@ class EnvironmentConfig:
}
if self.aws:
config_dict.update({"aws": self.aws})
config_dict.update(self.user_creds.to_dict())
return config_dict
def add_user(self, credentials: UserCreds):
self.user_creds = credentials
self.save_to_file()
def _get_user_credentials_from_config(dict_data: Dict):
username = dict_data.get("user", "")
password_hash = dict_data.get("password_hash", "")
return UserCreds(username, password_hash)

View File

@ -9,7 +9,7 @@ from jwt import PyJWTError
from common.utils.exceptions import IncorrectCredentialsError
from monkey_island.cc.resources.auth.credential_utils import get_username_password_from_request
from monkey_island.cc.services.authentication import AuthenticationService
from monkey_island.cc.services import AuthenticationService
logger = logging.getLogger(__name__)

View File

@ -3,9 +3,9 @@ import logging
import flask_restful
from flask import make_response, request
from common.utils.exceptions import InvalidRegistrationCredentialsError, RegistrationNotNeededError
from common.utils.exceptions import AlreadyRegisteredError, InvalidRegistrationCredentialsError
from monkey_island.cc.resources.auth.credential_utils import get_username_password_from_request
from monkey_island.cc.services.authentication import AuthenticationService
from monkey_island.cc.services import AuthenticationService
logger = logging.getLogger(__name__)
@ -20,5 +20,5 @@ class Registration(flask_restful.Resource):
try:
AuthenticationService.register_new_user(username, password)
return make_response({"error": ""}, 200)
except (InvalidRegistrationCredentialsError, RegistrationNotNeededError) as e:
except (InvalidRegistrationCredentialsError, AlreadyRegisteredError) as e:
return make_response({"error": str(e)}, 400)

View File

@ -55,7 +55,8 @@ def _create_secure_directory_windows(path: str):
@contextmanager
def open_new_securely_permissioned_file(path: str, mode: str = "w") -> Generator:
if is_windows_os():
fd = _get_file_descriptor_for_new_secure_file_windows(path)
# TODO: Switch from string to Path object to avoid this hack.
fd = _get_file_descriptor_for_new_secure_file_windows(str(path))
else:
fd = _get_file_descriptor_for_new_secure_file_linux(path)

View File

@ -0,0 +1,2 @@
from .authentication.authentication_service import AuthenticationService
from .authentication.json_file_user_datastore import JsonFileUserDatastore

View File

@ -1,39 +1,54 @@
import bcrypt
import monkey_island.cc.environment.environment_singleton as env_singleton
from common.utils.exceptions import IncorrectCredentialsError
from monkey_island.cc.environment.user_creds import UserCreds
from common.utils.exceptions import (
IncorrectCredentialsError,
InvalidRegistrationCredentialsError,
UnknownUserError,
)
from monkey_island.cc.server_utils.encryption import (
reset_datastore_encryptor,
unlock_datastore_encryptor,
)
from monkey_island.cc.setup.mongo.database_initializer import reset_database
from .i_user_datastore import IUserDatastore
from .user_creds import UserCreds
class AuthenticationService:
DATA_DIR = None
user_datastore = None
# TODO: A number of these services should be instance objects instead of
# static/singleton hybrids. At the moment, this requires invasive refactoring that's
# not a priority.
@classmethod
def initialize(cls, data_dir: str):
def initialize(cls, data_dir: str, user_datastore: IUserDatastore):
cls.DATA_DIR = data_dir
cls.user_datastore = user_datastore
@staticmethod
def needs_registration() -> bool:
return env_singleton.env.needs_registration()
@classmethod
def needs_registration(cls) -> bool:
return not cls.user_datastore.has_registered_users()
@classmethod
def register_new_user(cls, username: str, password: str):
if not username or not password:
raise InvalidRegistrationCredentialsError("Username or password can not be empty.")
credentials = UserCreds(username, _hash_password(password))
env_singleton.env.try_add_user(credentials)
cls.user_datastore.add_user(credentials)
cls._reset_datastore_encryptor(username, password)
reset_database()
@classmethod
def authenticate(cls, username: str, password: str):
if not _credentials_match_registered_user(username, password):
try:
registered_user = cls.user_datastore.get_user_credentials(username)
except UnknownUserError:
raise IncorrectCredentialsError()
if not _credentials_match_registered_user(username, password, registered_user):
raise IncorrectCredentialsError()
cls._unlock_datastore_encryptor(username, password)
@ -56,12 +71,9 @@ def _hash_password(plaintext_password: str) -> str:
return password_hash.decode()
def _credentials_match_registered_user(username: str, password: str) -> bool:
registered_user = env_singleton.env.get_user()
if not registered_user:
return False
def _credentials_match_registered_user(
username: str, password: str, registered_user: UserCreds
) -> bool:
return (registered_user.username == username) and _password_matches_hash(
password, registered_user.password_hash
)

View File

@ -0,0 +1,36 @@
import abc
from .user_creds import UserCreds
class IUserDatastore(metaclass=abc.ABCMeta):
"""
Allows user credentials to be stored and retrieved.
"""
@abc.abstractmethod
def has_registered_users(self) -> bool:
"""
Checks if there are any registered user.
:return: True if any users have been registered, False otherwise
:rtype: bool
"""
@abc.abstractmethod
def add_user(self, credentials: UserCreds):
"""
Adds a new user to the datastore.
:param UserCreds credentials: New user credentials to persistant storage.
:raises InvalidRegistrationCredentialsError: if the credentials are malformed
:raises AlreadyRegisteredError: if the user has already been registered
"""
@abc.abstractmethod
def get_user_credentials(self, username: str) -> UserCreds:
"""
Gets the user matching `username` from storage.
:param str username: The username for which credentials will be retrieved
:return: User credentials for username
:rtype: UserCreds
:raises UnknownUserError: if the username does not exist
"""

View File

@ -0,0 +1,58 @@
import json
from pathlib import Path
from common.utils.exceptions import (
AlreadyRegisteredError,
InvalidRegistrationCredentialsError,
UnknownUserError,
)
from monkey_island.cc.server_utils.file_utils import open_new_securely_permissioned_file
from .i_user_datastore import IUserDatastore
from .user_creds import UserCreds
CREDENTIALS_FILE = "credentials.json"
class JsonFileUserDatastore(IUserDatastore):
def __init__(self, data_dir: Path):
self._credentials = None
self._credentials_file = data_dir / CREDENTIALS_FILE
if self._credentials_file.exists():
self._credentials = self._load_from_file()
def _load_from_file(self) -> UserCreds:
with open(self._credentials_file, "r") as f:
credentials_dict = json.load(f)
return UserCreds(credentials_dict["user"], credentials_dict["password_hash"])
def has_registered_users(self) -> bool:
return self._credentials is not None
def add_user(self, credentials: UserCreds):
if credentials is None:
raise InvalidRegistrationCredentialsError("Credentials can not be 'None'")
elif not credentials.username:
raise InvalidRegistrationCredentialsError("Username can not be empty")
elif not credentials.password_hash:
raise InvalidRegistrationCredentialsError("Password hash can not be empty")
if self._credentials:
raise AlreadyRegisteredError(
"User has already been registered. Reset credentials or login."
)
self._credentials = credentials
self._store_credentials_to_file()
def _store_credentials_to_file(self):
with open_new_securely_permissioned_file(self._credentials_file, "w") as f:
json.dump(self._credentials.to_dict(), f, indent=2)
def get_user_credentials(self, username: str) -> UserCreds:
if self._credentials is None or self._credentials.username != username:
raise UnknownUserError(f"User {username} does not exist.")
return self._credentials

View File

@ -1,9 +1,10 @@
from monkey_island.cc.services.authentication import AuthenticationService
from monkey_island.cc.services.post_breach_files import PostBreachFilesService
from monkey_island.cc.services.run_local_monkey import LocalMonkeyRunService
from . import AuthenticationService, JsonFileUserDatastore
def initialize_services(data_dir):
PostBreachFilesService.initialize(data_dir)
LocalMonkeyRunService.initialize(data_dir)
AuthenticationService.initialize(data_dir)
AuthenticationService.initialize(data_dir, JsonFileUserDatastore(data_dir))

View File

@ -0,0 +1,4 @@
{
"user": "new_user",
"password_hash": "new_hash"
}

View File

@ -1,9 +0,0 @@
{
"environment" : {
"server_config": "password",
"user": "test"
},
"mongodb": {
"start_mongodb": true
}
}

View File

@ -1,11 +0,0 @@
{
"log_level": "NOTICE",
"environment" : {
"server_config": "password",
"user": "test",
"password_hash": "abcdef"
},
"mongodb": {
"start_mongodb": true
}
}

View File

@ -3,16 +3,6 @@ import os
import pytest
@pytest.fixture(scope="module")
def with_credentials(server_configs_dir):
return os.path.join(server_configs_dir, "server_config_with_credentials.json")
@pytest.fixture(scope="module")
def no_credentials(server_configs_dir):
return os.path.join(server_configs_dir, "server_config_no_credentials.json")
@pytest.fixture(scope="module")
def partial_credentials(server_configs_dir):
return os.path.join(server_configs_dir, "server_config_partial_credentials.json")

View File

@ -1,91 +1,10 @@
import os
import tempfile
from typing import Dict
from unittest import TestCase
from unittest.mock import MagicMock, patch
import pytest
from common.utils.exceptions import AlreadyRegisteredError, InvalidRegistrationCredentialsError
from monkey_island.cc.environment import Environment, EnvironmentConfig, UserCreds
WITH_CREDENTIALS = None
NO_CREDENTIALS = None
PARTIAL_CREDENTIALS = None
USER_CREDENTIALS = UserCreds(username="test", password_hash="1231234")
# This fixture is a dirty hack that can be removed once these tests are converted from
# unittest to pytest. Instead, the appropriate fixtures from conftest.py can be used.
@pytest.fixture(scope="module", autouse=True)
def configure_resources(server_configs_dir):
global WITH_CREDENTIALS
global NO_CREDENTIALS
global PARTIAL_CREDENTIALS
WITH_CREDENTIALS = os.path.join(server_configs_dir, "server_config_with_credentials.json")
NO_CREDENTIALS = os.path.join(server_configs_dir, "server_config_no_credentials.json")
PARTIAL_CREDENTIALS = os.path.join(server_configs_dir, "server_config_partial_credentials.json")
def get_tmp_file():
with tempfile.NamedTemporaryFile(delete=False) as f:
return f.name
class StubEnvironmentConfig(EnvironmentConfig):
def __init__(self, server_config, deployment, user_creds):
self.server_config = server_config
self.deployment = deployment
self.user_creds = user_creds
self.server_config_path = get_tmp_file()
def __del__(self):
os.remove(self.server_config_path)
from monkey_island.cc.environment import Environment, EnvironmentConfig
class TestEnvironment(TestCase):
class EnvironmentCredentialsRequired(Environment):
def __init__(self):
config = StubEnvironmentConfig("test", "test", None)
super().__init__(config)
class EnvironmentAlreadyRegistered(Environment):
def __init__(self):
config = StubEnvironmentConfig("test", "test", UserCreds("test_user", "test_secret"))
super().__init__(config)
@patch.object(target=EnvironmentConfig, attribute="save_to_file", new=MagicMock())
def test_try_add_user(self):
env = TestEnvironment.EnvironmentCredentialsRequired()
credentials = USER_CREDENTIALS
env.try_add_user(credentials)
credentials = UserCreds(username="test", password_hash="")
with self.assertRaises(InvalidRegistrationCredentialsError):
env.try_add_user(credentials)
def test_try_needs_registration(self):
env = TestEnvironment.EnvironmentAlreadyRegistered()
with self.assertRaises(AlreadyRegisteredError):
env._try_needs_registration()
env = TestEnvironment.EnvironmentCredentialsRequired()
self.assertTrue(env._try_needs_registration())
def test_needs_registration(self):
env = TestEnvironment.EnvironmentCredentialsRequired()
self._test_bool_env_method("needs_registration", env, WITH_CREDENTIALS, False)
self._test_bool_env_method("needs_registration", env, NO_CREDENTIALS, True)
self._test_bool_env_method("needs_registration", env, PARTIAL_CREDENTIALS, True)
def test_is_registered(self):
env = TestEnvironment.EnvironmentCredentialsRequired()
self._test_bool_env_method("_is_registered", env, WITH_CREDENTIALS, True)
self._test_bool_env_method("_is_registered", env, NO_CREDENTIALS, False)
self._test_bool_env_method("_is_registered", env, PARTIAL_CREDENTIALS, False)
def _test_bool_env_method(
self, method_name: str, env: Environment, config: Dict, expected_result: bool
):

View File

@ -5,7 +5,6 @@ import shutil
import pytest
from monkey_island.cc.environment.environment_config import EnvironmentConfig
from monkey_island.cc.environment.user_creds import UserCreds
@pytest.fixture
@ -13,15 +12,6 @@ def config_file(tmpdir):
return os.path.join(tmpdir, "test_config.json")
def test_get_with_credentials(with_credentials):
config_dict = EnvironmentConfig(with_credentials).to_dict()
assert len(config_dict.keys()) == 3
assert config_dict["server_config"] == "password"
assert config_dict["user"] == "test"
assert config_dict["password_hash"] == "abcdef"
def test_get_with_no_credentials(no_credentials):
config_dict = EnvironmentConfig(no_credentials).to_dict()
@ -29,16 +19,8 @@ def test_get_with_no_credentials(no_credentials):
assert config_dict["server_config"] == "password"
def test_get_with_partial_credentials(partial_credentials):
config_dict = EnvironmentConfig(partial_credentials).to_dict()
assert len(config_dict.keys()) == 2
assert config_dict["server_config"] == "password"
assert config_dict["user"] == "test"
def test_save_to_file(config_file, with_credentials):
shutil.copyfile(with_credentials, config_file)
def test_save_to_file(config_file, no_credentials):
shutil.copyfile(no_credentials, config_file)
environment_config = EnvironmentConfig(config_file)
environment_config.aws = "test_aws"
@ -48,43 +30,3 @@ def test_save_to_file(config_file, with_credentials):
from_file = json.load(f)
assert environment_config.to_dict() == from_file["environment"]
def test_save_to_file_preserve_log_level(config_file, with_credentials):
shutil.copyfile(with_credentials, config_file)
environment_config = EnvironmentConfig(config_file)
environment_config.aws = "test_aws"
environment_config.save_to_file()
with open(config_file, "r") as f:
from_file = json.load(f)
assert "log_level" in from_file
assert from_file["log_level"] == "NOTICE"
def test_add_user(config_file, with_credentials):
new_user = "new_user"
new_password_hash = "fedcba"
new_user_creds = UserCreds(new_user, new_password_hash)
shutil.copyfile(with_credentials, config_file)
environment_config = EnvironmentConfig(config_file)
environment_config.add_user(new_user_creds)
with open(config_file, "r") as f:
from_file = json.load(f)
assert len(from_file["environment"].keys()) == 3
assert from_file["environment"]["user"] == new_user
assert from_file["environment"]["password_hash"] == new_password_hash
def test_user(with_credentials):
environment_config = EnvironmentConfig(with_credentials)
user = environment_config.user_creds
assert user.username == "test"
assert user.password_hash == "abcdef"

View File

@ -3,7 +3,7 @@ from unittest.mock import MagicMock
import pytest
from common.utils.exceptions import InvalidRegistrationCredentialsError, RegistrationNotNeededError
from common.utils.exceptions import AlreadyRegisteredError, InvalidRegistrationCredentialsError
REGISTRATION_URL = "/api/registration"
@ -59,9 +59,7 @@ def test_invalid_credentials(make_registration_request, mock_authentication_serv
def test_registration_not_needed(make_registration_request, mock_authentication_service):
mock_authentication_service.register_new_user = MagicMock(
side_effect=RegistrationNotNeededError()
)
mock_authentication_service.register_new_user = MagicMock(side_effect=AlreadyRegisteredError())
registration_request_body = "{}"
response = make_registration_request(registration_request_body)

View File

@ -1,4 +1,4 @@
from monkey_island.cc.environment.user_creds import UserCreds
from monkey_island.cc.services.authentication.user_creds import UserCreds
TEST_USER = "Test"
TEST_HASH = "abc1231234"

View File

@ -0,0 +1,184 @@
from unittest.mock import MagicMock
import pytest
from common.utils.exceptions import (
AlreadyRegisteredError,
IncorrectCredentialsError,
InvalidRegistrationCredentialsError,
UnknownUserError,
)
from monkey_island.cc.services import AuthenticationService
from monkey_island.cc.services.authentication import authentication_service
from monkey_island.cc.services.authentication.i_user_datastore import IUserDatastore
from monkey_island.cc.services.authentication.user_creds import UserCreds
USERNAME = "user1"
PASSWORD = "test"
PASSWORD_HASH = "$2b$12$YsGjjuJFddYJ6z5S5/nMCuKkCzKHB1AWY9SXkQ02i25d8TgdhIRS2"
class MockUserDatastore(IUserDatastore):
def __init__(self, has_registered_users, add_user, get_user_credentials):
self._has_registered_users = has_registered_users
self._add_user = add_user
self._get_user_credentials = get_user_credentials
def has_registered_users(self):
return self._has_registered_users()
def add_user(self, credentials: UserCreds):
return self._add_user(credentials)
def get_user_credentials(self, username: str) -> UserCreds:
return self._get_user_credentials(username)
@pytest.fixture
def mock_reset_datastore_encryptor():
return MagicMock()
@pytest.fixture
def mock_reset_database():
return MagicMock()
@pytest.fixture
def mock_unlock_datastore_encryptor():
return MagicMock()
@pytest.fixture(autouse=True)
def patch_datastore_utils(
monkeypatch,
mock_reset_datastore_encryptor,
mock_reset_database,
mock_unlock_datastore_encryptor,
):
monkeypatch.setattr(
authentication_service, "reset_datastore_encryptor", mock_reset_datastore_encryptor
)
monkeypatch.setattr(authentication_service, "reset_database", mock_reset_database)
monkeypatch.setattr(
authentication_service, "unlock_datastore_encryptor", mock_unlock_datastore_encryptor
)
def test_needs_registration__true(tmp_path):
has_registered_users = False
mock_user_datastore = MockUserDatastore(lambda: has_registered_users, None, None)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
assert a_s.needs_registration()
def test_needs_registration__false(tmp_path):
has_registered_users = True
mock_user_datastore = MockUserDatastore(lambda: has_registered_users, None, None)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
assert not a_s.needs_registration()
@pytest.mark.parametrize("error", [InvalidRegistrationCredentialsError, AlreadyRegisteredError])
def test_register_new_user__fails(
tmp_path, mock_reset_datastore_encryptor, mock_reset_database, error
):
mock_user_datastore = MockUserDatastore(lambda: True, MagicMock(side_effect=error), None)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
with pytest.raises(error):
a_s.register_new_user(USERNAME, PASSWORD)
mock_reset_datastore_encryptor.assert_not_called()
mock_reset_database.assert_not_called()
def test_register_new_user__empty_password_fails(
tmp_path, mock_reset_datastore_encryptor, mock_reset_database
):
mock_user_datastore = MockUserDatastore(lambda: False, None, None)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
with pytest.raises(InvalidRegistrationCredentialsError):
a_s.register_new_user(USERNAME, "")
mock_reset_datastore_encryptor.assert_not_called()
mock_reset_database.assert_not_called()
def test_register_new_user(tmp_path, mock_reset_datastore_encryptor, mock_reset_database):
mock_add_user = MagicMock()
mock_user_datastore = MockUserDatastore(lambda: False, mock_add_user, None)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
a_s.register_new_user(USERNAME, PASSWORD)
assert mock_add_user.call_args[0][0].username == USERNAME
assert mock_add_user.call_args[0][0].password_hash != PASSWORD
mock_reset_datastore_encryptor.assert_called_once()
assert mock_reset_datastore_encryptor.call_args[0][1] != USERNAME
mock_reset_database.assert_called_once()
def test_authenticate__success(tmp_path, mock_unlock_datastore_encryptor):
mock_user_datastore = MockUserDatastore(
lambda: True,
None,
lambda _: UserCreds(USERNAME, PASSWORD_HASH),
)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
# If authentication fails, this function will raise an exception and the test will fail.
a_s.authenticate(USERNAME, PASSWORD)
mock_unlock_datastore_encryptor.assert_called_once()
@pytest.mark.parametrize(
("username", "password"), [("wrong_username", PASSWORD), (USERNAME, "wrong_password")]
)
def test_authenticate__failed_wrong_credentials(
tmp_path, mock_unlock_datastore_encryptor, username, password
):
mock_user_datastore = MockUserDatastore(
lambda: True,
None,
lambda _: UserCreds(USERNAME, PASSWORD_HASH),
)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
with pytest.raises(IncorrectCredentialsError):
a_s.authenticate(username, password)
mock_unlock_datastore_encryptor.assert_not_called()
def test_authenticate__failed_no_registered_user(tmp_path, mock_unlock_datastore_encryptor):
mock_user_datastore = MockUserDatastore(
lambda: True, None, MagicMock(side_effect=UnknownUserError)
)
a_s = AuthenticationService()
a_s.initialize(tmp_path, mock_user_datastore)
with pytest.raises(IncorrectCredentialsError):
a_s.authenticate(USERNAME, PASSWORD)
mock_unlock_datastore_encryptor.assert_not_called()

View File

@ -0,0 +1,111 @@
import os
import stat
import pytest
from tests.monkey_island.utils import assert_windows_permissions
from common.utils.exceptions import (
AlreadyRegisteredError,
InvalidRegistrationCredentialsError,
UnknownUserError,
)
from monkey_island.cc.server_utils.file_utils import is_windows_os
from monkey_island.cc.services.authentication.json_file_user_datastore import (
CREDENTIALS_FILE,
JsonFileUserDatastore,
)
from monkey_island.cc.services.authentication.user_creds import UserCreds
USERNAME = "test"
PASSWORD_HASH = "DEADBEEF"
@pytest.fixture
def empty_datastore(tmp_path):
return JsonFileUserDatastore(tmp_path)
@pytest.fixture
def populated_datastore(data_for_tests_dir):
return JsonFileUserDatastore(data_for_tests_dir)
@pytest.fixture
def credentials_file_path(tmp_path):
return tmp_path / CREDENTIALS_FILE
def test_has_registered_users_pre_registration(empty_datastore):
assert not empty_datastore.has_registered_users()
def test_has_registered_users_after_registration(populated_datastore):
assert populated_datastore.has_registered_users()
def test_add_user(empty_datastore, credentials_file_path):
datastore = empty_datastore
datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH))
assert datastore.has_registered_users()
assert credentials_file_path.exists()
@pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.")
def test_add_user__term_posix(empty_datastore, credentials_file_path):
empty_datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH))
st = os.stat(credentials_file_path)
expected_mode = stat.S_IRUSR | stat.S_IWUSR
actual_mode = st.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
assert expected_mode == actual_mode
@pytest.mark.skipif(not is_windows_os(), reason="Tests Windows (not Posix) permissions.")
def test_add_user__term_windows(empty_datastore, credentials_file_path):
datastore = empty_datastore
datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH))
assert_windows_permissions(str(credentials_file_path))
def test_add_user__None_creds(empty_datastore):
with pytest.raises(InvalidRegistrationCredentialsError):
empty_datastore.add_user(None)
def test_add_user__empty_username(empty_datastore):
with pytest.raises(InvalidRegistrationCredentialsError):
empty_datastore.add_user(UserCreds("", PASSWORD_HASH))
def test_add_user__empty_password_hash(empty_datastore):
with pytest.raises(InvalidRegistrationCredentialsError):
empty_datastore.add_user(UserCreds(USERNAME, ""))
def test_add_user__already_registered(populated_datastore):
with pytest.raises(AlreadyRegisteredError):
populated_datastore.add_user(UserCreds("new_user", "new_hash"))
def test_get_user_credentials_from_file(tmp_path):
empty_datastore = JsonFileUserDatastore(tmp_path)
empty_datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH))
populated_datastore = JsonFileUserDatastore(tmp_path)
stored_user = populated_datastore.get_user_credentials(USERNAME)
assert stored_user.username == USERNAME
assert stored_user.password_hash == PASSWORD_HASH
def test_get_unknown_user(populated_datastore):
with pytest.raises(UnknownUserError):
populated_datastore.get_user_credentials("unregistered_user")
def test_get_user_credentials__no_user_registered(empty_datastore):
with pytest.raises(UnknownUserError):
empty_datastore.get_user_credentials("unregistered_user")