Merge pull request #1803 from guardicore/1788-powershell-ssl-detection
PowerShell SSL detection
This commit is contained in:
commit
8ad31593b1
|
@ -3,19 +3,13 @@ from pathlib import Path
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from infection_monkey.exploit.HostExploiter import HostExploiter
|
from infection_monkey.exploit.HostExploiter import HostExploiter
|
||||||
from infection_monkey.exploit.powershell_utils.auth_options import (
|
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions, get_auth_options
|
||||||
AUTH_NEGOTIATE,
|
|
||||||
ENCRYPTION_AUTO,
|
|
||||||
AuthOptions,
|
|
||||||
get_auth_options,
|
|
||||||
)
|
|
||||||
from infection_monkey.exploit.powershell_utils.credentials import (
|
from infection_monkey.exploit.powershell_utils.credentials import (
|
||||||
Credentials,
|
Credentials,
|
||||||
SecretType,
|
SecretType,
|
||||||
get_credentials,
|
get_credentials,
|
||||||
)
|
)
|
||||||
from infection_monkey.exploit.powershell_utils.powershell_client import (
|
from infection_monkey.exploit.powershell_utils.powershell_client import (
|
||||||
AuthenticationError,
|
|
||||||
IPowerShellClient,
|
IPowerShellClient,
|
||||||
PowerShellClient,
|
PowerShellClient,
|
||||||
)
|
)
|
||||||
|
@ -28,10 +22,6 @@ from infection_monkey.utils.threading import interruptible_iter
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class PowerShellRemotingDisabledError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class RemoteAgentCopyError(Exception):
|
class RemoteAgentCopyError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -49,13 +39,11 @@ class PowerShellExploiter(HostExploiter):
|
||||||
self._client = None
|
self._client = None
|
||||||
|
|
||||||
def _exploit_host(self):
|
def _exploit_host(self):
|
||||||
try:
|
if not self._any_powershell_port_is_open():
|
||||||
use_ssl = self._is_client_using_https()
|
message = "PowerShell Remoting appears to be disabled on the remote host"
|
||||||
except PowerShellRemotingDisabledError as e:
|
self.exploit_result.error_message = message
|
||||||
logger.info(e)
|
logger.debug(message)
|
||||||
self.exploit_result.error_message = (
|
|
||||||
"PowerShell Remoting appears to be disabled on the remote host"
|
|
||||||
)
|
|
||||||
return self.exploit_result
|
return self.exploit_result
|
||||||
|
|
||||||
credentials = get_credentials(
|
credentials = get_credentials(
|
||||||
|
@ -66,7 +54,7 @@ class PowerShellExploiter(HostExploiter):
|
||||||
is_windows_os(),
|
is_windows_os(),
|
||||||
)
|
)
|
||||||
|
|
||||||
auth_options = [get_auth_options(creds, use_ssl) for creds in credentials]
|
auth_options = [get_auth_options(creds, self.host) for creds in credentials]
|
||||||
|
|
||||||
self._client = self._authenticate_via_brute_force(credentials, auth_options)
|
self._client = self._authenticate_via_brute_force(credentials, auth_options)
|
||||||
|
|
||||||
|
@ -89,55 +77,21 @@ class PowerShellExploiter(HostExploiter):
|
||||||
|
|
||||||
return self.exploit_result
|
return self.exploit_result
|
||||||
|
|
||||||
def _is_client_using_https(self) -> bool:
|
def _any_powershell_port_is_open(self) -> bool:
|
||||||
try:
|
return self._http_powershell_port_is_open() or self._https_powershell_port_is_open()
|
||||||
logger.debug("Checking if powershell remoting is enabled over HTTP.")
|
|
||||||
self._try_http()
|
|
||||||
return False
|
|
||||||
except AuthenticationError:
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"Powershell remoting over HTTP seems disabled: {e}")
|
|
||||||
|
|
||||||
try:
|
def _http_powershell_port_is_open(self) -> bool:
|
||||||
logger.debug("Checking if powershell remoting is enabled over HTTPS.")
|
return "tcp-5985" in self.host.services
|
||||||
self._try_https()
|
|
||||||
return True
|
|
||||||
except AuthenticationError:
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"Powershell remoting over HTTPS seems disabled: {e}")
|
|
||||||
raise PowerShellRemotingDisabledError("Powershell remoting seems to be disabled.")
|
|
||||||
|
|
||||||
def _try_http(self):
|
def _https_powershell_port_is_open(self) -> bool:
|
||||||
self._try_ssl_login(use_ssl=False)
|
return "tcp-5986" in self.host.services
|
||||||
|
|
||||||
def _try_https(self):
|
|
||||||
self._try_ssl_login(use_ssl=True)
|
|
||||||
|
|
||||||
def _try_ssl_login(self, use_ssl: bool):
|
|
||||||
# '.\' is machine qualifier if the user is in the local domain
|
|
||||||
# which happens if we try to exploit a machine on second hop
|
|
||||||
credentials = Credentials(
|
|
||||||
username=".\\dummy_username",
|
|
||||||
secret="dummy_password",
|
|
||||||
secret_type=SecretType.PASSWORD,
|
|
||||||
)
|
|
||||||
|
|
||||||
auth_options = AuthOptions(
|
|
||||||
auth_type=AUTH_NEGOTIATE,
|
|
||||||
encryption=ENCRYPTION_AUTO,
|
|
||||||
ssl=use_ssl,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: Report login attempt or find a better way of detecting if SSL is enabled
|
|
||||||
client = PowerShellClient(self.host.ip_addr, credentials, auth_options)
|
|
||||||
client.connect()
|
|
||||||
|
|
||||||
def _authenticate_via_brute_force(
|
def _authenticate_via_brute_force(
|
||||||
self, credentials: List[Credentials], auth_options: List[AuthOptions]
|
self, credentials: List[Credentials], auth_options: List[AuthOptions]
|
||||||
) -> Optional[IPowerShellClient]:
|
) -> Optional[IPowerShellClient]:
|
||||||
for (creds, opts) in interruptible_iter(zip(credentials, auth_options), self.interrupt):
|
creds_opts_pairs = filter(self.check_ssl_setting_is_valid, zip(credentials, auth_options))
|
||||||
|
for (creds, opts) in interruptible_iter(creds_opts_pairs, self.interrupt):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
client = PowerShellClient(self.host.ip_addr, creds, opts)
|
client = PowerShellClient(self.host.ip_addr, creds, opts)
|
||||||
client.connect()
|
client.connect()
|
||||||
|
@ -159,6 +113,17 @@ class PowerShellExploiter(HostExploiter):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def check_ssl_setting_is_valid(self, creds_opts_pair):
|
||||||
|
opts = creds_opts_pair[1]
|
||||||
|
|
||||||
|
if opts.ssl and not self._https_powershell_port_is_open():
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not opts.ssl and not self._http_powershell_port_is_open():
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def _report_login_attempt(self, result: bool, credentials: Credentials):
|
def _report_login_attempt(self, result: bool, credentials: Credentials):
|
||||||
if credentials.secret_type in [SecretType.PASSWORD, SecretType.CACHED]:
|
if credentials.secret_type in [SecretType.PASSWORD, SecretType.CACHED]:
|
||||||
self.report_login_attempt(result, credentials.username, password=credentials.secret)
|
self.report_login_attempt(result, credentials.username, password=credentials.secret)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from infection_monkey.exploit.powershell_utils.credentials import Credentials, SecretType
|
from infection_monkey.exploit.powershell_utils.credentials import Credentials, SecretType
|
||||||
|
from infection_monkey.model.host import VictimHost
|
||||||
|
|
||||||
AUTH_BASIC = "basic"
|
AUTH_BASIC = "basic"
|
||||||
AUTH_NEGOTIATE = "negotiate"
|
AUTH_NEGOTIATE = "negotiate"
|
||||||
|
@ -16,17 +17,27 @@ class AuthOptions:
|
||||||
ssl: bool
|
ssl: bool
|
||||||
|
|
||||||
|
|
||||||
def get_auth_options(credentials: Credentials, use_ssl: bool) -> AuthOptions:
|
def get_auth_options(credentials: Credentials, host: VictimHost) -> AuthOptions:
|
||||||
ssl = _get_ssl(credentials, use_ssl)
|
ssl = _get_ssl(credentials, host)
|
||||||
auth_type = _get_auth_type(credentials)
|
auth_type = _get_auth_type(credentials)
|
||||||
encryption = _get_encryption(credentials)
|
encryption = _get_encryption(credentials)
|
||||||
|
|
||||||
return AuthOptions(auth_type, encryption, ssl)
|
return AuthOptions(auth_type, encryption, ssl)
|
||||||
|
|
||||||
|
|
||||||
def _get_ssl(credentials: Credentials, use_ssl):
|
def _get_ssl(credentials: Credentials, host: VictimHost) -> bool:
|
||||||
# Passwordless login only works with SSL false, AUTH_BASIC and ENCRYPTION_NEVER
|
# Passwordless login only works with SSL false, AUTH_BASIC and ENCRYPTION_NEVER
|
||||||
return False if credentials.secret == "" else use_ssl
|
if credentials.secret == "":
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if default PSRemoting ports are open. Prefer with SSL, if both are.
|
||||||
|
if "tcp-5986" in host.services: # Default for HTTPS
|
||||||
|
return True
|
||||||
|
|
||||||
|
if "tcp-5985" in host.services: # Default for HTTP
|
||||||
|
return False
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _get_auth_type(credentials: Credentials):
|
def _get_auth_type(credentials: Credentials):
|
||||||
|
|
|
@ -91,6 +91,8 @@ INTERNAL = {
|
||||||
3306,
|
3306,
|
||||||
7001,
|
7001,
|
||||||
8088,
|
8088,
|
||||||
|
5985,
|
||||||
|
5986,
|
||||||
],
|
],
|
||||||
"description": "List of TCP ports the monkey will check whether "
|
"description": "List of TCP ports the monkey will check whether "
|
||||||
"they're open",
|
"they're open",
|
||||||
|
|
|
@ -19,3 +19,37 @@ def patch_win32api_get_user_name(local_user):
|
||||||
win32api.NameSamCompatible = None
|
win32api.NameSamCompatible = None
|
||||||
|
|
||||||
sys.modules["win32api"] = win32api
|
sys.modules["win32api"] = win32api
|
||||||
|
|
||||||
|
|
||||||
|
def _create_windows_host(http_enabled, https_enabled):
|
||||||
|
host = MagicMock()
|
||||||
|
host.os = {"type": "windows"}
|
||||||
|
host.services = {}
|
||||||
|
|
||||||
|
if http_enabled:
|
||||||
|
host.services["tcp-5985"] = {}
|
||||||
|
|
||||||
|
if https_enabled:
|
||||||
|
host.services["tcp-5986"] = {}
|
||||||
|
|
||||||
|
return host
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def https_only_host():
|
||||||
|
return _create_windows_host(False, True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def http_only_host():
|
||||||
|
return _create_windows_host(True, False)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def http_and_https_both_enabled_host():
|
||||||
|
return _create_windows_host(True, True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def powershell_disabled_host():
|
||||||
|
return _create_windows_host(False, False)
|
||||||
|
|
|
@ -16,85 +16,96 @@ CREDENTIALS_LM_HASH = Credentials("user4", "LM_HASH:NONE", SecretType.LM_HASH)
|
||||||
CREDENTIALS_NT_HASH = Credentials("user5", "NONE:NT_HASH", SecretType.NT_HASH)
|
CREDENTIALS_NT_HASH = Credentials("user5", "NONE:NT_HASH", SecretType.NT_HASH)
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__ssl_true_with_password():
|
def test_get_auth_options__ssl_false_with_no_open_ports(powershell_disabled_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, use_ssl=True)
|
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, powershell_disabled_host)
|
||||||
|
assert auth_options.ssl is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_auth_options__ssl_true_with_password(https_only_host):
|
||||||
|
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, https_only_host)
|
||||||
|
|
||||||
assert auth_options.ssl
|
assert auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__ssl_true_empty_password():
|
def test_get_auth_options__ssl_preferred(http_and_https_both_enabled_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, use_ssl=True)
|
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, http_and_https_both_enabled_host)
|
||||||
|
|
||||||
assert not auth_options.ssl
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__ssl_true_none_password():
|
|
||||||
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, use_ssl=True)
|
|
||||||
|
|
||||||
assert auth_options.ssl
|
assert auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__ssl_false_with_password():
|
def test_get_auth_options__ssl_true_empty_password(https_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, https_only_host)
|
||||||
|
|
||||||
assert not auth_options.ssl
|
assert not auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__ssl_false_empty_password():
|
def test_get_auth_options__ssl_true_none_password(https_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, https_only_host)
|
||||||
|
|
||||||
|
assert auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_auth_options__ssl_false_with_password(http_only_host):
|
||||||
|
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert not auth_options.ssl
|
assert not auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__ssl_false_none_password():
|
def test_get_auth_options__ssl_false_empty_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert not auth_options.ssl
|
assert not auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__auth_type_with_password():
|
def test_get_auth_options__ssl_false_none_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, http_only_host)
|
||||||
|
|
||||||
|
assert not auth_options.ssl
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_auth_options__auth_type_with_password(http_only_host):
|
||||||
|
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert auth_options.auth_type == AUTH_NEGOTIATE
|
assert auth_options.auth_type == AUTH_NEGOTIATE
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__auth_type_empty_password():
|
def test_get_auth_options__auth_type_empty_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert auth_options.auth_type == AUTH_BASIC
|
assert auth_options.auth_type == AUTH_BASIC
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__auth_type_none_password():
|
def test_get_auth_options__auth_type_none_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert auth_options.auth_type == AUTH_NEGOTIATE
|
assert auth_options.auth_type == AUTH_NEGOTIATE
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__auth_type_with_LM_hash():
|
def test_get_auth_options__auth_type_with_LM_hash(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_LM_HASH, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_LM_HASH, http_only_host)
|
||||||
|
|
||||||
assert auth_options.auth_type == AUTH_NTLM
|
assert auth_options.auth_type == AUTH_NTLM
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__auth_type_with_NT_hash():
|
def test_get_auth_options__auth_type_with_NT_hash(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_NT_HASH, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_NT_HASH, http_only_host)
|
||||||
|
|
||||||
assert auth_options.auth_type == AUTH_NTLM
|
assert auth_options.auth_type == AUTH_NTLM
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__encryption_with_password():
|
def test_get_auth_options__encryption_with_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_WITH_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert auth_options.encryption == ENCRYPTION_AUTO
|
assert auth_options.encryption == ENCRYPTION_AUTO
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__encryption_empty_password():
|
def test_get_auth_options__encryption_empty_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_EMPTY_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert auth_options.encryption == ENCRYPTION_NEVER
|
assert auth_options.encryption == ENCRYPTION_NEVER
|
||||||
|
|
||||||
|
|
||||||
def test_get_auth_options__encryption_none_password():
|
def test_get_auth_options__encryption_none_password(http_only_host):
|
||||||
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, use_ssl=False)
|
auth_options = get_auth_options(CREDENTIALS_NONE_PASSWORD, http_only_host)
|
||||||
|
|
||||||
assert auth_options.encryption == ENCRYPTION_AUTO
|
assert auth_options.encryption == ENCRYPTION_AUTO
|
||||||
|
|
|
@ -5,8 +5,6 @@ from unittest.mock import MagicMock
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from infection_monkey.exploit import powershell
|
from infection_monkey.exploit import powershell
|
||||||
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
|
|
||||||
from infection_monkey.exploit.powershell_utils.credentials import Credentials
|
|
||||||
from infection_monkey.model.host import VictimHost
|
from infection_monkey.model.host import VictimHost
|
||||||
|
|
||||||
# Use the path_win32api_get_user_name fixture for all tests in this module
|
# Use the path_win32api_get_user_name fixture for all tests in this module
|
||||||
|
@ -19,19 +17,12 @@ NT_HASH_LIST = ["bogo_nt_1", "bogo_nt_2"]
|
||||||
DROPPER_TARGET_PATH_64 = "C:\\agent64"
|
DROPPER_TARGET_PATH_64 = "C:\\agent64"
|
||||||
|
|
||||||
|
|
||||||
class AuthenticationErrorForTests(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
mock_agent_repository = MagicMock()
|
mock_agent_repository = MagicMock()
|
||||||
mock_agent_repository.get_agent_binary.return_value = BytesIO(b"BINARY_EXECUTABLE")
|
mock_agent_repository.get_agent_binary.return_value = BytesIO(b"BINARY_EXECUTABLE")
|
||||||
|
|
||||||
victim_host = VictimHost("127.0.0.1")
|
|
||||||
victim_host.os["type"] = "windows"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def powershell_arguments():
|
def powershell_arguments(http_and_https_both_enabled_host):
|
||||||
options = {
|
options = {
|
||||||
"dropper_target_path_win_64": DROPPER_TARGET_PATH_64,
|
"dropper_target_path_win_64": DROPPER_TARGET_PATH_64,
|
||||||
"credentials": {
|
"credentials": {
|
||||||
|
@ -42,7 +33,7 @@ def powershell_arguments():
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
arguments = {
|
arguments = {
|
||||||
"host": victim_host,
|
"host": http_and_https_both_enabled_host,
|
||||||
"options": options,
|
"options": options,
|
||||||
"current_depth": 2,
|
"current_depth": 2,
|
||||||
"telemetry_messenger": MagicMock(),
|
"telemetry_messenger": MagicMock(),
|
||||||
|
@ -56,18 +47,13 @@ def powershell_arguments():
|
||||||
def powershell_exploiter(monkeypatch):
|
def powershell_exploiter(monkeypatch):
|
||||||
pe = powershell.PowerShellExploiter()
|
pe = powershell.PowerShellExploiter()
|
||||||
|
|
||||||
monkeypatch.setattr(powershell, "AuthenticationError", AuthenticationErrorForTests)
|
|
||||||
monkeypatch.setattr(powershell, "is_windows_os", lambda: True)
|
monkeypatch.setattr(powershell, "is_windows_os", lambda: True)
|
||||||
|
|
||||||
return pe
|
return pe
|
||||||
|
|
||||||
|
|
||||||
def test_powershell_disabled(monkeypatch, powershell_exploiter, powershell_arguments):
|
def test_powershell_disabled(powershell_exploiter, powershell_arguments, powershell_disabled_host):
|
||||||
mock_powershell_client = MagicMock()
|
powershell_arguments["host"] = powershell_disabled_host
|
||||||
mock_powershell_client.connect = MagicMock(side_effect=Exception)
|
|
||||||
monkeypatch.setattr(
|
|
||||||
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
|
||||||
)
|
|
||||||
|
|
||||||
exploit_result = powershell_exploiter.exploit_host(**powershell_arguments)
|
exploit_result = powershell_exploiter.exploit_host(**powershell_arguments)
|
||||||
assert not exploit_result.exploitation_success
|
assert not exploit_result.exploitation_success
|
||||||
|
@ -75,15 +61,10 @@ def test_powershell_disabled(monkeypatch, powershell_exploiter, powershell_argum
|
||||||
assert "disabled" in exploit_result.error_message
|
assert "disabled" in exploit_result.error_message
|
||||||
|
|
||||||
|
|
||||||
def test_powershell_http(monkeypatch, powershell_exploiter, powershell_arguments):
|
def test_powershell_http(monkeypatch, powershell_exploiter, powershell_arguments, http_only_host):
|
||||||
def allow_http(_, credentials: Credentials, auth_options: AuthOptions):
|
powershell_arguments["host"] = http_only_host
|
||||||
if not auth_options.ssl:
|
|
||||||
raise AuthenticationErrorForTests
|
|
||||||
else:
|
|
||||||
raise Exception
|
|
||||||
|
|
||||||
mock_powershell_client = MagicMock()
|
mock_powershell_client = MagicMock()
|
||||||
mock_powershell_client.connect = MagicMock(side_effect=allow_http)
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
||||||
)
|
)
|
||||||
|
@ -94,29 +75,28 @@ def test_powershell_http(monkeypatch, powershell_exploiter, powershell_arguments
|
||||||
assert not call_args[0][2].ssl
|
assert not call_args[0][2].ssl
|
||||||
|
|
||||||
|
|
||||||
def test_powershell_https(monkeypatch, powershell_exploiter, powershell_arguments):
|
def test_powershell_https(monkeypatch, powershell_exploiter, powershell_arguments, https_only_host):
|
||||||
def allow_https(_, credentials: Credentials, auth_options: AuthOptions):
|
|
||||||
if auth_options.ssl:
|
|
||||||
raise AuthenticationErrorForTests
|
|
||||||
else:
|
|
||||||
raise Exception
|
|
||||||
|
|
||||||
mock_powershell_client = MagicMock()
|
mock_powershell_client = MagicMock()
|
||||||
mock_powershell_client.connect = MagicMock(side_effect=allow_https)
|
mock_powershell_client.connect = MagicMock(side_effect=Exception("Failed login"))
|
||||||
monkeypatch.setattr(
|
mock_powershell_client_constructor = MagicMock(return_value=mock_powershell_client)
|
||||||
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client_constructor)
|
||||||
)
|
|
||||||
|
|
||||||
powershell_exploiter.exploit_host(**powershell_arguments)
|
powershell_exploiter.exploit_host(**powershell_arguments)
|
||||||
|
|
||||||
for call_args in mock_powershell_client.call_args_list:
|
non_ssl_calls = 0
|
||||||
if call_args[0][1].secret != "" and call_args[0][1].secret != "dummy_password":
|
for call_args in mock_powershell_client_constructor.call_args_list:
|
||||||
|
if call_args[0][1].secret != "":
|
||||||
assert call_args[0][2].ssl
|
assert call_args[0][2].ssl
|
||||||
|
else:
|
||||||
|
assert not call_args[0][2].ssl
|
||||||
|
non_ssl_calls += 1
|
||||||
|
|
||||||
|
assert non_ssl_calls > 0
|
||||||
|
|
||||||
|
|
||||||
def test_no_valid_credentials(monkeypatch, powershell_exploiter, powershell_arguments):
|
def test_no_valid_credentials(monkeypatch, powershell_exploiter, powershell_arguments):
|
||||||
mock_powershell_client = MagicMock()
|
mock_powershell_client = MagicMock()
|
||||||
mock_powershell_client.connect = MagicMock(side_effect=AuthenticationErrorForTests)
|
mock_powershell_client.connect = MagicMock(side_effect=Exception("Failed login"))
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
||||||
)
|
)
|
||||||
|
@ -127,16 +107,6 @@ def test_no_valid_credentials(monkeypatch, powershell_exploiter, powershell_argu
|
||||||
assert "Unable to authenticate" in exploit_result.error_message
|
assert "Unable to authenticate" in exploit_result.error_message
|
||||||
|
|
||||||
|
|
||||||
def authenticate(mock_client):
|
|
||||||
def inner(_, credentials: Credentials, auth_options: AuthOptions):
|
|
||||||
if credentials.username == "user1" and credentials.secret == "pass2":
|
|
||||||
return mock_client
|
|
||||||
else:
|
|
||||||
raise AuthenticationErrorForTests("Invalid credentials")
|
|
||||||
|
|
||||||
return inner
|
|
||||||
|
|
||||||
|
|
||||||
def test_successful_copy(monkeypatch, powershell_exploiter, powershell_arguments):
|
def test_successful_copy(monkeypatch, powershell_exploiter, powershell_arguments):
|
||||||
mock_client = MagicMock()
|
mock_client = MagicMock()
|
||||||
|
|
||||||
|
@ -188,11 +158,9 @@ def test_successful_propagation(monkeypatch, powershell_exploiter, powershell_ar
|
||||||
|
|
||||||
|
|
||||||
def test_login_attempts_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments):
|
def test_login_attempts_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments):
|
||||||
# 1st call is for determining HTTP/HTTPs. 6 remaining calls are actual login attempts. the 6th
|
# First 5 login attempts fail. The 6th is successful.
|
||||||
# login attempt doesn't throw an exception, signifying that login with credentials was
|
connection_attempts = [Exception, Exception, Exception, Exception, Exception, True]
|
||||||
# successful.
|
mock_powershell_client = MagicMock()
|
||||||
connection_attempts = [True, Exception, Exception, Exception, Exception, Exception, True]
|
|
||||||
mock_powershell_client = MagicMock(side_effect=connection_attempts)
|
|
||||||
mock_powershell_client.connect = MagicMock(side_effect=connection_attempts)
|
mock_powershell_client.connect = MagicMock(side_effect=connection_attempts)
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
powershell, "PowerShellClient", MagicMock(return_value=mock_powershell_client)
|
||||||
|
@ -219,3 +187,22 @@ def test_build_monkey_execution_command():
|
||||||
|
|
||||||
assert f"-d {depth}" in cmd
|
assert f"-d {depth}" in cmd
|
||||||
assert executable_path in cmd
|
assert executable_path in cmd
|
||||||
|
|
||||||
|
|
||||||
|
def test_skip_http_only_logins(
|
||||||
|
monkeypatch, powershell_exploiter, powershell_arguments, https_only_host
|
||||||
|
):
|
||||||
|
# Only HTTPS is enabled on the destination, so we should never try to connect with "" empty
|
||||||
|
# password, since connection with empty password requires SSL == False.
|
||||||
|
powershell_arguments["host"] = https_only_host
|
||||||
|
|
||||||
|
mock_powershell_client = MagicMock()
|
||||||
|
mock_powershell_client.connect = MagicMock(side_effect=Exception("Failed login"))
|
||||||
|
mock_powershell_client_constructor = MagicMock(return_value=mock_powershell_client)
|
||||||
|
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client_constructor)
|
||||||
|
|
||||||
|
powershell_exploiter.exploit_host(**powershell_arguments)
|
||||||
|
|
||||||
|
for call_args in mock_powershell_client_constructor.call_args_list:
|
||||||
|
assert call_args[0][1].secret != ""
|
||||||
|
assert call_args[0][2].ssl
|
||||||
|
|
Loading…
Reference in New Issue