monkey/envs/monkey_zoo/blackbox/analyzers/zerologon_analyzer.py

77 lines
3.4 KiB
Python
Raw Normal View History

from pprint import pformat
2021-04-07 05:55:44 +08:00
from typing import List
2022-07-28 17:52:42 +08:00
from common.credentials import CredentialComponentType, Credentials
from envs.monkey_zoo.blackbox.analyzers.analyzer import Analyzer
from envs.monkey_zoo.blackbox.analyzers.analyzer_log import AnalyzerLog
from envs.monkey_zoo.blackbox.island_client.monkey_island_client import MonkeyIslandClient
# Query for telemetry collection to see if password restoration was successful
2021-04-06 21:19:27 +08:00
TELEM_QUERY = {
"telem_category": "exploit",
"data.exploiter": "ZerologonExploiter",
"data.info.password_restored": True,
}
class ZerologonAnalyzer(Analyzer):
def __init__(self, island_client: MonkeyIslandClient, expected_credentials: List[str]):
self.island_client = island_client
self.expected_credentials = expected_credentials
self.log = AnalyzerLog(self.__class__.__name__)
def analyze_test_results(self):
self.log.clear()
is_creds_gathered = self._analyze_credential_gathering()
is_creds_restored = self._analyze_credential_restore()
return is_creds_gathered and is_creds_restored
def _analyze_credential_gathering(self) -> bool:
2022-07-28 17:52:42 +08:00
propagation_credentials = self.island_client.get_propagation_credentials()
credentials_on_island = ZerologonAnalyzer._get_relevant_credentials(propagation_credentials)
return self._is_all_credentials_in_list(credentials_on_island)
@staticmethod
2022-07-28 17:52:42 +08:00
def _get_relevant_credentials(propagation_credentials: Credentials) -> List[str]:
credentials_on_island = set()
for credentials in propagation_credentials:
if credentials.identity.credential_type is CredentialComponentType.USERNAME:
credentials_on_island.update([credentials.identity.username])
if credentials.secret.credential_type is CredentialComponentType.NT_HASH:
credentials_on_island.update([credentials.secret.nt_hash])
if credentials.secret.credential_type is CredentialComponentType.LM_HASH:
credentials_on_island.update([credentials.secret.lm_hash])
return list(credentials_on_island)
2021-04-06 21:19:27 +08:00
def _is_all_credentials_in_list(self, all_creds: List[str]) -> bool:
credentials_missing = [cred for cred in self.expected_credentials if cred not in all_creds]
self._log_creds_not_gathered(credentials_missing)
return not credentials_missing
def _log_creds_not_gathered(self, missing_creds: List[str]):
if not missing_creds:
self.log.add_entry("Zerologon exploiter gathered all credentials expected.")
else:
for cred in missing_creds:
self.log.add_entry(f"Credential Zerologon exploiter failed to gathered:{cred}.")
def _analyze_credential_restore(self) -> bool:
cred_restore_telems = self.island_client.find_telems_in_db(TELEM_QUERY)
self._log_credential_restore(cred_restore_telems)
return bool(cred_restore_telems)
def _log_credential_restore(self, telem_list: List[dict]):
if telem_list:
2021-04-06 21:19:27 +08:00
self.log.add_entry(
"Zerologon exploiter telemetry contains indicators that credentials "
"were successfully restored."
)
else:
2021-04-06 21:19:27 +08:00
self.log.add_entry(
"Credential restore failed or credential restore "
"telemetry not found on the Monkey Island."
)
self.log.add_entry(f"Query for credential restore telem: {pformat(TELEM_QUERY)}")