Agent: Improve responsiveness of AutomatedMaster shutdown

This commit is contained in:
Mike Salvatore 2021-12-02 10:11:19 -05:00
parent 9279d82adf
commit 4fc18ae750
1 changed files with 22 additions and 8 deletions

View File

@ -8,8 +8,10 @@ from infection_monkey.i_master import IMaster
from infection_monkey.i_puppet import IPuppet from infection_monkey.i_puppet import IPuppet
from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger from infection_monkey.telemetry.messengers.i_telemetry_messenger import ITelemetryMessenger
from infection_monkey.telemetry.system_info_telem import SystemInfoTelem from infection_monkey.telemetry.system_info_telem import SystemInfoTelem
from infection_monkey.utils.timer import Timer
CHECK_FOR_STOP_INTERVAL_SEC = 5 CHECK_ISLAND_FOR_STOP_COMMAND_INTERVAL_SEC = 5
CHECK_FOR_TERMINATE_INTERVAL_SEC = CHECK_ISLAND_FOR_STOP_COMMAND_INTERVAL_SEC / 5
SHUTDOWN_TIMEOUT = 5 SHUTDOWN_TIMEOUT = 5
logger = logging.getLogger() logger = logging.getLogger()
@ -28,6 +30,7 @@ class AutomatedMaster(IMaster):
self._stop = threading.Event() self._stop = threading.Event()
self._master_thread = threading.Thread(target=self._run_master_thread, daemon=True) self._master_thread = threading.Thread(target=self._run_master_thread, daemon=True)
self._simulation_thread = threading.Thread(target=self._run_simulation, daemon=True)
def start(self): def start(self):
logger.info("Starting automated breach and attack simulation") logger.info("Starting automated breach and attack simulation")
@ -48,22 +51,33 @@ class AutomatedMaster(IMaster):
self._master_thread.join() self._master_thread.join()
def _run_master_thread(self): def _run_master_thread(self):
_simulation_thread = threading.Thread(target=self._run_simulation, daemon=True) self._simulation_thread.start()
_simulation_thread.start()
while (not self._stop.is_set()) and _simulation_thread.is_alive(): self._wait_for_master_stop_condition()
time.sleep(CHECK_FOR_STOP_INTERVAL_SEC)
self._check_for_stop()
logger.debug("Waiting for the simulation thread to stop") logger.debug("Waiting for the simulation thread to stop")
_simulation_thread.join(SHUTDOWN_TIMEOUT) self._simulation_thread.join(SHUTDOWN_TIMEOUT)
if _simulation_thread.is_alive(): if self._simulation_thread.is_alive():
logger.warn("Timed out waiting for the simulation to stop") logger.warn("Timed out waiting for the simulation to stop")
# Since the master thread is a Daemon thread, it will be forcefully # Since the master thread is a Daemon thread, it will be forcefully
# killed when the program exits. # killed when the program exits.
logger.warn("Forcefully killing the simulation") logger.warn("Forcefully killing the simulation")
def _wait_for_master_stop_condition(self):
timer = Timer()
timer.set(CHECK_ISLAND_FOR_STOP_COMMAND_INTERVAL_SEC)
while self._master_thread_should_run():
if timer.is_expired():
self._check_for_stop()
timer.reset()
time.sleep(CHECK_FOR_TERMINATE_INTERVAL_SEC)
def _master_thread_should_run(self):
return (not self._stop.is_set()) and self._simulation_thread.is_alive()
def _run_simulation(self): def _run_simulation(self):
config = self._control_channel.get_config() config = self._control_channel.get_config()