Agent: Usa agent config object instead of dict automated_master.py

This commit is contained in:
vakaris_zilius 2022-06-23 15:07:33 +00:00 committed by Mike Salvatore
parent 86ed174d74
commit ab67853192
1 changed files with 17 additions and 9 deletions

View File

@ -1,8 +1,9 @@
import logging
import threading
import time
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple
from common.configuration import PluginConfiguration
from common.utils import Timer
from infection_monkey.credential_store import ICredentialsStore
from infection_monkey.i_control_channel import IControlChannel, IslandCommunicationError
@ -188,17 +189,17 @@ class AutomatedMaster(IMaster):
pba_thread.join()
def _collect_credentials(self, collector: str):
credentials = self._puppet.run_credential_collector(collector, {})
def _collect_credentials(self, collector: PluginConfiguration):
credentials = self._puppet.run_credential_collector(collector.name, collector.options)
if credentials:
self._telemetry_messenger.send_telemetry(CredentialsTelem(credentials))
else:
logger.debug(f"No credentials were collected by {collector}")
def _run_pba(self, pba: Tuple[str, Dict]):
name = pba[0]
options = pba[1]
def _run_pba(self, pba: PluginConfiguration):
name = pba.name
options = pba.options
for pba_data in self._puppet.run_pba(name, options):
self._telemetry_messenger.send_telemetry(PostBreachTelem(pba_data))
@ -210,15 +211,22 @@ class AutomatedMaster(IMaster):
self._puppet.run_payload(name, options, self._stop)
def _run_pbas(
self, plugins: Iterable[Any], callback: Callable[[Any], None], custom_pba_options: Mapping
self,
plugins: List[PluginConfiguration],
callback: Callable[[Any], None],
custom_pba_options: Dict,
):
self._run_plugins(plugins, "post-breach action", callback)
if custom_pba_is_enabled(custom_pba_options):
self._run_plugins([("CustomPBA", custom_pba_options)], "post-breach action", callback)
self._run_plugins(
[PluginConfiguration(name="CustomPBA", options=custom_pba_options)],
"post-breach action",
callback,
)
def _run_plugins(
self, plugins: Iterable[Any], plugin_type: str, callback: Callable[[Any], None]
self, plugins: List[PluginConfiguration], plugin_type: str, callback: Callable[[Any], None]
):
logger.info(f"Running {plugin_type}s")
logger.debug(f"Found {len(plugins)} {plugin_type}(s) to run")