Agent: Deduplicate stop logic in AutomatedMaster

This commit is contained in:
Mike Salvatore 2021-12-03 10:21:10 -05:00
parent 839157a822
commit 1b04844e5e
1 changed files with 30 additions and 29 deletions

View File

@ -1,7 +1,7 @@
import logging import logging
import threading import threading
import time import time
from typing import Dict, List from typing import Any, Callable, Dict, List, Tuple
from infection_monkey.i_control_channel import IControlChannel from infection_monkey.i_control_channel import IControlChannel
from infection_monkey.i_master import IMaster from infection_monkey.i_master import IMaster
@ -87,8 +87,12 @@ class AutomatedMaster(IMaster):
config = self._control_channel.get_config() config = self._control_channel.get_config()
system_info_collector_thread = threading.Thread( system_info_collector_thread = threading.Thread(
target=self._collect_system_info, target=self._run_plugins,
args=(config["system_info_collector_classes"],), args=(
config["system_info_collector_classes"],
"system info collector",
self._collect_system_info,
),
daemon=True, daemon=True,
) )
pba_thread = threading.Thread( pba_thread = threading.Thread(
@ -112,7 +116,9 @@ class AutomatedMaster(IMaster):
propagation_thread.join() propagation_thread.join()
payload_thread = threading.Thread( payload_thread = threading.Thread(
target=self._run_payloads, args=(config["payloads"],), daemon=True target=self._run_plugins,
args=(config["payloads"].items(), "payload", self._run_payload),
daemon=True,
) )
payload_thread.start() payload_thread.start()
payload_thread.join() payload_thread.join()
@ -127,24 +133,13 @@ class AutomatedMaster(IMaster):
if self._stop.is_set(): if self._stop.is_set():
break break
def _collect_system_info(self, enabled_collectors: List[str]): def _collect_system_info(self, collector: str):
logger.info("Running system info collectors")
for collector in enabled_collectors:
if self._stop.is_set():
logger.debug("Received a stop signal, skipping remaining system info collectors")
break
logger.info(f"Running system info collector: {collector}")
system_info_telemetry = {} system_info_telemetry = {}
system_info_telemetry[collector] = self._puppet.run_sys_info_collector(collector) system_info_telemetry[collector] = self._puppet.run_sys_info_collector(collector)
self._telemetry_messenger.send_telemetry( self._telemetry_messenger.send_telemetry(
SystemInfoTelem({"collectors": system_info_telemetry}) SystemInfoTelem({"collectors": system_info_telemetry})
) )
logger.info("Finished running system info collectors")
def _run_pbas(self, enabled_pbas: List[str]): def _run_pbas(self, enabled_pbas: List[str]):
pass pass
@ -154,18 +149,24 @@ class AutomatedMaster(IMaster):
def _propagate(self, config: Dict): def _propagate(self, config: Dict):
pass pass
def _run_payloads(self, enabled_payloads: Dict[str, Dict]): def _run_payload(self, payload: Tuple[str, Dict]):
logger.info("Running payloads") name = payload[0]
logger.debug(f"Found {len(enabled_payloads.keys())} payload(s) to run") options = payload[1]
for payload_name, options in enabled_payloads.items(): self._puppet.run_payload(name, options, self._stop)
def _run_plugins(self, plugin: List[Any], plugin_type: str, callback: Callable[[Any], None]):
logger.info(f"Running {plugin_type}s")
logger.debug(f"Found {len(plugin)} {plugin_type}(s) to run")
for p in plugin:
if self._stop.is_set(): if self._stop.is_set():
logger.debug("Received a stop signal, skipping remaining system info collectors") logger.debug(f"Received a stop signal, skipping remaining {plugin_type}s")
break return
self._puppet.run_payload(payload_name, options, self._stop) callback(p)
logger.info("Finished running payloads") logger.info(f"Finished running {plugin_type}s")
def cleanup(self): def cleanup(self):
pass pass