forked from p15670423/monkey
Island: Fix serialization/deserialization of Machine.network_services
This commit is contained in:
parent
d8cf5d33dd
commit
8799a60f47
|
@ -1,19 +1,45 @@
|
|||
import json
|
||||
from ipaddress import IPv4Interface
|
||||
from typing import Mapping, Optional, Sequence
|
||||
from typing import Any, Dict, Mapping, Optional, Sequence
|
||||
|
||||
from pydantic import Field, validator
|
||||
|
||||
from common import OperatingSystem
|
||||
from common.base_models import MutableInfectionMonkeyBaseModel
|
||||
from common.base_models import MutableInfectionMonkeyBaseModel, MutableInfectionMonkeyModelConfig
|
||||
from common.transforms import make_immutable_sequence
|
||||
from common.types import HardwareID, NetworkService, SocketAddress
|
||||
|
||||
from . import MachineID
|
||||
|
||||
|
||||
def _serialize_network_services(machine_dict: Dict, *, default):
|
||||
machine_dict["network_services"] = {
|
||||
str(addr): val for addr, val in machine_dict["network_services"].items()
|
||||
}
|
||||
return json.dumps(machine_dict, default=default)
|
||||
|
||||
|
||||
class Machine(MutableInfectionMonkeyBaseModel):
|
||||
"""Represents machines, VMs, or other network nodes discovered by Infection Monkey"""
|
||||
|
||||
class Config(MutableInfectionMonkeyModelConfig):
|
||||
json_dumps = _serialize_network_services
|
||||
|
||||
@validator("network_services", pre=True)
|
||||
def _socketaddress_from_string(cls, v: Any) -> Any:
|
||||
if not isinstance(v, Mapping):
|
||||
# Let pydantic's type validation handle this
|
||||
return v
|
||||
|
||||
new_network_services = {}
|
||||
for addr, service in v.items():
|
||||
if isinstance(addr, SocketAddress):
|
||||
new_network_services[addr] = service
|
||||
else:
|
||||
new_network_services[SocketAddress.from_string(addr)] = service
|
||||
|
||||
return new_network_services
|
||||
|
||||
id: MachineID = Field(..., allow_mutation=False)
|
||||
"""Uniquely identifies the machine within the island"""
|
||||
|
||||
|
@ -35,7 +61,7 @@ class Machine(MutableInfectionMonkeyBaseModel):
|
|||
hostname: str = ""
|
||||
"""The hostname of the machine"""
|
||||
|
||||
network_services: Mapping[SocketAddress, NetworkService]
|
||||
network_services: Mapping[SocketAddress, NetworkService] = Field(default_factory=dict)
|
||||
"""All network services found running on the machine"""
|
||||
|
||||
_make_immutable_sequence = validator("network_interfaces", pre=True, allow_reuse=True)(
|
||||
|
|
|
@ -6,8 +6,12 @@ from typing import MutableSequence
|
|||
import pytest
|
||||
|
||||
from common import OperatingSystem
|
||||
from common.types import NetworkService, SocketAddress
|
||||
from monkey_island.cc.models import Machine
|
||||
|
||||
SOCKET_ADDR_1 = "192.168.1.10:5000"
|
||||
SOCKET_ADDR_2 = "192.168.1.10:8080"
|
||||
|
||||
MACHINE_OBJECT_DICT = MappingProxyType(
|
||||
{
|
||||
"id": 1,
|
||||
|
@ -17,6 +21,10 @@ MACHINE_OBJECT_DICT = MappingProxyType(
|
|||
"operating_system": OperatingSystem.WINDOWS,
|
||||
"operating_system_version": "eXtra Problems",
|
||||
"hostname": "my.host",
|
||||
"network_services": {
|
||||
SocketAddress.from_string(SOCKET_ADDR_1): NetworkService.UNKNOWN,
|
||||
SocketAddress.from_string(SOCKET_ADDR_2): NetworkService.UNKNOWN,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -26,9 +34,13 @@ MACHINE_SIMPLE_DICT = MappingProxyType(
|
|||
"hardware_id": uuid.getnode(),
|
||||
"island": True,
|
||||
"network_interfaces": ["10.0.0.1/24", "192.168.5.32/16"],
|
||||
"operating_system": "windows",
|
||||
"operating_system": OperatingSystem.WINDOWS.value,
|
||||
"operating_system_version": "eXtra Problems",
|
||||
"hostname": "my.host",
|
||||
"network_services": {
|
||||
SOCKET_ADDR_1: NetworkService.UNKNOWN.value,
|
||||
SOCKET_ADDR_2: NetworkService.UNKNOWN.value,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -60,6 +72,11 @@ def test_to_dict():
|
|||
("operating_system", "bsd"),
|
||||
("operating_system_version", {}),
|
||||
("hostname", []),
|
||||
("network_services", 42),
|
||||
("network_services", [SOCKET_ADDR_1]),
|
||||
("network_services", None),
|
||||
("network_services", {SOCKET_ADDR_1: "Hello"}),
|
||||
("network_services", {SocketAddress.from_string(SOCKET_ADDR_1): "Hello"}),
|
||||
],
|
||||
)
|
||||
def test_construct_invalid_field__type_error(key, value):
|
||||
|
@ -77,6 +94,7 @@ def test_construct_invalid_field__type_error(key, value):
|
|||
("hardware_id", 0),
|
||||
("network_interfaces", [1, "stuff", 3]),
|
||||
("network_interfaces", ["10.0.0.1/16", 2, []]),
|
||||
("network_services", {"192.168.": NetworkService.UNKNOWN.value}),
|
||||
],
|
||||
)
|
||||
def test_construct_invalid_field__value_error(key, value):
|
||||
|
@ -230,3 +248,19 @@ def test_hostname_default_value():
|
|||
m = Machine(**missing_hostname_dict)
|
||||
|
||||
assert m.hostname == ""
|
||||
|
||||
|
||||
def test_set_network_services_validates():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
m.network_services = {"not-an-ip": NetworkService.UNKNOWN.value}
|
||||
|
||||
|
||||
def test_set_network_services_default_value():
|
||||
missing_network_services = MACHINE_OBJECT_DICT.copy()
|
||||
del missing_network_services["network_services"]
|
||||
|
||||
m = Machine(**missing_network_services)
|
||||
|
||||
assert m.network_services == {}
|
||||
|
|
|
@ -343,4 +343,6 @@ CC_TUNNEL
|
|||
# TODO remove when 2267 is done
|
||||
NetworkServiceNameEnum.UNKNOWN
|
||||
Machine.network_services
|
||||
Machine.config.json_dumps
|
||||
Machine._socketaddress_from_string
|
||||
Node.tcp_connections
|
||||
|
|
Loading…
Reference in New Issue