Merge pull request #2117 from guardicore/2004-freeze-lists

2004 freeze lists
This commit is contained in:
Mike Salvatore 2022-07-26 10:50:21 -04:00 committed by GitHub
commit 2d86f1a3f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 73 additions and 41 deletions

View File

@ -1,11 +1,12 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, List, Mapping from typing import Any, Mapping, Tuple
from marshmallow import Schema, fields from marshmallow import Schema, fields
from marshmallow.exceptions import MarshmallowError from marshmallow.exceptions import MarshmallowError
from ..utils.code_utils import freeze_lists_in_mapping
from .agent_sub_configuration_schemas import ( from .agent_sub_configuration_schemas import (
CustomPBAConfigurationSchema, CustomPBAConfigurationSchema,
PluginConfigurationSchema, PluginConfigurationSchema,
@ -33,9 +34,9 @@ class InvalidConfigurationError(Exception):
class AgentConfiguration: class AgentConfiguration:
keep_tunnel_open_time: float keep_tunnel_open_time: float
custom_pbas: CustomPBAConfiguration custom_pbas: CustomPBAConfiguration
post_breach_actions: List[PluginConfiguration] post_breach_actions: Tuple[PluginConfiguration, ...]
credential_collectors: List[PluginConfiguration] credential_collectors: Tuple[PluginConfiguration, ...]
payloads: List[PluginConfiguration] payloads: Tuple[PluginConfiguration, ...]
propagation: PropagationConfiguration propagation: PropagationConfiguration
def __post_init__(self): def __post_init__(self):
@ -59,6 +60,7 @@ class AgentConfiguration:
try: try:
config_dict = AgentConfigurationSchema().load(config_mapping) config_dict = AgentConfigurationSchema().load(config_mapping)
config_dict = freeze_lists_in_mapping(config_dict)
return AgentConfiguration(**config_dict) return AgentConfiguration(**config_dict)
except MarshmallowError as err: except MarshmallowError as err:
raise InvalidConfigurationError(str(err)) raise InvalidConfigurationError(str(err))
@ -75,6 +77,7 @@ class AgentConfiguration:
""" """
try: try:
config_dict = AgentConfigurationSchema().loads(config_json) config_dict = AgentConfigurationSchema().loads(config_json)
config_dict = freeze_lists_in_mapping(config_dict)
return AgentConfiguration(**config_dict) return AgentConfiguration(**config_dict)
except MarshmallowError as err: except MarshmallowError as err:
raise InvalidConfigurationError(str(err)) raise InvalidConfigurationError(str(err))

View File

@ -11,6 +11,7 @@ from .agent_sub_configurations import (
ScanTargetConfiguration, ScanTargetConfiguration,
TCPScanConfiguration, TCPScanConfiguration,
) )
from .utils import freeze_lists
class CustomPBAConfigurationSchema(Schema): class CustomPBAConfigurationSchema(Schema):
@ -40,6 +41,7 @@ class ScanTargetConfigurationSchema(Schema):
subnets = fields.List(fields.Str()) subnets = fields.List(fields.Str())
@post_load @post_load
@freeze_lists
def _make_scan_target_configuration(self, data, **kwargs): def _make_scan_target_configuration(self, data, **kwargs):
return ScanTargetConfiguration(**data) return ScanTargetConfiguration(**data)
@ -57,6 +59,7 @@ class TCPScanConfigurationSchema(Schema):
ports = fields.List(fields.Int()) ports = fields.List(fields.Int())
@post_load @post_load
@freeze_lists
def _make_tcp_scan_configuration(self, data, **kwargs): def _make_tcp_scan_configuration(self, data, **kwargs):
return TCPScanConfiguration(**data) return TCPScanConfiguration(**data)
@ -68,6 +71,7 @@ class NetworkScanConfigurationSchema(Schema):
targets = fields.Nested(ScanTargetConfigurationSchema) targets = fields.Nested(ScanTargetConfigurationSchema)
@post_load @post_load
@freeze_lists
def _make_network_scan_configuration(self, data, **kwargs): def _make_network_scan_configuration(self, data, **kwargs):
return NetworkScanConfiguration(**data) return NetworkScanConfiguration(**data)
@ -76,6 +80,7 @@ class ExploitationOptionsConfigurationSchema(Schema):
http_ports = fields.List(fields.Int()) http_ports = fields.List(fields.Int())
@post_load @post_load
@freeze_lists
def _make_exploitation_options_configuration(self, data, **kwargs): def _make_exploitation_options_configuration(self, data, **kwargs):
return ExploitationOptionsConfiguration(**data) return ExploitationOptionsConfiguration(**data)
@ -86,6 +91,7 @@ class ExploitationConfigurationSchema(Schema):
vulnerability = fields.List(fields.Nested(PluginConfigurationSchema)) vulnerability = fields.List(fields.Nested(PluginConfigurationSchema))
@post_load @post_load
@freeze_lists
def _make_exploitation_options_configuration(self, data, **kwargs): def _make_exploitation_options_configuration(self, data, **kwargs):
return ExploitationConfiguration(**data) return ExploitationConfiguration(**data)

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, List from typing import Dict, Tuple
@dataclass(frozen=True) @dataclass(frozen=True)
@ -18,10 +18,10 @@ class PluginConfiguration:
@dataclass(frozen=True) @dataclass(frozen=True)
class ScanTargetConfiguration: class ScanTargetConfiguration:
blocked_ips: List[str] blocked_ips: Tuple[str, ...]
inaccessible_subnets: List[str] inaccessible_subnets: Tuple[str, ...]
local_network_scan: bool local_network_scan: bool
subnets: List[str] subnets: Tuple[str, ...]
@dataclass(frozen=True) @dataclass(frozen=True)
@ -32,27 +32,27 @@ class ICMPScanConfiguration:
@dataclass(frozen=True) @dataclass(frozen=True)
class TCPScanConfiguration: class TCPScanConfiguration:
timeout: float timeout: float
ports: List[int] ports: Tuple[int, ...]
@dataclass(frozen=True) @dataclass(frozen=True)
class NetworkScanConfiguration: class NetworkScanConfiguration:
tcp: TCPScanConfiguration tcp: TCPScanConfiguration
icmp: ICMPScanConfiguration icmp: ICMPScanConfiguration
fingerprinters: List[PluginConfiguration] fingerprinters: Tuple[PluginConfiguration, ...]
targets: ScanTargetConfiguration targets: ScanTargetConfiguration
@dataclass(frozen=True) @dataclass(frozen=True)
class ExploitationOptionsConfiguration: class ExploitationOptionsConfiguration:
http_ports: List[int] http_ports: Tuple[int, ...]
@dataclass(frozen=True) @dataclass(frozen=True)
class ExploitationConfiguration: class ExploitationConfiguration:
options: ExploitationOptionsConfiguration options: ExploitationOptionsConfiguration
brute_force: List[PluginConfiguration] brute_force: Tuple[PluginConfiguration, ...]
vulnerability: List[PluginConfiguration] vulnerability: Tuple[PluginConfiguration, ...]
@dataclass(frozen=True) @dataclass(frozen=True)

View File

@ -13,7 +13,7 @@ from .agent_sub_configurations import (
TCPScanConfiguration, TCPScanConfiguration,
) )
PBAS = [ PBAS = (
"CommunicateAsBackdoorUser", "CommunicateAsBackdoorUser",
"ModifyShellStartupFiles", "ModifyShellStartupFiles",
"HiddenFiles", "HiddenFiles",
@ -23,14 +23,14 @@ PBAS = [
"Timestomping", "Timestomping",
"AccountDiscovery", "AccountDiscovery",
"ProcessListCollection", "ProcessListCollection",
] )
CREDENTIAL_COLLECTORS = ["MimikatzCollector", "SSHCollector"] CREDENTIAL_COLLECTORS = ("MimikatzCollector", "SSHCollector")
PBA_CONFIGURATION = [PluginConfiguration(pba, {}) for pba in PBAS] PBA_CONFIGURATION = tuple(PluginConfiguration(pba, {}) for pba in PBAS)
CREDENTIAL_COLLECTOR_CONFIGURATION = [ CREDENTIAL_COLLECTOR_CONFIGURATION = tuple(
PluginConfiguration(collector, {}) for collector in CREDENTIAL_COLLECTORS PluginConfiguration(collector, {}) for collector in CREDENTIAL_COLLECTORS
] )
RANSOMWARE_OPTIONS = { RANSOMWARE_OPTIONS = {
"encryption": { "encryption": {
@ -40,13 +40,13 @@ RANSOMWARE_OPTIONS = {
"other_behaviors": {"readme": True}, "other_behaviors": {"readme": True},
} }
PAYLOAD_CONFIGURATION = [PluginConfiguration("ransomware", RANSOMWARE_OPTIONS)] PAYLOAD_CONFIGURATION = tuple([PluginConfiguration("ransomware", RANSOMWARE_OPTIONS)])
CUSTOM_PBA_CONFIGURATION = CustomPBAConfiguration( CUSTOM_PBA_CONFIGURATION = CustomPBAConfiguration(
linux_command="", linux_filename="", windows_command="", windows_filename="" linux_command="", linux_filename="", windows_command="", windows_filename=""
) )
TCP_PORTS = [ TCP_PORTS = (
22, 22,
80, 80,
135, 135,
@ -64,37 +64,38 @@ TCP_PORTS = [
8983, 8983,
9200, 9200,
9600, 9600,
] )
TCP_SCAN_CONFIGURATION = TCPScanConfiguration(timeout=3.0, ports=TCP_PORTS) TCP_SCAN_CONFIGURATION = TCPScanConfiguration(timeout=3.0, ports=TCP_PORTS)
ICMP_CONFIGURATION = ICMPScanConfiguration(timeout=1.0) ICMP_CONFIGURATION = ICMPScanConfiguration(timeout=1.0)
HTTP_PORTS = [80, 443, 7001, 8008, 8080, 8983, 9200, 9600] HTTP_PORTS = (80, 443, 7001, 8008, 8080, 8983, 9200, 9600)
FINGERPRINTERS = [ FINGERPRINTERS = (
PluginConfiguration("elastic", {}), PluginConfiguration("elastic", {}),
PluginConfiguration("http", {"http_ports": HTTP_PORTS}), # Plugin configuration option contents are not converted to tuples
PluginConfiguration("http", {"http_ports": list(HTTP_PORTS)}),
PluginConfiguration("mssql", {}), PluginConfiguration("mssql", {}),
PluginConfiguration("smb", {}), PluginConfiguration("smb", {}),
PluginConfiguration("ssh", {}), PluginConfiguration("ssh", {}),
] )
SCAN_TARGET_CONFIGURATION = ScanTargetConfiguration([], [], True, []) SCAN_TARGET_CONFIGURATION = ScanTargetConfiguration(tuple(), tuple(), True, tuple())
NETWORK_SCAN_CONFIGURATION = NetworkScanConfiguration( NETWORK_SCAN_CONFIGURATION = NetworkScanConfiguration(
TCP_SCAN_CONFIGURATION, ICMP_CONFIGURATION, FINGERPRINTERS, SCAN_TARGET_CONFIGURATION TCP_SCAN_CONFIGURATION, ICMP_CONFIGURATION, FINGERPRINTERS, SCAN_TARGET_CONFIGURATION
) )
EXPLOITATION_OPTIONS_CONFIGURATION = ExploitationOptionsConfiguration(HTTP_PORTS) EXPLOITATION_OPTIONS_CONFIGURATION = ExploitationOptionsConfiguration(HTTP_PORTS)
BRUTE_FORCE_EXPLOITERS = [ BRUTE_FORCE_EXPLOITERS = (
PluginConfiguration("MSSQLExploiter", {}), PluginConfiguration("MSSQLExploiter", {}),
PluginConfiguration("PowerShellExploiter", {}), PluginConfiguration("PowerShellExploiter", {}),
PluginConfiguration("SSHExploiter", {}), PluginConfiguration("SSHExploiter", {}),
PluginConfiguration("SmbExploiter", {"smb_download_timeout": 30}), PluginConfiguration("SmbExploiter", {"smb_download_timeout": 30}),
PluginConfiguration("WmiExploiter", {"smb_download_timeout": 30}), PluginConfiguration("WmiExploiter", {"smb_download_timeout": 30}),
] )
VULNERABILITY_EXPLOITERS = [ VULNERABILITY_EXPLOITERS = (
PluginConfiguration("Log4ShellExploiter", {}), PluginConfiguration("Log4ShellExploiter", {}),
PluginConfiguration("HadoopExploiter", {}), PluginConfiguration("HadoopExploiter", {}),
] )
EXPLOITATION_CONFIGURATION = ExploitationConfiguration( EXPLOITATION_CONFIGURATION = ExploitationConfiguration(
EXPLOITATION_OPTIONS_CONFIGURATION, BRUTE_FORCE_EXPLOITERS, VULNERABILITY_EXPLOITERS EXPLOITATION_OPTIONS_CONFIGURATION, BRUTE_FORCE_EXPLOITERS, VULNERABILITY_EXPLOITERS
@ -116,5 +117,5 @@ DEFAULT_AGENT_CONFIGURATION = AgentConfiguration(
) )
DEFAULT_RANSOMWARE_AGENT_CONFIGURATION = dataclasses.replace( DEFAULT_RANSOMWARE_AGENT_CONFIGURATION = dataclasses.replace(
DEFAULT_AGENT_CONFIGURATION, post_breach_actions=[] DEFAULT_AGENT_CONFIGURATION, post_breach_actions=tuple()
) )

View File

@ -0,0 +1,13 @@
from functools import wraps
from typing import Callable
from common.utils.code_utils import freeze_lists_in_mapping
def freeze_lists(function: Callable):
@wraps(function)
def wrapper(self, data, **kwargs):
data = freeze_lists_in_mapping(data)
return function(self, data, **kwargs)
return wrapper

View File

@ -1,4 +1,5 @@
import queue import queue
from collections.abc import MutableSequence
from typing import Any, List, MutableMapping, TypeVar from typing import Any, List, MutableMapping, TypeVar
T = TypeVar("T") T = TypeVar("T")
@ -48,3 +49,10 @@ def del_key(mapping: MutableMapping[T, Any], key: T):
del mapping[key] del mapping[key]
except KeyError: except KeyError:
pass pass
def freeze_lists_in_mapping(mapping: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
for key, value in mapping.items():
if isinstance(value, MutableSequence):
mapping[key] = tuple(value)
return mapping

View File

@ -73,10 +73,10 @@ def test_scan_target_configuration():
config = schema.load(SCAN_TARGET_CONFIGURATION) config = schema.load(SCAN_TARGET_CONFIGURATION)
assert config.blocked_ips == BLOCKED_IPS assert config.blocked_ips == tuple(BLOCKED_IPS)
assert config.inaccessible_subnets == INACCESSIBLE_SUBNETS assert config.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS)
assert config.local_network_scan == LOCAL_NETWORK_SCAN assert config.local_network_scan == LOCAL_NETWORK_SCAN
assert config.subnets == SUBNETS assert config.subnets == tuple(SUBNETS)
def test_icmp_scan_configuration_schema(): def test_icmp_scan_configuration_schema():
@ -93,7 +93,7 @@ def test_tcp_scan_configuration_schema():
config = schema.load(TCP_SCAN_CONFIGURATION) config = schema.load(TCP_SCAN_CONFIGURATION)
assert config.timeout == TIMEOUT assert config.timeout == TIMEOUT
assert config.ports == PORTS assert config.ports == tuple(PORTS)
def test_network_scan_configuration(): def test_network_scan_configuration():
@ -101,15 +101,15 @@ def test_network_scan_configuration():
config = schema.load(NETWORK_SCAN_CONFIGURATION) config = schema.load(NETWORK_SCAN_CONFIGURATION)
assert config.tcp.ports == TCP_SCAN_CONFIGURATION["ports"] assert config.tcp.ports == tuple(TCP_SCAN_CONFIGURATION["ports"])
assert config.tcp.timeout == TCP_SCAN_CONFIGURATION["timeout"] assert config.tcp.timeout == TCP_SCAN_CONFIGURATION["timeout"]
assert config.icmp.timeout == ICMP_CONFIGURATION["timeout"] assert config.icmp.timeout == ICMP_CONFIGURATION["timeout"]
assert config.fingerprinters[0].name == FINGERPRINTERS[0]["name"] assert config.fingerprinters[0].name == FINGERPRINTERS[0]["name"]
assert config.fingerprinters[0].options == FINGERPRINTERS[0]["options"] assert config.fingerprinters[0].options == FINGERPRINTERS[0]["options"]
assert config.targets.blocked_ips == BLOCKED_IPS assert config.targets.blocked_ips == tuple(BLOCKED_IPS)
assert config.targets.inaccessible_subnets == INACCESSIBLE_SUBNETS assert config.targets.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS)
assert config.targets.local_network_scan == LOCAL_NETWORK_SCAN assert config.targets.local_network_scan == LOCAL_NETWORK_SCAN
assert config.targets.subnets == SUBNETS assert config.targets.subnets == tuple(SUBNETS)
def test_exploitation_options_configuration_schema(): def test_exploitation_options_configuration_schema():
@ -118,7 +118,7 @@ def test_exploitation_options_configuration_schema():
config = schema.load({"http_ports": ports}) config = schema.load({"http_ports": ports})
assert config.http_ports == ports assert config.http_ports == tuple(ports)
def test_exploiter_configuration_schema(): def test_exploiter_configuration_schema():

View File

@ -194,6 +194,7 @@ _make_icmp_scan_configuration # unused method (monkey/common/configuration/agen
_make_tcp_scan_configuration # unused method (monkey/common/configuration/agent_configuration.py:122) _make_tcp_scan_configuration # unused method (monkey/common/configuration/agent_configuration.py:122)
_make_network_scan_configuration # unused method (monkey/common/configuration/agent_configuration.py:110) _make_network_scan_configuration # unused method (monkey/common/configuration/agent_configuration.py:110)
_make_propagation_configuration # unused method (monkey/common/configuration/agent_configuration.py:167) _make_propagation_configuration # unused method (monkey/common/configuration/agent_configuration.py:167)
_make_agent_configuration # \common\agent_configuration\agent_configuration.py:110: unused method '_make_agent_configuration'
# Credentials # Credentials
_strip_credential_type # unused method (monkey/common/credentials/password.py:18) _strip_credential_type # unused method (monkey/common/credentials/password.py:18)