UT: Modify tests to use new pydantic configurations
TODO: Fix error handling and some assertions (tuple/list stuff)
This commit is contained in:
parent
275237c3f7
commit
6d29829808
|
@ -1,6 +1,3 @@
|
|||
import json
|
||||
from copy import deepcopy
|
||||
|
||||
import pytest
|
||||
from marshmallow import ValidationError
|
||||
from tests.common.example_agent_configuration import (
|
||||
|
@ -28,42 +25,32 @@ from tests.common.example_agent_configuration import (
|
|||
WINDOWS_FILENAME,
|
||||
)
|
||||
|
||||
from common.agent_configuration import AgentConfiguration, InvalidConfigurationError
|
||||
from common.agent_configuration.agent_sub_configuration_schemas import (
|
||||
CustomPBAConfigurationSchema,
|
||||
ExploitationConfigurationSchema,
|
||||
ExploitationOptionsConfigurationSchema,
|
||||
ICMPScanConfigurationSchema,
|
||||
NetworkScanConfigurationSchema,
|
||||
PluginConfigurationSchema,
|
||||
PropagationConfigurationSchema,
|
||||
ScanTargetConfigurationSchema,
|
||||
TCPScanConfigurationSchema,
|
||||
)
|
||||
from common.agent_configuration import InvalidConfigurationError
|
||||
from common.agent_configuration.agent_configuration import Pydantic___AgentConfiguration
|
||||
from common.agent_configuration.agent_sub_configurations import (
|
||||
CustomPBAConfiguration,
|
||||
ExploitationConfiguration,
|
||||
NetworkScanConfiguration,
|
||||
PluginConfiguration,
|
||||
PropagationConfiguration,
|
||||
Pydantic___CustomPBAConfiguration,
|
||||
Pydantic___ExploitationConfiguration,
|
||||
Pydantic___ExploitationOptionsConfiguration,
|
||||
Pydantic___ICMPScanConfiguration,
|
||||
Pydantic___NetworkScanConfiguration,
|
||||
Pydantic___PluginConfiguration,
|
||||
Pydantic___PropagationConfiguration,
|
||||
Pydantic___ScanTargetConfiguration,
|
||||
Pydantic___TCPScanConfiguration,
|
||||
)
|
||||
|
||||
INVALID_PORTS = [[-1, 1, 2], [1, 2, 99999]]
|
||||
|
||||
|
||||
def test_build_plugin_configuration():
|
||||
schema = PluginConfigurationSchema()
|
||||
|
||||
config = schema.load(PLUGIN_CONFIGURATION)
|
||||
config = Pydantic___PluginConfiguration(**PLUGIN_CONFIGURATION)
|
||||
|
||||
assert config.name == PLUGIN_NAME
|
||||
assert config.options == PLUGIN_OPTIONS
|
||||
|
||||
|
||||
def test_custom_pba_configuration_schema():
|
||||
schema = CustomPBAConfigurationSchema()
|
||||
|
||||
config = schema.load(CUSTOM_PBA_CONFIGURATION)
|
||||
config = Pydantic___CustomPBAConfiguration(**CUSTOM_PBA_CONFIGURATION)
|
||||
|
||||
assert config.linux_command == LINUX_COMMAND
|
||||
assert config.linux_filename == LINUX_FILENAME
|
||||
|
@ -72,12 +59,10 @@ def test_custom_pba_configuration_schema():
|
|||
|
||||
|
||||
def test_custom_pba_configuration_schema__empty_filenames_allowed():
|
||||
schema = CustomPBAConfigurationSchema()
|
||||
|
||||
empty_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy()
|
||||
empty_filename_configuration.update({"linux_filename": "", "windows_filename": ""})
|
||||
|
||||
config = schema.load(empty_filename_configuration)
|
||||
config = Pydantic___CustomPBAConfiguration(**empty_filename_configuration)
|
||||
|
||||
assert config.linux_command == LINUX_COMMAND
|
||||
assert config.linux_filename == ""
|
||||
|
@ -87,32 +72,26 @@ def test_custom_pba_configuration_schema__empty_filenames_allowed():
|
|||
|
||||
@pytest.mark.parametrize("linux_filename", ["/", "/abc/", "\0"])
|
||||
def test_custom_pba_configuration_schema__invalid_linux_filename(linux_filename):
|
||||
schema = CustomPBAConfigurationSchema()
|
||||
|
||||
invalid_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy()
|
||||
invalid_filename_configuration["linux_filename"] = linux_filename
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(invalid_filename_configuration)
|
||||
Pydantic___CustomPBAConfiguration(**invalid_filename_configuration)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"windows_filename", ["CON", "CON.txt", "con.abc.pdf", " ", "abc.", "a?b", "d\\e"]
|
||||
)
|
||||
def test_custom_pba_configuration_schema__invalid_windows_filename(windows_filename):
|
||||
schema = CustomPBAConfigurationSchema()
|
||||
|
||||
invalid_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy()
|
||||
invalid_filename_configuration["windows_filename"] = windows_filename
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(invalid_filename_configuration)
|
||||
Pydantic___CustomPBAConfiguration(**invalid_filename_configuration)
|
||||
|
||||
|
||||
def test_scan_target_configuration():
|
||||
schema = ScanTargetConfigurationSchema()
|
||||
|
||||
config = schema.load(SCAN_TARGET_CONFIGURATION)
|
||||
config = Pydantic___ScanTargetConfiguration(**SCAN_TARGET_CONFIGURATION)
|
||||
|
||||
assert config.blocked_ips == tuple(BLOCKED_IPS)
|
||||
assert config.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS)
|
||||
|
@ -121,27 +100,21 @@ def test_scan_target_configuration():
|
|||
|
||||
|
||||
def test_icmp_scan_configuration_schema():
|
||||
schema = ICMPScanConfigurationSchema()
|
||||
|
||||
config = schema.load(ICMP_CONFIGURATION)
|
||||
config = Pydantic___ICMPScanConfiguration(**ICMP_CONFIGURATION)
|
||||
|
||||
assert config.timeout == TIMEOUT
|
||||
|
||||
|
||||
def test_icmp_scan_configuration_schema__negative_timeout():
|
||||
schema = ICMPScanConfigurationSchema()
|
||||
|
||||
negative_timeout_configuration = ICMP_CONFIGURATION.copy()
|
||||
negative_timeout_configuration["timeout"] = -1
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(negative_timeout_configuration)
|
||||
Pydantic___ICMPScanConfiguration(**negative_timeout_configuration)
|
||||
|
||||
|
||||
def test_tcp_scan_configuration_schema():
|
||||
schema = TCPScanConfigurationSchema()
|
||||
|
||||
config = schema.load(TCP_SCAN_CONFIGURATION)
|
||||
config = Pydantic___TCPScanConfiguration(**TCP_SCAN_CONFIGURATION)
|
||||
|
||||
assert config.timeout == TIMEOUT
|
||||
assert config.ports == tuple(PORTS)
|
||||
|
@ -149,29 +122,23 @@ def test_tcp_scan_configuration_schema():
|
|||
|
||||
@pytest.mark.parametrize("ports", INVALID_PORTS)
|
||||
def test_tcp_scan_configuration_schema__ports_out_of_range(ports):
|
||||
schema = TCPScanConfigurationSchema()
|
||||
|
||||
invalid_ports_configuration = TCP_SCAN_CONFIGURATION.copy()
|
||||
invalid_ports_configuration["ports"] = ports
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(invalid_ports_configuration)
|
||||
Pydantic___TCPScanConfiguration(**invalid_ports_configuration)
|
||||
|
||||
|
||||
def test_tcp_scan_configuration_schema__negative_timeout():
|
||||
schema = TCPScanConfigurationSchema()
|
||||
|
||||
negative_timeout_configuration = TCP_SCAN_CONFIGURATION.copy()
|
||||
negative_timeout_configuration["timeout"] = -1
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(negative_timeout_configuration)
|
||||
Pydantic___TCPScanConfiguration(**negative_timeout_configuration)
|
||||
|
||||
|
||||
def test_network_scan_configuration():
|
||||
schema = NetworkScanConfigurationSchema()
|
||||
|
||||
config = schema.load(NETWORK_SCAN_CONFIGURATION)
|
||||
config = Pydantic___NetworkScanConfiguration(**NETWORK_SCAN_CONFIGURATION)
|
||||
|
||||
assert config.tcp.ports == tuple(TCP_SCAN_CONFIGURATION["ports"])
|
||||
assert config.tcp.timeout == TCP_SCAN_CONFIGURATION["timeout"]
|
||||
|
@ -186,79 +153,69 @@ def test_network_scan_configuration():
|
|||
|
||||
def test_exploitation_options_configuration_schema():
|
||||
ports = [1, 2, 3]
|
||||
schema = ExploitationOptionsConfigurationSchema()
|
||||
|
||||
config = schema.load({"http_ports": ports})
|
||||
config = Pydantic___ExploitationOptionsConfiguration(**{"http_ports": ports})
|
||||
|
||||
assert config.http_ports == tuple(ports)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ports", INVALID_PORTS)
|
||||
def test_exploitation_options_configuration_schema__ports_out_of_range(ports):
|
||||
schema = ExploitationOptionsConfigurationSchema()
|
||||
|
||||
invalid_ports_configuration = {"http_ports": ports}
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(invalid_ports_configuration)
|
||||
Pydantic___ExploitationOptionsConfiguration(**invalid_ports_configuration)
|
||||
|
||||
|
||||
def test_exploiter_configuration_schema():
|
||||
name = "bond"
|
||||
options = {"gun": "Walther PPK", "car": "Aston Martin DB5"}
|
||||
schema = PluginConfigurationSchema()
|
||||
|
||||
config = schema.load({"name": name, "options": options})
|
||||
config = Pydantic___PluginConfiguration(**{"name": name, "options": options})
|
||||
|
||||
assert config.name == name
|
||||
assert config.options == options
|
||||
|
||||
|
||||
def test_exploitation_configuration():
|
||||
schema = ExploitationConfigurationSchema()
|
||||
config = Pydantic___ExploitationConfiguration(**EXPLOITATION_CONFIGURATION)
|
||||
config_dict = config.dict()
|
||||
|
||||
config = schema.load(EXPLOITATION_CONFIGURATION)
|
||||
config_dict = schema.dump(config)
|
||||
|
||||
assert isinstance(config, ExploitationConfiguration)
|
||||
assert isinstance(config, Pydantic___ExploitationConfiguration)
|
||||
assert config_dict == EXPLOITATION_CONFIGURATION
|
||||
|
||||
|
||||
def test_propagation_configuration():
|
||||
schema = PropagationConfigurationSchema()
|
||||
config = Pydantic___PropagationConfiguration(**PROPAGATION_CONFIGURATION)
|
||||
config_dict = config.dict()
|
||||
|
||||
config = schema.load(PROPAGATION_CONFIGURATION)
|
||||
config_dict = schema.dump(config)
|
||||
|
||||
assert isinstance(config, PropagationConfiguration)
|
||||
assert isinstance(config.network_scan, NetworkScanConfiguration)
|
||||
assert isinstance(config.exploitation, ExploitationConfiguration)
|
||||
assert isinstance(config, Pydantic___PropagationConfiguration)
|
||||
assert isinstance(config.network_scan, Pydantic___NetworkScanConfiguration)
|
||||
assert isinstance(config.exploitation, Pydantic___ExploitationConfiguration)
|
||||
assert config.maximum_depth == 5
|
||||
assert config_dict == PROPAGATION_CONFIGURATION
|
||||
|
||||
|
||||
def test_propagation_configuration__invalid_maximum_depth():
|
||||
schema = PropagationConfigurationSchema()
|
||||
|
||||
negative_maximum_depth_configuration = PROPAGATION_CONFIGURATION.copy()
|
||||
negative_maximum_depth_configuration["maximum_depth"] = -1
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
schema.load(negative_maximum_depth_configuration)
|
||||
Pydantic___PropagationConfiguration(**negative_maximum_depth_configuration)
|
||||
|
||||
|
||||
def test_agent_configuration():
|
||||
config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
|
||||
config_json = AgentConfiguration.to_json(config)
|
||||
config = Pydantic___AgentConfiguration(**AGENT_CONFIGURATION)
|
||||
config_dict = config.dict()
|
||||
|
||||
assert isinstance(config, AgentConfiguration)
|
||||
assert isinstance(config, Pydantic___AgentConfiguration)
|
||||
assert config.keep_tunnel_open_time == 30
|
||||
assert isinstance(config.custom_pbas, CustomPBAConfiguration)
|
||||
assert isinstance(config.post_breach_actions[0], PluginConfiguration)
|
||||
assert isinstance(config.credential_collectors[0], PluginConfiguration)
|
||||
assert isinstance(config.payloads[0], PluginConfiguration)
|
||||
assert isinstance(config.propagation, PropagationConfiguration)
|
||||
assert json.loads(config_json) == AGENT_CONFIGURATION
|
||||
assert isinstance(config.custom_pbas, Pydantic___CustomPBAConfiguration)
|
||||
assert isinstance(config.post_breach_actions[0], Pydantic___PluginConfiguration)
|
||||
assert isinstance(config.credential_collectors[0], Pydantic___PluginConfiguration)
|
||||
assert isinstance(config.payloads[0], Pydantic___PluginConfiguration)
|
||||
assert isinstance(config.propagation, Pydantic___PropagationConfiguration)
|
||||
assert config_dict == AGENT_CONFIGURATION
|
||||
|
||||
|
||||
def test_agent_configuration__negative_keep_tunnel_open_time():
|
||||
|
@ -266,41 +223,12 @@ def test_agent_configuration__negative_keep_tunnel_open_time():
|
|||
negative_keep_tunnel_open_time_configuration["keep_tunnel_open_time"] = -1
|
||||
|
||||
with pytest.raises(InvalidConfigurationError):
|
||||
AgentConfiguration.from_mapping(negative_keep_tunnel_open_time_configuration)
|
||||
Pydantic___AgentConfiguration(**negative_keep_tunnel_open_time_configuration)
|
||||
|
||||
|
||||
def test_incorrect_type():
|
||||
valid_config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
|
||||
valid_config = Pydantic___AgentConfiguration(**AGENT_CONFIGURATION)
|
||||
with pytest.raises(InvalidConfigurationError):
|
||||
valid_config_dict = valid_config.__dict__
|
||||
valid_config_dict["keep_tunnel_open_time"] = "not_a_float"
|
||||
AgentConfiguration(**valid_config_dict)
|
||||
|
||||
|
||||
def test_to_from_mapping():
|
||||
config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
|
||||
|
||||
assert AgentConfiguration.to_mapping(config) == AGENT_CONFIGURATION
|
||||
|
||||
|
||||
def test_from_mapping__invalid_data():
|
||||
dict_ = deepcopy(AGENT_CONFIGURATION)
|
||||
dict_["payloads"] = "payloads"
|
||||
|
||||
with pytest.raises(InvalidConfigurationError):
|
||||
AgentConfiguration.from_mapping(dict_)
|
||||
|
||||
|
||||
def test_to_from_json():
|
||||
original_config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION)
|
||||
config_json = AgentConfiguration.to_json(original_config)
|
||||
|
||||
assert AgentConfiguration.from_json(config_json) == original_config
|
||||
|
||||
|
||||
def test_from_json__invalid_data():
|
||||
invalid_dict = deepcopy(AGENT_CONFIGURATION)
|
||||
invalid_dict["payloads"] = "payloads"
|
||||
|
||||
with pytest.raises(InvalidConfigurationError):
|
||||
AgentConfiguration.from_json(json.dumps(invalid_dict))
|
||||
Pydantic___AgentConfiguration(**valid_config_dict)
|
||||
|
|
Loading…
Reference in New Issue