From 6d29829808d69229439f4fffdd6876af4819e917 Mon Sep 17 00:00:00 2001 From: Shreya Malviya Date: Mon, 29 Aug 2022 14:19:25 +0530 Subject: [PATCH] UT: Modify tests to use new pydantic configurations TODO: Fix error handling and some assertions (tuple/list stuff) --- .../test_agent_configuration.py | 166 +++++------------- 1 file changed, 47 insertions(+), 119 deletions(-) diff --git a/monkey/tests/unit_tests/common/agent_configuration/test_agent_configuration.py b/monkey/tests/unit_tests/common/agent_configuration/test_agent_configuration.py index 22a09b87b..883b5f6d2 100644 --- a/monkey/tests/unit_tests/common/agent_configuration/test_agent_configuration.py +++ b/monkey/tests/unit_tests/common/agent_configuration/test_agent_configuration.py @@ -1,6 +1,3 @@ -import json -from copy import deepcopy - import pytest from marshmallow import ValidationError from tests.common.example_agent_configuration import ( @@ -28,42 +25,32 @@ from tests.common.example_agent_configuration import ( WINDOWS_FILENAME, ) -from common.agent_configuration import AgentConfiguration, InvalidConfigurationError -from common.agent_configuration.agent_sub_configuration_schemas import ( - CustomPBAConfigurationSchema, - ExploitationConfigurationSchema, - ExploitationOptionsConfigurationSchema, - ICMPScanConfigurationSchema, - NetworkScanConfigurationSchema, - PluginConfigurationSchema, - PropagationConfigurationSchema, - ScanTargetConfigurationSchema, - TCPScanConfigurationSchema, -) +from common.agent_configuration import InvalidConfigurationError +from common.agent_configuration.agent_configuration import Pydantic___AgentConfiguration from common.agent_configuration.agent_sub_configurations import ( - CustomPBAConfiguration, - ExploitationConfiguration, - NetworkScanConfiguration, - PluginConfiguration, - PropagationConfiguration, + Pydantic___CustomPBAConfiguration, + Pydantic___ExploitationConfiguration, + Pydantic___ExploitationOptionsConfiguration, + Pydantic___ICMPScanConfiguration, + Pydantic___NetworkScanConfiguration, + Pydantic___PluginConfiguration, + Pydantic___PropagationConfiguration, + Pydantic___ScanTargetConfiguration, + Pydantic___TCPScanConfiguration, ) INVALID_PORTS = [[-1, 1, 2], [1, 2, 99999]] def test_build_plugin_configuration(): - schema = PluginConfigurationSchema() - - config = schema.load(PLUGIN_CONFIGURATION) + config = Pydantic___PluginConfiguration(**PLUGIN_CONFIGURATION) assert config.name == PLUGIN_NAME assert config.options == PLUGIN_OPTIONS def test_custom_pba_configuration_schema(): - schema = CustomPBAConfigurationSchema() - - config = schema.load(CUSTOM_PBA_CONFIGURATION) + config = Pydantic___CustomPBAConfiguration(**CUSTOM_PBA_CONFIGURATION) assert config.linux_command == LINUX_COMMAND assert config.linux_filename == LINUX_FILENAME @@ -72,12 +59,10 @@ def test_custom_pba_configuration_schema(): def test_custom_pba_configuration_schema__empty_filenames_allowed(): - schema = CustomPBAConfigurationSchema() - empty_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy() empty_filename_configuration.update({"linux_filename": "", "windows_filename": ""}) - config = schema.load(empty_filename_configuration) + config = Pydantic___CustomPBAConfiguration(**empty_filename_configuration) assert config.linux_command == LINUX_COMMAND assert config.linux_filename == "" @@ -87,32 +72,26 @@ def test_custom_pba_configuration_schema__empty_filenames_allowed(): @pytest.mark.parametrize("linux_filename", ["/", "/abc/", "\0"]) def test_custom_pba_configuration_schema__invalid_linux_filename(linux_filename): - schema = CustomPBAConfigurationSchema() - invalid_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy() invalid_filename_configuration["linux_filename"] = linux_filename with pytest.raises(ValidationError): - schema.load(invalid_filename_configuration) + Pydantic___CustomPBAConfiguration(**invalid_filename_configuration) @pytest.mark.parametrize( "windows_filename", ["CON", "CON.txt", "con.abc.pdf", " ", "abc.", "a?b", "d\\e"] ) def test_custom_pba_configuration_schema__invalid_windows_filename(windows_filename): - schema = CustomPBAConfigurationSchema() - invalid_filename_configuration = CUSTOM_PBA_CONFIGURATION.copy() invalid_filename_configuration["windows_filename"] = windows_filename with pytest.raises(ValidationError): - schema.load(invalid_filename_configuration) + Pydantic___CustomPBAConfiguration(**invalid_filename_configuration) def test_scan_target_configuration(): - schema = ScanTargetConfigurationSchema() - - config = schema.load(SCAN_TARGET_CONFIGURATION) + config = Pydantic___ScanTargetConfiguration(**SCAN_TARGET_CONFIGURATION) assert config.blocked_ips == tuple(BLOCKED_IPS) assert config.inaccessible_subnets == tuple(INACCESSIBLE_SUBNETS) @@ -121,27 +100,21 @@ def test_scan_target_configuration(): def test_icmp_scan_configuration_schema(): - schema = ICMPScanConfigurationSchema() - - config = schema.load(ICMP_CONFIGURATION) + config = Pydantic___ICMPScanConfiguration(**ICMP_CONFIGURATION) assert config.timeout == TIMEOUT def test_icmp_scan_configuration_schema__negative_timeout(): - schema = ICMPScanConfigurationSchema() - negative_timeout_configuration = ICMP_CONFIGURATION.copy() negative_timeout_configuration["timeout"] = -1 with pytest.raises(ValidationError): - schema.load(negative_timeout_configuration) + Pydantic___ICMPScanConfiguration(**negative_timeout_configuration) def test_tcp_scan_configuration_schema(): - schema = TCPScanConfigurationSchema() - - config = schema.load(TCP_SCAN_CONFIGURATION) + config = Pydantic___TCPScanConfiguration(**TCP_SCAN_CONFIGURATION) assert config.timeout == TIMEOUT assert config.ports == tuple(PORTS) @@ -149,29 +122,23 @@ def test_tcp_scan_configuration_schema(): @pytest.mark.parametrize("ports", INVALID_PORTS) def test_tcp_scan_configuration_schema__ports_out_of_range(ports): - schema = TCPScanConfigurationSchema() - invalid_ports_configuration = TCP_SCAN_CONFIGURATION.copy() invalid_ports_configuration["ports"] = ports with pytest.raises(ValidationError): - schema.load(invalid_ports_configuration) + Pydantic___TCPScanConfiguration(**invalid_ports_configuration) def test_tcp_scan_configuration_schema__negative_timeout(): - schema = TCPScanConfigurationSchema() - negative_timeout_configuration = TCP_SCAN_CONFIGURATION.copy() negative_timeout_configuration["timeout"] = -1 with pytest.raises(ValidationError): - schema.load(negative_timeout_configuration) + Pydantic___TCPScanConfiguration(**negative_timeout_configuration) def test_network_scan_configuration(): - schema = NetworkScanConfigurationSchema() - - config = schema.load(NETWORK_SCAN_CONFIGURATION) + config = Pydantic___NetworkScanConfiguration(**NETWORK_SCAN_CONFIGURATION) assert config.tcp.ports == tuple(TCP_SCAN_CONFIGURATION["ports"]) assert config.tcp.timeout == TCP_SCAN_CONFIGURATION["timeout"] @@ -186,79 +153,69 @@ def test_network_scan_configuration(): def test_exploitation_options_configuration_schema(): ports = [1, 2, 3] - schema = ExploitationOptionsConfigurationSchema() - config = schema.load({"http_ports": ports}) + config = Pydantic___ExploitationOptionsConfiguration(**{"http_ports": ports}) assert config.http_ports == tuple(ports) @pytest.mark.parametrize("ports", INVALID_PORTS) def test_exploitation_options_configuration_schema__ports_out_of_range(ports): - schema = ExploitationOptionsConfigurationSchema() - invalid_ports_configuration = {"http_ports": ports} with pytest.raises(ValidationError): - schema.load(invalid_ports_configuration) + Pydantic___ExploitationOptionsConfiguration(**invalid_ports_configuration) def test_exploiter_configuration_schema(): name = "bond" options = {"gun": "Walther PPK", "car": "Aston Martin DB5"} - schema = PluginConfigurationSchema() - config = schema.load({"name": name, "options": options}) + config = Pydantic___PluginConfiguration(**{"name": name, "options": options}) assert config.name == name assert config.options == options def test_exploitation_configuration(): - schema = ExploitationConfigurationSchema() + config = Pydantic___ExploitationConfiguration(**EXPLOITATION_CONFIGURATION) + config_dict = config.dict() - config = schema.load(EXPLOITATION_CONFIGURATION) - config_dict = schema.dump(config) - - assert isinstance(config, ExploitationConfiguration) + assert isinstance(config, Pydantic___ExploitationConfiguration) assert config_dict == EXPLOITATION_CONFIGURATION def test_propagation_configuration(): - schema = PropagationConfigurationSchema() + config = Pydantic___PropagationConfiguration(**PROPAGATION_CONFIGURATION) + config_dict = config.dict() - config = schema.load(PROPAGATION_CONFIGURATION) - config_dict = schema.dump(config) - - assert isinstance(config, PropagationConfiguration) - assert isinstance(config.network_scan, NetworkScanConfiguration) - assert isinstance(config.exploitation, ExploitationConfiguration) + assert isinstance(config, Pydantic___PropagationConfiguration) + assert isinstance(config.network_scan, Pydantic___NetworkScanConfiguration) + assert isinstance(config.exploitation, Pydantic___ExploitationConfiguration) assert config.maximum_depth == 5 assert config_dict == PROPAGATION_CONFIGURATION def test_propagation_configuration__invalid_maximum_depth(): - schema = PropagationConfigurationSchema() - negative_maximum_depth_configuration = PROPAGATION_CONFIGURATION.copy() negative_maximum_depth_configuration["maximum_depth"] = -1 with pytest.raises(ValidationError): - schema.load(negative_maximum_depth_configuration) + Pydantic___PropagationConfiguration(**negative_maximum_depth_configuration) def test_agent_configuration(): - config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION) - config_json = AgentConfiguration.to_json(config) + config = Pydantic___AgentConfiguration(**AGENT_CONFIGURATION) + config_dict = config.dict() - assert isinstance(config, AgentConfiguration) + assert isinstance(config, Pydantic___AgentConfiguration) assert config.keep_tunnel_open_time == 30 - assert isinstance(config.custom_pbas, CustomPBAConfiguration) - assert isinstance(config.post_breach_actions[0], PluginConfiguration) - assert isinstance(config.credential_collectors[0], PluginConfiguration) - assert isinstance(config.payloads[0], PluginConfiguration) - assert isinstance(config.propagation, PropagationConfiguration) - assert json.loads(config_json) == AGENT_CONFIGURATION + assert isinstance(config.custom_pbas, Pydantic___CustomPBAConfiguration) + assert isinstance(config.post_breach_actions[0], Pydantic___PluginConfiguration) + assert isinstance(config.credential_collectors[0], Pydantic___PluginConfiguration) + assert isinstance(config.payloads[0], Pydantic___PluginConfiguration) + assert isinstance(config.propagation, Pydantic___PropagationConfiguration) + assert config_dict == AGENT_CONFIGURATION def test_agent_configuration__negative_keep_tunnel_open_time(): @@ -266,41 +223,12 @@ def test_agent_configuration__negative_keep_tunnel_open_time(): negative_keep_tunnel_open_time_configuration["keep_tunnel_open_time"] = -1 with pytest.raises(InvalidConfigurationError): - AgentConfiguration.from_mapping(negative_keep_tunnel_open_time_configuration) + Pydantic___AgentConfiguration(**negative_keep_tunnel_open_time_configuration) def test_incorrect_type(): - valid_config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION) + valid_config = Pydantic___AgentConfiguration(**AGENT_CONFIGURATION) with pytest.raises(InvalidConfigurationError): valid_config_dict = valid_config.__dict__ valid_config_dict["keep_tunnel_open_time"] = "not_a_float" - AgentConfiguration(**valid_config_dict) - - -def test_to_from_mapping(): - config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION) - - assert AgentConfiguration.to_mapping(config) == AGENT_CONFIGURATION - - -def test_from_mapping__invalid_data(): - dict_ = deepcopy(AGENT_CONFIGURATION) - dict_["payloads"] = "payloads" - - with pytest.raises(InvalidConfigurationError): - AgentConfiguration.from_mapping(dict_) - - -def test_to_from_json(): - original_config = AgentConfiguration.from_mapping(AGENT_CONFIGURATION) - config_json = AgentConfiguration.to_json(original_config) - - assert AgentConfiguration.from_json(config_json) == original_config - - -def test_from_json__invalid_data(): - invalid_dict = deepcopy(AGENT_CONFIGURATION) - invalid_dict["payloads"] = "payloads" - - with pytest.raises(InvalidConfigurationError): - AgentConfiguration.from_json(json.dumps(invalid_dict)) + Pydantic___AgentConfiguration(**valid_config_dict)