From 3b094d0478e813a1a5b9d2cf94cf3c5dbedc3f41 Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Mon, 14 Mar 2022 12:11:28 -0400 Subject: [PATCH] Agent: Move test for successful login to PowerShellClient The current powershell client does not alert the caller that login was unsuccessful until an attempt is made to execute a command. This is likely a detail that is specific to the underlying pypsrp. This detail should be abstracted away from the PowerShellExploiter so that the PowerShellExploiter is not dealing with implementation details of the PowerShellClient. --- monkey/infection_monkey/exploit/powershell.py | 40 ++++++------------- .../powershell_utils/powershell_client.py | 4 ++ .../exploit/test_powershell.py | 37 +++++++---------- 3 files changed, 32 insertions(+), 49 deletions(-) diff --git a/monkey/infection_monkey/exploit/powershell.py b/monkey/infection_monkey/exploit/powershell.py index 7466a115e..e8999545a 100644 --- a/monkey/infection_monkey/exploit/powershell.py +++ b/monkey/infection_monkey/exploit/powershell.py @@ -128,7 +128,7 @@ class PowerShellExploiter(HostExploiter): encryption=ENCRYPTION_AUTO, ssl=use_ssl, ) - # TODO: Report login attempt + # TODO: Report login attempt or find a better way of detecting if SSL is enabled PowerShellClient(self.host.ip_addr, credentials, auth_options) @@ -138,36 +138,22 @@ class PowerShellExploiter(HostExploiter): for (creds, opts) in zip(credentials, auth_options): try: client = PowerShellClient(self.host.ip_addr, creds, opts) - if self._is_client_auth_valid(creds, client): - return client - except Exception as ex: - logger.warning( - "An unexpected error occurred while trying to authenticate " - f"to {self.host.ip_addr}: {ex}" + logger.info( + f"Successfully logged into {self.host.ip_addr} using Powershell. User: " + f"{creds.username}, Secret Type: {creds.secret_type.name}" ) + self._report_login_attempt(True, creds) + return client + except Exception as ex: + logger.debug( + f"Error logging into {self.host.ip_addr} using Powershell. User: " + f"{creds.username}, SecretType: {creds.secret_type.name} -- Error: {ex}" + ) + self._report_login_attempt(False, creds) + return None - def _is_client_auth_valid(self, creds: Credentials, client: IPowerShellClient) -> bool: - try: - # attempt to execute dir command to know if authentication was successful - client.execute_cmd("dir") - - logger.info( - f"Successfully logged into {self.host.ip_addr} using Powershell. User: " - f"{creds.username}, Secret Type: {creds.secret_type.name}" - ) - self._report_login_attempt(True, creds) - - return True - except Exception as ex: # noqa: F841 - logger.debug( - f"Error logging into {self.host.ip_addr} using Powershell. User: " - f"{creds.username}, SecretType: {creds.secret_type.name} -- Error: {ex}" - ) - self._report_login_attempt(False, creds) - return False - def _report_login_attempt(self, result: bool, credentials: Credentials): if credentials.secret_type in [SecretType.PASSWORD, SecretType.CACHED]: self.report_login_attempt(result, credentials.username, password=credentials.secret) diff --git a/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py b/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py index 7993687de..82c073ff2 100644 --- a/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py +++ b/monkey/infection_monkey/exploit/powershell_utils/powershell_client.py @@ -83,6 +83,10 @@ class PowerShellClient(IPowerShellClient): connection_timeout=CONNECTION_TIMEOUT, ) + # Attempt to execute dir command to know if authentication was successful. This will raise + # an exception if authentication was not successful. + self.execute_cmd("dir") + def execute_cmd(self, cmd: str) -> str: output, _, _ = self._client.execute_cmd(cmd) return output diff --git a/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py b/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py index de1fa265a..5af0dd617 100644 --- a/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py +++ b/monkey/tests/unit_tests/infection_monkey/exploit/test_powershell.py @@ -114,7 +114,6 @@ def authenticate(mock_client): def test_successful_copy(monkeypatch, powershell_exploiter, powershell_arguments): mock_client = MagicMock() - mock_client.return_value.copy_file = MagicMock(return_value=True) monkeypatch.setattr(powershell, "PowerShellClient", mock_client) @@ -137,7 +136,6 @@ def test_failed_copy(monkeypatch, powershell_exploiter, powershell_arguments): def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_arguments): mock_client = MagicMock() - mock_client.copy_file = MagicMock(return_value=True) mock_client.execute_cmd_as_detached_process = MagicMock( side_effect=Exception("EXECUTION FAILED") ) @@ -151,29 +149,24 @@ def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_a assert "execute" in exploit_result.error_message -def test_login_attemps_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments): - mock_client = MagicMock() - mock_client.return_value.copy_file = MagicMock(return_value=True) - - # execute_cmd method will throw exceptions for 5 first calls. - # 6-th call doesn't throw an exception == credentials successful - execute_cmd_returns = [Exception, Exception, Exception, Exception, Exception, True] - mock_client.return_value.execute_cmd = MagicMock(side_effect=execute_cmd_returns) - +def test_login_attempts_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments): + # 1st call is for determining HTTP/HTTPs. 6 remaining calls are actual login attempts. the 6th + # login attempt doesn't throw an exception, signifying that login with credentials was + # successful. + connection_attempts = [True, Exception, Exception, Exception, Exception, Exception, True] + mock_client = MagicMock(side_effect=connection_attempts) monkeypatch.setattr(powershell, "PowerShellClient", mock_client) - powershell_exploiter.exploit_host(**powershell_arguments) + exploit_result = powershell_exploiter.exploit_host(**powershell_arguments) - # Total 6 attempts reported, 5 failed and 1 succeeded - assert len(powershell_exploiter.exploit_attempts) == len(execute_cmd_returns) - assert ( - len([attempt for attempt in powershell_exploiter.exploit_attempts if not attempt["result"]]) - == 5 - ) - assert ( - len([attempt for attempt in powershell_exploiter.exploit_attempts if attempt["result"]]) - == 1 - ) + successful_attempts = [attempt for attempt in exploit_result.attempts if attempt["result"]] + unsuccessful_attempts = [ + attempt for attempt in exploit_result.attempts if not attempt["result"] + ] + + assert len(exploit_result.attempts) == 6 + assert len(unsuccessful_attempts) == 5 + assert len(successful_attempts) == 1 def test_build_monkey_execution_command():