Common: Freeze lists to tuples in agent configuration

This commit is contained in:
vakarisz 2022-07-26 15:57:48 +03:00
parent 81101d4213
commit 91e8ce62db
7 changed files with 65 additions and 33 deletions

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any, List, Mapping
from marshmallow import Schema, fields
from marshmallow import Schema, fields, post_load
from marshmallow.exceptions import MarshmallowError
from .agent_sub_configuration_schemas import (
@ -16,6 +16,7 @@ from .agent_sub_configurations import (
PluginConfiguration,
PropagationConfiguration,
)
from .utils import freeze_lists
class InvalidConfigurationError(Exception):
@ -58,8 +59,7 @@ class AgentConfiguration:
"""
try:
config_dict = AgentConfigurationSchema().load(config_mapping)
return AgentConfiguration(**config_dict)
return AgentConfigurationSchema().load(config_mapping)
except MarshmallowError as err:
raise InvalidConfigurationError(str(err))
@ -74,8 +74,7 @@ class AgentConfiguration:
AgentConfiguration
"""
try:
config_dict = AgentConfigurationSchema().loads(config_json)
return AgentConfiguration(**config_dict)
return AgentConfigurationSchema().loads(config_json)
except MarshmallowError as err:
raise InvalidConfigurationError(str(err))
@ -107,3 +106,8 @@ class AgentConfigurationSchema(Schema):
credential_collectors = fields.List(fields.Nested(PluginConfigurationSchema))
payloads = fields.List(fields.Nested(PluginConfigurationSchema))
propagation = fields.Nested(PropagationConfigurationSchema)
@post_load
@freeze_lists
def _make_agent_configuration(self, data, **kwargs):
return AgentConfiguration(**data)

View File

@ -11,6 +11,7 @@ from .agent_sub_configurations import (
ScanTargetConfiguration,
TCPScanConfiguration,
)
from .utils import freeze_lists
class CustomPBAConfigurationSchema(Schema):
@ -40,6 +41,7 @@ class ScanTargetConfigurationSchema(Schema):
subnets = fields.List(fields.Str())
@post_load
@freeze_lists
def _make_scan_target_configuration(self, data, **kwargs):
return ScanTargetConfiguration(**data)
@ -57,6 +59,7 @@ class TCPScanConfigurationSchema(Schema):
ports = fields.List(fields.Int())
@post_load
@freeze_lists
def _make_tcp_scan_configuration(self, data, **kwargs):
return TCPScanConfiguration(**data)
@ -68,6 +71,7 @@ class NetworkScanConfigurationSchema(Schema):
targets = fields.Nested(ScanTargetConfigurationSchema)
@post_load
@freeze_lists
def _make_network_scan_configuration(self, data, **kwargs):
return NetworkScanConfiguration(**data)
@ -76,6 +80,7 @@ class ExploitationOptionsConfigurationSchema(Schema):
http_ports = fields.List(fields.Int())
@post_load
@freeze_lists
def _make_exploitation_options_configuration(self, data, **kwargs):
return ExploitationOptionsConfiguration(**data)
@ -86,6 +91,7 @@ class ExploitationConfigurationSchema(Schema):
vulnerability = fields.List(fields.Nested(PluginConfigurationSchema))
@post_load
@freeze_lists
def _make_exploitation_options_configuration(self, data, **kwargs):
return ExploitationConfiguration(**data)

View File

@ -13,7 +13,7 @@ from .agent_sub_configurations import (
TCPScanConfiguration,
)
PBAS = [
PBAS = (
"CommunicateAsBackdoorUser",
"ModifyShellStartupFiles",
"HiddenFiles",
@ -23,14 +23,14 @@ PBAS = [
"Timestomping",
"AccountDiscovery",
"ProcessListCollection",
]
)
CREDENTIAL_COLLECTORS = ["MimikatzCollector", "SSHCollector"]
CREDENTIAL_COLLECTORS = ("MimikatzCollector", "SSHCollector")
PBA_CONFIGURATION = [PluginConfiguration(pba, {}) for pba in PBAS]
CREDENTIAL_COLLECTOR_CONFIGURATION = [
PBA_CONFIGURATION = tuple(PluginConfiguration(pba, {}) for pba in PBAS)
CREDENTIAL_COLLECTOR_CONFIGURATION = tuple(
PluginConfiguration(collector, {}) for collector in CREDENTIAL_COLLECTORS
]
)
RANSOMWARE_OPTIONS = {
"encryption": {
@ -40,13 +40,13 @@ RANSOMWARE_OPTIONS = {
"other_behaviors": {"readme": True},
}
PAYLOAD_CONFIGURATION = [PluginConfiguration("ransomware", RANSOMWARE_OPTIONS)]
PAYLOAD_CONFIGURATION = tuple([PluginConfiguration("ransomware", RANSOMWARE_OPTIONS)])
CUSTOM_PBA_CONFIGURATION = CustomPBAConfiguration(
linux_command="", linux_filename="", windows_command="", windows_filename=""
)
TCP_PORTS = [
TCP_PORTS = (
22,
80,
135,
@ -64,37 +64,38 @@ TCP_PORTS = [
8983,
9200,
9600,
]
)
TCP_SCAN_CONFIGURATION = TCPScanConfiguration(timeout=3.0, ports=TCP_PORTS)
ICMP_CONFIGURATION = ICMPScanConfiguration(timeout=1.0)
HTTP_PORTS = [80, 443, 7001, 8008, 8080, 8983, 9200, 9600]
FINGERPRINTERS = [
HTTP_PORTS = (80, 443, 7001, 8008, 8080, 8983, 9200, 9600)
FINGERPRINTERS = (
PluginConfiguration("elastic", {}),
PluginConfiguration("http", {"http_ports": HTTP_PORTS}),
# Plugin configuration option contents are not converted to tuples
PluginConfiguration("http", {"http_ports": list(HTTP_PORTS)}),
PluginConfiguration("mssql", {}),
PluginConfiguration("smb", {}),
PluginConfiguration("ssh", {}),
]
)
SCAN_TARGET_CONFIGURATION = ScanTargetConfiguration([], [], True, [])
SCAN_TARGET_CONFIGURATION = ScanTargetConfiguration(tuple(), tuple(), True, tuple())
NETWORK_SCAN_CONFIGURATION = NetworkScanConfiguration(
TCP_SCAN_CONFIGURATION, ICMP_CONFIGURATION, FINGERPRINTERS, SCAN_TARGET_CONFIGURATION
)
EXPLOITATION_OPTIONS_CONFIGURATION = ExploitationOptionsConfiguration(HTTP_PORTS)
BRUTE_FORCE_EXPLOITERS = [
BRUTE_FORCE_EXPLOITERS = (
PluginConfiguration("MSSQLExploiter", {}),
PluginConfiguration("PowerShellExploiter", {}),
PluginConfiguration("SSHExploiter", {}),
PluginConfiguration("SmbExploiter", {"smb_download_timeout": 30}),
PluginConfiguration("WmiExploiter", {"smb_download_timeout": 30}),
]
)
VULNERABILITY_EXPLOITERS = [
VULNERABILITY_EXPLOITERS = (
PluginConfiguration("Log4ShellExploiter", {}),
PluginConfiguration("HadoopExploiter", {}),
]
)
EXPLOITATION_CONFIGURATION = ExploitationConfiguration(
EXPLOITATION_OPTIONS_CONFIGURATION, BRUTE_FORCE_EXPLOITERS, VULNERABILITY_EXPLOITERS
@ -116,5 +117,5 @@ DEFAULT_AGENT_CONFIGURATION = AgentConfiguration(
)
DEFAULT_RANSOMWARE_AGENT_CONFIGURATION = dataclasses.replace(
DEFAULT_AGENT_CONFIGURATION, post_breach_actions=[]
DEFAULT_AGENT_CONFIGURATION, post_breach_actions=tuple()
)

View File

@ -0,0 +1,13 @@
from functools import wraps
from typing import Callable
from common.utils.code_utils import freeze_lists_in_dict
def freeze_lists(function: Callable):
@wraps(function)
def wrapper(self, data, **kwargs):
data = freeze_lists_in_dict(data)
return function(self, data, **kwargs)
return wrapper

View File

@ -48,3 +48,10 @@ def del_key(mapping: MutableMapping[T, Any], key: T):
del mapping[key]
except KeyError:
pass
def freeze_lists_in_dict(mapping: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
for key, value in mapping.items():
if type(value) == list:
mapping[key] = tuple(value)
return mapping

View File

@ -73,10 +73,10 @@ def test_scan_target_configuration():
config = schema.load(SCAN_TARGET_CONFIGURATION)
assert config.blocked_ips == BLOCKED_IPS
assert config.inaccessible_subnets == INACCESSIBLE_SUBNETS
assert config.blocked_ips == tuple(BLOCKED_IPS)
assert config.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS)
assert config.local_network_scan == LOCAL_NETWORK_SCAN
assert config.subnets == SUBNETS
assert config.subnets == tuple(SUBNETS)
def test_icmp_scan_configuration_schema():
@ -93,7 +93,7 @@ def test_tcp_scan_configuration_schema():
config = schema.load(TCP_SCAN_CONFIGURATION)
assert config.timeout == TIMEOUT
assert config.ports == PORTS
assert config.ports == tuple(PORTS)
def test_network_scan_configuration():
@ -101,15 +101,15 @@ def test_network_scan_configuration():
config = schema.load(NETWORK_SCAN_CONFIGURATION)
assert config.tcp.ports == TCP_SCAN_CONFIGURATION["ports"]
assert config.tcp.ports == tuple(TCP_SCAN_CONFIGURATION["ports"])
assert config.tcp.timeout == TCP_SCAN_CONFIGURATION["timeout"]
assert config.icmp.timeout == ICMP_CONFIGURATION["timeout"]
assert config.fingerprinters[0].name == FINGERPRINTERS[0]["name"]
assert config.fingerprinters[0].options == FINGERPRINTERS[0]["options"]
assert config.targets.blocked_ips == BLOCKED_IPS
assert config.targets.inaccessible_subnets == INACCESSIBLE_SUBNETS
assert config.targets.blocked_ips == tuple(BLOCKED_IPS)
assert config.targets.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS)
assert config.targets.local_network_scan == LOCAL_NETWORK_SCAN
assert config.targets.subnets == SUBNETS
assert config.targets.subnets == tuple(SUBNETS)
def test_exploitation_options_configuration_schema():
@ -118,7 +118,7 @@ def test_exploitation_options_configuration_schema():
config = schema.load({"http_ports": ports})
assert config.http_ports == ports
assert config.http_ports == tuple(ports)
def test_exploiter_configuration_schema():

View File

@ -194,6 +194,7 @@ _make_icmp_scan_configuration # unused method (monkey/common/configuration/agen
_make_tcp_scan_configuration # unused method (monkey/common/configuration/agent_configuration.py:122)
_make_network_scan_configuration # unused method (monkey/common/configuration/agent_configuration.py:110)
_make_propagation_configuration # unused method (monkey/common/configuration/agent_configuration.py:167)
_make_agent_configuration # \common\agent_configuration\agent_configuration.py:110: unused method '_make_agent_configuration'
# Credentials
_strip_credential_type # unused method (monkey/common/credentials/password.py:18)