Agent: Move test for successful login to PowerShellClient

The current powershell client does not alert the caller that login was
unsuccessful until an attempt is made to execute a command. This is
likely a detail that is specific to the underlying pypsrp. This detail
should be abstracted away from the PowerShellExploiter so that the
PowerShellExploiter is not dealing with implementation details of the
PowerShellClient.
This commit is contained in:
Mike Salvatore 2022-03-14 12:11:28 -04:00 committed by Ilija Lazoroski
parent df572d84c0
commit 3b094d0478
3 changed files with 32 additions and 49 deletions

View File

@ -128,7 +128,7 @@ class PowerShellExploiter(HostExploiter):
encryption=ENCRYPTION_AUTO, encryption=ENCRYPTION_AUTO,
ssl=use_ssl, ssl=use_ssl,
) )
# TODO: Report login attempt # TODO: Report login attempt or find a better way of detecting if SSL is enabled
PowerShellClient(self.host.ip_addr, credentials, auth_options) PowerShellClient(self.host.ip_addr, credentials, auth_options)
@ -138,35 +138,21 @@ class PowerShellExploiter(HostExploiter):
for (creds, opts) in zip(credentials, auth_options): for (creds, opts) in zip(credentials, auth_options):
try: try:
client = PowerShellClient(self.host.ip_addr, creds, opts) client = PowerShellClient(self.host.ip_addr, creds, opts)
if self._is_client_auth_valid(creds, client):
return client
except Exception as ex:
logger.warning(
"An unexpected error occurred while trying to authenticate "
f"to {self.host.ip_addr}: {ex}"
)
return None
def _is_client_auth_valid(self, creds: Credentials, client: IPowerShellClient) -> bool:
try:
# attempt to execute dir command to know if authentication was successful
client.execute_cmd("dir")
logger.info( logger.info(
f"Successfully logged into {self.host.ip_addr} using Powershell. User: " f"Successfully logged into {self.host.ip_addr} using Powershell. User: "
f"{creds.username}, Secret Type: {creds.secret_type.name}" f"{creds.username}, Secret Type: {creds.secret_type.name}"
) )
self._report_login_attempt(True, creds)
return True self._report_login_attempt(True, creds)
except Exception as ex: # noqa: F841 return client
except Exception as ex:
logger.debug( logger.debug(
f"Error logging into {self.host.ip_addr} using Powershell. User: " f"Error logging into {self.host.ip_addr} using Powershell. User: "
f"{creds.username}, SecretType: {creds.secret_type.name} -- Error: {ex}" f"{creds.username}, SecretType: {creds.secret_type.name} -- Error: {ex}"
) )
self._report_login_attempt(False, creds) self._report_login_attempt(False, creds)
return False
return None
def _report_login_attempt(self, result: bool, credentials: Credentials): def _report_login_attempt(self, result: bool, credentials: Credentials):
if credentials.secret_type in [SecretType.PASSWORD, SecretType.CACHED]: if credentials.secret_type in [SecretType.PASSWORD, SecretType.CACHED]:

View File

@ -83,6 +83,10 @@ class PowerShellClient(IPowerShellClient):
connection_timeout=CONNECTION_TIMEOUT, connection_timeout=CONNECTION_TIMEOUT,
) )
# Attempt to execute dir command to know if authentication was successful. This will raise
# an exception if authentication was not successful.
self.execute_cmd("dir")
def execute_cmd(self, cmd: str) -> str: def execute_cmd(self, cmd: str) -> str:
output, _, _ = self._client.execute_cmd(cmd) output, _, _ = self._client.execute_cmd(cmd)
return output return output

View File

@ -114,7 +114,6 @@ def authenticate(mock_client):
def test_successful_copy(monkeypatch, powershell_exploiter, powershell_arguments): def test_successful_copy(monkeypatch, powershell_exploiter, powershell_arguments):
mock_client = MagicMock() mock_client = MagicMock()
mock_client.return_value.copy_file = MagicMock(return_value=True)
monkeypatch.setattr(powershell, "PowerShellClient", mock_client) monkeypatch.setattr(powershell, "PowerShellClient", mock_client)
@ -137,7 +136,6 @@ def test_failed_copy(monkeypatch, powershell_exploiter, powershell_arguments):
def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_arguments): def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_arguments):
mock_client = MagicMock() mock_client = MagicMock()
mock_client.copy_file = MagicMock(return_value=True)
mock_client.execute_cmd_as_detached_process = MagicMock( mock_client.execute_cmd_as_detached_process = MagicMock(
side_effect=Exception("EXECUTION FAILED") side_effect=Exception("EXECUTION FAILED")
) )
@ -151,29 +149,24 @@ def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_a
assert "execute" in exploit_result.error_message assert "execute" in exploit_result.error_message
def test_login_attemps_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments): def test_login_attempts_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments):
mock_client = MagicMock() # 1st call is for determining HTTP/HTTPs. 6 remaining calls are actual login attempts. the 6th
mock_client.return_value.copy_file = MagicMock(return_value=True) # login attempt doesn't throw an exception, signifying that login with credentials was
# successful.
# execute_cmd method will throw exceptions for 5 first calls. connection_attempts = [True, Exception, Exception, Exception, Exception, Exception, True]
# 6-th call doesn't throw an exception == credentials successful mock_client = MagicMock(side_effect=connection_attempts)
execute_cmd_returns = [Exception, Exception, Exception, Exception, Exception, True]
mock_client.return_value.execute_cmd = MagicMock(side_effect=execute_cmd_returns)
monkeypatch.setattr(powershell, "PowerShellClient", mock_client) monkeypatch.setattr(powershell, "PowerShellClient", mock_client)
powershell_exploiter.exploit_host(**powershell_arguments) exploit_result = powershell_exploiter.exploit_host(**powershell_arguments)
# Total 6 attempts reported, 5 failed and 1 succeeded successful_attempts = [attempt for attempt in exploit_result.attempts if attempt["result"]]
assert len(powershell_exploiter.exploit_attempts) == len(execute_cmd_returns) unsuccessful_attempts = [
assert ( attempt for attempt in exploit_result.attempts if not attempt["result"]
len([attempt for attempt in powershell_exploiter.exploit_attempts if not attempt["result"]]) ]
== 5
) assert len(exploit_result.attempts) == 6
assert ( assert len(unsuccessful_attempts) == 5
len([attempt for attempt in powershell_exploiter.exploit_attempts if attempt["result"]]) assert len(successful_attempts) == 1
== 1
)
def test_build_monkey_execution_command(): def test_build_monkey_execution_command():