Merge pull request #1656 from guardicore/1598-concrete-puppet

Add `load_plugin` functions and create a concrete puppet class
This commit is contained in:
Mike Salvatore 2021-12-14 10:55:32 -05:00 committed by GitHub
commit 8423a064bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 2 deletions

View File

@ -2,7 +2,9 @@ import abc
import threading
from collections import namedtuple
from enum import Enum
from typing import Dict
from typing import Dict, Tuple
from infection_monkey.puppet.plugin_type import PluginType
class PortStatus(Enum):
@ -10,6 +12,10 @@ class PortStatus(Enum):
CLOSED = 2
class UnknownPluginError(Exception):
pass
ExploiterResultData = namedtuple("ExploiterResultData", ["result", "info", "attempts"])
PingScanData = namedtuple("PingScanData", ["response_received", "os"])
PortScanData = namedtuple("PortScanData", ["port", "status", "banner", "service"])
@ -18,6 +24,14 @@ PostBreachData = namedtuple("PostBreachData", ["command", "result"])
class IPuppet(metaclass=abc.ABCMeta):
@abc.abstractmethod
def load_plugin(self, plugin: object, plugin_type: PluginType) -> None:
"""
Loads a plugin into the puppet.
:param object plugin: The plugin object to load
:param PluginType plugin_type: The type of plugin being loaded
"""
@abc.abstractmethod
def run_sys_info_collector(self, name: str) -> Dict:
"""
@ -91,7 +105,9 @@ class IPuppet(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def run_payload(self, name: str, options: Dict, interrupt: threading.Event) -> None:
def run_payload(
self, name: str, options: Dict, interrupt: threading.Event
) -> Tuple[None, bool, str]:
"""
Runs a payload
:param str name: The name of the payload to run

View File

@ -11,6 +11,7 @@ from infection_monkey.i_puppet import (
PortStatus,
PostBreachData,
)
from infection_monkey.puppet.plugin_type import PluginType
DOT_1 = "10.0.0.1"
DOT_2 = "10.0.0.2"
@ -21,6 +22,9 @@ logger = logging.getLogger()
class MockPuppet(IPuppet):
def load_plugin(self, plugin: object, plugin_type: PluginType) -> None:
logger.debug(f"load_plugin({plugin}, {plugin_type})")
def run_sys_info_collector(self, name: str) -> Dict:
logger.debug(f"run_sys_info_collector({name})")
# TODO: More collectors

View File

@ -0,0 +1,9 @@
from enum import Enum
class PluginType(Enum):
EXPLOITER = "Exploiter"
FINGERPRINTER = "Fingerprinter"
PAYLOAD = "Payload"
POST_BREACH_ACTION = "PBA"
SYSTEM_INFO_COLLECTOR = "SystemInfoCollector"

View File

@ -0,0 +1,54 @@
import logging
import threading
from typing import Dict, Tuple
from infection_monkey.i_puppet import (
ExploiterResultData,
FingerprintData,
IPuppet,
PingScanData,
PortScanData,
PostBreachData,
)
from infection_monkey.puppet.plugin_type import PluginType
logger = logging.getLogger()
class Puppet(IPuppet):
def load_plugin(self, plugin: object, plugin_type: PluginType) -> None:
pass
def run_sys_info_collector(self, name: str) -> Dict:
pass
def run_pba(self, name: str, options: Dict) -> PostBreachData:
pass
def ping(self, host: str, timeout: float = 1) -> PingScanData:
pass
def scan_tcp_port(self, host: str, port: int, timeout: float = 3) -> PortScanData:
pass
def fingerprint(
self,
name: str,
host: str,
ping_scan_data: PingScanData,
port_scan_data: Dict[int, PortScanData],
) -> FingerprintData:
pass
def exploit_host(
self, name: str, host: str, options: Dict, interrupt: threading.Event
) -> ExploiterResultData:
pass
def run_payload(
self, name: str, options: Dict, interrupt: threading.Event
) -> Tuple[None, bool, str]:
pass
def cleanup(self) -> None:
pass