Merge pull request #2229 from guardicore/2217-pydantic-for-agent-configuration

Agent configuration with pydantic
This commit is contained in:
Shreya Malviya 2022-09-02 12:32:31 +05:30 committed by GitHub
commit 3ced1d97d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 377 additions and 647 deletions

View File

@ -5,7 +5,6 @@ from typing import Sequence, Union
from bson import json_util
from common.agent_configuration import AgentConfiguration
from common.credentials import Credentials
from envs.monkey_zoo.blackbox.island_client.monkey_island_requests import MonkeyIslandRequests
from envs.monkey_zoo.blackbox.test_configurations.test_configuration import TestConfiguration
@ -51,7 +50,7 @@ class MonkeyIslandClient(object):
def _import_config(self, test_configuration: TestConfiguration):
response = self.requests.put_json(
"api/agent-configuration",
json=AgentConfiguration.to_mapping(test_configuration.agent_configuration),
json=test_configuration.agent_configuration.dict(simplify=True),
)
if response.ok:
LOGGER.info("Configuration is imported.")

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from common.credentials import Credentials, Password, Username
@ -60,7 +62,7 @@ def _add_subnets(agent_configuration: AgentConfiguration) -> AgentConfiguration:
def _add_credential_collectors(agent_configuration: AgentConfiguration) -> AgentConfiguration:
return add_credential_collectors(
agent_configuration, [PluginConfiguration("MimikatzCollector", {})]
agent_configuration, [PluginConfiguration(name="MimikatzCollector", options={})]
)
@ -76,22 +78,24 @@ def _add_http_ports(agent_configuration: AgentConfiguration) -> AgentConfigurati
return add_http_ports(agent_configuration, HTTP_PORTS)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_fingerprinters(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
test_configuration = _add_credential_collectors(test_configuration)
test_configuration = _add_http_ports(test_configuration)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_fingerprinters(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
test_agent_configuration = _add_credential_collectors(test_agent_configuration)
test_agent_configuration = _add_http_ports(test_agent_configuration)
depth_1_a_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
)
CREDENTIALS = (
Credentials(Username("m0nk3y"), None),
Credentials(None, Password("Ivrrw5zEzs")),
Credentials(None, Password("Xk8VDTsC")),
)
depth_1_a_test_configuration = replace_propagation_credentials(
depth_1_a_test_configuration, CREDENTIALS
depth_1_a_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=depth_1_a_test_configuration, agent_configuration=test_agent_configuration
)
replace_propagation_credentials(
test_configuration=depth_1_a_test_configuration, propagation_credentials=CREDENTIALS
)

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from common.credentials import Credentials, Password, Username
@ -34,20 +36,20 @@ def _add_tcp_ports(agent_configuration: AgentConfiguration) -> AgentConfiguratio
return add_tcp_ports(agent_configuration, ports)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 2)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
depth_2_a_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 2)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
CREDENTIALS = (
Credentials(Username("m0nk3y"), None),
Credentials(None, Password("^NgDvY59~8")),
)
depth_2_a_test_configuration = replace_propagation_credentials(
depth_2_a_test_configuration, CREDENTIALS
depth_2_a_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=depth_2_a_test_configuration, agent_configuration=test_agent_configuration
)
replace_propagation_credentials(
test_configuration=depth_2_a_test_configuration, propagation_credentials=CREDENTIALS
)

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from common.credentials import Credentials, NTHash, Password, Username
@ -48,16 +50,11 @@ def _add_tcp_ports(agent_configuration: AgentConfiguration) -> AgentConfiguratio
return add_tcp_ports(agent_configuration, ports)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 3)
test_configuration = set_keep_tunnel_open_time(test_configuration, 20)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
depth_3_a_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 3)
test_agent_configuration = set_keep_tunnel_open_time(test_agent_configuration, 20)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
CREDENTIALS = (
Credentials(Username("m0nk3y"), None),
@ -70,6 +67,11 @@ CREDENTIALS = (
Credentials(None, NTHash("5da0889ea2081aa79f6852294cba4a5e")),
Credentials(None, NTHash("50c9987a6bf1ac59398df9f911122c9b")),
)
depth_3_a_test_configuration = replace_propagation_credentials(
depth_3_a_test_configuration, CREDENTIALS
depth_3_a_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=depth_3_a_test_configuration, agent_configuration=test_agent_configuration
)
replace_propagation_credentials(
test_configuration=depth_3_a_test_configuration, propagation_credentials=CREDENTIALS
)

View File

@ -12,7 +12,9 @@ from common.agent_configuration import (
from . import TestConfiguration
_custom_pba_configuration = CustomPBAConfiguration("", "", "", "")
_custom_pba_configuration = CustomPBAConfiguration(
linux_command="", linux_filename="", windows_command="", windows_filename=""
)
_tcp_scan_configuration = TCPScanConfiguration(timeout=3.0, ports=[])
_icmp_scan_configuration = ICMPScanConfiguration(timeout=1.0)

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from .noop import noop_test_configuration
@ -30,11 +32,13 @@ def _add_tcp_ports(agent_configuration: AgentConfiguration) -> AgentConfiguratio
return add_tcp_ports(agent_configuration, ports)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
powershell_credentials_reuse_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
powershell_credentials_reuse_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=powershell_credentials_reuse_test_configuration,
agent_configuration=test_agent_configuration,
)

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from common.credentials import Credentials, NTHash, Password, Username
@ -33,16 +35,11 @@ def _add_tcp_ports(agent_configuration: AgentConfiguration) -> AgentConfiguratio
return add_tcp_ports(agent_configuration, ports)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 3)
test_configuration = set_keep_tunnel_open_time(test_configuration, 20)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
smb_pth_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 3)
test_agent_configuration = set_keep_tunnel_open_time(test_agent_configuration, 20)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
CREDENTIALS = (
Credentials(Username("Administrator"), None),
@ -54,6 +51,11 @@ CREDENTIALS = (
Credentials(None, NTHash("5da0889ea2081aa79f6852294cba4a5e")),
Credentials(None, NTHash("50c9987a6bf1ac59398df9f911122c9b")),
)
smb_pth_test_configuration = replace_propagation_credentials(
smb_pth_test_configuration, CREDENTIALS
smb_pth_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=smb_pth_test_configuration, agent_configuration=test_agent_configuration
)
replace_propagation_credentials(
test_configuration=smb_pth_test_configuration, propagation_credentials=CREDENTIALS
)

View File

@ -1,18 +1,8 @@
from dataclasses import replace
from typing import Sequence, Tuple
from common.agent_configuration import (
AgentConfiguration,
ExploitationConfiguration,
ExploitationOptionsConfiguration,
NetworkScanConfiguration,
PluginConfiguration,
PropagationConfiguration,
ScanTargetConfiguration,
)
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from common.credentials import Credentials
from . import TestConfiguration
from envs.monkey_zoo.blackbox.test_configurations.test_configuration import TestConfiguration
def add_exploiters(
@ -20,133 +10,91 @@ def add_exploiters(
brute_force: Sequence[PluginConfiguration] = [],
vulnerability: Sequence[PluginConfiguration] = [],
) -> AgentConfiguration:
exploitation_configuration = replace(
agent_configuration.propagation.exploitation,
brute_force=brute_force,
vulnerability=vulnerability,
)
return replace_exploitation_configuration(agent_configuration, exploitation_configuration)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.propagation.exploitation.brute_force = brute_force
agent_configuration_copy.propagation.exploitation.vulnerability = vulnerability
return agent_configuration_copy
def add_fingerprinters(
agent_configuration: AgentConfiguration, fingerprinters: Sequence[PluginConfiguration]
) -> AgentConfiguration:
network_scan_configuration = replace(
agent_configuration.propagation.network_scan, fingerprinters=fingerprinters
)
return replace_network_scan_configuration(agent_configuration, network_scan_configuration)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.propagation.network_scan.fingerprinters = fingerprinters
return agent_configuration_copy
def add_tcp_ports(
agent_configuration: AgentConfiguration, tcp_ports: Sequence[int]
) -> AgentConfiguration:
tcp_scan_configuration = replace(
agent_configuration.propagation.network_scan.tcp, ports=tuple(tcp_ports)
)
network_scan_configuration = replace(
agent_configuration.propagation.network_scan, tcp=tcp_scan_configuration
)
return replace_network_scan_configuration(agent_configuration, network_scan_configuration)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.propagation.network_scan.tcp.ports = tuple(tcp_ports)
return agent_configuration_copy
def add_subnets(
agent_configuration: AgentConfiguration, subnets: Sequence[str]
) -> AgentConfiguration:
scan_target_configuration = replace(
agent_configuration.propagation.network_scan.targets, subnets=subnets
)
return replace_scan_target_configuration(agent_configuration, scan_target_configuration)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.propagation.network_scan.targets.subnets = subnets
return agent_configuration_copy
def add_credential_collectors(
agent_configuration: AgentConfiguration, credential_collectors: Sequence[PluginConfiguration]
) -> AgentConfiguration:
return replace(agent_configuration, credential_collectors=tuple(credential_collectors))
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.credential_collectors = tuple(credential_collectors)
return agent_configuration_copy
def add_http_ports(
agent_configuration: AgentConfiguration, http_ports: Sequence[int]
) -> AgentConfiguration:
exploitation_options_configuration = agent_configuration.propagation.exploitation.options
exploitation_options_configuration = replace(
exploitation_options_configuration, http_ports=http_ports
)
return replace_exploitation_options_configuration(
agent_configuration, exploitation_options_configuration
)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.propagation.exploitation.options.http_ports = http_ports
return agent_configuration_copy
def set_keep_tunnel_open_time(
agent_configuration: AgentConfiguration, keep_tunnel_open_time: int
) -> AgentConfiguration:
return replace(agent_configuration, keep_tunnel_open_time=keep_tunnel_open_time)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.keep_tunnel_open_time = keep_tunnel_open_time
return agent_configuration_copy
def set_maximum_depth(
agent_configuration: AgentConfiguration, maximum_depth: int
) -> AgentConfiguration:
propagation_configuration = replace(
agent_configuration.propagation, maximum_depth=maximum_depth
)
return replace_propagation_configuration(agent_configuration, propagation_configuration)
agent_configuration_copy = agent_configuration.copy()
agent_configuration_copy.propagation.maximum_depth = maximum_depth
def replace_exploitation_configuration(
agent_configuration: AgentConfiguration, exploitation_configuration: ExploitationConfiguration
) -> AgentConfiguration:
propagation_configuration = replace(
agent_configuration.propagation, exploitation=exploitation_configuration
)
return replace_propagation_configuration(agent_configuration, propagation_configuration)
def replace_scan_target_configuration(
agent_configuration: AgentConfiguration, scan_target_configuration: ScanTargetConfiguration
) -> AgentConfiguration:
network_scan_configuration = replace(
agent_configuration.propagation.network_scan, targets=scan_target_configuration
)
return replace_network_scan_configuration(agent_configuration, network_scan_configuration)
def replace_network_scan_configuration(
agent_configuration: AgentConfiguration, network_scan_configuration: NetworkScanConfiguration
) -> AgentConfiguration:
propagation_configuration = replace(
agent_configuration.propagation, network_scan=network_scan_configuration
)
return replace_propagation_configuration(agent_configuration, propagation_configuration)
def replace_propagation_configuration(
agent_configuration: AgentConfiguration, propagation_configuration: PropagationConfiguration
) -> AgentConfiguration:
return replace(agent_configuration, propagation=propagation_configuration)
def replace_exploitation_options_configuration(
agent_configuration: AgentConfiguration,
exploitation_options_configuration: ExploitationOptionsConfiguration,
) -> AgentConfiguration:
exploitation_configuration = agent_configuration.propagation.exploitation
exploitation_configuration = replace(
exploitation_configuration, options=exploitation_options_configuration
)
return replace_exploitation_configuration(agent_configuration, exploitation_configuration)
return agent_configuration_copy
def replace_agent_configuration(
test_configuration: TestConfiguration, agent_configuration: AgentConfiguration
) -> TestConfiguration:
return replace(test_configuration, agent_configuration=agent_configuration)
):
test_configuration.agent_configuration = agent_configuration
def replace_propagation_credentials(
test_configuration: TestConfiguration, propagation_credentials: Tuple[Credentials, ...]
):
return replace(test_configuration, propagation_credentials=propagation_credentials)
test_configuration.propagation_credentials = propagation_credentials

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from common.credentials import Credentials, Password, Username
@ -31,7 +33,7 @@ def _add_subnets(agent_configuration: AgentConfiguration) -> AgentConfiguration:
def _add_credential_collectors(agent_configuration: AgentConfiguration) -> AgentConfiguration:
return add_credential_collectors(
agent_configuration, [PluginConfiguration("MimikatzCollector", {})]
agent_configuration, [PluginConfiguration(name="MimikatzCollector", options={})]
)
@ -40,17 +42,12 @@ def _add_tcp_ports(agent_configuration: AgentConfiguration) -> AgentConfiguratio
return add_tcp_ports(agent_configuration, ports)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_configuration = _add_credential_collectors(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
test_configuration = _add_credential_collectors(test_configuration)
wmi_mimikatz_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
test_agent_configuration = _add_credential_collectors(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
test_agent_configuration = _add_credential_collectors(test_agent_configuration)
CREDENTIALS = (
Credentials(Username("Administrator"), None),
@ -59,6 +56,11 @@ CREDENTIALS = (
Credentials(None, Password("Ivrrw5zEzs")),
Credentials(None, Password("Password1!")),
)
wmi_mimikatz_test_configuration = replace_propagation_credentials(
wmi_mimikatz_test_configuration, CREDENTIALS
wmi_mimikatz_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=wmi_mimikatz_test_configuration, agent_configuration=test_agent_configuration
)
replace_propagation_credentials(
test_configuration=wmi_mimikatz_test_configuration, propagation_credentials=CREDENTIALS
)

View File

@ -1,3 +1,5 @@
import dataclasses
from common.agent_configuration import AgentConfiguration, PluginConfiguration
from .noop import noop_test_configuration
@ -27,11 +29,12 @@ def _add_subnets(agent_configuration: AgentConfiguration) -> AgentConfiguration:
return add_subnets(agent_configuration, subnets)
test_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_configuration = _add_exploiters(test_configuration)
test_configuration = _add_tcp_ports(test_configuration)
test_configuration = _add_subnets(test_configuration)
test_agent_configuration = set_maximum_depth(noop_test_configuration.agent_configuration, 1)
test_agent_configuration = _add_exploiters(test_agent_configuration)
test_agent_configuration = _add_tcp_ports(test_agent_configuration)
test_agent_configuration = _add_subnets(test_agent_configuration)
zerologon_test_configuration = replace_agent_configuration(
noop_test_configuration, test_configuration
zerologon_test_configuration = dataclasses.replace(noop_test_configuration)
replace_agent_configuration(
test_configuration=zerologon_test_configuration, agent_configuration=test_agent_configuration
)

View File

@ -1,4 +1,4 @@
from .agent_configuration import AgentConfiguration, InvalidConfigurationError
from .agent_configuration import AgentConfiguration
from .agent_sub_configurations import (
CustomPBAConfiguration,
PluginConfiguration,

View File

@ -1,17 +1,9 @@
from __future__ import annotations
from typing import Tuple
from dataclasses import dataclass
from typing import Any, Mapping, Tuple
from pydantic import confloat
from marshmallow import Schema, fields, validate
from marshmallow.exceptions import MarshmallowError
from common.base_models import MutableInfectionMonkeyBaseModel
from ..utils.code_utils import freeze_lists_in_mapping
from .agent_sub_configuration_schemas import (
CustomPBAConfigurationSchema,
PluginConfigurationSchema,
PropagationConfigurationSchema,
)
from .agent_sub_configurations import (
CustomPBAConfiguration,
PluginConfiguration,
@ -19,107 +11,10 @@ from .agent_sub_configurations import (
)
class InvalidConfigurationError(Exception):
def __init__(self, message: str):
self._message = message
def __str__(self) -> str:
return (
f"Cannot construct an AgentConfiguration object with the supplied, invalid data: "
f"{self._message}"
)
@dataclass(frozen=True)
class AgentConfiguration:
"""
A configuration for Infection Monkey agents
Attributes:
:param keep_tunnel_open_time: Maximum time in seconds to keep a tunnel open after
the last exploit
:param custom_pbas: Configuration for custom post-breach actions
:param post_breach_actions: Configuration for post-breach actions
:param credential_collectors: Configuration for credential collectors
:param payloads: Configuration for payloads
:param propagation: Configuration for propagation
"""
keep_tunnel_open_time: float
class AgentConfiguration(MutableInfectionMonkeyBaseModel):
keep_tunnel_open_time: confloat(ge=0)
custom_pbas: CustomPBAConfiguration
post_breach_actions: Tuple[PluginConfiguration, ...]
credential_collectors: Tuple[PluginConfiguration, ...]
payloads: Tuple[PluginConfiguration, ...]
propagation: PropagationConfiguration
def __post_init__(self):
# This will raise an exception if the object is invalid. Calling this in __post__init()
# makes it impossible to construct an invalid object
try:
AgentConfigurationSchema().dump(self)
except Exception as err:
raise InvalidConfigurationError(str(err))
@staticmethod
def from_mapping(config_mapping: Mapping[str, Any]) -> AgentConfiguration:
"""
Construct an AgentConfiguration from a Mapping
:param config_mapping: A Mapping that represents an AgentConfiguration
:return: An AgentConfiguration
:raises: InvalidConfigurationError if the provided Mapping does not represent a valid
AgentConfiguration
"""
try:
config_dict = AgentConfigurationSchema().load(config_mapping)
config_dict = freeze_lists_in_mapping(config_dict)
return AgentConfiguration(**config_dict)
except MarshmallowError as err:
raise InvalidConfigurationError(str(err))
@staticmethod
def from_json(config_json: str) -> AgentConfiguration:
"""
Construct an AgentConfiguration from a JSON string
:param config_json: A JSON string that represents an AgentConfiguration
:return: An AgentConfiguration
:raises: InvalidConfigurationError if the provided JSON does not represent a valid
AgentConfiguration
"""
try:
config_dict = AgentConfigurationSchema().loads(config_json)
config_dict = freeze_lists_in_mapping(config_dict)
return AgentConfiguration(**config_dict)
except MarshmallowError as err:
raise InvalidConfigurationError(str(err))
@staticmethod
def to_mapping(config: AgentConfiguration) -> Mapping[str, Any]:
"""
Serialize an AgentConfiguration to a Mapping
:param config: An AgentConfiguration
:return: A Mapping that represents the AgentConfiguration
"""
return AgentConfigurationSchema().dump(config)
@staticmethod
def to_json(config: AgentConfiguration) -> str:
"""
Serialize an AgentConfiguration to JSON
:param config: An AgentConfiguration
:return: A JSON string that represents the AgentConfiguration
"""
return AgentConfigurationSchema().dumps(config)
class AgentConfigurationSchema(Schema):
keep_tunnel_open_time = fields.Float(validate=validate.Range(min=0))
custom_pbas = fields.Nested(CustomPBAConfigurationSchema)
post_breach_actions = fields.List(fields.Nested(PluginConfigurationSchema))
credential_collectors = fields.List(fields.Nested(PluginConfigurationSchema))
payloads = fields.List(fields.Nested(PluginConfigurationSchema))
propagation = fields.Nested(PropagationConfigurationSchema)

View File

@ -1,112 +0,0 @@
from marshmallow import Schema, fields, post_load, validate
from .agent_sub_configurations import (
CustomPBAConfiguration,
ExploitationConfiguration,
ExploitationOptionsConfiguration,
ICMPScanConfiguration,
NetworkScanConfiguration,
PluginConfiguration,
PropagationConfiguration,
ScanTargetConfiguration,
TCPScanConfiguration,
)
from .utils import freeze_lists
from .validators import (
validate_ip,
validate_linux_filename,
validate_subnet_range,
validate_windows_filename,
)
class CustomPBAConfigurationSchema(Schema):
linux_command = fields.Str()
linux_filename = fields.Str(validate=validate_linux_filename)
windows_command = fields.Str()
windows_filename = fields.Str(validate=validate_windows_filename)
@post_load
def _make_custom_pba_configuration(self, data, **kwargs):
return CustomPBAConfiguration(**data)
class PluginConfigurationSchema(Schema):
name = fields.Str()
options = fields.Mapping()
@post_load
def _make_plugin_configuration(self, data, **kwargs):
return PluginConfiguration(**data)
class ScanTargetConfigurationSchema(Schema):
blocked_ips = fields.List(fields.Str(validate=validate_ip))
inaccessible_subnets = fields.List(fields.Str(validate=validate_subnet_range))
local_network_scan = fields.Bool()
subnets = fields.List(fields.Str(validate=validate_subnet_range))
@post_load
@freeze_lists
def _make_scan_target_configuration(self, data, **kwargs):
return ScanTargetConfiguration(**data)
class ICMPScanConfigurationSchema(Schema):
timeout = fields.Float(validate=validate.Range(min=0))
@post_load
def _make_icmp_scan_configuration(self, data, **kwargs):
return ICMPScanConfiguration(**data)
class TCPScanConfigurationSchema(Schema):
timeout = fields.Float(validate=validate.Range(min=0))
ports = fields.List(fields.Int(validate=validate.Range(min=0, max=65535)))
@post_load
@freeze_lists
def _make_tcp_scan_configuration(self, data, **kwargs):
return TCPScanConfiguration(**data)
class NetworkScanConfigurationSchema(Schema):
tcp = fields.Nested(TCPScanConfigurationSchema)
icmp = fields.Nested(ICMPScanConfigurationSchema)
fingerprinters = fields.List(fields.Nested(PluginConfigurationSchema))
targets = fields.Nested(ScanTargetConfigurationSchema)
@post_load
@freeze_lists
def _make_network_scan_configuration(self, data, **kwargs):
return NetworkScanConfiguration(**data)
class ExploitationOptionsConfigurationSchema(Schema):
http_ports = fields.List(fields.Int(validate=validate.Range(min=0, max=65535)))
@post_load
@freeze_lists
def _make_exploitation_options_configuration(self, data, **kwargs):
return ExploitationOptionsConfiguration(**data)
class ExploitationConfigurationSchema(Schema):
options = fields.Nested(ExploitationOptionsConfigurationSchema)
brute_force = fields.List(fields.Nested(PluginConfigurationSchema))
vulnerability = fields.List(fields.Nested(PluginConfigurationSchema))
@post_load
@freeze_lists
def _make_exploitation_options_configuration(self, data, **kwargs):
return ExploitationConfiguration(**data)
class PropagationConfigurationSchema(Schema):
maximum_depth = fields.Int(validate=validate.Range(min=0))
network_scan = fields.Nested(NetworkScanConfigurationSchema)
exploitation = fields.Nested(ExploitationConfigurationSchema)
@post_load
def _make_propagation_configuration(self, data, **kwargs):
return PropagationConfiguration(**data)

View File

@ -1,9 +1,18 @@
from dataclasses import dataclass
from typing import Dict, Tuple
from pydantic import PositiveFloat, conint, validator
@dataclass(frozen=True)
class CustomPBAConfiguration:
from common.base_models import MutableInfectionMonkeyBaseModel
from .validators import (
validate_ip,
validate_linux_filename,
validate_subnet_range,
validate_windows_filename,
)
class CustomPBAConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for custom post-breach actions
@ -24,9 +33,18 @@ class CustomPBAConfiguration:
windows_command: str
windows_filename: str
@validator("linux_filename")
def linux_filename_valid(cls, filename):
validate_linux_filename(filename)
return filename
@dataclass(frozen=True)
class PluginConfiguration:
@validator("windows_filename")
def windows_filename_valid(cls, filename):
validate_windows_filename(filename)
return filename
class PluginConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for plugins
@ -52,8 +70,7 @@ class PluginConfiguration:
options: Dict
@dataclass(frozen=True)
class ScanTargetConfiguration:
class ScanTargetConfiguration(MutableInfectionMonkeyBaseModel):
"""
Configuration of network targets to scan and exploit
@ -73,9 +90,23 @@ class ScanTargetConfiguration:
local_network_scan: bool
subnets: Tuple[str, ...]
@validator("blocked_ips", each_item=True)
def blocked_ips_valid(cls, ip):
validate_ip(ip)
return ip
@dataclass(frozen=True)
class ICMPScanConfiguration:
@validator("inaccessible_subnets", each_item=True)
def inaccessible_subnets_valid(cls, subnet_range):
validate_subnet_range(subnet_range)
return subnet_range
@validator("subnets", each_item=True)
def subnets_valid(cls, subnet_range):
validate_subnet_range(subnet_range)
return subnet_range
class ICMPScanConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for ICMP scanning
@ -83,11 +114,10 @@ class ICMPScanConfiguration:
:param timeout: Maximum time in seconds to wait for a response from the target
"""
timeout: float
timeout: PositiveFloat
@dataclass(frozen=True)
class TCPScanConfiguration:
class TCPScanConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for TCP scanning
@ -96,12 +126,11 @@ class TCPScanConfiguration:
:param ports: Ports to scan
"""
timeout: float
ports: Tuple[int, ...]
timeout: PositiveFloat
ports: Tuple[conint(ge=0, le=65535), ...]
@dataclass(frozen=True)
class NetworkScanConfiguration:
class NetworkScanConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for network scanning
@ -118,8 +147,7 @@ class NetworkScanConfiguration:
targets: ScanTargetConfiguration
@dataclass(frozen=True)
class ExploitationOptionsConfiguration:
class ExploitationOptionsConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for exploitation options
@ -127,11 +155,10 @@ class ExploitationOptionsConfiguration:
:param http_ports: HTTP ports to exploit
"""
http_ports: Tuple[int, ...]
http_ports: Tuple[conint(ge=0, le=65535), ...]
@dataclass(frozen=True)
class ExploitationConfiguration:
class ExploitationConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for exploitation
@ -146,8 +173,7 @@ class ExploitationConfiguration:
vulnerability: Tuple[PluginConfiguration, ...]
@dataclass(frozen=True)
class PropagationConfiguration:
class PropagationConfiguration(MutableInfectionMonkeyBaseModel):
"""
A configuration for propagation
@ -159,6 +185,6 @@ class PropagationConfiguration:
:param exploitation: Configuration for exploitation
"""
maximum_depth: int
maximum_depth: conint(ge=0)
network_scan: NetworkScanConfiguration
exploitation: ExploitationConfiguration

View File

@ -1,5 +1,3 @@
import dataclasses
from . import AgentConfiguration
from .agent_sub_configurations import (
CustomPBAConfiguration,
@ -27,9 +25,9 @@ PBAS = (
CREDENTIAL_COLLECTORS = ("MimikatzCollector", "SSHCollector")
PBA_CONFIGURATION = tuple(PluginConfiguration(pba, {}) for pba in PBAS)
PBA_CONFIGURATION = tuple(PluginConfiguration(name=pba, options={}) for pba in PBAS)
CREDENTIAL_COLLECTOR_CONFIGURATION = tuple(
PluginConfiguration(collector, {}) for collector in CREDENTIAL_COLLECTORS
PluginConfiguration(name=collector, options={}) for collector in CREDENTIAL_COLLECTORS
)
RANSOMWARE_OPTIONS = {
@ -41,7 +39,7 @@ RANSOMWARE_OPTIONS = {
"other_behaviors": {"readme": True},
}
PAYLOAD_CONFIGURATION = tuple([PluginConfiguration("ransomware", RANSOMWARE_OPTIONS)])
PAYLOAD_CONFIGURATION = tuple([PluginConfiguration(name="ransomware", options=RANSOMWARE_OPTIONS)])
CUSTOM_PBA_CONFIGURATION = CustomPBAConfiguration(
linux_command="", linux_filename="", windows_command="", windows_filename=""
@ -71,35 +69,42 @@ TCP_SCAN_CONFIGURATION = TCPScanConfiguration(timeout=3.0, ports=TCP_PORTS)
ICMP_CONFIGURATION = ICMPScanConfiguration(timeout=1.0)
HTTP_PORTS = (80, 443, 7001, 8008, 8080, 8983, 9200, 9600)
FINGERPRINTERS = (
PluginConfiguration("elastic", {}),
PluginConfiguration(name="elastic", options={}),
# Plugin configuration option contents are not converted to tuples
PluginConfiguration("http", {"http_ports": list(HTTP_PORTS)}),
PluginConfiguration("mssql", {}),
PluginConfiguration("smb", {}),
PluginConfiguration("ssh", {}),
PluginConfiguration(name="http", options={"http_ports": list(HTTP_PORTS)}),
PluginConfiguration(name="mssql", options={}),
PluginConfiguration(name="smb", options={}),
PluginConfiguration(name="ssh", options={}),
)
SCAN_TARGET_CONFIGURATION = ScanTargetConfiguration(tuple(), tuple(), True, tuple())
SCAN_TARGET_CONFIGURATION = ScanTargetConfiguration(
blocked_ips=tuple(), inaccessible_subnets=tuple(), local_network_scan=True, subnets=tuple()
)
NETWORK_SCAN_CONFIGURATION = NetworkScanConfiguration(
TCP_SCAN_CONFIGURATION, ICMP_CONFIGURATION, FINGERPRINTERS, SCAN_TARGET_CONFIGURATION
tcp=TCP_SCAN_CONFIGURATION,
icmp=ICMP_CONFIGURATION,
fingerprinters=FINGERPRINTERS,
targets=SCAN_TARGET_CONFIGURATION,
)
EXPLOITATION_OPTIONS_CONFIGURATION = ExploitationOptionsConfiguration(HTTP_PORTS)
EXPLOITATION_OPTIONS_CONFIGURATION = ExploitationOptionsConfiguration(http_ports=HTTP_PORTS)
BRUTE_FORCE_EXPLOITERS = (
PluginConfiguration("MSSQLExploiter", {}),
PluginConfiguration("PowerShellExploiter", {}),
PluginConfiguration("SSHExploiter", {}),
PluginConfiguration("SmbExploiter", {"smb_download_timeout": 30}),
PluginConfiguration("WmiExploiter", {"smb_download_timeout": 30}),
PluginConfiguration(name="MSSQLExploiter", options={}),
PluginConfiguration(name="PowerShellExploiter", options={}),
PluginConfiguration(name="SSHExploiter", options={}),
PluginConfiguration(name="SmbExploiter", options={"smb_download_timeout": 30}),
PluginConfiguration(name="WmiExploiter", options={"smb_download_timeout": 30}),
)
VULNERABILITY_EXPLOITERS = (
PluginConfiguration("Log4ShellExploiter", {}),
PluginConfiguration("HadoopExploiter", {}),
PluginConfiguration(name="Log4ShellExploiter", options={}),
PluginConfiguration(name="HadoopExploiter", options={}),
)
EXPLOITATION_CONFIGURATION = ExploitationConfiguration(
EXPLOITATION_OPTIONS_CONFIGURATION, BRUTE_FORCE_EXPLOITERS, VULNERABILITY_EXPLOITERS
options=EXPLOITATION_OPTIONS_CONFIGURATION,
brute_force=BRUTE_FORCE_EXPLOITERS,
vulnerability=VULNERABILITY_EXPLOITERS,
)
PROPAGATION_CONFIGURATION = PropagationConfiguration(
@ -117,6 +122,6 @@ DEFAULT_AGENT_CONFIGURATION = AgentConfiguration(
propagation=PROPAGATION_CONFIGURATION,
)
DEFAULT_RANSOMWARE_AGENT_CONFIGURATION = dataclasses.replace(
DEFAULT_AGENT_CONFIGURATION, post_breach_actions=tuple()
DEFAULT_RANSOMWARE_AGENT_CONFIGURATION = DEFAULT_AGENT_CONFIGURATION.copy(
update={"post_breach_actions": tuple()}
)

View File

@ -1,13 +0,0 @@
from functools import wraps
from typing import Callable
from common.utils.code_utils import freeze_lists_in_mapping
def freeze_lists(function: Callable):
@wraps(function)
def wrapper(self, data, **kwargs):
data = freeze_lists_in_mapping(data)
return function(self, data, **kwargs)
return wrapper

View File

@ -1,24 +1,22 @@
import re
from pathlib import PureWindowsPath
from marshmallow import ValidationError
_valid_windows_filename_regex = re.compile(r"^[^<>:\"\\\/|?*]*[^<>:\"\\\/|?* \.]+$|^$")
_valid_linux_filename_regex = re.compile(r"^[^\0/]*$")
def validate_linux_filename(linux_filename: str):
if not re.match(_valid_linux_filename_regex, linux_filename):
raise ValidationError(f"Invalid Unix filename {linux_filename}: illegal characters")
raise ValueError(f"Invalid Unix filename {linux_filename}: illegal characters")
def validate_windows_filename(windows_filename: str):
_validate_windows_filename_not_reserved(windows_filename)
if not re.match(_valid_windows_filename_regex, windows_filename):
raise ValidationError(f"Invalid Windows filename {windows_filename}: illegal characters")
raise ValueError(f"Invalid Windows filename {windows_filename}: illegal characters")
def _validate_windows_filename_not_reserved(windows_filename: str):
# filename shouldn't start with any of these and be followed by a period
if PureWindowsPath(windows_filename).is_reserved():
raise ValidationError(f"Invalid Windows filename {windows_filename}: reserved name used")
raise ValueError(f"Invalid Windows filename {windows_filename}: reserved name used")

View File

@ -1,38 +1,36 @@
import re
from ipaddress import AddressValueError, IPv4Address, IPv4Network, NetmaskValueError
from marshmallow import ValidationError
def validate_subnet_range(subnet_range: str):
try:
return validate_ip(subnet_range)
except ValidationError:
except ValueError:
pass
try:
return validate_ip_range(subnet_range)
except ValidationError:
except ValueError:
pass
try:
return validate_ip_network(subnet_range)
except ValidationError:
except ValueError:
pass
try:
return validate_hostname(subnet_range)
except ValidationError:
raise ValidationError(f"Invalid subnet range {subnet_range}")
except ValueError:
raise ValueError(f"Invalid subnet range {subnet_range}")
def validate_hostname(hostname: str):
# Based on hostname syntax: https://www.rfc-editor.org/rfc/rfc1123#page-13
hostname_segments = hostname.split(".")
if any((part.endswith("-") or part.startswith("-") for part in hostname_segments)):
raise ValidationError(f"Hostname segment can't start or end with a hyphen: {hostname}")
raise ValueError(f"Hostname segment can't start or end with a hyphen: {hostname}")
if not any((char.isalpha() for char in hostname_segments[-1])):
raise ValidationError(f"Last segment of a hostname must contain a letter: {hostname}")
raise ValueError(f"Last segment of a hostname must contain a letter: {hostname}")
valid_characters_pattern = r"^[A-Za-z0-9\-]+$"
valid_characters_regex = re.compile(valid_characters_pattern)
@ -41,21 +39,21 @@ def validate_hostname(hostname: str):
)
if not all(matches):
raise ValidationError(f"Hostname contains invalid characters: {hostname}")
raise ValueError(f"Hostname contains invalid characters: {hostname}")
def validate_ip_network(ip_network: str):
try:
IPv4Network(ip_network, strict=False)
except (NetmaskValueError, AddressValueError):
raise ValidationError(f"Invalid IPv4 network {ip_network}")
raise ValueError(f"Invalid IPv4 network {ip_network}")
def validate_ip_range(ip_range: str):
ip_range = ip_range.replace(" ", "")
ips = ip_range.split("-")
if len(ips) != 2:
raise ValidationError(f"Invalid IP range {ip_range}")
raise ValueError(f"Invalid IP range {ip_range}")
validate_ip(ips[0])
validate_ip(ips[1])
@ -64,4 +62,4 @@ def validate_ip(ip: str):
try:
IPv4Address(ip)
except AddressValueError:
raise ValidationError(f"Invalid IP address {ip}")
raise ValueError(f"Invalid IP address {ip}")

View File

@ -1,5 +1,4 @@
import queue
from collections.abc import MutableSequence
from typing import Any, List, MutableMapping, TypeVar
T = TypeVar("T")
@ -49,10 +48,3 @@ def del_key(mapping: MutableMapping[T, Any], key: T):
del mapping[key]
except KeyError:
pass
def freeze_lists_in_mapping(mapping: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
for key, value in mapping.items():
if isinstance(value, MutableSequence):
mapping[key] = tuple(value)
return mapping

View File

@ -32,9 +32,3 @@ class FindingWithoutDetailsError(Exception):
class DomainControllerNameFetchError(FailedExploitationError):
"""Raise on failed attempt to extract domain controller's name"""
# TODO: This has been replaced by common.configuration.InvalidConfigurationError. Use that error
# instead and remove this one.
class InvalidConfigurationError(Exception):
"""Raise when configuration is invalid"""

View File

@ -93,9 +93,11 @@ class ControlChannel(IControlChannel):
)
response.raise_for_status()
logger.debug(f"Received configuration:\n{pformat(json.loads(response.text))}")
config_dict = json.loads(response.text)
return AgentConfiguration.from_json(response.text)
logger.debug(f"Received configuration:\n{pformat(config_dict)}")
return AgentConfiguration(**config_dict)
except (
json.JSONDecodeError,
requests.exceptions.ConnectionError,

View File

@ -94,7 +94,7 @@ class Exploiter:
# This order allows exploiter-specific options to
# override general options for all exploiters.
options = {**exploitation_config.options.__dict__, **exploiter.options}
extended_exploiters.append(PluginConfiguration(exploiter.name, options))
extended_exploiters.append(PluginConfiguration(name=exploiter.name, options=options))
return extended_exploiters

View File

@ -1,5 +1,4 @@
import logging
from dataclasses import replace
from ipaddress import IPv4Interface
from queue import Queue
from threading import Event
@ -93,9 +92,9 @@ class Propagator:
modified_options = fingerprinter.options.copy()
modified_options["http_ports"] = list(http_ports)
modified_fingerprinters[i] = replace(fingerprinter, options=modified_options)
modified_fingerprinters[i] = fingerprinter.copy(update={"options": modified_options})
return replace(network_scan, fingerprinters=modified_fingerprinters)
return network_scan.copy(update={"fingerprinters": modified_fingerprinters})
def _scan_network(self, scan_config: NetworkScanConfiguration, stop: Event):
logger.info("Starting network scan")

View File

@ -1,4 +1,5 @@
import io
import json
from common.agent_configuration import AgentConfiguration
from monkey_island.cc import repository
@ -23,14 +24,14 @@ class FileAgentConfigurationRepository(IAgentConfigurationRepository):
with self._file_repository.open_file(AGENT_CONFIGURATION_FILE_NAME) as f:
configuration_json = f.read().decode()
return AgentConfiguration.from_json(configuration_json)
return AgentConfiguration(**json.loads(configuration_json))
except repository.FileNotFoundError:
return self._default_agent_configuration
except Exception as err:
raise RetrievalError(f"Error retrieving the agent configuration: {err}")
def store_configuration(self, agent_configuration: AgentConfiguration):
configuration_json = AgentConfiguration.to_json(agent_configuration)
configuration_json = agent_configuration.json()
self._file_repository.save_file(
AGENT_CONFIGURATION_FILE_NAME, io.BytesIO(configuration_json.encode())

View File

@ -5,7 +5,6 @@ from flask import make_response, request
from common.agent_configuration.agent_configuration import (
AgentConfiguration as AgentConfigurationObject,
)
from common.agent_configuration.agent_configuration import InvalidConfigurationError
from monkey_island.cc.repository import IAgentConfigurationRepository
from monkey_island.cc.resources.AbstractResource import AbstractResource
from monkey_island.cc.resources.request_authentication import jwt_required
@ -20,17 +19,17 @@ class AgentConfiguration(AbstractResource):
# Used by the agent. Can't secure
def get(self):
configuration = self._agent_configuration_repository.get_configuration()
configuration_json = AgentConfigurationObject.to_json(configuration)
return make_response(configuration_json, 200)
configuration_dict = configuration.dict(simplify=True)
return make_response(configuration_dict, 200)
@jwt_required
def put(self):
try:
configuration_object = AgentConfigurationObject.from_mapping(request.json)
configuration_object = AgentConfigurationObject(**request.json)
self._agent_configuration_repository.store_configuration(configuration_object)
# API Spec: Should return 204 (NO CONTENT)
return make_response({}, 200)
except (InvalidConfigurationError, json.JSONDecodeError) as err:
except (ValueError, TypeError, json.JSONDecodeError) as err:
return make_response(
{"error": f"Invalid configuration supplied: {err}"},
400,

View File

@ -1,5 +1,4 @@
import logging
from dataclasses import replace
from http import HTTPStatus
from flask import Response, make_response, request, send_file
@ -102,12 +101,9 @@ class PBAFileUpload(AbstractResource):
agent_configuration = self._agent_configuration_repository.get_configuration()
if target_os == LINUX_PBA_TYPE:
custom_pbas = replace(agent_configuration.custom_pbas, linux_filename=safe_filename)
agent_configuration.custom_pbas.linux_filename = safe_filename
else:
custom_pbas = replace(agent_configuration.custom_pbas, windows_filename=safe_filename)
updated_agent_configuration = replace(agent_configuration, custom_pbas=custom_pbas)
self._agent_configuration_repository.store_configuration(updated_agent_configuration)
agent_configuration.custom_pbas.windows_filename = safe_filename
@jwt_required
def delete(self, target_os):

View File

@ -1,8 +1,4 @@
import json
from copy import deepcopy
import pytest
from marshmallow import ValidationError
from tests.common.example_agent_configuration import (
AGENT_CONFIGURATION,
BLOCKED_IPS,
@ -28,42 +24,31 @@ from tests.common.example_agent_configuration import (
WINDOWS_FILENAME,
)
from common.agent_configuration import AgentConfiguration, InvalidConfigurationError
from common.agent_configuration.agent_sub_configuration_schemas import (
CustomPBAConfigurationSchema,
ExploitationConfigurationSchema,
ExploitationOptionsConfigurationSchema,
ICMPScanConfigurationSchema,
NetworkScanConfigurationSchema,
PluginConfigurationSchema,
PropagationConfigurationSchema,
ScanTargetConfigurationSchema,
TCPScanConfigurationSchema,
)
from common.agent_configuration.agent_configuration import AgentConfiguration
from common.agent_configuration.agent_sub_configurations import (
CustomPBAConfiguration,
ExploitationConfiguration,
ExploitationOptionsConfiguration,
ICMPScanConfiguration,
NetworkScanConfiguration,
PluginConfiguration,
PropagationConfiguration,
ScanTargetConfiguration,
TCPScanConfiguration,
)
INVALID_PORTS = [[-1, 1, 2], [1, 2, 99999]]
def test_build_plugin_configuration():
schema = PluginConfigurationSchema()
config = schema.load(PLUGIN_CONFIGURATION)
config = PluginConfiguration(**PLUGIN_CONFIGURATION)
assert config.name == PLUGIN_NAME
assert config.options == PLUGIN_OPTIONS
def test_custom_pba_configuration_schema():
schema = CustomPBAConfigurationSchema()
config = schema.load(CUSTOM_PBA_CONFIGURATION)
config = CustomPBAConfiguration(**CUSTOM_PBA_CONFIGURATION)
assert config.linux_command == LINUX_COMMAND
assert config.linux_filename == LINUX_FILENAME
@ -72,12 +57,10 @@ def test_custom_pba_configuration_schema():
def test_custom_pba_configuration_schema__empty_filenames_allowed():
schema = CustomPBAConfigurationSchema()
empty_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy()
empty_filename_configuration.update({"linux_filename": "", "windows_filename": ""})
config = schema.load(empty_filename_configuration)
config = CustomPBAConfiguration(**empty_filename_configuration)
assert config.linux_command == LINUX_COMMAND
assert config.linux_filename == ""
@ -87,32 +70,26 @@ def test_custom_pba_configuration_schema__empty_filenames_allowed():
@pytest.mark.parametrize("linux_filename", ["/", "/abc/", "\0"])
def test_custom_pba_configuration_schema__invalid_linux_filename(linux_filename):
schema = CustomPBAConfigurationSchema()
invalid_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy()
invalid_filename_configuration["linux_filename"] = linux_filename
with pytest.raises(ValidationError):
schema.load(invalid_filename_configuration)
with pytest.raises(ValueError):
CustomPBAConfiguration(**invalid_filename_configuration)
@pytest.mark.parametrize(
"windows_filename", ["CON", "CON.txt", "con.abc.pdf", " ", "abc.", "a?b", "d\\e"]
)
def test_custom_pba_configuration_schema__invalid_windows_filename(windows_filename):
schema = CustomPBAConfigurationSchema()
invalid_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy()
invalid_filename_configuration["windows_filename"] = windows_filename
with pytest.raises(ValidationError):
schema.load(invalid_filename_configuration)
with pytest.raises(ValueError):
CustomPBAConfiguration(**invalid_filename_configuration)
def test_scan_target_configuration():
schema = ScanTargetConfigurationSchema()
config = schema.load(SCAN_TARGET_CONFIGURATION)
config = ScanTargetConfiguration(**SCAN_TARGET_CONFIGURATION)
assert config.blocked_ips == tuple(BLOCKED_IPS)
assert config.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS)
@ -120,28 +97,51 @@ def test_scan_target_configuration():
assert config.subnets == tuple(SUBNETS)
def test_icmp_scan_configuration_schema():
schema = ICMPScanConfigurationSchema()
@pytest.mark.parametrize("invalid_blocked_ip_list", [["abc"], [1]])
def test_scan_target_configuration__invalid_blocked_ips(invalid_blocked_ip_list):
invalid_blocked_ips = SCAN_TARGET_CONFIGURATION.copy()
invalid_blocked_ips["blocked_ips"] = invalid_blocked_ip_list
config = schema.load(ICMP_CONFIGURATION)
with pytest.raises(ValueError):
ScanTargetConfiguration(**invalid_blocked_ips)
@pytest.mark.parametrize(
"invalid_inaccessible_subnets_list", [["1-2-3"], ["0.0.0.0/33"], ["www.invalid-.com"]]
)
def test_scan_target_configuration__invalid_inaccessible_subnets(invalid_inaccessible_subnets_list):
invalid_inaccessible_subnets = SCAN_TARGET_CONFIGURATION.copy()
invalid_inaccessible_subnets["inaccessible_subnets"] = invalid_inaccessible_subnets_list
with pytest.raises(ValueError):
ScanTargetConfiguration(**invalid_inaccessible_subnets)
@pytest.mark.parametrize("invalid_subnets_list", [["1-2-3"], ["0.0.0.0/33"], ["www.invalid-.com"]])
def test_scan_target_configuration__invalid_subnets(invalid_subnets_list):
invalid_subnets = SCAN_TARGET_CONFIGURATION.copy()
invalid_subnets["subnets"] = invalid_subnets_list
with pytest.raises(ValueError):
ScanTargetConfiguration(**invalid_subnets)
def test_icmp_scan_configuration_schema():
config = ICMPScanConfiguration(**ICMP_CONFIGURATION)
assert config.timeout == TIMEOUT
def test_icmp_scan_configuration_schema__negative_timeout():
schema = ICMPScanConfigurationSchema()
negative_timeout_configuration = ICMP_CONFIGURATION.copy()
negative_timeout_configuration["timeout"] = -1
with pytest.raises(ValidationError):
schema.load(negative_timeout_configuration)
with pytest.raises(ValueError):
ICMPScanConfiguration(**negative_timeout_configuration)
def test_tcp_scan_configuration_schema():
schema = TCPScanConfigurationSchema()
config = schema.load(TCP_SCAN_CONFIGURATION)
config = TCPScanConfiguration(**TCP_SCAN_CONFIGURATION)
assert config.timeout == TIMEOUT
assert config.ports == tuple(PORTS)
@ -149,29 +149,23 @@ def test_tcp_scan_configuration_schema():
@pytest.mark.parametrize("ports", INVALID_PORTS)
def test_tcp_scan_configuration_schema__ports_out_of_range(ports):
schema = TCPScanConfigurationSchema()
invalid_ports_configuration = TCP_SCAN_CONFIGURATION.copy()
invalid_ports_configuration["ports"] = ports
with pytest.raises(ValidationError):
schema.load(invalid_ports_configuration)
with pytest.raises(ValueError):
TCPScanConfiguration(**invalid_ports_configuration)
def test_tcp_scan_configuration_schema__negative_timeout():
schema = TCPScanConfigurationSchema()
negative_timeout_configuration = TCP_SCAN_CONFIGURATION.copy()
negative_timeout_configuration["timeout"] = -1
with pytest.raises(ValidationError):
schema.load(negative_timeout_configuration)
with pytest.raises(ValueError):
TCPScanConfiguration(**negative_timeout_configuration)
def test_network_scan_configuration():
schema = NetworkScanConfigurationSchema()
config = schema.load(NETWORK_SCAN_CONFIGURATION)
config = NetworkScanConfiguration(**NETWORK_SCAN_CONFIGURATION)
assert config.tcp.ports == tuple(TCP_SCAN_CONFIGURATION["ports"])
assert config.tcp.timeout == TCP_SCAN_CONFIGURATION["timeout"]
@ -186,49 +180,39 @@ def test_network_scan_configuration():
def test_exploitation_options_configuration_schema():
ports = [1, 2, 3]
schema = ExploitationOptionsConfigurationSchema()
config = schema.load({"http_ports": ports})
config = ExploitationOptionsConfiguration(http_ports=ports)
assert config.http_ports == tuple(ports)
@pytest.mark.parametrize("ports", INVALID_PORTS)
def test_exploitation_options_configuration_schema__ports_out_of_range(ports):
schema = ExploitationOptionsConfigurationSchema()
invalid_ports_configuration = {"http_ports": ports}
with pytest.raises(ValidationError):
schema.load(invalid_ports_configuration)
with pytest.raises(ValueError):
ExploitationOptionsConfiguration(http_ports=ports)
def test_exploiter_configuration_schema():
name = "bond"
options = {"gun": "Walther PPK", "car": "Aston Martin DB5"}
schema = PluginConfigurationSchema()
config = schema.load({"name": name, "options": options})
config = PluginConfiguration(name=name, options=options)
assert config.name == name
assert config.options == options
def test_exploitation_configuration():
schema = ExploitationConfigurationSchema()
config = schema.load(EXPLOITATION_CONFIGURATION)
config_dict = schema.dump(config)
config = ExploitationConfiguration(**EXPLOITATION_CONFIGURATION)
config_dict = config.dict(simplify=True)
assert isinstance(config, ExploitationConfiguration)
assert config_dict == EXPLOITATION_CONFIGURATION
def test_propagation_configuration():
schema = PropagationConfigurationSchema()
config = schema.load(PROPAGATION_CONFIGURATION)
config_dict = schema.dump(config)
config = PropagationConfiguration(**PROPAGATION_CONFIGURATION)
config_dict = config.dict(simplify=True)
assert isinstance(config, PropagationConfiguration)
assert isinstance(config.network_scan, NetworkScanConfiguration)
@ -238,18 +222,25 @@ def test_propagation_configuration():
def test_propagation_configuration__invalid_maximum_depth():
schema = PropagationConfigurationSchema()
negative_maximum_depth_configuration = PROPAGATION_CONFIGURATION.copy()
negative_maximum_depth_configuration["maximum_depth"] = -1
with pytest.raises(ValidationError):
schema.load(negative_maximum_depth_configuration)
with pytest.raises(ValueError):
PropagationConfiguration(**negative_maximum_depth_configuration)
def test_propagation_configuration__maximum_depth_zero():
maximum_depth_zero_configuration = PROPAGATION_CONFIGURATION.copy()
maximum_depth_zero_configuration["maximum_depth"] = 0
pc = PropagationConfiguration(**maximum_depth_zero_configuration)
assert pc.maximum_depth == 0
def test_agent_configuration():
config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
config_json = AgentConfiguration.to_json(config)
config = AgentConfiguration(**AGENT_CONFIGURATION)
config_dict = config.dict(simplify=True)
assert isinstance(config, AgentConfiguration)
assert config.keep_tunnel_open_time == 30
@ -258,49 +249,29 @@ def test_agent_configuration():
assert isinstance(config.credential_collectors[0], PluginConfiguration)
assert isinstance(config.payloads[0], PluginConfiguration)
assert isinstance(config.propagation, PropagationConfiguration)
assert json.loads(config_json) == AGENT_CONFIGURATION
assert config_dict == AGENT_CONFIGURATION
def test_agent_configuration__negative_keep_tunnel_open_time():
def test_agent_configuration__negative_keep_tunnel_open_time_zero():
keep_tunnel_open_time_zero_configuration = AGENT_CONFIGURATION.copy()
keep_tunnel_open_time_zero_configuration["keep_tunnel_open_time"] = 0
ac = AgentConfiguration(**keep_tunnel_open_time_zero_configuration)
assert ac.keep_tunnel_open_time == 0
def test_agent_configuration__keep_tunnel_open_time():
negative_keep_tunnel_open_time_configuration = AGENT_CONFIGURATION.copy()
negative_keep_tunnel_open_time_configuration["keep_tunnel_open_time"] = -1
with pytest.raises(InvalidConfigurationError):
AgentConfiguration.from_mapping(negative_keep_tunnel_open_time_configuration)
with pytest.raises(ValueError):
AgentConfiguration(**negative_keep_tunnel_open_time_configuration)
def test_incorrect_type():
valid_config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
with pytest.raises(InvalidConfigurationError):
valid_config = AgentConfiguration(**AGENT_CONFIGURATION)
with pytest.raises(TypeError):
valid_config_dict = valid_config.__dict__
valid_config_dict["keep_tunnel_open_time"] = "not_a_float"
AgentConfiguration(**valid_config_dict)
def test_to_from_mapping():
config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
assert AgentConfiguration.to_mapping(config) == AGENT_CONFIGURATION
def test_from_mapping__invalid_data():
dict_ = deepcopy(AGENT_CONFIGURATION)
dict_["payloads"] = "payloads"
with pytest.raises(InvalidConfigurationError):
AgentConfiguration.from_mapping(dict_)
def test_to_from_json():
original_config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
config_json = AgentConfiguration.to_json(original_config)
assert AgentConfiguration.from_json(config_json) == original_config
def test_from_json__invalid_data():
invalid_dict = deepcopy(AGENT_CONFIGURATION)
invalid_dict["payloads"] = "payloads"
with pytest.raises(InvalidConfigurationError):
AgentConfiguration.from_json(json.dumps(invalid_dict))

View File

@ -1,5 +1,4 @@
import pytest
from marshmallow import ValidationError
from common.agent_configuration.validators.ip_ranges import validate_ip, validate_subnet_range
@ -11,7 +10,7 @@ def test_validate_ip_valid(ip):
@pytest.mark.parametrize("ip", ["1.1.1", "257.256.255.255", "1.1.1.1.1"])
def test_validate_ip_invalid(ip):
with pytest.raises(ValidationError):
with pytest.raises(ValueError):
validate_ip(ip)
@ -22,7 +21,7 @@ def test_validate_subnet_range__ip_valid(ip):
@pytest.mark.parametrize("ip", ["1.1.1", "257.256.255.255", "1.1.1.1.1"])
def test_validate_subnet_range__ip_invalid(ip):
with pytest.raises(ValidationError):
with pytest.raises(ValueError):
validate_subnet_range(ip)
@ -42,7 +41,7 @@ def test_validate_subnet_range__ip_range_valid(ip_range):
],
)
def test_validate_subnet_range__ip_range_invalid(ip_range):
with pytest.raises(ValidationError):
with pytest.raises(ValueError):
validate_subnet_range(ip_range)
@ -55,7 +54,7 @@ def test_validate_subnet_range__hostname_valid(hostname):
"hostname", ["hy&!he.host", "čili-peppers.are-hot", "one.two-", "one-.two", "one@two", ""]
)
def test_validate_subnet_range__hostname_invalid(hostname):
with pytest.raises(ValidationError):
with pytest.raises(ValueError):
validate_subnet_range(hostname)
@ -66,5 +65,5 @@ def test_validate_subnet_range__cidr_valid(cidr_range):
@pytest.mark.parametrize("cidr_range", ["1.1.1/24", "1.1.1.1/-1", "1.1.1.1/33", "1.1.1.1/222"])
def test_validate_subnet_range__cidr_invalid(cidr_range):
with pytest.raises(ValidationError):
with pytest.raises(ValueError):
validate_subnet_range(cidr_range)

View File

@ -39,10 +39,10 @@ def scan_config(default_agent_configuration):
PluginConfiguration(name="SSHFinger", options={}),
]
scan_config = NetworkScanConfiguration(
tcp_config,
icmp_config,
fingerprinter_config,
default_agent_configuration.propagation.network_scan.targets,
tcp=tcp_config,
icmp=icmp_config,
fingerprinters=fingerprinter_config,
targets=default_agent_configuration.propagation.network_scan.targets,
)
return scan_config

View File

@ -145,15 +145,15 @@ def get_propagation_config(
default_agent_configuration, scan_target_config: ScanTargetConfiguration
):
network_scan = NetworkScanConfiguration(
default_agent_configuration.propagation.network_scan.tcp,
default_agent_configuration.propagation.network_scan.icmp,
default_agent_configuration.propagation.network_scan.fingerprinters,
scan_target_config,
tcp=default_agent_configuration.propagation.network_scan.tcp,
icmp=default_agent_configuration.propagation.network_scan.icmp,
fingerprinters=default_agent_configuration.propagation.network_scan.fingerprinters,
targets=scan_target_config,
)
propagation_config = PropagationConfiguration(
default_agent_configuration.propagation.maximum_depth,
network_scan,
default_agent_configuration.propagation.exploitation,
maximum_depth=default_agent_configuration.propagation.maximum_depth,
network_scan=network_scan,
exploitation=default_agent_configuration.propagation.exploitation,
)
return propagation_config

View File

@ -12,7 +12,7 @@ def repository(default_agent_configuration):
def test_store_agent_config(repository):
agent_configuration = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
agent_configuration = AgentConfiguration(**AGENT_CONFIGURATION)
repository.store_configuration(agent_configuration)
retrieved_agent_configuration = repository.get_configuration()
@ -36,7 +36,7 @@ def test_get_agent_config_retrieval_error(default_agent_configuration):
def test_reset_to_default(repository, default_agent_configuration):
agent_configuration = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
agent_configuration = AgentConfiguration(**AGENT_CONFIGURATION)
repository.store_configuration(agent_configuration)
repository.reset_to_default()

View File

@ -26,14 +26,15 @@ def flask_client(build_flask_client):
def test_agent_configuration_endpoint(flask_client):
resp = flask_client.put(
AGENT_CONFIGURATION_URL,
json=AgentConfiguration.to_mapping(AGENT_CONFIGURATION),
json=AgentConfiguration(**AGENT_CONFIGURATION).dict(simplify=True),
follow_redirects=True,
)
assert resp.status_code == 200
resp = flask_client.get(AGENT_CONFIGURATION_URL)
assert resp.status_code == 200
assert json.loads(resp.data) == AGENT_CONFIGURATION
assert AgentConfiguration(**json.loads(resp.data)) == AgentConfiguration(**AGENT_CONFIGURATION)
def test_agent_configuration_invalid_config(flask_client):

View File

@ -1,4 +1,3 @@
from dataclasses import replace
from unittest.mock import MagicMock
import pytest
@ -18,12 +17,10 @@ WINDOWS_FILENAME = "windows_pba_file.ps1"
@pytest.fixture
def agent_configuration(default_agent_configuration: AgentConfiguration) -> AgentConfiguration:
custom_pbas = replace(
default_agent_configuration.custom_pbas,
linux_filename=LINUX_FILENAME,
windows_filename=WINDOWS_FILENAME,
custom_pbas = default_agent_configuration.custom_pbas.copy(
update={"linux_filename": LINUX_FILENAME, "windows_filename": WINDOWS_FILENAME},
)
return replace(default_agent_configuration, custom_pbas=custom_pbas)
return default_agent_configuration.copy(update={"custom_pbas": custom_pbas})
@pytest.fixture

View File

@ -3,6 +3,12 @@ Everything in this file is what Vulture found as dead code but either isn't real
dead or is kept deliberately. Referencing these in a file like this makes sure that
Vulture doesn't mark these as dead again.
"""
from common.agent_configuration.agent_sub_configurations import (
CustomPBAConfiguration,
ScanTargetConfiguration,
)
from common.credentials import Credentials
from common.utils import IJSONSerializable
from infection_monkey.exploit.log4shell_utils.ldap_server import LDAPServerFactory
from monkey_island.cc.models import Report
from monkey_island.cc.models.networkmap import Arc, NetworkMap
@ -294,6 +300,11 @@ underscore_attrs_are_private
extra
allow_mutation
validate_assignment
CustomPBAConfiguration.linux_filename_valid
CustomPBAConfiguration.windows_filename_valid
ScanTargetConfiguration.blocked_ips_valid
ScanTargetConfiguration.inaccessible_subnets_valid
ScanTargetConfiguration.subnets_valid
# CommunicationType
CommunicationType
@ -301,3 +312,6 @@ SCANNED
EXPLOITED
CC
CC_TUNNEL
Credentials.from_json
IJSONSerializable.from_json