forked from p34709852/monkey
Common: Reorder scan and exploit configuration classes
This commit is contained in:
parent
2c4069ae1b
commit
5845bb73af
|
@ -3,16 +3,16 @@ from .agent_configuration import (
|
||||||
PluginConfigurationSchema,
|
PluginConfigurationSchema,
|
||||||
CustomPBAConfiguration,
|
CustomPBAConfiguration,
|
||||||
CustomPBAConfigurationSchema,
|
CustomPBAConfigurationSchema,
|
||||||
ExploitationOptionsConfiguration,
|
|
||||||
ExploitationOptionsConfigurationSchema,
|
|
||||||
ExploiterConfiguration,
|
|
||||||
ExploiterConfigurationSchema,
|
|
||||||
ExploitationConfiguration,
|
|
||||||
ExploitationConfigurationSchema,
|
|
||||||
ICMPScanConfiguration,
|
ICMPScanConfiguration,
|
||||||
ICMPScanConfigurationSchema,
|
ICMPScanConfigurationSchema,
|
||||||
TCPScanConfiguration,
|
TCPScanConfiguration,
|
||||||
TCPScanConfigurationSchema,
|
TCPScanConfigurationSchema,
|
||||||
ScanTargetConfiguration,
|
ScanTargetConfiguration,
|
||||||
ScanTargetConfigurationSchema,
|
ScanTargetConfigurationSchema,
|
||||||
|
ExploitationOptionsConfiguration,
|
||||||
|
ExploitationOptionsConfigurationSchema,
|
||||||
|
ExploiterConfiguration,
|
||||||
|
ExploiterConfigurationSchema,
|
||||||
|
ExploitationConfiguration,
|
||||||
|
ExploitationConfigurationSchema,
|
||||||
)
|
)
|
||||||
|
|
|
@ -46,48 +46,6 @@ class ExploitationOptionsConfiguration:
|
||||||
http_ports: List[int]
|
http_ports: List[int]
|
||||||
|
|
||||||
|
|
||||||
class ExploitationOptionsConfigurationSchema(Schema):
|
|
||||||
http_ports = fields.List(fields.Int())
|
|
||||||
|
|
||||||
@post_load
|
|
||||||
def _make_exploitation_options_configuration(self, data, **kwargs):
|
|
||||||
return ExploitationOptionsConfiguration(**data)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
|
||||||
class ExploiterConfiguration:
|
|
||||||
name: str
|
|
||||||
options: Dict
|
|
||||||
supported_os: List[OperatingSystems]
|
|
||||||
|
|
||||||
|
|
||||||
class ExploiterConfigurationSchema(Schema):
|
|
||||||
name = fields.Str()
|
|
||||||
options = fields.Mapping()
|
|
||||||
supported_os = fields.List(EnumField(OperatingSystems))
|
|
||||||
|
|
||||||
@post_load
|
|
||||||
def _make_exploiter_configuration(self, data, **kwargs):
|
|
||||||
return ExploiterConfiguration(**data)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
|
||||||
class ExploitationConfiguration:
|
|
||||||
options: ExploitationOptionsConfiguration
|
|
||||||
brute_force: List[ExploiterConfiguration]
|
|
||||||
vulnerability: List[ExploiterConfiguration]
|
|
||||||
|
|
||||||
|
|
||||||
class ExploitationConfigurationSchema(Schema):
|
|
||||||
options = fields.Nested(ExploitationOptionsConfigurationSchema)
|
|
||||||
brute_force = fields.List(fields.Nested(ExploiterConfigurationSchema))
|
|
||||||
vulnerability = fields.List(fields.Nested(ExploiterConfigurationSchema))
|
|
||||||
|
|
||||||
@post_load
|
|
||||||
def _make_exploitation_options_configuration(self, data, **kwargs):
|
|
||||||
return ExploitationConfiguration(**data)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class ScanTargetConfiguration:
|
class ScanTargetConfiguration:
|
||||||
blocked_ips: List[str]
|
blocked_ips: List[str]
|
||||||
|
@ -133,3 +91,45 @@ class TCPScanConfigurationSchema(Schema):
|
||||||
@post_load
|
@post_load
|
||||||
def _make_tcp_scan_configuration(self, data, **kwargs):
|
def _make_tcp_scan_configuration(self, data, **kwargs):
|
||||||
return TCPScanConfiguration(**data)
|
return TCPScanConfiguration(**data)
|
||||||
|
|
||||||
|
|
||||||
|
class ExploitationOptionsConfigurationSchema(Schema):
|
||||||
|
http_ports = fields.List(fields.Int())
|
||||||
|
|
||||||
|
@post_load
|
||||||
|
def _make_exploitation_options_configuration(self, data, **kwargs):
|
||||||
|
return ExploitationOptionsConfiguration(**data)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ExploiterConfiguration:
|
||||||
|
name: str
|
||||||
|
options: Dict
|
||||||
|
supported_os: List[OperatingSystems]
|
||||||
|
|
||||||
|
|
||||||
|
class ExploiterConfigurationSchema(Schema):
|
||||||
|
name = fields.Str()
|
||||||
|
options = fields.Mapping()
|
||||||
|
supported_os = fields.List(EnumField(OperatingSystems))
|
||||||
|
|
||||||
|
@post_load
|
||||||
|
def _make_exploiter_configuration(self, data, **kwargs):
|
||||||
|
return ExploiterConfiguration(**data)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ExploitationConfiguration:
|
||||||
|
options: ExploitationOptionsConfiguration
|
||||||
|
brute_force: List[ExploiterConfiguration]
|
||||||
|
vulnerability: List[ExploiterConfiguration]
|
||||||
|
|
||||||
|
|
||||||
|
class ExploitationConfigurationSchema(Schema):
|
||||||
|
options = fields.Nested(ExploitationOptionsConfigurationSchema)
|
||||||
|
brute_force = fields.List(fields.Nested(ExploiterConfigurationSchema))
|
||||||
|
vulnerability = fields.List(fields.Nested(ExploiterConfigurationSchema))
|
||||||
|
|
||||||
|
@post_load
|
||||||
|
def _make_exploitation_options_configuration(self, data, **kwargs):
|
||||||
|
return ExploitationConfiguration(**data)
|
||||||
|
|
|
@ -45,6 +45,47 @@ def test_custom_pba_configuration_schema():
|
||||||
assert config.windows_filename == windows_filename
|
assert config.windows_filename == windows_filename
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_target_configuration():
|
||||||
|
blocked_ips = ["10.0.0.1", "192.168.1.1"]
|
||||||
|
inaccessible_subnets = ["172.0.0.0/24", "172.2.2.0/24", "192.168.56.0/24"]
|
||||||
|
local_network_scan = True
|
||||||
|
subnets = ["10.0.0.2", "10.0.0.2/16"]
|
||||||
|
scan_target_config = {
|
||||||
|
"blocked_ips": blocked_ips,
|
||||||
|
"inaccessible_subnets": inaccessible_subnets,
|
||||||
|
"local_network_scan": local_network_scan,
|
||||||
|
"subnets": subnets,
|
||||||
|
}
|
||||||
|
schema = ScanTargetConfigurationSchema()
|
||||||
|
|
||||||
|
config = schema.load(scan_target_config)
|
||||||
|
|
||||||
|
assert config.blocked_ips == blocked_ips
|
||||||
|
assert config.inaccessible_subnets == inaccessible_subnets
|
||||||
|
assert config.local_network_scan == local_network_scan
|
||||||
|
assert config.subnets == subnets
|
||||||
|
|
||||||
|
|
||||||
|
def test_icmp_scan_configuration_schema():
|
||||||
|
timeout_ms = 2525
|
||||||
|
schema = ICMPScanConfigurationSchema()
|
||||||
|
|
||||||
|
config = schema.load({"timeout_ms": timeout_ms})
|
||||||
|
|
||||||
|
assert config.timeout_ms == timeout_ms
|
||||||
|
|
||||||
|
|
||||||
|
def test_tcp_scan_configuration_schema():
|
||||||
|
timeout_ms = 2525
|
||||||
|
ports = [8080, 443]
|
||||||
|
schema = TCPScanConfigurationSchema()
|
||||||
|
|
||||||
|
config = schema.load({"timeout_ms": timeout_ms, "ports": ports})
|
||||||
|
|
||||||
|
assert config.timeout_ms == timeout_ms
|
||||||
|
assert config.ports == ports
|
||||||
|
|
||||||
|
|
||||||
def test_exploitation_options_configuration_schema():
|
def test_exploitation_options_configuration_schema():
|
||||||
ports = [1, 2, 3]
|
ports = [1, 2, 3]
|
||||||
schema = ExploitationOptionsConfigurationSchema()
|
schema = ExploitationOptionsConfigurationSchema()
|
||||||
|
@ -98,44 +139,3 @@ def test_exploitation_configuration():
|
||||||
|
|
||||||
assert isinstance(config, ExploitationConfiguration)
|
assert isinstance(config, ExploitationConfiguration)
|
||||||
assert config_dict == exploitation_config
|
assert config_dict == exploitation_config
|
||||||
|
|
||||||
|
|
||||||
def test_scan_target_configuration():
|
|
||||||
blocked_ips = ["10.0.0.1", "192.168.1.1"]
|
|
||||||
inaccessible_subnets = ["172.0.0.0/24", "172.2.2.0/24", "192.168.56.0/24"]
|
|
||||||
local_network_scan = True
|
|
||||||
subnets = ["10.0.0.2", "10.0.0.2/16"]
|
|
||||||
scan_target_config = {
|
|
||||||
"blocked_ips": blocked_ips,
|
|
||||||
"inaccessible_subnets": inaccessible_subnets,
|
|
||||||
"local_network_scan": local_network_scan,
|
|
||||||
"subnets": subnets,
|
|
||||||
}
|
|
||||||
schema = ScanTargetConfigurationSchema()
|
|
||||||
|
|
||||||
config = schema.load(scan_target_config)
|
|
||||||
|
|
||||||
assert config.blocked_ips == blocked_ips
|
|
||||||
assert config.inaccessible_subnets == inaccessible_subnets
|
|
||||||
assert config.local_network_scan == local_network_scan
|
|
||||||
assert config.subnets == subnets
|
|
||||||
|
|
||||||
|
|
||||||
def test_icmp_scan_configuration_schema():
|
|
||||||
timeout_ms = 2525
|
|
||||||
schema = ICMPScanConfigurationSchema()
|
|
||||||
|
|
||||||
config = schema.load({"timeout_ms": timeout_ms})
|
|
||||||
|
|
||||||
assert config.timeout_ms == timeout_ms
|
|
||||||
|
|
||||||
|
|
||||||
def test_tcp_scan_configuration_schema():
|
|
||||||
timeout_ms = 2525
|
|
||||||
ports = [8080, 443]
|
|
||||||
schema = TCPScanConfigurationSchema()
|
|
||||||
|
|
||||||
config = schema.load({"timeout_ms": timeout_ms, "ports": ports})
|
|
||||||
|
|
||||||
assert config.timeout_ms == timeout_ms
|
|
||||||
assert config.ports == ports
|
|
||||||
|
|
Loading…
Reference in New Issue