From 1b04844e5efa13fe46172365be4d34400d234f5d Mon Sep 17 00:00:00 2001 From: Mike Salvatore Date: Fri, 3 Dec 2021 10:21:10 -0500 Subject: [PATCH] Agent: Deduplicate stop logic in AutomatedMaster --- .../master/automated_master.py | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/monkey/infection_monkey/master/automated_master.py b/monkey/infection_monkey/master/automated_master.py index d50f242c1..42ab4d285 100644 --- a/monkey/infection_monkey/master/automated_master.py +++ b/monkey/infection_monkey/master/automated_master.py @@ -1,7 +1,7 @@ import logging import threading import time -from typing import Dict, List +from typing import Any, Callable, Dict, List, Tuple from infection_monkey.i_control_channel import IControlChannel from infection_monkey.i_master import IMaster @@ -87,8 +87,12 @@ class AutomatedMaster(IMaster): config = self._control_channel.get_config() system_info_collector_thread = threading.Thread( - target=self._collect_system_info, - args=(config["system_info_collector_classes"],), + target=self._run_plugins, + args=( + config["system_info_collector_classes"], + "system info collector", + self._collect_system_info, + ), daemon=True, ) pba_thread = threading.Thread( @@ -112,7 +116,9 @@ class AutomatedMaster(IMaster): propagation_thread.join() payload_thread = threading.Thread( - target=self._run_payloads, args=(config["payloads"],), daemon=True + target=self._run_plugins, + args=(config["payloads"].items(), "payload", self._run_payload), + daemon=True, ) payload_thread.start() payload_thread.join() @@ -127,23 +133,12 @@ class AutomatedMaster(IMaster): if self._stop.is_set(): break - def _collect_system_info(self, enabled_collectors: List[str]): - logger.info("Running system info collectors") - - for collector in enabled_collectors: - if self._stop.is_set(): - logger.debug("Received a stop signal, skipping remaining system info collectors") - break - - logger.info(f"Running system info collector: {collector}") - - system_info_telemetry = {} - system_info_telemetry[collector] = self._puppet.run_sys_info_collector(collector) - self._telemetry_messenger.send_telemetry( - SystemInfoTelem({"collectors": system_info_telemetry}) - ) - - logger.info("Finished running system info collectors") + def _collect_system_info(self, collector: str): + system_info_telemetry = {} + system_info_telemetry[collector] = self._puppet.run_sys_info_collector(collector) + self._telemetry_messenger.send_telemetry( + SystemInfoTelem({"collectors": system_info_telemetry}) + ) def _run_pbas(self, enabled_pbas: List[str]): pass @@ -154,18 +149,24 @@ class AutomatedMaster(IMaster): def _propagate(self, config: Dict): pass - def _run_payloads(self, enabled_payloads: Dict[str, Dict]): - logger.info("Running payloads") - logger.debug(f"Found {len(enabled_payloads.keys())} payload(s) to run") + def _run_payload(self, payload: Tuple[str, Dict]): + name = payload[0] + options = payload[1] - for payload_name, options in enabled_payloads.items(): + self._puppet.run_payload(name, options, self._stop) + + def _run_plugins(self, plugin: List[Any], plugin_type: str, callback: Callable[[Any], None]): + logger.info(f"Running {plugin_type}s") + logger.debug(f"Found {len(plugin)} {plugin_type}(s) to run") + + for p in plugin: if self._stop.is_set(): - logger.debug("Received a stop signal, skipping remaining system info collectors") - break + logger.debug(f"Received a stop signal, skipping remaining {plugin_type}s") + return - self._puppet.run_payload(payload_name, options, self._stop) + callback(p) - logger.info("Finished running payloads") + logger.info(f"Finished running {plugin_type}s") def cleanup(self): pass