Agent: Add long timeout to clear command history PBA

This commit is contained in:
Ilija Lazoroski 2022-03-30 19:41:37 +02:00 committed by Mike Salvatore
parent 649404d50f
commit 2e389cc87e
2 changed files with 40 additions and 19 deletions

View File

@ -1,7 +1,8 @@
import subprocess import subprocess
from typing import Dict from typing import Dict, Iterable, Tuple
from common.common_consts.post_breach_consts import POST_BREACH_CLEAR_CMD_HISTORY from common.common_consts.post_breach_consts import POST_BREACH_CLEAR_CMD_HISTORY
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from infection_monkey.i_puppet.i_puppet import PostBreachData from infection_monkey.i_puppet.i_puppet import PostBreachData
from infection_monkey.post_breach.clear_command_history.clear_command_history import ( from infection_monkey.post_breach.clear_command_history.clear_command_history import (
get_commands_to_clear_command_history, get_commands_to_clear_command_history,
@ -14,7 +15,7 @@ class ClearCommandHistory(PBA):
def __init__(self, telemetry_messenger: ITelemetryMessenger): def __init__(self, telemetry_messenger: ITelemetryMessenger):
super().__init__(telemetry_messenger, name=POST_BREACH_CLEAR_CMD_HISTORY) super().__init__(telemetry_messenger, name=POST_BREACH_CLEAR_CMD_HISTORY)
def run(self, options: Dict): def run(self, options: Dict) -> Iterable[PostBreachData]:
results = [pba.run() for pba in self.clear_command_history_pba_list()] results = [pba.run() for pba in self.clear_command_history_pba_list()]
if results: if results:
# `self.command` is empty here # `self.command` is empty here
@ -22,11 +23,11 @@ class ClearCommandHistory(PBA):
return self.pba_data return self.pba_data
def clear_command_history_pba_list(self): def clear_command_history_pba_list(self) -> Iterable[PBA]:
return self.CommandHistoryPBAGenerator().get_clear_command_history_pbas() return self.CommandHistoryPBAGenerator().get_clear_command_history_pbas()
class CommandHistoryPBAGenerator: class CommandHistoryPBAGenerator:
def get_clear_command_history_pbas(self): def get_clear_command_history_pbas(self) -> Iterable[PBA]:
( (
cmds_for_linux, cmds_for_linux,
command_history_files_for_linux, command_history_files_for_linux,
@ -52,13 +53,18 @@ class ClearCommandHistory(PBA):
linux_cmd=linux_cmds, linux_cmd=linux_cmds,
) )
def run(self): def run(self) -> Tuple[str, bool]:
if self.command: if self.command:
try: try:
output = subprocess.check_output( # noqa: DUO116 output = subprocess.check_output( # noqa: DUO116
self.command, stderr=subprocess.STDOUT, shell=True self.command,
stderr=subprocess.STDOUT,
shell=True,
timeout=LONG_REQUEST_TIMEOUT,
).decode() ).decode()
return output, True return output, True
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as err:
# Return error output of the command # Return error output of the command
return e.output.decode(), False return err.output.decode(), False
except subprocess.TimeoutExpired as err:
return str(err), False

View File

@ -1,11 +1,17 @@
import logging
import subprocess import subprocess
from typing import Iterable
from common.common_consts.post_breach_consts import POST_BREACH_CLEAR_CMD_HISTORY
from common.common_consts.timeouts import LONG_REQUEST_TIMEOUT
from infection_monkey.utils.environment import is_windows_os from infection_monkey.utils.environment import is_windows_os
logger = logging.getLogger(__name__)
def get_linux_commands_to_clear_command_history():
def get_linux_commands_to_clear_command_history() -> Iterable[str]:
if is_windows_os(): if is_windows_os():
return "" return []
TEMP_HIST_FILE = "$HOME/monkey-temp-hist-file" TEMP_HIST_FILE = "$HOME/monkey-temp-hist-file"
@ -20,7 +26,7 @@ def get_linux_commands_to_clear_command_history():
] ]
def get_linux_command_history_files(): def get_linux_command_history_files() -> Iterable[str]:
if is_windows_os(): if is_windows_os():
return [] return []
@ -41,17 +47,26 @@ def get_linux_command_history_files():
return STARTUP_FILES return STARTUP_FILES
def get_linux_usernames(): def get_linux_usernames() -> Iterable[str]:
if is_windows_os(): if is_windows_os():
return [] return []
# get list of usernames # get list of usernames
try:
USERS = ( USERS = (
subprocess.check_output( # noqa: DUO116 subprocess.check_output( # noqa: DUO116
"cut -d: -f1,3 /etc/passwd | egrep ':[0-9]{4}$' | cut -d: -f1", shell=True "cut -d: -f1,3 /etc/passwd | egrep ':[0-9]{4}$' | cut -d: -f1",
shell=True,
timeout=LONG_REQUEST_TIMEOUT,
) )
.decode() .decode()
.split("\n")[:-1] .split("\n")[:-1]
) )
return USERS return USERS
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as err:
logger.error(
f"An exception occured on fetching linux usernames,"
f"PBA: {POST_BREACH_CLEAR_CMD_HISTORY}: {str(err)}"
)
return []