forked from p15670423/monkey
Island: Add JsonFileUserDatastore
This commit is contained in:
parent
52f461f425
commit
8dfacbc099
|
@ -1 +1,2 @@
|
|||
from .authentication.authentication_service import AuthenticationService
|
||||
from .authentication.json_file_user_datastore import JsonFileUserDatastore
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from common.utils.exceptions import (
|
||||
AlreadyRegisteredError,
|
||||
InvalidRegistrationCredentialsError,
|
||||
UnknownUserError,
|
||||
)
|
||||
from monkey_island.cc.environment.user_creds import UserCreds
|
||||
|
||||
from .i_user_datastore import IUserDatastore
|
||||
|
||||
CREDENTIALS_FILE = "credentials.json"
|
||||
|
||||
|
||||
class JsonFileUserDatastore(IUserDatastore):
|
||||
def __init__(self, data_dir: Path):
|
||||
self._credentials = None
|
||||
self._credentials_file = data_dir / CREDENTIALS_FILE
|
||||
|
||||
if self._credentials_file.exists():
|
||||
self._credentials = self._load_from_file()
|
||||
|
||||
def _load_from_file(self) -> UserCreds:
|
||||
with open(self._credentials_file, "r") as f:
|
||||
credentials_dict = json.load(f)
|
||||
|
||||
return UserCreds(credentials_dict["user"], credentials_dict["password_hash"])
|
||||
|
||||
def has_registered_users(self) -> bool:
|
||||
return self._credentials is not None
|
||||
|
||||
def add_user(self, credentials: UserCreds):
|
||||
if credentials is None:
|
||||
raise InvalidRegistrationCredentialsError("Credentials can not be 'None'")
|
||||
elif not credentials.username:
|
||||
raise InvalidRegistrationCredentialsError("Username can not be empty")
|
||||
elif not credentials.password_hash:
|
||||
raise InvalidRegistrationCredentialsError("Password hash can not be empty")
|
||||
|
||||
if self._credentials:
|
||||
raise AlreadyRegisteredError(
|
||||
"User has already been registered. Reset credentials or login."
|
||||
)
|
||||
|
||||
self._credentials = credentials
|
||||
self._store_credentials_to_file()
|
||||
|
||||
def _store_credentials_to_file(self):
|
||||
with open(self._credentials_file, "w") as f:
|
||||
json.dump(self._credentials.to_dict(), f, indent=2)
|
||||
|
||||
def get_user_credentials(self, username: str) -> UserCreds:
|
||||
if self._credentials.username != username:
|
||||
raise UnknownUserError(f"User {username} does not exist.")
|
||||
|
||||
return self._credentials
|
|
@ -0,0 +1,4 @@
|
|||
{
|
||||
"user": "new_user",
|
||||
"password_hash": "new_hash"
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
import pytest
|
||||
|
||||
from common.utils.exceptions import (
|
||||
AlreadyRegisteredError,
|
||||
InvalidRegistrationCredentialsError,
|
||||
UnknownUserError,
|
||||
)
|
||||
from monkey_island.cc.environment.user_creds import UserCreds
|
||||
from monkey_island.cc.services.authentication.json_file_user_datastore import (
|
||||
CREDENTIALS_FILE,
|
||||
JsonFileUserDatastore,
|
||||
)
|
||||
|
||||
USERNAME = "test"
|
||||
PASSWORD_HASH = "DEADBEEF"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_datastore(tmp_path):
|
||||
return JsonFileUserDatastore(tmp_path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def populated_datastore(data_for_tests_dir):
|
||||
return JsonFileUserDatastore(data_for_tests_dir)
|
||||
|
||||
|
||||
def test_has_registered_users_pre_registration(empty_datastore):
|
||||
assert not empty_datastore.has_registered_users()
|
||||
|
||||
|
||||
def test_has_registered_users_after_registration(populated_datastore):
|
||||
assert populated_datastore.has_registered_users()
|
||||
|
||||
|
||||
def test_add_user(empty_datastore, tmp_path):
|
||||
datastore = empty_datastore
|
||||
|
||||
datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH))
|
||||
assert datastore.has_registered_users()
|
||||
assert (tmp_path / CREDENTIALS_FILE).exists()
|
||||
|
||||
|
||||
def test_add_user__None_creds(empty_datastore):
|
||||
with pytest.raises(InvalidRegistrationCredentialsError):
|
||||
empty_datastore.add_user(None)
|
||||
|
||||
|
||||
def test_add_user__empty_username(empty_datastore):
|
||||
with pytest.raises(InvalidRegistrationCredentialsError):
|
||||
empty_datastore.add_user(UserCreds("", PASSWORD_HASH))
|
||||
|
||||
|
||||
def test_add_user__empty_password_hash(empty_datastore):
|
||||
with pytest.raises(InvalidRegistrationCredentialsError):
|
||||
empty_datastore.add_user(UserCreds(USERNAME, ""))
|
||||
|
||||
|
||||
def test_add_user__already_registered(populated_datastore):
|
||||
with pytest.raises(AlreadyRegisteredError):
|
||||
populated_datastore.add_user(UserCreds("new_user", "new_hash"))
|
||||
|
||||
|
||||
def test_get_user_credentials_from_file(tmp_path):
|
||||
empty_datastore = JsonFileUserDatastore(tmp_path)
|
||||
empty_datastore.add_user(UserCreds(USERNAME, PASSWORD_HASH))
|
||||
|
||||
populated_datastore = JsonFileUserDatastore(tmp_path)
|
||||
stored_user = populated_datastore.get_user_credentials(USERNAME)
|
||||
|
||||
assert stored_user.username == USERNAME
|
||||
assert stored_user.password_hash == PASSWORD_HASH
|
||||
|
||||
|
||||
def test_get_unknown_user(populated_datastore):
|
||||
with pytest.raises(UnknownUserError):
|
||||
populated_datastore.get_user_credentials("unregistered_user")
|
Loading…
Reference in New Issue