Island, UT: fix ssh key processing, add unit tests

This commit is contained in:
vakarisz 2022-02-23 14:00:56 +02:00
parent ddb227b181
commit 9396ac7512
2 changed files with 83 additions and 22 deletions

View File

@ -1,7 +1,9 @@
from common.common_consts.credentials_type import CredentialComponentType
from typing import Mapping
from monkey_island.cc.models import Monkey
from monkey_island.cc.server_utils.encryption import get_datastore_encryptor
from monkey_island.cc.services.config import ConfigService
from monkey_island.cc.services.telemetry.processing.credentials import Credentials
class SSHKeyProcessingError(ValueError):
@ -10,33 +12,38 @@ class SSHKeyProcessingError(ValueError):
super().__init__(self.msg)
def process_ssh_key(credentials: dict, monkey_guid: str):
if len(credentials["identities"]) != 1:
def process_ssh_key(keypair: Mapping, credentials: Credentials):
if len(credentials.identities) != 1:
raise SSHKeyProcessingError(
f'SSH credentials have {len(credentials["identities"])}' f" users associated with it!"
f"SSH credentials have {len(credentials.identities)}" f" users associated with " f"it!"
)
for ssh_key in credentials["secrets"]:
if not ssh_key["credential_type"] == CredentialComponentType.SSH_KEYPAIR.value:
raise SSHKeyProcessingError("SSH credentials contain secrets that are not keypairs")
if not ssh_key["public_key"] or not ssh_key["private_key"]:
if not _contains_both_keys(keypair):
raise SSHKeyProcessingError("Private or public key missing!")
# TODO SSH key should be associated with IP that monkey exploited
ip = Monkey.get_single_monkey_by_guid(monkey_guid).ip_addresses[0]
username = credentials["identities"][0]["username"]
ip = Monkey.get_single_monkey_by_guid(credentials.monkey_guid).ip_addresses[0]
username = credentials.identities[0]["username"]
encrypt_system_info_ssh_keys(ssh_key)
encrypted_keys = _encrypt_ssh_keys(keypair)
ConfigService.ssh_add_keys(
user=username,
public_key=ssh_key["public_key"],
private_key=ssh_key["private_key"],
public_key=encrypted_keys["public_key"],
private_key=encrypted_keys["private_key"],
ip=ip,
)
def encrypt_system_info_ssh_keys(ssh_key: dict):
def _contains_both_keys(ssh_key: Mapping) -> bool:
try:
return ssh_key["public_key"] and ssh_key["private_key"]
except KeyError:
return False
def _encrypt_ssh_keys(ssh_key: Mapping) -> Mapping:
encrypted_keys = {}
for field in ["public_key", "private_key"]:
ssh_key[field] = get_datastore_encryptor().encrypt(ssh_key[field])
encrypted_keys[field] = get_datastore_encryptor().encrypt(ssh_key[field])
return encrypted_keys

View File

@ -0,0 +1,54 @@
from copy import deepcopy
import dpath.util
import pytest
from tests.unit_tests.monkey_island.cc.services.telemetry.processing.credentials.conftest import (
CREDENTIAL_TELEM_TEMPLATE,
)
from common.config_value_paths import SSH_KEYS_PATH, USER_LIST_PATH
from monkey_island.cc.models import Monkey
from monkey_island.cc.services.config import ConfigService
from monkey_island.cc.services.telemetry.processing.credentials.credentials_parser import (
parse_credentials,
)
fake_monkey_guid = "272405690278083"
fake_ip_address = "192.168.56.1"
fake_private_key = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAACmFlczI1N\n"
fake_partial_secret = {"private_key": fake_private_key, "credential_type": "ssh_keypair"}
fake_username = "ubuntu"
fake_public_key = (
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC1u2+50OFRnzOGHpWo69"
"tc02oMXudeML7pOl7rqXLmdxuj monkey@krk-wpas5"
)
fake_secret_full = {
"private_key": fake_private_key,
"public_key": fake_public_key,
"credential_type": "ssh_keypair",
}
fake_identity = {"username": fake_username, "credential_type": "username"}
ssh_telem = deepcopy(CREDENTIAL_TELEM_TEMPLATE)
ssh_telem["data"] = [{"identities": [fake_identity], "secrets": [fake_secret_full]}]
@pytest.fixture
def insert_fake_monkey():
monkey = Monkey(guid=fake_monkey_guid, ip_addresses=[fake_ip_address])
monkey.save()
@pytest.mark.usefixtures("uses_encryptor", "uses_database", "fake_mongo", "insert_fake_monkey")
def test_ssh_credential_parsing():
parse_credentials(ssh_telem)
config = ConfigService.get_config(should_decrypt=True)
ssh_keypairs = dpath.util.get(config, SSH_KEYS_PATH)
assert len(ssh_keypairs) == 1
assert ssh_keypairs[0]["private_key"] == fake_private_key
assert ssh_keypairs[0]["public_key"] == fake_public_key
assert ssh_keypairs[0]["user"] == fake_username
assert ssh_keypairs[0]["ip"] == fake_ip_address
assert fake_username in dpath.util.get(config, USER_LIST_PATH)