Merge branch 'machine-agent-node-models' into develop
This commit is contained in:
commit
19d6333f2c
|
@ -34,6 +34,7 @@ pymongo = "*"
|
|||
cryptography = "*"
|
||||
semantic-version = "*"
|
||||
pypubsub = "*"
|
||||
pydantic = "*"
|
||||
|
||||
[dev-packages]
|
||||
virtualenv = ">=20.0.26"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "82ce85b7c4c14c663026a659d3f35f869af944ba5dd52ed8cfe6ff83b20e9633"
|
||||
"sha256": "2793120cfe1c233af7954e784b18ea46fbb8a0630dc1df7f8f977ae0dc5d7c52"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
|
@ -540,6 +540,47 @@
|
|||
],
|
||||
"version": "==2.21"
|
||||
},
|
||||
"pydantic": {
|
||||
"hashes": [
|
||||
"sha256:1061c6ee6204f4f5a27133126854948e3b3d51fcc16ead2e5d04378c199b2f44",
|
||||
"sha256:19b5686387ea0d1ea52ecc4cffb71abb21702c5e5b2ac626fd4dbaa0834aa49d",
|
||||
"sha256:2bd446bdb7755c3a94e56d7bdfd3ee92396070efa8ef3a34fab9579fe6aa1d84",
|
||||
"sha256:328558c9f2eed77bd8fffad3cef39dbbe3edc7044517f4625a769d45d4cf7555",
|
||||
"sha256:32e0b4fb13ad4db4058a7c3c80e2569adbd810c25e6ca3bbd8b2a9cc2cc871d7",
|
||||
"sha256:3ee0d69b2a5b341fc7927e92cae7ddcfd95e624dfc4870b32a85568bd65e6131",
|
||||
"sha256:4aafd4e55e8ad5bd1b19572ea2df546ccace7945853832bb99422a79c70ce9b8",
|
||||
"sha256:4b3946f87e5cef3ba2e7bd3a4eb5a20385fe36521d6cc1ebf3c08a6697c6cfb3",
|
||||
"sha256:4de71c718c9756d679420c69f216776c2e977459f77e8f679a4a961dc7304a56",
|
||||
"sha256:5565a49effe38d51882cb7bac18bda013cdb34d80ac336428e8908f0b72499b0",
|
||||
"sha256:5803ad846cdd1ed0d97eb00292b870c29c1f03732a010e66908ff48a762f20e4",
|
||||
"sha256:5da164119602212a3fe7e3bc08911a89db4710ae51444b4224c2382fd09ad453",
|
||||
"sha256:615661bfc37e82ac677543704437ff737418e4ea04bef9cf11c6d27346606044",
|
||||
"sha256:78a4d6bdfd116a559aeec9a4cfe77dda62acc6233f8b56a716edad2651023e5e",
|
||||
"sha256:7d0f183b305629765910eaad707800d2f47c6ac5bcfb8c6397abdc30b69eeb15",
|
||||
"sha256:7ead3cd020d526f75b4188e0a8d71c0dbbe1b4b6b5dc0ea775a93aca16256aeb",
|
||||
"sha256:84d76ecc908d917f4684b354a39fd885d69dd0491be175f3465fe4b59811c001",
|
||||
"sha256:8cb0bc509bfb71305d7a59d00163d5f9fc4530f0881ea32c74ff4f74c85f3d3d",
|
||||
"sha256:91089b2e281713f3893cd01d8e576771cd5bfdfbff5d0ed95969f47ef6d676c3",
|
||||
"sha256:9c9e04a6cdb7a363d7cb3ccf0efea51e0abb48e180c0d31dca8d247967d85c6e",
|
||||
"sha256:a8c5360a0297a713b4123608a7909e6869e1b56d0e96eb0d792c27585d40757f",
|
||||
"sha256:afacf6d2a41ed91fc631bade88b1d319c51ab5418870802cedb590b709c5ae3c",
|
||||
"sha256:b34ba24f3e2d0b39b43f0ca62008f7ba962cff51efa56e64ee25c4af6eed987b",
|
||||
"sha256:bd67cb2c2d9602ad159389c29e4ca964b86fa2f35c2faef54c3eb28b4efd36c8",
|
||||
"sha256:c0f5e142ef8217019e3eef6ae1b6b55f09a7a15972958d44fbd228214cede567",
|
||||
"sha256:cdb4272678db803ddf94caa4f94f8672e9a46bae4a44f167095e4d06fec12979",
|
||||
"sha256:d70916235d478404a3fa8c997b003b5f33aeac4686ac1baa767234a0f8ac2326",
|
||||
"sha256:d8ce3fb0841763a89322ea0432f1f59a2d3feae07a63ea2c958b2315e1ae8adb",
|
||||
"sha256:e0b214e57623a535936005797567231a12d0da0c29711eb3514bc2b3cd008d0f",
|
||||
"sha256:e631c70c9280e3129f071635b81207cad85e6c08e253539467e4ead0e5b219aa",
|
||||
"sha256:e78578f0c7481c850d1c969aca9a65405887003484d24f6110458fb02cca7747",
|
||||
"sha256:f0ca86b525264daa5f6b192f216a0d1e860b7383e3da1c65a1908f9c02f42801",
|
||||
"sha256:f1a68f4f65a9ee64b6ccccb5bf7e17db07caebd2730109cb8a95863cfa9c4e55",
|
||||
"sha256:fafe841be1103f340a24977f61dee76172e4ae5f647ab9e7fd1e1fca51524f08",
|
||||
"sha256:ff68fc85355532ea77559ede81f35fff79a6a5543477e168ab3a381887caea76"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==1.9.2"
|
||||
},
|
||||
"pyinstaller": {
|
||||
"hashes": [
|
||||
"sha256:24035eb9fffa2e3e288b4c1c9710043819efc7203cae5c8c573bec16f4a8e98f",
|
||||
|
@ -833,7 +874,7 @@
|
|||
"sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02",
|
||||
"sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"
|
||||
],
|
||||
"markers": "python_version < '3.8'",
|
||||
"markers": "python_version >= '3.7'",
|
||||
"version": "==4.3.0"
|
||||
},
|
||||
"urllib3": {
|
||||
|
@ -959,7 +1000,7 @@
|
|||
"sha256:7614553711ee97490f732126dc077f8d0ae084ebc6a96e23db1482afabdb2c51",
|
||||
"sha256:ff56f4892c1c4bf0d814575ea23471c230d544203c7748e8c68f0089478d48eb"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"markers": "python_full_version >= '3.6.0'",
|
||||
"version": "==2.10.3"
|
||||
},
|
||||
"black": {
|
||||
|
@ -1251,7 +1292,7 @@
|
|||
"sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159",
|
||||
"sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"markers": "python_full_version >= '3.6.0'",
|
||||
"version": "==1.0.0"
|
||||
},
|
||||
"py": {
|
||||
|
@ -1283,7 +1324,7 @@
|
|||
"sha256:5eb116118f9612ff1ee89ac96437bb6b49e8f04d8a13b514ba26f620208e26eb",
|
||||
"sha256:dc9c10fb40944260f6ed4c688ece0cd2048414940f1cea51b8b226318411c519"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"markers": "python_full_version >= '3.6.0'",
|
||||
"version": "==2.12.0"
|
||||
},
|
||||
"pyparsing": {
|
||||
|
@ -1407,7 +1448,7 @@
|
|||
"sha256:d412243dfb797ae3ec2b59eca0e52dac12e75a241bf0e4eb861e450d06c6ed07",
|
||||
"sha256:f5f8bb2d0d629f398bf47d0d69c07bc13b65f75a81ad9e2f71a63d4b7a2f6db2"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"markers": "python_full_version >= '3.6.0'",
|
||||
"version": "==2.0.0"
|
||||
},
|
||||
"sphinxcontrib-jsmath": {
|
||||
|
@ -1493,7 +1534,7 @@
|
|||
"sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02",
|
||||
"sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"
|
||||
],
|
||||
"markers": "python_version < '3.8'",
|
||||
"markers": "python_version >= '3.7'",
|
||||
"version": "==4.3.0"
|
||||
},
|
||||
"urllib3": {
|
||||
|
|
|
@ -9,3 +9,7 @@ from .pba_results import PbaResults
|
|||
from monkey_island.cc.models.report.report import Report
|
||||
from .simulation import Simulation, SimulationSchema, IslandMode
|
||||
from .user_credentials import UserCredentials
|
||||
from .machine import Machine, MachineID
|
||||
from .communication_type import CommunicationType
|
||||
from .node import Node
|
||||
from .agent import Agent
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from . import MachineID
|
||||
from .base_models import MutableBaseModel
|
||||
|
||||
|
||||
class Agent(MutableBaseModel):
|
||||
id: UUID = Field(..., allow_mutation=False)
|
||||
machine_id: MachineID = Field(..., allow_mutation=False)
|
||||
start_time: datetime = Field(..., allow_mutation=False)
|
||||
stop_time: Optional[datetime]
|
||||
parent_id: UUID = Field(..., allow_mutation=False)
|
||||
cc_server: str = Field(default="")
|
||||
log_contents: str = Field(default="")
|
|
@ -0,0 +1,51 @@
|
|||
import json
|
||||
from typing import Sequence
|
||||
|
||||
from pydantic import BaseModel, Extra, ValidationError
|
||||
|
||||
|
||||
class InfectionMonkeyModelConfig:
|
||||
underscore_attrs_are_private = True
|
||||
extra = Extra.forbid
|
||||
|
||||
|
||||
class InfectionMonkeyBaseModel(BaseModel):
|
||||
class Config(InfectionMonkeyModelConfig):
|
||||
pass
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
try:
|
||||
super().__init__(**kwargs)
|
||||
except ValidationError as err:
|
||||
# TLDR: This exception handler allows users of this class to be decoupled from pydantic.
|
||||
#
|
||||
# When validation of a pydantic object fails, pydantic raises a `ValidationError`, which
|
||||
# is a `ValueError`, even if the real cause was a `TypeError`. Furthermore, allowing
|
||||
# `pydantic.ValueError` to be raised would couple other modules to pydantic, which is
|
||||
# undesirable. This exception handler re-raises the first validation error that pydantic
|
||||
# encountered. This allows users of these models to `except` `TypeError` or `ValueError`
|
||||
# and handle them. Pydantic-specific errors are still raised, but they inherit from
|
||||
# `TypeError` or `ValueError`.
|
||||
e = err.raw_errors[0]
|
||||
while isinstance(e, Sequence):
|
||||
e = e[0]
|
||||
|
||||
raise e.exc
|
||||
|
||||
# We need to be able to convert our models to fully simplified dictionaries. The
|
||||
# `BaseModel.dict()` does not support this. There is a proposal to add a `simplify` keyword
|
||||
# argument to `dict()` to support this. See
|
||||
# https://github.com/pydantic/pydantic/issues/951#issuecomment-552463606. The hope is that we
|
||||
# can override `dict()` with an implementation of `simplify` and remove it when the feature gets
|
||||
# merged. If the feature doesn't get merged, or the interface is changed, this function can
|
||||
# continue to serve as a wrapper until we can update all references to it.
|
||||
def dict(self, simplify=False, **kwargs):
|
||||
if simplify:
|
||||
return json.loads(self.json())
|
||||
return BaseModel.dict(self, **kwargs)
|
||||
|
||||
|
||||
class MutableBaseModel(InfectionMonkeyBaseModel):
|
||||
class Config(InfectionMonkeyModelConfig):
|
||||
allow_mutation = True
|
||||
validate_assignment = True
|
|
@ -0,0 +1,15 @@
|
|||
from enum import Enum
|
||||
|
||||
|
||||
class CommunicationType(Enum):
|
||||
"""
|
||||
An Enum representing different types of communication between agents and the Island
|
||||
|
||||
This Enum represents the different was agents can communicate with each other and with the
|
||||
Island. The value of each member is the member's name in all lower-case characters.
|
||||
"""
|
||||
|
||||
SCANNED = "scanned"
|
||||
EXPLOITED = "exploited"
|
||||
CC = "cc"
|
||||
CC_TUNNEL = "cc_tunnel"
|
|
@ -0,0 +1,24 @@
|
|||
from ipaddress import IPv4Interface
|
||||
from typing import Optional, Sequence
|
||||
|
||||
from pydantic import Field, PositiveInt, validator
|
||||
|
||||
from common import OperatingSystems
|
||||
|
||||
from .base_models import MutableBaseModel
|
||||
from .transforms import make_immutable_sequence
|
||||
|
||||
MachineID = PositiveInt
|
||||
|
||||
|
||||
class Machine(MutableBaseModel):
|
||||
id: MachineID = Field(..., allow_mutation=False)
|
||||
hardware_id: Optional[PositiveInt]
|
||||
network_interfaces: Sequence[IPv4Interface]
|
||||
operating_system: OperatingSystems
|
||||
operating_system_version: str
|
||||
hostname: str
|
||||
|
||||
_make_immutable_sequence = validator("network_interfaces", pre=True, allow_reuse=True)(
|
||||
make_immutable_sequence
|
||||
)
|
|
@ -0,0 +1,18 @@
|
|||
from typing import Sequence, Tuple
|
||||
|
||||
from pydantic import Field, validator
|
||||
|
||||
from . import CommunicationType, MachineID
|
||||
from .base_models import MutableBaseModel
|
||||
from .transforms import make_immutable_nested_sequence
|
||||
|
||||
ConnectionsSequence = Sequence[Tuple[MachineID, Sequence[CommunicationType]]]
|
||||
|
||||
|
||||
class Node(MutableBaseModel):
|
||||
machine_id: MachineID = Field(..., allow_mutation=False)
|
||||
connections: ConnectionsSequence
|
||||
|
||||
_make_immutable_nested_sequence = validator("connections", pre=True, allow_reuse=True)(
|
||||
make_immutable_nested_sequence
|
||||
)
|
|
@ -0,0 +1,36 @@
|
|||
from typing import Any, MutableSequence, Sequence, Union
|
||||
|
||||
|
||||
def make_immutable_nested_sequence(sequence_or_element: Union[Sequence, Any]) -> Sequence:
|
||||
"""
|
||||
Take a Sequence of Sequences (or other types) and return an immutable copy
|
||||
|
||||
Takes a Sequence of Sequences, for example `List[List[int, float]]]` and returns an immutable
|
||||
copy. Note that if the Sequence does not contain other sequences, `make_sequence_immutable()`
|
||||
will be more performant.
|
||||
|
||||
:param sequence_or_element: A nested sequence or an element from within a nested sequence
|
||||
:return: An immutable copy of the sequence if `sequence_or_element` is a Sequence, otherwise
|
||||
just return `sequence_or_element`
|
||||
"""
|
||||
if isinstance(sequence_or_element, str):
|
||||
return sequence_or_element
|
||||
|
||||
if isinstance(sequence_or_element, Sequence):
|
||||
return tuple(map(make_immutable_nested_sequence, sequence_or_element))
|
||||
|
||||
return sequence_or_element
|
||||
|
||||
|
||||
def make_immutable_sequence(sequence: Sequence):
|
||||
"""
|
||||
Take a Sequence and return an immutable copy
|
||||
|
||||
:param sequence: A Sequence to create an immutable copy from
|
||||
:return: An immutable copy of `sequence`
|
||||
"""
|
||||
|
||||
if isinstance(sequence, MutableSequence):
|
||||
return tuple(sequence)
|
||||
|
||||
return sequence
|
|
@ -0,0 +1,133 @@
|
|||
from datetime import datetime, timezone
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from monkey_island.cc.models import Agent
|
||||
|
||||
AGENT_ID = UUID("012e7238-7b81-4108-8c7f-0787bc3f3c10")
|
||||
PARENT_ID = UUID("0fc9afcb-1902-436b-bd5c-1ad194252484")
|
||||
|
||||
AGENT_OBJECT_DICT = {
|
||||
"id": AGENT_ID,
|
||||
"machine_id": 2,
|
||||
"parent_id": PARENT_ID,
|
||||
"start_time": datetime.fromtimestamp(1660848408, tz=timezone.utc),
|
||||
}
|
||||
|
||||
AGENT_SIMPLE_DICT = {
|
||||
"id": str(AGENT_ID),
|
||||
"machine_id": 2,
|
||||
"parent_id": str(PARENT_ID),
|
||||
"start_time": "2022-08-18T18:46:48+00:00",
|
||||
}
|
||||
|
||||
|
||||
def test_constructor__defaults_from_objects():
|
||||
a = Agent(**AGENT_OBJECT_DICT)
|
||||
|
||||
assert a.stop_time is None
|
||||
assert a.cc_server == ""
|
||||
|
||||
|
||||
def test_constructor__defaults_from_simple_dict():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
assert a.stop_time is None
|
||||
assert a.cc_server == ""
|
||||
assert a.log_contents == ""
|
||||
|
||||
|
||||
def test_to_dict():
|
||||
a = Agent(**AGENT_OBJECT_DICT)
|
||||
agent_simple_dict = AGENT_SIMPLE_DICT.copy()
|
||||
agent_simple_dict["stop_time"] = None
|
||||
agent_simple_dict["cc_server"] = ""
|
||||
agent_simple_dict["log_contents"] = ""
|
||||
|
||||
assert a.dict(simplify=True) == agent_simple_dict
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, value",
|
||||
[
|
||||
("id", 1),
|
||||
("machine_id", "not-an-int"),
|
||||
("start_time", None),
|
||||
("stop_time", []),
|
||||
("parent_id", 2.1),
|
||||
("cc_server", []),
|
||||
("log_contents", None),
|
||||
],
|
||||
)
|
||||
def test_construct_invalid_field__type_error(key, value):
|
||||
invalid_type_dict = AGENT_SIMPLE_DICT.copy()
|
||||
invalid_type_dict[key] = value
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
Agent(**invalid_type_dict)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, value",
|
||||
[
|
||||
("machine_id", -1),
|
||||
("start_time", "not-a-datetime"),
|
||||
("stop_time", "not-a-datetime"),
|
||||
],
|
||||
)
|
||||
def test_construct_invalid_field__value_error(key, value):
|
||||
invalid_value_dict = AGENT_SIMPLE_DICT.copy()
|
||||
invalid_value_dict[key] = value
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
Agent(**invalid_value_dict)
|
||||
|
||||
|
||||
def test_id_immutable():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
a.id = PARENT_ID
|
||||
|
||||
|
||||
def test_machine_id_immutable():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
a.machine_id = 10
|
||||
|
||||
|
||||
def test_start_time_immutable():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
a.start_time = 100
|
||||
|
||||
|
||||
def test_parent_id_immutable():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
a.parent_id = AGENT_ID
|
||||
|
||||
|
||||
def test_stop_time_set_validated():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
a.stop_time = "testing!"
|
||||
|
||||
|
||||
def test_cc_server_set_validated():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
a.cc_server = None
|
||||
|
||||
|
||||
def test_log_contents_set_validated():
|
||||
a = Agent(**AGENT_SIMPLE_DICT)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
a.log_contents = None
|
|
@ -0,0 +1,160 @@
|
|||
import uuid
|
||||
from ipaddress import IPv4Interface
|
||||
from types import MappingProxyType
|
||||
from typing import MutableSequence
|
||||
|
||||
import pytest
|
||||
|
||||
from common import OperatingSystems
|
||||
from monkey_island.cc.models import Machine
|
||||
|
||||
MACHINE_OBJECT_DICT = MappingProxyType(
|
||||
{
|
||||
"id": 1,
|
||||
"hardware_id": uuid.getnode(),
|
||||
"network_interfaces": [IPv4Interface("10.0.0.1/24"), IPv4Interface("192.168.5.32/16")],
|
||||
"operating_system": OperatingSystems.WINDOWS,
|
||||
"operating_system_version": "eXtra Problems",
|
||||
"hostname": "my.host",
|
||||
}
|
||||
)
|
||||
|
||||
MACHINE_SIMPLE_DICT = MappingProxyType(
|
||||
{
|
||||
"id": 1,
|
||||
"hardware_id": uuid.getnode(),
|
||||
"network_interfaces": ["10.0.0.1/24", "192.168.5.32/16"],
|
||||
"operating_system": "windows",
|
||||
"operating_system_version": "eXtra Problems",
|
||||
"hostname": "my.host",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_constructor():
|
||||
# Raises exception_on_failure
|
||||
Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
|
||||
def test_from_dict():
|
||||
# Raises exception_on_failure
|
||||
Machine(**MACHINE_SIMPLE_DICT)
|
||||
|
||||
|
||||
def test_to_dict():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
assert m.dict(simplify=True) == dict(MACHINE_SIMPLE_DICT)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, value",
|
||||
[
|
||||
("id", "not-an-int"),
|
||||
("hardware_id", "not-an-int"),
|
||||
("network_interfaces", "not-a-list"),
|
||||
("operating_system", 2.1),
|
||||
("operating_system", "bsd"),
|
||||
("operating_system_version", {}),
|
||||
("hostname", []),
|
||||
],
|
||||
)
|
||||
def test_construct_invalid_field__type_error(key, value):
|
||||
invalid_type_dict = MACHINE_SIMPLE_DICT.copy()
|
||||
invalid_type_dict[key] = value
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
Machine(**invalid_type_dict)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, value",
|
||||
[
|
||||
("id", -1),
|
||||
("hardware_id", 0),
|
||||
("network_interfaces", [1, "stuff", 3]),
|
||||
("network_interfaces", ["10.0.0.1/16", 2, []]),
|
||||
],
|
||||
)
|
||||
def test_construct_invalid_field__value_error(key, value):
|
||||
invalid_type_dict = MACHINE_SIMPLE_DICT.copy()
|
||||
invalid_type_dict[key] = value
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
Machine(**invalid_type_dict)
|
||||
|
||||
|
||||
def test_construct__extra_fields_forbidden():
|
||||
extra_field_dict = MACHINE_SIMPLE_DICT.copy()
|
||||
extra_field_dict["extra_field"] = 99 # red balloons
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
Machine(**extra_field_dict)
|
||||
|
||||
|
||||
def test_id_immutable():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
with pytest.raises(TypeError):
|
||||
m.id = 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("hardware_id", [None, 1, 100])
|
||||
def test_hardware_id_set_valid_value(hardware_id):
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
# Raises exception_on_failure
|
||||
m.hardware_id = hardware_id
|
||||
|
||||
|
||||
def test_hardware_id_validate_on_set():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
with pytest.raises(ValueError):
|
||||
m.hardware_id = -50
|
||||
|
||||
|
||||
def test_network_interfaces_set_valid_value():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
# Raises exception_on_failure
|
||||
m.network_interfaces = [IPv4Interface("172.1.2.3/24")]
|
||||
|
||||
|
||||
def test_network_interfaces_set_invalid_value():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
m.network_interfaces = [IPv4Interface("172.1.2.3/24"), None]
|
||||
|
||||
|
||||
def test_network_interfaces_sequence_is_immutable():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
assert not isinstance(m.network_interfaces, MutableSequence)
|
||||
|
||||
|
||||
def test_operating_system_set_valid_value():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
# Raises exception_on_failure
|
||||
m.operating_system = OperatingSystems.LINUX
|
||||
|
||||
|
||||
def test_operating_system_set_invalid_value():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
m.operating_system = "MacOS"
|
||||
|
||||
|
||||
def test_set_operating_system_version():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
# Raises exception_on_failure
|
||||
m.operating_system_version = "1234"
|
||||
|
||||
|
||||
def test_set_hostname():
|
||||
m = Machine(**MACHINE_OBJECT_DICT)
|
||||
|
||||
# Raises exception_on_failure
|
||||
m.operating_system_version = "wopr"
|
|
@ -0,0 +1,96 @@
|
|||
from typing import MutableSequence
|
||||
|
||||
import pytest
|
||||
|
||||
from monkey_island.cc.models import CommunicationType, Node
|
||||
|
||||
|
||||
def test_constructor():
|
||||
machine_id = 1
|
||||
connections = (
|
||||
(6, (CommunicationType.SCANNED,)),
|
||||
(7, (CommunicationType.SCANNED, CommunicationType.EXPLOITED)),
|
||||
)
|
||||
n = Node(
|
||||
machine_id=1,
|
||||
connections=connections,
|
||||
)
|
||||
|
||||
assert n.machine_id == machine_id
|
||||
assert n.connections == connections
|
||||
|
||||
|
||||
def test_serialization():
|
||||
node_dict = {
|
||||
"machine_id": 1,
|
||||
"connections": [
|
||||
[
|
||||
6,
|
||||
["cc", "scanned"],
|
||||
],
|
||||
[7, ["exploited", "cc_tunnel"]],
|
||||
],
|
||||
}
|
||||
n = Node(**node_dict)
|
||||
|
||||
assert n.dict(simplify=True) == node_dict
|
||||
|
||||
|
||||
def test_machine_id_immutable():
|
||||
n = Node(machine_id=1, connections=[])
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
n.machine_id = 2
|
||||
|
||||
|
||||
def test_machine_id__invalid_type():
|
||||
with pytest.raises(TypeError):
|
||||
Node(machine_id=None, connections=[])
|
||||
|
||||
|
||||
def test_machine_id__invalid_value():
|
||||
with pytest.raises(ValueError):
|
||||
Node(machine_id=-5, connections=[])
|
||||
|
||||
|
||||
def test_connections__mutable():
|
||||
n = Node(machine_id=1, connections=[])
|
||||
|
||||
# Raises exception on failure
|
||||
n.connections = [(5, []), (7, [])]
|
||||
|
||||
|
||||
def test_connections__invalid_machine_id():
|
||||
n = Node(machine_id=1, connections=[])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
n.connections = [(5, []), (-5, [])]
|
||||
|
||||
|
||||
def test_connections__recursively_immutable():
|
||||
n = Node(
|
||||
machine_id=1,
|
||||
connections=[
|
||||
[6, [CommunicationType.SCANNED]],
|
||||
[7, [CommunicationType.SCANNED, CommunicationType.EXPLOITED]],
|
||||
],
|
||||
)
|
||||
|
||||
assert not isinstance(n.connections, MutableSequence)
|
||||
assert not isinstance(n.connections[0], MutableSequence)
|
||||
assert not isinstance(n.connections[1], MutableSequence)
|
||||
assert not isinstance(n.connections[0][1], MutableSequence)
|
||||
assert not isinstance(n.connections[1][1], MutableSequence)
|
||||
|
||||
|
||||
def test_connections__set_invalid_communications_type():
|
||||
connections = (
|
||||
[
|
||||
[8, [CommunicationType.SCANNED, "invalid_comm_type"]],
|
||||
],
|
||||
)
|
||||
|
||||
n = Node(machine_id=1, connections=[])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
n.connections = connections
|
|
@ -0,0 +1,53 @@
|
|||
from itertools import zip_longest
|
||||
import pytest
|
||||
from typing import MutableSequence, Sequence
|
||||
|
||||
from monkey_island.cc.models.transforms import (
|
||||
make_immutable_nested_sequence,
|
||||
make_immutable_sequence,
|
||||
)
|
||||
|
||||
|
||||
def test_make_immutable_sequence__list():
|
||||
mutable_sequence = [1, 2, 3]
|
||||
immutable_sequence = make_immutable_sequence(mutable_sequence)
|
||||
|
||||
assert isinstance(immutable_sequence, Sequence)
|
||||
assert not isinstance(immutable_sequence, MutableSequence)
|
||||
assert_sequences_equal(mutable_sequence, immutable_sequence)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"mutable_sequence", [
|
||||
[1, 2, 3],
|
||||
[[1, 2, 3], [4, 5, 6]],
|
||||
[[1, 2, 3, [4, 5, 6]], [4, 5, 6]],
|
||||
[8, [5.3, "invalid_comm_type"]]]
|
||||
)
|
||||
def test_make_immutable_nested_sequence(mutable_sequence):
|
||||
immutable_sequence = make_immutable_nested_sequence(mutable_sequence)
|
||||
|
||||
assert isinstance(immutable_sequence, Sequence)
|
||||
assert not isinstance(immutable_sequence, MutableSequence)
|
||||
assert_sequences_equal(mutable_sequence, immutable_sequence)
|
||||
|
||||
|
||||
def assert_sequence_immutable_recursive(sequence: Sequence):
|
||||
assert not isinstance(sequence, MutableSequence)
|
||||
|
||||
for s in sequence:
|
||||
if isinstance(s, str):
|
||||
continue
|
||||
|
||||
if isinstance(s, Sequence):
|
||||
assert_sequence_immutable_recursive(s)
|
||||
assert not isinstance(s, MutableSequence)
|
||||
|
||||
|
||||
def assert_sequences_equal(a: Sequence, b: Sequence):
|
||||
assert len(a) == len(b)
|
||||
for i, j in zip_longest(a, b):
|
||||
if isinstance(i, str) or not isinstance(i, Sequence):
|
||||
assert i == j
|
||||
else:
|
||||
assert_sequences_equal(i, j)
|
|
@ -209,6 +209,15 @@ _serialize_credentials # unused method (monkey/common/credentials/credentials:6
|
|||
|
||||
# Models
|
||||
_make_simulation # unused method (monkey/monkey_island/cc/models/simulation.py:19
|
||||
operating_system_version
|
||||
_make_sequence_immutable
|
||||
Agent
|
||||
machine_id
|
||||
stop_time
|
||||
parent_id
|
||||
cc_server
|
||||
hardware_id
|
||||
connections
|
||||
|
||||
# TODO DELETE AFTER RESOURCE REFACTORING
|
||||
|
||||
|
@ -274,3 +283,16 @@ serialize
|
|||
event
|
||||
deserialize
|
||||
serialized_event
|
||||
|
||||
# pydantic base models
|
||||
underscore_attrs_are_private
|
||||
extra
|
||||
allow_mutation
|
||||
validate_assignment
|
||||
|
||||
# CommunicationType
|
||||
CommunicationType
|
||||
SCANNED
|
||||
EXPLOITED
|
||||
CC
|
||||
CC_TUNNEL
|
||||
|
|
Loading…
Reference in New Issue