diff --git a/monkey/monkey_island/cc/services/__init__.py b/monkey/monkey_island/cc/services/__init__.py index 83cbf3492..b3dcba9de 100644 --- a/monkey/monkey_island/cc/services/__init__.py +++ b/monkey/monkey_island/cc/services/__init__.py @@ -1 +1,2 @@ from .authentication.authentication_service import AuthenticationService +from .authentication.json_file_user_datastore import JsonFileUserDatastore diff --git a/monkey/monkey_island/cc/services/authentication/json_file_user_datastore.py b/monkey/monkey_island/cc/services/authentication/json_file_user_datastore.py new file mode 100644 index 000000000..7dd8d82b7 --- /dev/null +++ b/monkey/monkey_island/cc/services/authentication/json_file_user_datastore.py @@ -0,0 +1,57 @@ +import json +from pathlib import Path + +from common.utils.exceptions import ( + AlreadyRegisteredError, + InvalidRegistrationCredentialsError, + UnknownUserError, +) +from monkey_island.cc.environment.user_creds import UserCreds + +from .i_user_datastore import IUserDatastore + +CREDENTIALS_FILE = "credentials.json" + + +class JsonFileUserDatastore(IUserDatastore): + def __init__(self, data_dir: Path): + self._credentials = None + self._credentials_file = data_dir / CREDENTIALS_FILE + + if self._credentials_file.exists(): + self._credentials = self._load_from_file() + + def _load_from_file(self) -> UserCreds: + with open(self._credentials_file, "r") as f: + credentials_dict = json.load(f) + + return UserCreds(credentials_dict["user"], credentials_dict["password_hash"]) + + def has_registered_users(self) -> bool: + return self._credentials is not None + + def add_user(self, credentials: UserCreds): + if credentials is None: + raise InvalidRegistrationCredentialsError("Credentials can not be 'None'") + elif not credentials.username: + raise InvalidRegistrationCredentialsError("Username can not be empty") + elif not credentials.password_hash: + raise InvalidRegistrationCredentialsError("Password hash can not be empty") + + if self._credentials: + raise AlreadyRegisteredError( + "User has already been registered. Reset credentials or login." + ) + + self._credentials = credentials + self._store_credentials_to_file() + + def _store_credentials_to_file(self): + with open(self._credentials_file, "w") as f: + json.dump(self._credentials.to_dict(), f, indent=2) + + def get_user_credentials(self, username: str) -> UserCreds: + if self._credentials.username != username: + raise UnknownUserError(f"User {username} does not exist.") + + return self._credentials diff --git a/monkey/tests/data_for_tests/credentials.json b/monkey/tests/data_for_tests/credentials.json new file mode 100644 index 000000000..440f7fc8b --- /dev/null +++ b/monkey/tests/data_for_tests/credentials.json @@ -0,0 +1,4 @@ +{ + "user": "new_user", + "password_hash": "new_hash" +} diff --git a/monkey/tests/unit_tests/monkey_island/cc/services/test_json_file_user_datastore.py b/monkey/tests/unit_tests/monkey_island/cc/services/test_json_file_user_datastore.py new file mode 100644 index 000000000..35b342a9f --- /dev/null +++ b/monkey/tests/unit_tests/monkey_island/cc/services/test_json_file_user_datastore.py @@ -0,0 +1,77 @@ +import pytest + +from common.utils.exceptions import ( + AlreadyRegisteredError, + InvalidRegistrationCredentialsError, + UnknownUserError, +) +from monkey_island.cc.environment.user_creds import UserCreds +from monkey_island.cc.services.authentication.json_file_user_datastore import ( + CREDENTIALS_FILE, + JsonFileUserDatastore, +) + +USERNAME = "test" +PASSWORD_HASH = "DEADBEEF" + + +@pytest.fixture +def empty_datastore(tmp_path): + return JsonFileUserDatastore(tmp_path) + + +@pytest.fixture +def populated_datastore(data_for_tests_dir): + return JsonFileUserDatastore(data_for_tests_dir) + + +def test_has_registered_users_pre_registration(empty_datastore): + assert not empty_datastore.has_registered_users() + + +def test_has_registered_users_after_registration(populated_datastore): + assert populated_datastore.has_registered_users() + + +def test_add_user(empty_datastore, tmp_path): + datastore = empty_datastore + + datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH)) + assert datastore.has_registered_users() + assert (tmp_path / CREDENTIALS_FILE).exists() + + +def test_add_user__None_creds(empty_datastore): + with pytest.raises(InvalidRegistrationCredentialsError): + empty_datastore.add_user(None) + + +def test_add_user__empty_username(empty_datastore): + with pytest.raises(InvalidRegistrationCredentialsError): + empty_datastore.add_user(UserCreds("", PASSWORD_HASH)) + + +def test_add_user__empty_password_hash(empty_datastore): + with pytest.raises(InvalidRegistrationCredentialsError): + empty_datastore.add_user(UserCreds(USERNAME, "")) + + +def test_add_user__already_registered(populated_datastore): + with pytest.raises(AlreadyRegisteredError): + populated_datastore.add_user(UserCreds("new_user", "new_hash")) + + +def test_get_user_credentials_from_file(tmp_path): + empty_datastore = JsonFileUserDatastore(tmp_path) + empty_datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH)) + + populated_datastore = JsonFileUserDatastore(tmp_path) + stored_user = populated_datastore.get_user_credentials(USERNAME) + + assert stored_user.username == USERNAME + assert stored_user.password_hash == PASSWORD_HASH + + +def test_get_unknown_user(populated_datastore): + with pytest.raises(UnknownUserError): + populated_datastore.get_user_credentials("unregistered_user")